前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang中使用Kafka实现消息队列发布订阅

Golang中使用Kafka实现消息队列发布订阅

原创
作者头像
Petrochor
修改2022-08-09 09:15:05
1.3K1
修改2022-08-09 09:15:05
举报
文章被收录于专栏:StephenStephen

安装JDK1.8

1、搜索jdk安装包

代码语言:shell
复制
yum search java|grep jdk

2、下载jdk1.8,下载之后默认的目录为: /usr/lib/jvm/

代码语言:shell
复制
yum install java-1.8.0-openjdk

安装zookeeper

安装zookeeper

kafka依赖zookeeper,所以需要下载安装zookeeper

代码语言:shell
复制
# 下载压缩包
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
# 解压
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz

修改配置文件

代码语言:shell
复制
cd apache-zookeeper-3.7.0-bin/conf/
mv zoo_sample.cfg zoo.cfg

启动zookeeper

代码语言:shell
复制
cd ../bin/
./zkServer.sh start

出现以下信息表示启动成功

代码语言:shell
复制
[root@localhost apache-zookeeper-3.7.0-bin]# bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

启动异常

如果出现 already running as process 错误,这个一般是因为机器异常关闭缓存目录中残留PID文件导致的(为关闭进程强行关机等导致的)

解决方案:到配置文件 conf/zoo.cfg 查找 dataDir 配置的目录

代码语言:shell
复制
dataDir=/tmp/zookeeper

dataDir 目录下,清理缓存文件

代码语言:shell
复制
cd /tmp/zookeeper
rm -rf zookeeper_server.pid

安装kafka

下载并解压

代码语言:shell
复制
wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -zxvf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1

修改配置 config/server.propertieslisteners 配置项

代码语言:shell
复制
# 默认为:
#listeners=PLAINTEXT://:9092
# 修改为:
listeners=PLAINTEXT://192.168.10.232:9092

这里需要修改监听地址,否则无法在另外的主机中连接kafka 修改后,监听地址需改为:IP地址:端口 ,否则会出现如下错误:

代码语言:shell
复制
[2022-08-05 10:40:56,361] WARN [Consumer clientId=console-consumer, groupId=console-consumer-65957] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-08-05 10:40:56,362] WARN [Consumer clientId=console-consumer, groupId=console-consumer-65957] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

启动kafka

代码语言:shell
复制
bin/kafka-server-start.sh config/server.properties

创建主题

代码语言:shell
复制
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic1 --bootstrap-server 192.168.10.232:9092

生产者发送消息

代码语言:shell
复制
bin/kafka-console-producer.sh --topic topic1 --bootstrap-server 192.168.10.232:9092

消费者接收消息

代码语言:shell
复制
bin/kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server 192.168.10.232:9092

golang中使用kafka

安装golang客户端

代码语言:shell
复制
go get github.com/Shopify/sarama
go get github.com/bsm/sarama-cluster

使用golang创建同步消息生产者

代码语言:go
复制
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"time"
)

var address = []string{"192.168.10.232:9092"}

func main() {
	// 配置
	config := sarama.NewConfig()
	// 设置属性
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	producer, err := sarama.NewSyncProducer(address, config)
	if err != nil {
		log.Printf("new sync producer error: %s \n", err.Error())
		return
	}
	// 关闭生产者
	defer producer.Close()
	// 循环发送消息
	for i := 0; i < 10; i++ {
		// 创建消息
		value := fmt.Sprintf("sync message, index = %d", i)
		msg := &sarama.ProducerMessage{
			Topic: "topic1",                  // 主题名称
			Value: sarama.ByteEncoder(value), // 消息内容
		}
		// 发送消息
		part, offset, err := producer.SendMessage(msg)
		if err != nil {
			log.Printf("send message error: %s \n", err.Error())
		} else {
			fmt.Printf("SUCCESS: value=%s, partition=%d, offset=%d \n", value, part, offset)
		}
		// 每隔两秒发送一条消息
		time.Sleep(2 * time.Second)
	}
}

使用golang创建异步消息生产者

代码语言:go
复制
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"time"
)

var address = []string{"192.168.10.232:9092"}

func main() {
	// 配置
	config := sarama.NewConfig()
	// 等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 随机向partition发送消息
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	// 设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用,需要消费和生产同时配置
	// 注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
	config.Version = sarama.V0_10_0_1

	fmt.Println("start make producer")
	//使用配置,新建一个异步生产者
	producer, err := sarama.NewAsyncProducer(address, config)
	if err != nil {
		log.Printf("new async producer error: %s \n", err.Error())
		return
	}
	defer producer.AsyncClose()

	// 循环判断哪个通道发送过来数据
	fmt.Println("start goroutine")
	go func(p sarama.AsyncProducer) {
		for {
			select {
			case suc := <-p.Successes():
				fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
			case fail := <-p.Errors():
				fmt.Println("error: ", fail.Error())
			}
		}
	}(producer)

	var value string
	for i := 0; ; i++ {
		// 每隔两秒发送一条消息
		time.Sleep(2 * time.Second)

		// 创建消息
		value = fmt.Sprintf("async message, index = %d", i)
		// 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系
		msg := &sarama.ProducerMessage{
			Topic: "topic1",
			Value: sarama.ByteEncoder(value),
		}

		// 使用通道发送
		producer.Input() <- msg
	}
}

使用golang创建消息消费者

代码语言:go
复制
package main

import (
	"fmt"
	"os"
	"os/signal"
	cluster "github.com/bsm/sarama-cluster"
)

func main() {
	// 配置
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	config.Consumer.Offsets.Initial = -2
	config.Consumer.Offsets.CommitInterval = 1 * time.Second
	config.Group.Return.Notifications = true

	// 创建消费者
	brokers := []string{"192.168.10.232:9092"}
	topics := []string{"topic1"}
	consumer, err := cluster.NewConsumer(brokers, "consumer-group", topics, config)
	if err != nil {
		fmt.Printf("new consumer error: %s\n", err.Error())
		return
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	go func() {
		for err := range consumer.Errors() {
			fmt.Printf("consumer error: %s", err.Error())
		}
	}()

	go func() {
		for ntf := range consumer.Notifications() {
			fmt.Printf("consumer notification error: %v \n", ntf)
		}
	}()

	// 循环从通道中获取消息
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Printf("%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "") // 上报offset
			} else {
				fmt.Println("监听服务失败")
			}
		case <-signals:
			return
		}
	}
}

链接

DEMO:https://github.com/cqcqs/go-kafka-demo

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装JDK1.8
  • 安装zookeeper
  • 安装kafka
  • golang中使用kafka
  • 链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档