以下是一个简单的Kafka Producer代码:
package com.bonc.rdpe.spark.kafka08 import java.io.{BufferedReader, FileReader} import java.util.Properties import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} /** * Author: YangYunhe * Description: * Create: 2018/7/24 19:33 */ object Kafka08Producer { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "jed:9095,jed:9096,jed:9097") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) val br = new BufferedReader(new FileReader("D:\\data\\news_profile_data.txt")) var line = "" while((line = br.readLine()) != null) { val record = new ProducerRecord[String, String]("topic001", line) producer.send(record, new Callback { override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { if(recordMetadata != null) { println(s"topic: ${recordMetadata.topic()}, partition: ${recordMetadata.partition()}, offset: ${recordMetadata.offset()}") } if(e != null) { e.printStackTrace() } } }) Thread.sleep(1000) } producer.close() } }
运行后程序报错:
java.net.ConnectException: Connection timed out: no further information
解决办法: 关闭Linux防火墙
CentOS 7 [root@jed bin]# systemctl stop firewalld.service # 关闭防火墙 [root@jed bin]# systemctl disable firewalld.service # 禁止开机启动 CentOS 6 [root@jed bin]# servcie iptables stop # 临时关闭 [root@jed bin]# chkconfig iptables off # 永久关闭
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句