前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang实现的持久化消息队列-OPQ

Golang实现的持久化消息队列-OPQ

原创
作者头像
levinllin
发布2019-02-21 10:02:50
2.3K0
发布2019-02-21 10:02:50
举报

这是个搁置了有段时间了的个人兴趣项目,还有不少完善工作需要做,这里记录下基本思路和实现,欢迎拍砖。

一、OPQ是什么

  • An Open sourced Persistent message Queue
  • 一款开源的持久化消息队列
  • 基于go 1.4.2实现
  • 功能
  1. 消息持久化
  2. 采用推送模式
  3. 易用,无需集成客户端,调用服务API即可
  4. 消息重放
  5. 高性能(目标)
  6. 运维友好——平滑重启/高可用(todo)/可视化控制台(todo)等
  • 性能(机器 - 单台mac pro)
  1. 当消息体大小为2K Bytes时,>20,000Message/Second
  2. 当消息体大小为1K Bytes时,>30,000Message/Second
  3. 当消息体大小为128Bytes时,>60,000Message/Second

二、如何使用

下载源码

代码语言:javascript
复制
go get -u github.com/LevinLin/OPQ

编译安装

代码语言:javascript
复制
cd /path/to/OPQ
go build

运行服务

代码语言:javascript
复制
cd /path/to/OPQ
nohup ./OPQ &>/dev/null &

参数说明:

-debug

当debug=yes时,服务运行在debug模式,主要用于log/output

-port

监听端口,默认8999

-syslog

系统日志,默认为system.log

平滑重启
代码语言:javascript
复制
kill -1 %{PID}

提交消息

url

http://<HOST>[:<PORT>]/opq/push

post fields

url: 目标url topic: topic名称,每条消息必须指定一个topic message: 消息具体内容

headers

头部,如果需要

请求示例(PHP)
代码语言:javascript
复制
<?php
  $url = "http://localhost:8999/opq/push";
  
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $url);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
  curl_setopt($ch, CURLOPT_POST, 1);

  $data = array(
      'url' => 'http://127.0.0.1/Comment/addComment?comment=nny&user=q18',
      'topic'=> 'comment',
      'message'=> 'this is message body',
  );
  curl_setopt($ch, CURLOPT_POSTFIELDS, $data);

  $response = curl_exec($ch);
  var_dump($response);
  curl_close($ch);

回放消息

url

http://<HOST>[:<PORT>]/opq/replay

post fields

topic: 指定需要回放消息所属topic cmd: 消息序号

headers

头部,如果需要

请求示例(PHP)
代码语言:javascript
复制
<?php
  $url = "http://localhost:8999/opq/replay";
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $url);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); 
  curl_setopt($ch, CURLOPT_POST, 1); 
  
  $data = array(
      'topic'=> 'comment',
      'cmd'=> '30',
  );  
  curl_setopt($ch, CURLOPT_POSTFIELDS, $data);

  $response = curl_exec($ch);
  var_dump($response);
  curl_close($ch);

三、设计思路

总体上借鉴了kafka的设计(topic/消息定位等),但是没有照搬具体实现,同时舍掉了客户端代码的需求。

总体架构图:

listener

或者称为dispatcher,负责监听对消息队列的请求,将请求处理任务加到任务队列(task queue)里

recorder(s)

多个recoder,并发获取任务队列里的任务进行处理(主要是数据序列化),然后通知record service进行持久化操作

record service

record service负责数据在持久化过程中的串行写入,根据消息所属topic,分别更新对应路径下的文件:索引(<N>.idx,文件按固定数目进行切分,N为切分区间最小的消息序号),消息(<N>.msg),总数(cmd)

deliverer(s)

从dlv文件获取需要发送的消息起始序号M,根据M从索引文件查找比该消息更早的最近一条消息的索引信息S,根据S从消息文件查找到序号M的消息内容,依次顺序发送后面的消息到对应目标地址,同时更新已发送序号到dlv文件

代码结构:

四、性能方面的一些考虑

  • 带缓冲的写入,即累计一定数目的消息或达到一定数据量才写入文件,减少磁盘IO
  • 采用稀疏索引,保证索引文件足够小
  • 采用内存映射,将用到的索引文件放入内存,减少磁盘IO
  • 采用二分查找+顺序查找的方式定位索引
  • 采用FlatBuffers进行数据的序列化/反序列化
  • 基于Goroutine的dispatcher/workers模型,worker数目可调

五、待完善的事项

  • 解决单点问题,实现HA
  • 友好的管理平台Admin Portal
  • 性能优化
  • 全面的测试,目前未经过生产环境验证,请慎用( ̄▽ ̄)"

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、OPQ是什么
  • 二、如何使用
    • 下载源码
      • 编译安装
        • 运行服务
          • -debug
          • -port
          • -syslog
          • 平滑重启
        • 提交消息
          • url
          • post fields
          • headers
          • 请求示例(PHP)
        • 回放消息
          • url
          • post fields
          • headers
          • 请求示例(PHP)
          • listener
          • recorder(s)
          • record service
          • deliverer(s)
      • 三、设计思路
      • 四、性能方面的一些考虑
      • 五、待完善的事项
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档