前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go使用消息队列优化接口

go使用消息队列优化接口

原创
作者头像
陈杪秋
发布2024-07-22 20:33:31
890
发布2024-07-22 20:33:31

前言

在我们编写后端接口时,通常有些接口对于实时性的要求并不是那么高,但其中有些函数却相当占用接口调用时间,如调用第三方接口、发送短信、发送邮件等等。为了提升用户的体验感、系统的稳定性,此时我们就可以使用消息队列对于接口进行优化,对于实时性要求不高的接口使用消息队列来进行处理,提高api响应速度,优化用户体验。本文将以go语言使用rabbitMQ来演示如何对于一个接口进行优化。

RabbitMQ安装

此处给出docker-compose文件方便各位安装rabbtMQ环境,当然也可以选择自行安装

代码语言:yaml
复制
services:
  rabbitMQ:
    container_name: learn-rabbitMQ
    image: rabbitmq:3.13-management
    ports:
      - "15672:15672"
      - "5672:5672"
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    restart: always

定义好docker-compose.yaml文件填入以上代码,然后使用docker-compose命令快速启动 rabbitMQ

代码语言:shell
复制
docker-compose up -d

Go环境安装

使用到的Go依赖包

Gin: 本文将使用Gin来进行api注册官方教程

RabbitMQ: Rabbit官方提供的Go依赖包官方教程

代码语言:go
复制
go get github.com/rabbitmq/amqp091-go
go get -u github.com/gin-gonic/gin

前置代码

为方便演示,我们先使用Gin实现一个速度较慢的接口

代码语言:go
复制
package main

import (
	"fmt"
	"time"

	"github.com/gin-gonic/gin"
)

func SendMessage(c *gin.Context) {
	message := c.Param("message")
    // 假设此为必要操作需要700ms
	time.Sleep(700 * time.Millisecond)

	fmt.Println("hello,", message)
	c.JSON(200, "success")
}

func main() {
	server := gin.Default()

	server.POST("/send/:message", SendMessage)

	server.Run(":8080")
}

通过以上代码我们实现了一个至少耗时700ms,且返回参数固定的一个接口。在实际情况中可能由于某些各种各样的原因导致接口较慢,此处为方便演示直接使用了sleep函数。

让我们使用Postman调用一下接口

耗时700ms+,确实很慢

RabbitMQ函数

接下来我们先来写出使用rabbitMQ进行收发消息的函数

RabbitMQ连接函数

代码语言:go
复制
package main

import (
	"fmt"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

var RabbitMQ *amqp.Connection

// 根据自己的RabbitMQ环境进行填写
var (
	RabbitMQConnection = "amqp"
	RabbitMQUser       = "admin"
	RabbitMQPassword   = "123456"
	RabbitMQHost       = "127.0.0.1"
	RabbitMQPort       = "5672"
)
// 需要发送的队列名称
var RabbitMQSendMessageQueue = "test-send-message-queue"

func InitRabbitMQ() {
	connString := strings.Join([]string{RabbitMQConnection, "://", RabbitMQUser, ":", RabbitMQPassword, "@", RabbitMQHost, ":", RabbitMQPort, "/"}, "")
	conn, err := amqp.Dial(connString)
	if err != nil {
		panic(fmt.Sprintf("Failed to connect to RabbitMQ: %s", err))
	}
	RabbitMQ = conn
}

RabbitMQ发送消息函数

代码语言:go
复制
func SendMessageToMQ(ctx context.Context, queueName string, body []byte) (err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return
	}
    // 设置为消息持久化,方便重启时消息不丢失
	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	err = ch.PublishWithContext(ctx, "", q.Name, false, false, amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "application/json",
		Body:         body,
	})
	if err != nil {
		return
	}
	return
}

RabbitMQ接收消息函数

代码语言:go
复制
func ConsumerMessage(ctx context.Context, queueName string) (msgs <-chan amqp.Delivery, err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return nil, err
	}
	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	// RabbitMQ负载均衡
	err = ch.Qos(1, 0, false)
	if err != nil {
		return nil, err
	}
	return ch.Consume(q.Name, "", false, false, false, false, nil)
}

具体优化实现

首先我们先将业务逻辑过慢的代码抽离,并通过发送消息队列的方法,发送给指定函数进行具体业务异步处理。

代码语言:go
复制
type SendMessageReq struct {
	Message string `json:"message"`
}

func SendMessage(c *gin.Context) {
	message := c.Param("message")

	err := sendMessageToMQ(message)
	if err != nil {
		c.JSON(200, "error")
		return
	}

	c.JSON(200, "success")
}
// 发送消息给对应的消息队列
func sendMessageToMQ(message string) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
	defer cancel()
	sendMessageReq := SendMessageReq{
		Message: message,
	}

	body, err := json.Marshal(sendMessageReq)
	if err != nil {
		return err
	}
	err = SendMessageToMQ(ctx, RabbitMQSendMessageQueue, body)
	if err != nil {
		return err
	}
	return nil
}

定义一个新函数从消息队列中读取消息并进行具体业务处理

代码语言:go
复制
func SendConfirmEmailSync(ctx context.Context) {
	err := RunSendConfirmEmail(ctx)
	if err != nil {
		log.Println(err)
	}
}

func RunSendConfirmEmail(ctx context.Context) error {
    // 获取消息队列,并开始监听消费
	msgs, err := ConsumerMessage(ctx, RabbitMQSendMessageQueue)
	if err != nil {
		return err
	}
	var forever chan struct{}

	go func() {
		for msg := range msgs {
			sendMessageReq := SendMessageReq{}
			err = json.Unmarshal(msg.Body, &sendMessageReq)

            // 假设此为具体业务逻辑
			time.Sleep(700 * time.Millisecond)
			fmt.Println("hello,", sendMessageReq.Message)

			msg.Ack(false)
		}
	}()

	<-forever
	return nil
}

初始化

最后,让我们将具体函数使用Goroutine运行起来

代码语言:go
复制
func loadScript() {
	ctx := context.Background()
	go SendConfirmEmailSync(ctx)
}

func main() {
	InitRabbitMQ()
	loadScript()

	server := gin.Default()

	server.POST("/send/:message", SendMessage)

	server.Run(":8080")
}

调用接口

让我们使用postman再次调用一下我们优化完成的接口

我们可以发现,现在调用接口仅需2ms!!!

我们成功了!!!

结尾&完整代码示例

虽然使用消息队列可以大幅度优化接口响应时间,但是我们还是需要根据具体业务需求、逻辑进行相对应的优化,以免变成了负面优化,写出了屎山代码。

如果各位想尝试一下接口优化,可以试试优化我的邮箱API接口,如果想知道我是如何进行邮件接口优化的,可以来学习或者参与进我的开源项目

愿这篇文章能帮助到你!!!

完整代码

代码语言:go
复制
// main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/gin-gonic/gin"
)

type SendMessageReq struct {
	Message string `json:"message"`
}

func SendMessage(c *gin.Context) {
	message := c.Param("message")

	err := sendMessageToMQ(message)
	if err != nil {
		c.JSON(200, "error")
		return
	}

	c.JSON(200, "success")
}

func sendMessageToMQ(message string) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
	defer cancel()
	sendMessageReq := SendMessageReq{
		Message: message,
	}

	body, err := json.Marshal(sendMessageReq)
	if err != nil {
		return err
	}
	err = SendMessageToMQ(ctx, RabbitMQSendMessageQueue, body)
	if err != nil {
		return err
	}
	return nil
}

func SendConfirmEmailSync(ctx context.Context) {
	err := RunSendConfirmEmail(ctx)
	if err != nil {
		log.Println(err)
	}
}

func RunSendConfirmEmail(ctx context.Context) error {
	msgs, err := ConsumerMessage(ctx, RabbitMQSendMessageQueue)
	if err != nil {
		return err
	}
	var forever chan struct{}

	go func() {
		for msg := range msgs {
			sendMessageReq := SendMessageReq{}
			err = json.Unmarshal(msg.Body, &sendMessageReq)

			time.Sleep(700 * time.Millisecond)
			fmt.Println("hello,", sendMessageReq.Message)

			msg.Ack(false)
		}
	}()

	<-forever
	return nil
}

func loadScript() {
	ctx := context.Background()
	go SendConfirmEmailSync(ctx)
}

func main() {
	InitRabbitMQ()
	loadScript()

	server := gin.Default()

	server.POST("/send/:message", SendMessage)

	server.Run(":8080")
}
代码语言:go
复制
// rabbtmq.go
package main

import (
	"context"
	"fmt"
	"strings"

	amqp "github.com/rabbitmq/amqp091-go"
)

var RabbitMQ *amqp.Connection

// 根据自己的RabbitMQ环境进行填写
var (
	RabbitMQConnection = "amqp"
	RabbitMQUser       = "admin"
	RabbitMQPassword   = "123456"
	RabbitMQHost       = "127.0.0.1"
	RabbitMQPort       = "5672"
)

var RabbitMQSendMessageQueue = "test-send-message-queue"

func InitRabbitMQ() {
	connString := strings.Join([]string{RabbitMQConnection, "://", RabbitMQUser, ":", RabbitMQPassword, "@", RabbitMQHost, ":", RabbitMQPort, "/"}, "")
	conn, err := amqp.Dial(connString)
	if err != nil {
		panic(fmt.Sprintf("Failed to connect to RabbitMQ: %s", err))
	}
	RabbitMQ = conn
}

func SendMessageToMQ(ctx context.Context, queueName string, body []byte) (err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return
	}

	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	err = ch.PublishWithContext(ctx, "", q.Name, false, false, amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "application/json",
		Body:         body,
	})
	if err != nil {
		return
	}
	return
}

func ConsumerMessage(ctx context.Context, queueName string) (msgs <-chan amqp.Delivery, err error) {
	ch, err := RabbitMQ.Channel()
	if err != nil {
		return nil, err
	}
	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
	// RabbitMQ负载均衡
	err = ch.Qos(1, 0, false)
	if err != nil {
		return nil, err
	}
	return ch.Consume(q.Name, "", false, false, false, false, nil)
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • RabbitMQ安装
  • Go环境安装
  • 前置代码
  • RabbitMQ函数
    • RabbitMQ连接函数
      • RabbitMQ发送消息函数
        • RabbitMQ接收消息函数
        • 具体优化实现
          • 初始化
            • 调用接口
            • 结尾&完整代码示例
            • 愿这篇文章能帮助到你!!!
            相关产品与服务
            消息队列
            腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档