前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >启动kafka服务并用golang发送和接受消息

启动kafka服务并用golang发送和接受消息

作者头像
用户7962184
发布2020-11-20 14:21:54
2.7K0
发布2020-11-20 14:21:54
举报
文章被收录于专栏:没事多喝水没事多喝水

kafka系列分为两个篇幅,分别是实用篇,讲使用命令和一些使用中会遇到的概念名词,理论篇,讲kafka为了实现高可用和高性能做了哪些努力。这篇我们从搭建开始,然后用kafka脚本去发送和接受信息,最后用go语言展示在代码之中怎么使用。

大家可以在kafka官网上面下载最新包。要是嫌弃网速太慢的话可以用一下我这个包,我下载了传到了百度云,提取码是:klei。

使用的系统是linux,要是没有服务器,我特别推荐windows10的linux子系统,在上面也可以运行,好用又舒服,这里我用了一个测试虚拟机。

启动kafka服务

下载好了压缩包之后,从本地scp到服务器上

代码语言:javascript
复制
root@DESKTOP-888:/mnt/e/BaiduNetdiskDownload#
scp -C -i /root/curt/id_rsa_zelin.huang -P 33335 kafka_2.11-1.0.0.tgz zelin.huang@dhzltest01.***:/tmp

然后登陆到服务器上,把/tmp/kafka_2.11-1.0.0.tgz 弄到你想存放的目录上,然后

代码语言:javascript
复制
[zelin.huang@dhzltest01.*** app]$ ls
cron   kafka_2.11-1.0.0.tgz  
[zelin.huang@dhzltest01.*** app]$ sudo tar -xzf kafka_2.11-1.0.0.tgz
[zelin.huang@dhzltest01.*** app]$ ls
cron   kafka_2.11-1.0.0  kafka_2.11-1.0.0.tgz  
[zelin.huang@dhzltest01.*** app]$ ls kafka_2.11-1.0.0
bin  config  libs  LICENSE  NOTICE  site-docs

首先kafka的启动是需要ZooKeeper来托管的,至于为什么需要,理论篇我们再提一下,现在要是自己有机子起了ZooKeeper服务的话,可以跳过下面这一步。

代码语言:javascript
复制
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/zookeeper-server-start.sh config/zookeeper.properties
/home/app/kafka_2.11-1.0.0/bin/kafka-run-class.sh: line 270: exec: java: not found

但是我在运行的时候发现这台虚拟机竟然没有java环境(公司后台语言是golang+php),没办法,只能装java。

代码语言:javascript
复制
[zelin.huang@dhzltest01.*** ~]$ sudo yum install java-1.8.0-openjdk
.......(安装输出)
[zelin.huang@dhzltest01.*** ~]$ java -version                      
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-b07)
OpenJDK 64-Bit Server VM (build 25.242-b07, mixed mode)

安装好了再来,我们可以在linux中挂载运行zookeeper,这样当我们只是暂时练下手而不是真正使用还是很好的,这样我们退出shell之后,我们启动的服务也会关闭,不会占用到系统资源(要是后台运行想关了,请用ps+kill)。如果真的是想用在生产或者测试环境,而不是顺便玩玩的话,虚拟机可以托管在 supervisor或者是以nohub模式运行。

这里我们起多个终端,可以更好地看到各个工具的输出。

代码语言:javascript
复制
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/zookeeper-server-start.sh config/zookeeper.properties 
[2020-02-29 11:57:10,691] INFO Reading configuration from: config/zookeeper.properties 
。。。
[2020-02-29 11:57:10,739] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
......

需要注意的是zookeeper占用的端口号是2181 然后起另一个终端

代码语言:javascript
复制
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-server-start.sh config/server.properties
[2020-02-29 11:59:36,346] INFO KafkaConfig values: 
。。。
        port = 9092
。。。
[2020-02-29 11:59:36,451] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
。。。

其中可以看到kafka连接的zookeeper是上面所启动的2181端口号,所以kafka是依赖zookeeper启动的,如果我们要启动多个kafka形成一个集群,那么我们设定的连接zookeeper的服务是同一个。 kafka占用的端口号是,9092。 好,执行到这一步,我们的kafka是启动起来了。 接下来,我们使用kafka来实现一个消息队列的功能。 首先该创建一个topic,topic相当于kafka的一个消息类型,通过选择不同的topic发送,或者是监听某个topic,就可以实现消息队列。发消息的时候是需要指定topic的。 或者,您也可将topic配置为:发消息指定的topic不存在时,自动创建topic,而不是手动创建。(ps:我们公司的测试环境是不需要创建topic的,但是正式环境需要,所以曾经导致测试环境跑得好好的代码,到了正式环境就不行了)

代码语言:javascript
复制
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo  bin/kafka-topics.sh --list --zookeeper localhost:2181 
test

创建消费者和生产者

这里创建了一个topic和查看所有的topic。 然后我们创建生产者和消费者,尝试发送一些消息。 一个终端做消费者,一个终端做生产者

代码语言:javascript
复制
#生产者终端
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>huangzelin
>huangzliyong
>
# 消费者终端
[zelin.huang@dhzltest01.*** kafka_2.11-1.0.0]$ sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
huangzelin
huangzliyong

这里我们可以看到可以正常的通讯信息。

在go语言中使用

go创建生产者

代码语言:javascript
复制
package easy_kafka

import (
    "fmt"
    "github.com/pkg/errors"
    "gopkg.in/Shopify/sarama.v1"
)

type KafkaProducer struct {
    producer sarama.SyncProducer
}

func (self *KafkaProducer) Init(ip string,port int) error {
    //这里可以初始化多个kafka的,因为是集群,最好多传几个,但是只传一个也可以使用
    servers := []string{fmt.Sprintf("%s:%d", ip, port)}
    p, err := sarama.NewSyncProducer(servers, sarama.NewConfig())
    if err != nil {
        return err
    }
    self.producer = p
    return nil
}

func (self *KafkaProducer) SendMessage(topic string, data []byte) error {
    if self.producer == nil {
        return errors.New("no producer while send message")
    }
    kafkaMsg := &sarama.ProducerMessage{
        Topic: topic,
        Key:   nil,
        Value: sarama.ByteEncoder(data),
    }
    _, _, err := self.producer.SendMessage(kafkaMsg)
    return err
}

func (self *KafkaProducer) Close() error {
    if self.producer != nil {
        return self.producer.Close()
    }
    return nil
}

go启动一个监听卡夫卡对象

代码语言:javascript
复制
package easy_kafka

import (
    "fmt"
    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster"
    "github.com/golang/glog"
    "github.com/pkg/errors"
)

type KafkaConsumer struct {
    consumer *cluster.Consumer
}

func (self *KafkaConsumer) Init(brokersIp string, brokersPort int, topic []string, group string) error {
    brokersServers := []string{fmt.Sprintf("%s:%d", brokersIp, brokersPort)}
    config := cluster.NewConfig()
    //配置是否接受错误信息
    config.Consumer.Return.Errors = true
    //配置是否接受注意消息
    config.Group.Return.Notifications = true
    //配置是否接受最新消息
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    //这个消费者是谁,同一个消费者如果对一条信息确认了,则不会重复发送
    config.ClientID = group
    //topic是指要收到的消息对象
    cg, err := cluster.NewConsumer(brokersServers, group, topic, config)
    if err != nil {
        return err
    }
    self.consumer = cg
    return nil
}


//注意该方法是非阻塞的,如果调用了该方法,并且没有其他的阻塞方法,记得手动阻塞他
func (self *KafkaConsumer) StartKafkaListen(listenMsg func(*sarama.ConsumerMessage)) error {
    if self.consumer == nil {
        return errors.New("还没初始化消费者对象")
    }
    go func(cg *cluster.Consumer) {
        for message := range cg.Messages() {
            go listenMsg(message)
            //确认这条消息收到
            cg.MarkOffset(message, "")
        }
    }(self.consumer)
    go func(cg *cluster.Consumer) {
        for ntf := range cg.Notifications() {
            glog.Infof("%+v", *ntf)
        }
    }(self.consumer)
    go func(cg *cluster.Consumer) {
        for err := range cg.Errors() {
            glog.Errorf("%+v", err)
        }
    }(self.consumer)
    return nil
}

小结

kafka的安装包十分友好,启动服务过程相当简单,但是可配置内容还是很多的,不过简单使用直接默认的配置文件去启动过程就可以啦。

利用

代码语言:javascript
复制
启动服务
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
sudo bin/kafka-server-start.sh config/server.properties

创建topic
sudo bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

启动消费者脚本和消费者脚本
sudo bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

更多操作命令可以去(kafka中文文档官网)查看 还有用go语言展示了在写代码的时候怎么使用kafka,可以直接拿去用的没问题。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 启动kafka服务
  • 创建消费者和生产者
  • 在go语言中使用
  • 小结
相关产品与服务
消息队列 CMQ
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档