的连接参数,如集群地址,主题,消费者组名称,是否自动提交,offset重置位置,kv序列化
val kafkaParams = Map[String, Object](
"bootstrap.servers...的连接参数,如集群地址,主题,消费者组名称,是否自动提交,offset重置位置,kv序列化
val kafkaParams = Map[String, Object](
"bootstrap.servers...中消费到的value
//手动提交偏移量的时机:
//1.每隔一段时间提交一次:可以,但是和自动提交一样了,那还不如直接自动提交!
...的连接参数,如集群地址,主题,消费者组名称,是否自动提交,offset重置位置,kv序列化
val kafkaParams = Map[String, Object](
"bootstrap.servers...0-10中的Direct模式连接Kafka
//连接kafka之前,要先去MySQL看下有没有该消费者组的offset记录,如果有从记录的位置开始消费,如果没有从"auto.offset.reset