我正在尝试计算文本中的单词数,并将结果保存到Cassandra数据库中。Producer从文件中读取数据并将其发送给kafka。消费者使用spark streaming读取和处理数据,然后将计算结果发送到表。
我的producer看起来像这样:
object ProducerPlayground extends App {
val topicName = "test"
private def createProducer: Properties = {
val producerProperties = new Properties()
producerProperties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092"
)
producerProperties.setProperty(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[IntegerSerializer].getName
)
producerProperties.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName
)
producerProperties
}
val producer = new KafkaProducer[Int, String](createProducer)
val source = Source.fromFile("G:\\text.txt", "UTF-8")
val lines = source.getLines()
var key = 0
for (line <- lines) {
producer.send(new ProducerRecord[Int, String](topicName, key, line))
key += 1
}
source.close()
producer.flush()
}消费者看起来像这样:
object BatchLayer {
def main(args: Array[String]) {
val brokers = "localhost:9092"
val topics = "test"
val groupId = "groupId-1"
val sparkConf = new SparkConf()
.setAppName("BatchLayer")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val sc = ssc.sparkContext
sc.setLogLevel("OFF")
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
)
val stream =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)
val cass = CassandraConnector(sparkConf)
cass.withSessionDo { session =>
session.execute(
s"CREATE KEYSPACE IF NOT EXISTS batch_layer WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }"
)
session.execute(s"CREATE TABLE IF NOT EXISTS batch_layer.test (key VARCHAR PRIMARY KEY, value INT)")
session.execute(s"TRUNCATE batch_layer.test")
}
stream
.map(v => v.value())
.flatMap(x => x.split(" "))
.filter(x => !x.contains(Array('\n', '\t')))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("batch_layer", "test", SomeColumns("key", "value"))
ssc.start()
ssc.awaitTermination()
}
}启动producer后,程序会因此错误而停止工作。我做错了什么?
发布于 2021-05-09 16:42:37
在2021年使用传统流媒体几乎没有意义--使用起来非常麻烦,而且你还需要跟踪Kafka的偏移量,等等。最好使用Structured Streaming instead -它将通过检查点跟踪你的偏移量,你将使用高级数据集API,等等。
在您的示例中,代码可能如下所示(没有测试,但它是从this working example中采用的):
val streamingInputDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val wordsCountsDF = streamingInputDF.selectExpr("CAST(value AS STRING) as value")
.selectExpr("split(value, '\\w+', -1) as words")
.selectExpr("explode(words) as word")
.filter("word != ''")
.groupBy($"word")
.count()
.select($"word", $"count")
// create table ...
val query = wordsCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation", "path_to_checkpoint)
.option("keyspace", "test")
.option("table", "<table_name>")
.start()
query.awaitTermination()附注:在你的例子中,最可能的错误是你试图直接在DStream上使用.saveToCassandra -它不是这样工作的。
https://stackoverflow.com/questions/67451827
复制相似问题