Storm是Twitter开源的分布式实时大数据处理框架,最早开源于github. 2013年,Storm进入Apache社区进行孵化. 2014年9月,晋级成为了Apache顶级项目. 国内外各大网站使用,例如雅虎、阿里、度 官网 http://storm.apache.org/
特点
注意:
组件说明
拓扑图
架构 详细说明见第四章第一节
数据结构
有向无环图(DAG,directed acyclic graph): 起始点一定是spout, 终点一定是 bolt, 拓扑有方向, 如下图
1.Topology(译为拓扑结构) – DAG有向无环图的实现
2.Tuple – 元组
3.Stream – 数据流
4.Spout – 数据源
5.Bolt – 数据流处理组件
6.Stream Grouping – 数据流分组(即数据分发策略)
注意: 1,4,5,6 在Storm开发中经常用到
环境准备, 案例用到的jar在底部分享, 下载后在项目下创建一个lib目录, 然后右击bulild path全部即可
用于数据的推送 这里是将每个i 的值推送给 bolt 进行处理
package ah.szxy.storm.bolt; import java.util.List; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; /** * 手动继承 BaseRichSpout, 实现它的未实现方法 * @author chy */ public class WsSpout extends BaseRichSpout{ private Map map; private TopologyContext context; private SpoutOutputCollector collector; int i=0;//nextTuple方法会被循环调用,因此i应该是成员变量 /** * 1.配置初始化spout * 将局部变量赋值给成员变量, 目的是提升局部变量的作用域 */ @Override public void open(Map map, TopologyContext context, SpoutOutputCollector collector) { this.map=map; this.context=context; this.collector=collector; } /** * 2.采集并且向后推送数据 */ @Override public void nextTuple() { /** * 这里体现了面向接口的核心思想 * 如果声明直接使用Values, 接收数据的类型就会被限制死了 */ List list = new Values(i++); this.collector.emit(list); System.err.println("num==========="+list); Utils.sleep(1000);//和线程休眠效果一样,storm包提供 } /** * 3.向接收的数据的逻辑处理单元声明发送数据的字段名称 */ @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("num")); } }
用于对spout的数据进行逻辑处理 这里是对数据进行求和
package ah.szxy.storm.spout; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; /** * 继承BaseRichBolt, 实现相关方法 * @author chy * */ public class WsBolt extends BaseRichBolt{ //成员变量 private Map stormConf; private TopologyContext comtext; private OutputCollector collector; //求和 int sum=0; /** * 准备阶段(提供逻辑运算的环境) * 将局部变量赋值给成员变量, 目的是提升局部变量的作用域 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.stormConf=stormConf; this.comtext=context; this.collector=collector; } /** * 获取数据 ( 有必要的话, 向后继续发送数据 ) */ @Override public void execute(Tuple input) { // input.getInteger(0); int num=input.getIntegerByField("num");//接收的是spout类中declareOutputFields方法声明的字段名称 sum+=num; System.err.println("sum========================="+sum); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
构建拓扑结构模型 测试程序是否正常运行
package ah.szxy.storm.test; import ah.szxy.storm.bolt.WsSpout; import ah.szxy.storm.spout.WsBolt; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class TestWs { /** * 建立拓扑结构, 加入集群中运行 * @param args 命令行参数 */ public static void main(String[] args) { //构建storm拓扑结构( Topology: 拓扑结构) TopologyBuilder tb=new TopologyBuilder(); tb.setSpout("wsspout", new WsSpout()); tb.setBolt("wsbolt", new WsBolt()).shuffleGrouping("wsspout"); //创建本地storm集群 LocalCluster lc=new LocalCluster(); //将任务布置到集群中运行 lc.submitTopology("wordsum", new Config(), tb.createTopology()); } }
注意:
需要注意的是这里采取了随机的方式推送数据 因此下面在结果打印时, 打印的数据可能相同
/** * spout数据推送 * @author chy * */ public class WcSpout extends BaseRichSpout{ private Map conf; private TopologyContext context; private SpoutOutputCollector collector; //定义需要被统计字符串数据 String[] text= { "I am a walker", "I like play computer and comic", "I like study and sing", "My nickname is TimePause", "TimePause is not simple history" }; //定义一个随机数变量r Random r=new Random(); /** * 初始化方法 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf=conf; this.context=context; this.collector=collector; } /** * 采集并向后推送数据 */ @Override public void nextTuple() { //从数组中随机取出一行,放到list集合中 List line=new Values(text[r.nextInt(text.length)]); //推送数据 this.collector.emit(line); System.err.println("spout emit line========"+line); Utils.sleep(1000); } /** * 向接收的数据的逻辑处理单元声明发送数据的字段名称 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }
/** * 第一个Bolt---进行分词 * @author chy * */ public class WcSplitBolt extends BaseRichBolt{ Map stormConf; TopologyContext context; OutputCollector collector; /** * 准备阶段(提供逻辑运算的环境) * 将局部变量赋值给成员变量, 目的是提升局部变量的作用域 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.stormConf=stormConf; this.context=context; this.collector=collector; } /** * 获取tuple元祖中每一行数据并切割 * @param input */ @Override public void execute(Tuple input) { //input.getString(0);//通过偏移量获取 String line=input.getStringByField("line"); //切割 String[] words = line.split(" "); for (String word : words) { List wordList=new Values(word); this.collector.emit(wordList);//发送数据 } } /** * 向接收的数据的逻辑处理单元声明发送数据的字段名称 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("wordList")); } }
/** * 第二个Bolt---分词后的统计 * @author chy * */ public class WcCountBolt extends BaseRichBolt{ //用来存放,单词,以及单词出现的个数 Map<String, Integer> map=new HashMap<String, Integer>(); /** * 准备阶段(提供逻辑运算的环境) */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } /** * 获取tuple元祖中每一个单词, 并且按照单词统计出出现的次数 * @param input */ @Override public void execute(Tuple input) { String word=input.getStringByField("wordList");//到这里获取的方式时一个一个的获取 //存放单词数量,之所以不设置为全局是因为每次key的值都不一样 int count=1; if (map.containsKey(word)) {//如果出现,则count+1 count=(int)map.get(word)+1;//map.get(key)获取map的值 } map.put(word, count); System.err.println("WcCountBolt emit===key:"+word+"==count:"+count); } /** * 向接收的数据的逻辑处理单元声明发送数据的字段名称 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
/** * 测试类 * @author chy * */ public class TestWc { public static void main(String[] args) { //创建拓扑结构 TopologyBuilder tb=new TopologyBuilder(); tb.setSpout("WcSpout", new WcSpout()); tb.setBolt("WcSplitBolt",new WcSplitBolt()).shuffleGrouping("WcSpout"); //fieldsGrouping:根据单词属性名称进行分组 tb.setBolt("WcCountBolt", new WcCountBolt(), 3).fieldsGrouping("WcSplitBolt", new Fields("wordList")); //创建本地集群 LocalCluster lc=new LocalCluster(); //发布任务到集群 lc.submitTopology("WordCount", new Config(), tb.createTopology()); } }
结果展示
因为spout采取随机推送, 因此数据重复的可能性非常大
由上面两个案例的test方法中我们可以看到Storm Grouping的作用,下面我们来具体学习一下它吧~~~
随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。 轮询,平均分配
按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。
广播发送,对于每一个tuple,所有的bolts都会收到
全局分组,把tuple分配给task id最低的task 。
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
自定义,相当于mapreduce 自己去实现一个partition一样。
单一节点安装, 但是具备分布式所具备的所有组件
## 单机模式 ## 上传解压,资料分享至末尾 $ tar xf apache-storm-0.10.0.tar.gz $ cd apache-storm-0.10.0 $ storm安装目录下创建log: mkdir logs $ ./bin/storm --help 下面分别启动ZooKeeper、Nimbus、UI、supervisor、logviewer ##错误信息放到标准输入中, $ ./bin/storm dev-zookeeper >> ./logs/zk.out 2>&1 & $ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 & $ ./bin/storm ui >> ./logs/ui.out 2>&1 & $ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 & $ ./bin/storm logviewer >> ./logs/logviewer.out 2>&1 & # 需要等一会儿 $ jps 6966 Jps 6684 logviewer 6680 dev_zookeeper 6681 nimbus 6682 core 6683 supervisor # 访问图形化界面( 图1 ) http://nodex:8080 # 提交任务到Storm集群当中运行: ## 首先将WrodCount程序打包成 WrodCount.jar 放到/root/chy/software ,需要阅读下方的注意事项 ## 在Strom根目录下运如下命令 ./bin/storm jar jar全路径 主类/启动类的全路径( 图2 ) ./bin/storm jar /root/chy/software/WrodCount.jar ah.szxy.storm.tesTestWc wc
注意: 在将项目打包放到伪分布式环境中时, 修改了主类如下的代码, 使其能够依靠集群环境下运行
//提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行 Config config = new Config(); if (args.length>0) { try { StormSubmitter.submitTopology(args[0], config, tb.createTopology()); } catch (Exception e) { } }else { //创建本地集群 LocalCluster lc=new LocalCluster(); //发布任务到集群 lc.submitTopology("WordCount", config, tb.createTopology()); }
图1
图2
环境要求
java -version JDK 1.6+ python -V (系统内置) Python 2.6.6+ ZooKeeper3.4.5+ storm 0.9.4+
各节点分配情况 | Nimbus | Supervisor | Zookeeper |
---|---|---|---|
node2 | * | * | |
node3 | * | * | |
node4 | * | * |
具体步骤
思路: 首先在node2配置storm, 配置完成后分发给node3,node4
node1作为nimbus, # 1. 开始配置storm.yaml $ vim conf/storm.yaml -------------------------------------- storm.zookeeper.servers: - "node2" - "node3" - "node4" # 任务的存储目录 storm.local.dir: "/tmp/storm" # 声明主节点在哪里 nimbus.host: "node2" # 指定从节点的槽位,一个从节点对应四个槽位,一个槽位对应一个worker,一个worker对应一个端口 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 ------------------------------- # 2.在storm目录中创建logs目录 $ mkdir logs # 3. (分发)集群其他服务器node3,node4 ## 启动ZooKeeper集群(node2,3,4) zkServer.sh start # 4. node1上启动Nimbus ## 2>&1的意思就是将标准错误重定向到标准输出, & 为后台输出 $ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 & $ tail -f logs/nimbus.log $ ./bin/storm ui >> ./logs/ui.out 2>&1 & $ tail -f logs/ui.log # 5. 节点node2和node3启动supervisor,按照配置,每启动一个supervisor就有了4个slots $ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 & $ tail -f logs/supervisor.log (当然node1也可以启动supervisor) # 6.访问图形化界面(图1),至此安装完成 http://node2:8080/ # 集群测试 ## 上传jar任务到Storm集群当中运行(可以从Supervisor节点提交,但是会汇总到nimbus的/tmp/storm目录下, 图2,图3): $ ./bin/storm jar /root/chy/software/WrodCount2.jar ah.szxy.storm.test.TestWc wc ## 观察关闭一个supervisor后,nimbus的重新调度 ## 再次启动一个新的supervisor后,观察,并rebalance, 可以通过图形化页面来操作
注意: 在打包前, 修改了主类的相关代码 , 设置了相关的进程和线程数, 以及worker的数目
public class TestWc { /** * 建立拓扑结构, 加入集群中运行 * @param args 命令行参数 */ public static void main(String[] args) { //创建拓扑结构 TopologyBuilder tb=new TopologyBuilder(); tb.setSpout("WcSpout", new WcSpout(),2); tb.setBolt("WcSplitBolt",new WcSplitBolt(),4).shuffleGrouping("WcSpout"); //fieldsGrouping:根据单词属性名称进行分组 tb.setBolt("WcCountBolt", new WcCountBolt(),2).setNumTasks(4).fieldsGrouping("WcSplitBolt", new Fields("wordList")); //提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行 Config config = new Config(); config.setNumWorkers(2); if (args.length>0) { try { StormSubmitter.submitTopology(args[0], config, tb.createTopology()); } catch (Exception e) { } }else { //创建本地集群 LocalCluster lc=new LocalCluster(); //发布任务到集群 lc.submitTopology("WordCount", config, tb.createTopology()); } } }
图1
图2
图3
Worker – 进程
Executor – 线程
Task
设置Worker进程数
Config.setNumWorkers(int workers)
设置Executor线程数
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
:其中, parallelism_hint即为executor线程数
设置Task数量
ComponentConfigurationDeclarer.setNumTasks(Number val)
例:
Config conf = new Config() ; conf.setNumWorkers(2); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("spout", new MySpout(), 1); topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping("blue-spout);
复杂情况下的配置图与代码截图
该图5进程6任务的原因是: 有一个进程分配了两个任务(GreenBolt)
配置图
代码截图
因为有两个worker, 因此进程数是原来的两倍, 可知原来进程为5个
Rebalance – 再平衡 即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量
支持两种调整方式: 1、通过Storm UI 2、通过Storm CLI
通过Storm CLI动态调整:
例:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 可以通过 help rebalance 将mytopology拓扑worker进程数量调整为5个 “ blue-spout ” 所使用的线程数量调整为3个 “ yellow-bolt ”所使用的线程数量调整为10个
Worker进程间的数据通信
Worker内部的数据通信
1、集群节点宕机
2、进程挂掉
3、消息的完整性
Acker – 消息完整性的实现机制
DRPC设计目的:
为了充分利用Storm的计算能力实现高密度的并行实时计算。 (Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
Drpc 流程介绍
定义DRPC拓扑:
1、本地模式
2、远程模式(集群模式)
# 1. 修改配置文件conf/storm.yaml(指定为当前主节点nimbus即可) ----------将该更改分发到集群的其他节点----------------- drpc.servers: - "node2“ ---------------------------------------------------- # 2. 启动DRPC Server bin/storm drpc & # 3. 通过StormSubmitter.submitTopology提交拓扑
事务性拓扑(Transactional Topologies):保证消息(tuple)被且仅被处理一次 官网介绍
强顺序流(强有序)
两种情况: 1、当前transaction id与数据库中的transaction id不一致( 表示新的事务, 往里面存) 2、两个transaction id相同( 覆盖或者让新的变量指向原来的数据库)
缺点: 一次只能处理一个tuple,无法实现分布式计算
强顺序的Batch流
一个关键的认识是,并非所有处理批处理元组的工作都需要有序地进行。例如,在计算全局计数时,计算分为两个部分:
#2的计算需要在批之间进行严格排序,但是没有理由您不应该通过为多个批并行计算#1 来流水线化批的计算。因此,当批次1正在更新数据库时,批次2至10可以计算其部分计数。
Storm通过将批处理的计算分为两个阶段来实现这一区别:
这两个阶段一起称为“交易”。在给定的时刻,许多批次可以处于处理阶段,但是只有一个批次可以处于提交阶段。如果批处理或提交阶段发生任何故障,则将重播整个事务(两个阶段)。
Design details(设计细节)
三种事务: 三种分区介绍
前提: 安装了Flume,Kafka,以及Storm Flume介绍以及安装 Kafka介绍以及安装
该过程实现了数据的清洗
1.启动zk集群,kafka集群,flume
启动zk zkServer.sh start 启动kafka kafka-server-start.sh /opt/kafka/config/server.properties 启动flume( flume-kafka.conf为flume的启动脚本,见本人Kafka博文介绍第三章 ) flume-ng agent -n a1 -c conf -f /opt/flume/conf/flume-kafka.conf -Dflume.root.logger=DEBUG,console
2.启动kafka的消费者端进程
监听testflume 数据流转 kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic testflume 监听LogError数据流转 kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic LogError
3.运行代码测试 a.运行RpcClientDemo , 查看testflume监听的数据流转情况(图1)
/** * Flume官网案例 * http://flume.apache.org/FlumeDeveloperGuide.html * @author root */ public class RpcClientDemo { public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); // Initialize client with the remote Flume agent's host and port client.init("node2", 41414); // Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. for (int i = 100; i < 150; i++) { String sampleData = "Hello Flume!ERROR" + i; client.sendDataToFlume(sampleData); System.out.println("发送数据:" + sampleData); } client.cleanUp(); } } class MyRpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of the // above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); // Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of // the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }
b.运行LogFilterTopology ,过滤数据,并将数据发送给kafka集群中的 LogError主题,效果如图2
/** * This topology demonstrates Storm's stream groupings and multilang * capabilities. */ public class LogFilterTopology { public static class FilterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String line = tuple.getString(0); System.err.println("Accept: " + line); // 包含ERROR的行留下 if (line.contains("ERROR")) { System.err.println("Filter: " + line); collector.emit(new Values(line)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用 declarer.declare(new Fields("message")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // https://github.com/apache/storm/tree/master/external/storm-kafka // config kafka spout,话题 String topic = "testflume"; ZkHosts zkHosts = new ZkHosts("node2:2181,node3:2181,node4:2181"); // /MyKafka,偏移量offset的根目录,记录队列取到了哪里 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用 List<String> zkServers = new ArrayList<String>(); System.out.println(zkHosts.brokerZkStr); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = 2181; // 是否从头开始消费 spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 1000; // StringScheme将字节流转解码成某种编码的字符串 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); // set kafka spout builder.setSpout("kafka_spout", kafkaSpout, 3); // set bolt builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout"); // 数据写出 // set kafka bolt // withTopicSelector使用缺省的选择器指定写入的topic: LogError // withTupleToKafkaMapper tuple==>kafka的key和message KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter"); Config conf = new Config(); // set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "node2:9092,node3:9092,node4:9092"); /** * Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 : * 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 : * 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据 */ props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node2", "node3", "node4" })); // 本地方式运行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } }
c.修改 RpcClientDemo 中的循环语句,验证 FilterBolt是否起到了过滤的作用 查看testflume, 图3; 查看LogError, 图4 可以看到数据流转到了testflume主题, 而没有流转到LogError,由此可以看出 FilterBolt起到了过滤的作用
// Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. for (int i = 200; i < 250; i++) { String sampleData = "Hello Flume!" + i; client.sendDataToFlume(sampleData); System.out.println("发送数据:" + sampleData); }
图1
图2
图3
如果自己想应聘大公司, 一定要去别人技术分享网站看一看,就像美团技术团队官网
链接:https://pan.baidu.com/s/1wu9qYQZPxkqOdiY5QGR2cg 点赞私聊获取资料~~~ 提取码:m8kh
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句