前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka相关操作

kafka相关操作

作者头像
超级大猪
发布2019-11-22 00:12:09
8670
发布2019-11-22 00:12:09
举报
文章被收录于专栏:大猪的笔记

调试工具操作

代码语言:javascript
复制
- 启动
bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

- 列出topicc
./kafka-topics.sh  --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --list 

- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1

- 查看topic的状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

- 消费者 读数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group superpig

- 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

- 删除topic
./kafka-topics --delete --zookeeper 10.0.8.23:2181 --topic PedesJobResult

go客户端,读消息

代码语言:javascript
复制
/*
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
*/
ctx, cancel = context.WithCancel(context.Background())

config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Version = sarama.V2_0_0_0

// 这里很有迷惑性,实际上,这个选项只有第一次new consumer的时候才会有效,当partion已经存在offset,这是没用的
// 如果想每次重启,都忽略中间产生的消息,必须更换group_ip
config.Consumer.Offsets.Initial = sarama.OffsetNewest

var topicArr = []string{topic}
// KafkaAddresses=["kafka.service.consul:9092"]
consumer, err := cluster.NewConsumer(kafkaAddress,
    kafkaGroupID, topicArr, config)

if err != nil {
    logging.Errorf("cluster.NewConsumer  err:%s", err)
    return nil
}

go func() {
    for err := range consumer.Errors() {
        logging.Errorf("consumer.Error: groupId:%s:Error: %s\n", kafkaGroupID, err.Error())
    }
}()
go func() {
    for ntf := range consumer.Notifications() {
        logging.Infof("consumer.Notification: groupId:%s Rebalanced: %+v \n", kafkaGroupID, ntf)
    }
}()

logging.Infof("NewKafka loop before")
Loop:
    for {
        select {
        case msgc, ok := <-consumer.Messages():
            if ok {
                //logging.Debugf("read msg %v", msgc)
                // do sth
                // 如果sarama.OffsetNewest ,commit意义不大
                consumer.MarkOffset(msg, "")

            } else {
                logging.Errorf("read msg not ok %v", topic)
            }
        case <-ctx.Done():
            logging.Infof("kafka done %v", topic)
            break Loop
        case <-time.After(time.Second * 3):
            //logging.Debugf("NewKafka %v timeout", topic)
        }
    }
    logging.Infof("NewKafka kafka exit %v", topic)
    consumer.Close()

go客户端,写消息

代码语言:javascript
复制
// producer config
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true

// sync producer
producer, err := sarama.NewSyncProducer(addr, config)

i := 0
for {
    i++
    msg := &sarama.ProducerMessage{
        Topic: topics[0],
        Value: sarama.StringEncoder(strconv.Itoa(i)),
    }
    _, _, err = producer.SendMessage(msg)
    checkerr.CheckError(err)
    time.Sleep(time.Millisecond * 500)
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-04-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 调试工具操作
  • go客户端,读消息
  • go客户端,写消息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档