Git 项目推荐 | 基于go+protobuff 实现的分布式

eQ

基于go+protobuff实现的多种持久化方案的mq框架

Client For KiteQ

Go:    https://github.com/blackbeans/kiteq-client-go
Java : https://github.com/blackbeans/kiteq-client-java
PHP:   https://github.com/blackbeans/kiteq-client-php
C++: https://github.com/quguangjie/kiteq-client-cpp

简介

* 基于zk维护发送方、订阅方、broker订阅发送关系、支持水平、垂直方面的扩展
* 基于与topic以及第二级messageType订阅消息
* 基于mysql、文件存储方式多重持久层消息存储
* 保证可靠异步投递
* 支持两阶段提交分布式事务
* 自定义group内的Topic级别的流控措施,保护订阅方安全
* kiteserver的流量保护

工程结构

kiteq/
├── README.md
├── conf              配置信息
├── log               log4go的配置
├── build.sh          安装脚本
├── doc               文档
├── handler           KiteQ所需要的处理Handler
├── kiteq.go          KiteQ对外启动入口        
└── server             KiteQ的Server端组装需要的组件
概念:
* Binding:订阅关系,描述订阅某种消息类型的数据结构
* Consumer : 消息的消费方
* Producer : 消息的发送方
* Topic: 消息的主题比如 Trade则为消息主题,一般可以定义为某种业务类型
* MessageType: 第二级别的消息类型,比如Trade下存在支付成功的pay-succ-200的消息类型

架构图

Zookeeper数据结构

    KiteServer : /kiteq/server/${topic}/ip:port
    Producer   : /kiteq/pub/${topic}/${groupId}/ip:port
    Consumer   : /kiteq/sub/${topic}/${groupId}-bind/#$data(bind)
流程:
1. KiteQ启动会将自己可以接受和投递的Topics列表给到zookeeper
2. KiteQ拉取Zookeeper上的Topics下的订阅关系(Bingding:订阅方推送上来的订阅消息信息)。
3. Consumer推送自己需要订阅的Topic+messageType的消息的订阅关系(Binding)到Zookeeper
4. Consumer拉取当前提供推送Topics消息的KiteQ地址列表,并发起TCP长连接
5. Producer推送自己可以发布消息Topics列表到Zookeeper
6. Producer拉取当前提供接受Topics消息的KiteQ地址列表,并发起TCP长连接
订阅方式:
Direct (直接订阅): 明确的Topic+MessageType订阅消息
Regx(正则式订阅):  Topic级别下,对MessageType进行正则匹配方式订阅消息
Fanout(广播式订阅): Topic级别下,订阅所有的MessageType的消息
两阶段提交:
因为引入了异步投递方案,所以在有些场景下需要本地执行某个事务成功的时候,本条消息才可以被订阅方消费。
例如:
    用户购买会员支付成功成功需要修改本地用户账户Mysql的余额、并且告知会员系统为用户的会员期限延长。
    这个时候就会碰到、必须在保证mysql操作成功的情况下,会员系统才可以接收到会员延期的消息。

对于以上的问题,KiteQ的处理和ali的Notify系统一样,
    1. 发送一个UnCommit的消息到KiteQ ,KiteQ 不会对Uncommite的消息做投递操作
    2. KiteQ定期对UnCommit的消息向Producer发送TxAck的询问
    3. 直到Producer明确告诉Commit或者Rollback该消息
    4. Commit会走正常投递流程、Rollback会对当前消息回滚即删除操作。
QuickStart
1.编译:sh build.sh 
2.安装装Zookeeper:省略
3.启动KiteQ:
    命令参数:
        ./kiteq -bind=172.30.3.124:13800 -pport=13801 -db="memory://initcap=10000&maxcap=20000" -topics=trade,feed -zkhost=localhost:2181
        -bind  //绑定本地IP:Port
        -pport //pprof的Http端口
        -db //存储的协议地址  mock:// 启动mock模式 mysql:// mmap:// 
        -topics //本机可以处理的topics列表逗号分隔
        -zkhost //zk的地址
        -logxml=./log.xml //log4go的配置
        -deliveryFirst=false //是否投递优先

    toml配置启动(推荐)
        ./kiteq -clusterName=集群名称 -configPath=配置文件路径
        文件样例见[conf/cluster.toml]
启动客户端:

参考:

github.com/blackbeans/kiteq-client-go

原文发布于微信公众号 - 码云Gitee(mayunOSC)

原文发表时间:2016-06-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏游戏杂谈

express:node throwing error on mongodb

与此类似node throwing error on mongodb,一直报Db.open那里出错,查源代码,发现应该是有err参数传入,因为之前从来没接触过m...

701
来自专栏乐沙弥的世界

快速体验Percona XtraDB Cluster(PXC)

Percona XtraDB Cluster(简称PXC)集群是基于Galera 2.x library,事务型应用下的通用的多主同步复制插件,主要用于解决强一...

1122
来自专栏云计算教程系列

如何在Ubuntu 14.04上的主代理安装程序中安装Puppet 4

来自Puppet Labs的Puppet是一种配置管理工具,可帮助系统管理员自动化服务器基础架构的配置,配置和管理。提前规划并使用Puppet等配置管理工具可以...

1283
来自专栏浪淘沙

Kafka学习笔记

Apache Kafka 是分布式发布-订阅消息系统(消息中间件)。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka ...

2433
来自专栏精讲JAVA

RocketMQ 源码学习 1 : 整体结构

为什么是 RocketMQ,而不是 ActiveMQ/RabbitMQ/Kafka 呢?这不是技术选型,我只是想找一个业界比较好的、开源的 MQ 系统,学习一下...

1463
来自专栏MessageQueue

Pulsar-Producer实现分析

“Pulsar is a distributed pub-sub messaging platform with a very flexible messagi...

1562
来自专栏zhisheng

RocketMQ 安装及快速入门

如果你对 RocketMQ 还没了解,建议先看下上一篇文章:RocketMQ 初探 安装条件 64位操作系统,建议使用 Linux / Unix / Mac; ...

42411
来自专栏码神联盟

ActiveMQ入门篇一入门实例1

上一期,我们讲解了ActiveMQ的原理和概念,以及用它来实现MQ的一些优势,今天我们来写一个入门级的示例。 首先,在写示例之前,我们先了解下ActiveMQ关...

3356

使用CoreOs,Docker和Nirmata来部署微服务风格的应用程序

随着应用程序容器(application container)的技术越来越被大众接受,设计用于运行容器的“最轻量级”操作系统正在变得非常流行。CoreOS便是这...

3967

使用CoreOS,Docker和Nirmata部署微服务风格的应用程序

随着应用容器的运用持续火热,设计用于运行容器的“最小”操作系统也悄然而生。CoreOS就是其中一个操作系统,它被设计用于运行现代原生云应用,并且支持Docker...

49111

扫码关注云+社区

领取腾讯云代金券