首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据实时处理实战

大数据实时处理实战

作者头像
CSDN技术头条
发布2018-02-12 16:01:17
2.2K0
发布2018-02-12 16:01:17
举报

随着互联网时代的发展,运营商作为内容传送的管道服务商,在数据领域具有巨大的优势,如何将这些数据转化为价值,越来越被运营商所重视。

运营商的大数据具有体量大,种类多的特点,如各类话单、信令等,通常一种话单每天的数据量就有上百亿条。随着业务分析需求对数据处理实时性的要求越来越高,也给我们的大数据处理架构带来了巨大的挑战,参照网络上可查的例子,运用到实际处理架构上,经常会因为实时数据流量大,造成系统运行不稳定及各种异常。从大数据实时处理架构开发到上线,耗时近2个月时间,经过大量优化,我们的系统才趋于稳定。最终我们使用10台服务器的集群,实时处理每天上百亿条的数据,这里每条数据的字段数量有100个,最长的字段内容超过1000字节。

下面就来分享一下我们在实时大数据处理大体量数据的过程中,总结出来的酸甜苦辣。

  • 项目目标

在有限服务器集群数量的基础上,实现对每天超过百亿条、体量超过20T的某话单进行实时处理。具体需求是FTP收集多台话单服务器上的详单,进行实时处理后将数据存储到Hbase数据库供用户即时详单查询,同时将话单存储到Hdfs供离线分析使用。

  • 硬件资源

10台x86服务器,单机配置16盒CPU,128G内存,2T硬盘*10,300G硬盘*2(系统盘)。

  • 系统架构

10台服务器组成hadoop集群,其中NameNode节点同时作为采集机安装FTP和Flume,选取其他5台服务器安装Kafka,Zookeeper和Storm实现大数据实时流处理架构,为了充分利用集群计算资源,这5台服务器也配置了少量的Yarn计算资源,参与日常的离线数据分析需求。剩下的4台服务器我们安装了Hbase满足大数据下的秒级查询需求,系统拓扑图如下:

图一 系统拓扑图

  • 项目实施

1.使用的相关技术

我们先来回顾一下相关的大数据架构和开源技术,大数据处理分离线分析架构和实时处理架构。离线分析架构(如Hive,Map/Reduce,Spark Sql等)可以满足数据后分析,数据挖掘的应用需求。对于实时性要求高的应用,如用户即时详单查询,业务量监控等,需要应用实时处理架构。目前大数据开源实时处理架构最常见的是Storm和Spark Streaming,相比Spark Streaming准实时批处理系统,Strom是更纯粹的实时处理系统,即来一条事件就处理一条,具有更高的实时性。

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume支持单机也支持集群,支持多种数据源,如不断写入的文件、Socket、不断生成新文件的文件夹等,支持多种输出,如Hdfs、Kafka、Mysql数据库等。Flume使用时仅需实现简单配置,无需开发程序。

Kafka是一种高吞吐量的分布式发布订阅消息系统,类似一个大数据量的缓存池,支持一份数据多用户消费。ZooKeeper是一个分布式的,开源的分布式应用程序协调服务,负责存储集群间部分组件的状态同步信息。Storm分布式实时计算系统,包含Nimbus主节点和Supervisor从节点(从storm1.0以后,增加了Nimbus备份节点),节点之间需要依靠Zookeeper做状态同步。Storm集群组件:

  • Nimbus:是Storm集群的master节点,负责资源分配和任务调度。
  • Supervisor:是Storm集群的slave节点,负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程,是真正意义上的分布式计算节点。

图二 Storm集群组件

Storm应用涉及到Java程序的开发,编程模型中涉及的概念:

  • Topology:Storm中运行的一个实时应用程序,各个组件间的消息流动形成逻辑上的一个拓扑结构,Topology一旦启动,就会常驻内存并占用worker资源。
  • Spout:在一个Topology中产生源数据流的组件。通常情况下Spout会从外部数据源中读取数据,然后转换为Topology内部的源数据。
  • Bolt:在一个Topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等任何操作。
  • Tuple:一次消息传递的基本单元。

2.开源组件安装及配置

a)Flume安装及配置

从http://flume.apache.org/下载flume的安装包,解压缩;如果使用Cloudera Manager或者Ambari安装,仅需通过相应的管理页面安装配置。我们仅安装了单机的Flume,未安装Flume集群,单机Flume处理效率非常高,完全能够满足我们每天处理上百亿条数据的需求,但需要说明一点的是Flume鲁棒性非常差,经常出现进程在、但数据不处理的进程卡死状态,使用Flume时要注意以下几点:

  • flume监控目录中不能含有目录;
  • flume正在处理的文件,其他进程不能更改(如FTP正在传送中的文件,需要设置过滤条件,避免flume处理)。建议flume监控目录与FTP实时传送目录分开,避免flume处理FTP传送中的文件,导致异常,也可以设置正则表达式忽略正在传送的文件:
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
  • flume处理的文件中可能含有特殊字符,导致flume进程卡死。设置遇到不能识别的字符忽略跳过:
a1.sources.r1.decodeErrorPolicy = IGNORE
  • flume运行过程中出现GC over的内存溢出错误,配置flume-env.sh中内存配置(默认值很小);
export JAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"
  • flume启动时-c后面要给全到详细flume配置文件目录,否则flume-env.sh中的配置不会加载,会使用默认配置,例如下面启动命令给全配置文件目录:
/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/
  • 如果使用内存队列,请注意内存队列消息数的配置,设置transactionCapacity队列大小必须大于等于batchSize;
a1.channels.c1.transactionCapacity = 2000a1.sinks.k1.batchSize = 2000
  • 增加batchSize可以提升flume处理速度,原理是flume处理的event都保存在transaction队列中,直到满足了batchSize的数量条件,才一次性批量向sink发送。但是要注意实际数据量的大小,如果实际数据量很小,batchSize就不能配置过大,否则数据达不到batchSize的数量条件,会长时间积压在transaction队列中,后面的实时处理程序反而得不到数据,导致实时性变差;
  • flume中读取的一条记录长度超过2048字符,也就是4096字节就会被截断,可以在配置文件中增加如下配置项解决:
producer.sources.s.deserializer.maxLineLength=65535
  • flume字符转换异常问题,java.nio.charset.MalformedInputException: Input length = 1,可以在配置文件中增加如下配置项解决:
a1.sources.r1.inputCharset = ISO8859-1
  • flume遇到乱码停止,报异常:java.nio.charset.MalformedInputException,可以在配置文件中增加如下配置,忽略错误数据(默认是FAIL,抛异常报错,flume会停止)解决;
producer.sources.s.decodeErrorPolicy=IGNORE
  • 默认情况下,Flume处理完成的文件会增加.completed后缀,在数据量很大的情况下,会很快撑满采集机硬盘,可以在配置文件中增加如下配置,让flume处理完后自动删除该数据文件解决。
a1.sources.r1.deletePolicy = immediate

Flume配置:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /ftpdata/xdr/HTTP_tmp
a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
a1.sources.r1.fileHeader = false
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.inputCharset = ISO8859-1
a1.sources.r1.deserializer.maxLineLength = 8192
a1.sources.r1.decodeErrorPolicy = IGNORE

# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.batchSize = 10000
a1.sinks.k1.brokerList = stormmaster:9092,storm01:9092,storm02:9092,storm03:9092,storm04:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.producer.type = async
a1.sinks.k1.topic = sighttpnew

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 80000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.keep-alive = 30

Flume-env.sh配置:

# Enviroment variables can be set here.
export JAVA_HOME=/usr/java/jdk1.7.0_80
export FLUME_HOME=/hadoop/apache-flume-1.6.0-bin
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
export JAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"
# Note that the Flume conf directory is always included in the classpath.
export FLUME_CLASSPATH="/hadoop/apache-flume-1.6.0-bin/lib"

Flume启动命令:

/hadoop/apache-flume-1.6.0-bin/bin/flume-ng agent -c /hadoop/apache-flume-1.6.0-bin/conf/ -f /hadoop/apache-flume-1.6.0-bin/conf/viewdata.conf -n producer –Dflume.root.logger=ERROR &

注意一定要给全Flume配置文件的路径,否则启动Flume不能正确加载Flume-env.sh的配置。

b)Kafka集群安装及配置

http://kafka.apache.org/下载kafka安装包:kafka_*.tgz,解压后,配置server.properties文件。

server.properties配置:

#本机在kafka集群中的idbroker.id=48#服务端口port=9092#主机名host.name=storm01# The number of threads handling network requestsnum.network.threads=3# The number of threads doing disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600#kafka数据存储位置(数据量大时,需要存储的目录大小也要充分)log.dirs=/data1/kafka-logs#默认topic创建partition的数量num.partitions=1# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1#kafka事件只有flash到硬盘才能被后续消费者消费,因此要配置flash时间参数,避免小数据量情况下数据刷新时间过久log.flush.interval.messages=10000log.flush.interval.ms=1000# 数据在kafka中保存的时间,单位小时,超时的数据kafka会自动删除log.retention.hours=48# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according to the retention policieslog.retention.check.interval.ms=300000# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.log.cleaner.enable=false# zookeeper集群配置zookeeper.connect=master:2181,storm01:2181,storm02:2181,storm03:2181,storm04:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000#是否能够删除topic的配置,默认false不能删除topicdelete.topic.enable=true

Kafka服务启动:jps命令可以看到kafka的进程名,说明kafka已经成功启动。

nohup kafka-server-start.sh /home/hadoop/kafka_2.9.1-0.8.2.1/config/server.properties &

创建topic:创建复制因子2,有24个partition的topic,创建多个partition的目的是增加并行性,复制因子的目的是数据安全冗余。

kafka-topics.sh --create --zookeeper master:2181,storm01:2181,storm02:2181,storm03:2181,storm04:2181 --replication-factor 2 --partitions 24 --topic sighttp

kafka数据存储方式:在kafka数据存储目录下,可以看到以每个-方式命名的文件夹,例如sighttp-19表示topic:sighttp,partition:19,如下图所示:

图三

进入topic-partition目录,可以看到很多.index和.log结尾的文件。其中.log是数据文件,其中存储的是kafka缓存池中的数据,.index是索引文件,数据文件和索引文件成对出现,文件名为一串数字,标识了该文件中存储数据的起始序列号,如下:

图四

kafka数据消费状态查询:消费者从kafka消费数据状态是记录在zookeeper中的,使用zkCli.sh命令可以查看,如下图查询了消费topic:sighttp,partition:0的状态,offset表明已经处理到49259227840行,如下图所示:

图五

经验:通过消费到的行数与存储到的行数,可以判断数据处理程序的速度是否满足数据生成速度的需求。

kafka消费典型异常:

[2016-10-27 16:15:42,536] ERROR [Replica Manager on Broker 51]: Error when processing fetch request for partition [sighttp,3] offset 6535061966 from consumer with correlation id 0. Possible cause: Request for offset 6535061966 but we only have log segments in the range 6580106664 to 6797636149. (kafka.server.ReplicaManager)

异常原因:kafka中由于消息过期已经把序号是6535061966的消息删除了,目前kafka中只有范围是6580106664到6797636149的日志,但是消费者还要处理过期删除的消息,那就会出现此异常消息(通常是由于数据处理速度慢,无法满足数据生成速度的要求,导致消息积压,积压的消息到达kafka配置的过期时间,被kafka删除)。

c)Storm集群安装及配置

在http://storm.apache.org/下载Storm安装包,建议使用Storm 0.10.0 released以上版本,因为最新版本修正了很多bug,特别是STORM-935的问题(拓扑启动后会占用大量系统资源,导致Topology运行不稳定)。

storm.yaml文件配置:

#zookeeper集群服务器配置
storm.zookeeper.servers:
    - "master"
    - "storm01"
    - "storm02"
    - "storm03"
    - "storm04"
#storm主节点
nimbus.host: "master"
#strom管理页面服务端口
ui.port: 8081
#storm从节点服务端口配置,默认6700-6703共4个端口,意味着每台服务器可以提供4个worker插槽,这里增加了6704和6705端口,即为单台服务器增加了2个worker插槽,worker数增加意味着storm集群可以提供更多的计算资源。
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703
 - 6704
 - 6705
#状态信息存储位置,避免使用/tmp
storm.local.dir: "/home/hadoop/apache-storm-0.10.0/workdir"
#主节点的内存
nimbus.childopts: "-Xmx3072m"
#从节点的内存
supervisor.childopts: "-Xmx3072m"
#worker的内存,增加内存可以减少GC overload的问题
worker.childopts: "-Xmx3072m"
#默认为30,增加netty超时时长等参数,降低因Netty通信问题,造成worker不稳定
storm.messaging.netty.max_retries:60
#增加storm.messaging.netty.max_wait_ms设置,默认为1000
storm.messaging.netty.max_wait_ms:2000

启动服务:

  • 主节点:(启动主节点服务和管理页面) nohup storm nimbus & nohup storm ui &
  • 从节点: nohup storm supervisor &

Storm管理页面:

浏览器输入Storm UI所在服务器地址+8081端口号,打开Strom管理页面如下图:

图六

从图六Cluster Summary中可以看出Storm集群共有4个Supervisor节点,因每台Supervisor提供6个slot(如果在storm.yaml配置文件中不配置supervisor.slots.ports属性,则每个Supervisor默认提供4个slot),因此共有4*6=24个slot,已使用22个,还有2个空闲。需要注意的是每个拓扑一旦发布,将长久占用slot,如果没有足够的slot,最新发布的拓扑只会占用空闲的slot,不会抢占其他已经被占用的slot资源;如果没有slot,将无法发布新的拓扑,此时需要挖潜Storm集群服务器,通过配置文件增加slot资源或增加新的服务器。

从图六Topology Summary中可以看出,集群上已经发布了7个Topology,每个Topology占用的worker资源,启动的executor线程数,具体资源占用多少是在Storm Topology开发程序中指定的。

d)Kafka+Storm+Hdfs+Hbase拓扑开发

我们使用Eclipse创建MAVEN工程,在pom.xml配置文件中添加Storm及Hdfs的相关依赖,本例是Storm从Kafka中消费数据,经过ETL处理后存储到Hdfs和Hbase中,因此需要添加Storm-Kafka、Storm-Hdfs、Storm-Hbase等依赖,注意依赖包版本要与集群一致。

抽取过程继承BaseRichBolt类:

public class splitBolt extends BaseRichBolt {
    private static final String TAB = ",";
    private OutputCollector collector; 
    public void prepare(Map config,TopologyContext context,OutputCollector collector){
        this.collector=collector;
    }
    public void execute(Tuple input){
            String line=input.getString(0);
            String[] words=line.split(TAB);
            if (words.length>74)
            {
                String Account;
                if (words[0].length()>0) Account=words[0]; 
                else Account="NULL";
                String LocalIPv4;
                if (words[1].length()>0) LocalIPv4=words[1];
                else LocalIPv4="NULL";
                 String RemoteIPv4;
                if (words[2].length()>0) RemoteIPv4=words[2];
                else RemoteIPv4="NULL";
                String newline=Account+"|"+LocalIPv4+"|"+RemoteIPv4;
                collector.emit(input,new Values(newline));
            }
            collector.ack(input);
    }
     public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("newline"));
    }
}

写Hbase需要实现HBaseMapper类:

public class myHbaseMapper implements HBaseMapper {
    public ColumnList columns(Tuple tuple) {
        String line=tuple.getString(0);
        String[] words=line.split("\\|");
        ColumnList cols = new ColumnList();
         //参数依次是列族名,列名,值
        if (words[1].length()>0) cols.addColumn("content".getBytes(), "LocalIPv4".getBytes(), words[1].getBytes());
        if (words[2].length()>0) cols.addColumn("content".getBytes(), "RemoteIPv4".getBytes(), words[2].getBytes());
        return cols;
    }
     public byte[] rowKey(Tuple tuple) {
        String line=tuple.getString(0);
        String[] words=line.split("\\|");
        String key;
        //rowkey设置成Account的反字符串,便于hbase表内分区的数据均衡
        key=new StringBuilder(words[0]).reverse().toString();
        return key.getBytes();
    }
}

main函数:

public static void main(String[] args)
{  
    String zks = "master:2181,storm01:2181,storm02:2181 "; //zookeeper集群
    String topic = "topicname"; //kafka中topic名称
    String zkRoot = "/storm";//zookeeper中存储状态信息的根目录
    String id = "kafkatopicname";//zookeeper中存储本拓扑状态信息的子目录
    FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    .withPath("/storm/tmp/").withPrefix("tmp_").withExtension(".dat");
    RecordFormat format = new DelimitedRecordFormat()
    .withFieldDelimiter("|"); //写到hdfs的目录文件名以’tmp_’开头,’.dat’结尾
    //每10分钟重写一个hdfs的新文件
    FileRotationPolicy rotationPolicy = new TimedRotationPolicy(10.0f, TimeUnit.MINUTES);
    BrokerHosts brokerHosts = new ZkHosts(zks);
    //配置storm拓扑的spout
    SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    spoutConf.scheme = new SchemeAsMultiScheme(new MessageScheme());  
    spoutConf.zkServers = Arrays.asList(new String[] {"master", "storm01","storm02"});  
    spoutConf.zkPort = 2181;
    spoutConf.ignoreZkOffsets = false;//重启拓扑时,需要从zookeeper中读取偏移量
    //如果偏移量中的数据已经从kafka中删除,则从kafka中保存的最早数据开始处理。
    spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;    //配置hdfs bolt
    HdfsBolt hdfsBolt = new HdfsBolt()
    .withFsUrl("hdfs://hdfsmaster:9000")
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(format)
    .withRotationPolicy(rotationPolicy)
    //hdfs数据文件写完后,move到新目录
    .addRotationAction(new MoveFileAction().toDestination("/storm/http/")); 
    //实例化HBaseMapper
    HBaseMapper mapper = new myHbaseMapper();
    //实例化HBaseBolt,指定hbase中的表名
    HBaseBolt hBolt = new HBaseBolt("hbasetable", mapper).withConfigKey("hbase.conf");
    TopologyBuilder builder = new TopologyBuilder();  
    //配置spout线程数为24,此数要与kafka中topic的partition数一致,partition数越多,则spout读取数据的并行性越高,处理速度越快
    builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),24);    //配置bolt,此bolt开发处理逻辑,bolt可以串接多个
    builder.setBolt("etl", new splitBolt(), 24).shuffleGrouping("kafka-reader");  
    builder.setBolt("hdfs-bolt", hdfsBolt, 24).shuffleGrouping("etl");
    builder.setBolt("hbase-bolt", hBolt, 24).shuffleGrouping("etl");
    Config conf = new Config();
    //增加hbase配置,指定hbase在hdfs集群上的目录,zookeeper服务器集群
    Map<String, Object> hbConf = new HashMap<String, Object>();
    hbConf.put("hbase.rootdir", "hdfs://hdfsmaster:9000/hbase");
    hbConf.put("hbase.zookeeper.quorum","master,storm01,storm02");
    conf.put("hbase.conf", hbConf);    String name = sighttphdfs.class.getSimpleName();   
    if (args != null && args.length > 0) {  
        conf.put(Config.NIMBUS_HOST, args[0]);
        conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); 
        //设置拓扑占用worker数为4,根据实时处理数据量大小按需配置
        conf.setNumWorkers(4); 
        StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());  
    }
}

上面程序实现了Storm读Kafka写Hdfs和Hbase的例子,抽取类中可以根据不同的业务需求,通过Java代码实现不同的逻辑。编译后的jar包上传到集群,使用storm命令行提交Topology:

storm jar ./kafkastream.jar sighdfs.sighttphdfs stormmaster

总结

经过几个月的实际运行,我们的大数据实时处理架构能够始终保持稳定,话单处理速度高于话单生成速度,有效的支撑了运营商大数据的各种分析查询需求。开发和优化过程充满挑战,经过各种研究和尝试,问题逐渐解决,在此我们也积累了大量的开发和优化经验。

最后再分享2个我们实际遇到的问题:

  • Zookeeper配置造成Storm拓扑运行不稳定

因Storm集群需要Zookeeper集群作状态同步,因此所有是Storm服务器worker进程都会不停连接Zookeeper节点,Zookeeper节点的默认连接数是60,当Storm计算拓扑数量较多时,需要修改Zookeeper配置maxClientCnxns=1000,增加Zookeeper连接数。

  • Hdfs节点磁盘I/O高造成Storm拓扑运行不稳定

由于Storm是实时计算,每个环节的拥塞都将引起Storm拓扑的不稳定,在开发中我们遇到Hdfs某个节点磁盘I/O高,导致Storm写Hdfs超时,最终引发Supervisor杀掉worker,造成拓扑不稳定的问题。究其原因是在某个Hdfs节点上,Yarn任务正在进行Reduce操作,用iostat -x 1 10命令查看,Yarn的中间盘I/O长时间被100%占用,同时Yarn的中间盘也是Hdfs的数据盘,导致写入请求无法响应,最终导致Storm写Hdfs的worker超时,引发拓扑运行不稳定。此处建议配置Yarn的中间盘时,不要使用操作系统根盘,不要使用Hdfs的数据盘,可以有效避免Storm写Hdfs超时的问题。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-03-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CSDN技术头条 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档