Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

前言

Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。 在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。 本文的目标是写一个基于kafka的scala工程,在一个spark standalone的集群环境中运行。

项目结构和文件说明

说明

这个工程包含了两个应用。 一个Consumer应用:CusomerApp - 实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。 一个Producer应用:ProducerApp - 实现了向Kafka集群发消息的功能。

文件结构

KafkaSampleApp   # 项目目录
|-- build.bat    # build文件    
|-- src
    |-- main
        |-- scala
            |-- ConsumerApp.scala  # Consumer应用
            |-- ProducerApp.scala  # Producer应用

构建工程目录

可以运行:

mkdir KafkaSampleApp
mkdir -p /KafkaSampleApp/src/main/scala

代码

build.sbt

name := "kafka-sample-app"
 
version := "1.0"
 
scalaVersion := "2.11.8"

scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps"
 
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0",
  "org.apache.spark" %% "spark-streaming" % "2.0.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0",
  "org.apache.kafka" %% "kafka" % "0.8.2.1"
)

CusomerApp.scala

这个例子中使用了Spark自带的Stream+Kafka结合的技术,有个限制的绑定了kafka的8.x版本。 我个人建议只用Kafka的技术,写一个Consomer,或者使用其自带的Consumer,来接受消息。 然后再使用Spark的技术。 这样可以跳过对kafak版本的限制。

import java.util.Properties
import _root_.kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object ConsumerApp {
  def main(args: Array[String]) {
    val brokers = "localhost:9092"
    val topics = "test-topic"

    // Create context with 10 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    println("============== Start ==============")
    wordCounts.print
    println("============== End   ==============")

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

ProducerApp.scala

import java.util.Arrays
import java.util.List
import java.util.Properties
import org.apache.kafka.clients.producer._

object ProducerApp {
  def main(args: Array[String]): Unit = {

    val props = new Properties()
    // Must-have properties
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // Optional properties
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none")
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, (1024*100).toString)
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, (100).toString)
    props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, (5*60*1000L).toString)
    //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, (60*1000l).toString)
    props.put(ProducerConfig.ACKS_CONFIG, (0).toString)
    //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, (1500).toString)
    props.put(ProducerConfig.RETRIES_CONFIG, (3).toString)
    props.put(ProducerConfig.LINGER_MS_CONFIG, (1000).toString)
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (32 * 1024 * 1024L).toString)
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, (200).toString)
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-app-producer")

    val producer = new KafkaProducer[String, String](props)

    // Thread hook to close produer
    Runtime.getRuntime.addShutdownHook(new Thread() {
      override def run() {
        producer.close()
      }
    })

    // send 10 messages
    var i = 0
    for( i <- (1 to 10)) {
      val data = new ProducerRecord[String, String]("test-topic", "test-key", s"test-message $i")
      producer.send(data)
    }

    // Reduce package lost
    Thread.sleep(1000)
    producer.close()
  }
}

构建工程

进入目录KafkaSampleApp。运行:

sbt package

第一次运行时间会比较长。

测试应用

启动Kafka服务

# Start zookeeper server
gnome-terminal -x sh -c '$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties; bash'

# Wait zookeeper server is started.
sleep 8s

# Start kafka server
gnome-terminal -x sh -c '$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties; bash'

# Wait kafka server is started.
sleep 5s

注:使用Ctrl+C可以中断服务。

  • 创建一个topic
# Create a topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

# List topics
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

启动Spark服务

  • 启动spark集群master server
$SPARK_HOME/sbin/start-master.sh

master服务,默认会使用7077这个端口。可以通过其日志文件查看实际的端口号。

  • 启动spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077

启动Consumer应用

新起一个终端,来运行:

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --master spark://$(hostname):7077 --class ConsumerApp target/scala-2.11/kafka-sample-app_2.11-1.0.jar

注:如果定义的topic没有create,第一次运行会失败,再运行一次就好了。 如果出现java.lang.NoClassDefFoundError错误, 请参照Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境, 确保kafka的包在Spark中设置好了。

启动Producer应用

java -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp
# or
# $KAFKA_HOME/bin/kafka-run-class.sh -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp

然后:看看Consumer应用是否收到了消息。

总结

建议写一个Kafka的Consumer,然后调用Spark功能,而不是使用Spark的Stream+Kafka的编程方式。 好处是可以使用最新版本的Kafka。

Kafka的包中带有一个Sample代码,可以从中学习一些编写程序的方法。

参照

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《Spark2Streaming读Kerberos环境的Kafka并写数据到HBas...

33110
来自专栏Hadoop实操

Spark2Streaming读非Kerberos环境的Kafka并写数据到Kudu

在前面的文章Fayson介绍了在Kerberos环境下《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》,本篇文章Fayso...

40010
来自专栏行者悟空

Spark RDD的Action

16560
来自专栏Albert陈凯

3.4 RDD的计算

3.4 RDD的计算 3.4.1 Ta s k简介 原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为...

372100
来自专栏斑斓

大数据 | 理解Spark的核心RDD

与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streami...

40690
来自专栏芋道源码1024

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业执行

Lite调度作业( LiteJob ),作业被调度后,调用 #execute() 执行作业。

69620
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

502120
来自专栏个人分享

Spark作业调度

    Spark在任务提交时,主要存在于Driver和Executor的两个节点.

21010
来自专栏岑玉海

Spark Streaming编程指南

Overview Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 它可以接受来自Kafka, Flume,...

74450
来自专栏行者悟空

Spark DAG调度

16230

扫码关注云+社区

领取腾讯云代金券