以下是一个简单的Kafka Producer代码:
package com.bonc.rdpe.spark.kafka08
import java.io.{BufferedReader, FileReader}
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
/**
* Author: YangYunhe
* Description:
* Create: 2018/7/24 19:33
*/
object Kafka08Producer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "jed:9095,jed:9096,jed:9097")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val br = new BufferedReader(new FileReader("D:\\data\\news_profile_data.txt"))
var line = ""
while((line = br.readLine()) != null) {
val record = new ProducerRecord[String, String]("topic001", line)
producer.send(record, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if(recordMetadata != null) {
println(s"topic: ${recordMetadata.topic()}, partition: ${recordMetadata.partition()}, offset: ${recordMetadata.offset()}")
}
if(e != null) {
e.printStackTrace()
}
}
})
Thread.sleep(1000)
}
producer.close()
}
}
运行后程序报错:
java.net.ConnectException: Connection timed out: no further information
解决办法: 关闭Linux防火墙
CentOS 7
[root@jed bin]# systemctl stop firewalld.service # 关闭防火墙
[root@jed bin]# systemctl disable firewalld.service # 禁止开机启动
CentOS 6
[root@jed bin]# servcie iptables stop # 临时关闭
[root@jed bin]# chkconfig iptables off # 永久关闭