前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用云函数实现消息流转

使用云函数实现消息流转

原创
作者头像
haimingli
修改2021-01-04 20:10:24
6680
修改2021-01-04 20:10:24
举报
文章被收录于专栏:腾讯云中间件专家服务

背景

在使用消息队列时,消息流转是常见的需求,比如消息需要从ckafka的实例转储到另一个ckafka实例。消息流转的目的通常是为了能够访问不同网络的消息队列,这是因为云上的消息队列通常只开放内网访问。在很多情况下,用户往往是在云服务器cvm中部署一个中转程序。这种方案存在以下不足:

  • 资源可能过度供给。云服务器只用于消息转储太浪费。
  • 资源可能严重不足。在生产或者消费处于高峰时,云服务器无法自动扩展资源。

从一点出发,云函数可以很好地克服云服务器在消息流传上的不足。

原理

云函数的特点是

> 无服务器云函数可以让用户无需关心服务器的部署运营,只需开发最核心的业务逻辑,即可实现上线运营,具备分布容灾能力,可依据负载自动扩缩容,按照实际调用次数与时长计费

与云服务器相比,云函数的优势在于:

  • 便宜。按量付费,用多少资源交多少钱。
  • 省心。全自动运维,资源自动伸缩。

详细方案

TDMQ为例,要实现消息在不同网络的消息队列中流转,需要在消息队列之间使用云函数作为中间件,如下图所示:

这里的难点在于需要将消息队列的生产者和消费者程序按云函数的模板改写,下面使用golang语言,以生产者为例进行说明。参考了云函数的golang文档

一个hello程序如下所示:

package main import ( "context" "fmt" "github.com/tencentyun/scf-go-lib/cloudfunction" ) type DefineEvent struct { // test event define Key1 string `json:"key1"` Key2 string `json:"key2"` } func hello(ctx context.Context, event DefineEvent) (string, error) { fmt.Println("key1:", event.Key1) fmt.Println("key2:", event.Key2) return fmt.Sprintf("Hello %s!", event.Key1), nil } func main() { // Make the handler available for Remote Procedure Call by Cloud Function cloudfunction.Start(hello) }

这里按照TDMQ golang模板把上面改写为:

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package main import ( "context" "log" "strconv" "github.com/TencentCloud/tdmq-go-client/pulsar" "github.com/tencentyun/scf-go-lib/cloudfunction" ) type DefineEvent struct { // test event define Key1 string `json:"key1"` Key2 string `json:"key2"` } func produce(ctx context.Context, event DefineEvent) { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://ip:6000", ListenerName: "custom:pulsar-*********/vpc-*********/subnet-*********", Authentication: pulsar.NewAuthenticationToken("eyJrZX*********4XRTqs"), }) if err != nil { log.Fatal(err) } defer client.Close() producer, err := client.CreateProducer(pulsar.ProducerOptions{ DisableBatching: true, Topic: "persistent://pulsar-*********/*********/*********", }) if err != nil { log.Fatal(err) } defer producer.Close() //ctx := context.Background() for j := 0; j < 10; j++ { if msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte("Hello " + strconv.Itoa(j)), }); err != nil { log.Fatal(err) } else { log.Println("Published message: ", msgID) } } } func main() { cloudfunction.Start(produce) }

接着就可以编译,然后上传验证了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 原理
  • 详细方案
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档