以下是一个简单的Kafka Producer代码:
package com.bonc.rdpe.spark.kafka08
import java.io.{BufferedReader, FileReader}
import java.util.Properties
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
/**
  * Author: YangYunhe
  * Description: 
  * Create: 2018/7/24 19:33
  */
object Kafka08Producer {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "jed:9095,jed:9096,jed:9097")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val br = new BufferedReader(new FileReader("D:\\data\\news_profile_data.txt"))
    var line = ""
    while((line = br.readLine()) != null) {
      val record = new ProducerRecord[String, String]("topic001", line)
      producer.send(record, new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          if(recordMetadata != null) {
            println(s"topic: ${recordMetadata.topic()}, partition: ${recordMetadata.partition()}, offset: ${recordMetadata.offset()}")
          }
          if(e != null) {
            e.printStackTrace()
          }
        }
      })
      Thread.sleep(1000)
    }
    producer.close()
  }
}运行后程序报错:
java.net.ConnectException: Connection timed out: no further information解决办法: 关闭Linux防火墙
CentOS 7
[root@jed bin]# systemctl stop firewalld.service # 关闭防火墙
[root@jed bin]# systemctl disable firewalld.service # 禁止开机启动
CentOS 6
[root@jed bin]# servcie iptables stop # 临时关闭
[root@jed bin]# chkconfig iptables off # 永久关闭