前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MQ Kafka

MQ Kafka

原创
作者头像
vanguard
修改2021-01-04 10:57:35
1.4K0
修改2021-01-04 10:57:35
举报
文章被收录于专栏:vanguard

Message Queue/消息队列/分布式消息中间件,

异步通信/解耦/冗余/扩展/过载保护/可恢复性/顺序保证/缓冲/数据流处理

Options: Kafka,ActiveMQ,RabbitMQ, WebSphere MQ*(IBM),RocketMQ(阿里系) ...

Protocal 概念和细节比较多,自动略过文字直奔脚本,或者概念参考-> https://www.cnblogs.com/sea520/p/11125174.html

AMQP/Advanced Message Queuing Protocol/先进//高级消息队列协议,应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。可靠、通用

MQTT/Message Queuing Telemetry Transport/消息队列遥测传输是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

STOMP/Streaming Text Orientated Message Protocol/流文本定向消息协议,为MOM/Message Oriented Middleware/面向消息的中间件设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。 命令模式(非topic\queue模式)

XMPP/Extensible Messaging and Presence Protocol/可扩展消息处理现场协议,基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大

redis、kafka、zeroMq等根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现MQ功能

Details

Broker/消息服务器/server,提供消息核心服务;

Producer/消息生产者/producer,业务的发起方产生消息 -> broker;

Consumer/消息消费者,业务的处理方负责从broker获取消息并进行业务逻辑处理;

Topic/主题,发布订阅模式下消息汇集地,不同生产者向其发送消息,由MQ服务器分发到不同订阅者,实现消息广播/broadcast;

Queue/队列,PTP Point To Point/点对点模式下特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收;

Message/消息体,根据不同通信协议定义的固定格式进行编码的数据包封装业务数据;

Kafka -> http://kafka.apache.org/quickstart

快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;

高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;

高堆积:支持topic下消费者较长时间离线,消息堆积量大;

完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡;

支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,是个可行的解决方案。

代码语言:shell
复制
# 0. JVM prepared
# 1. download
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -xzf kafka_2.12-2.4.0.tgz && cd kafka_2.12-2.4.0
# 2. start server
bin/zookeeper-server-start.sh config/zookeeper.properties
# INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
bin/kafka-server-start.sh config/server.properties
# INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
# 3. create topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 4. send a message
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# send xxx
# 5. start a consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# echo xxx
# 6. Setting up a multi-broker cluster
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dirs=/tmp/kafka-logs-1
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dirs=/tmp/kafka-logs-2
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test

#########################################################################################
# server started # ps -ef | grep kafka
# bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper_log.file 2>&1 &
# bin/kafka-server-start.sh config/server.properties > kafka_log.file 2>&1 &
# bin/kafka-topics.sh --create --bootstrap-server 10.170.15.54:9092 --replication-factor 1 --partitions 1 --topic hello
# bin/kafka-topics.sh --list --bootstrap-server 10.170.15.54:9092

# library installed
# pip install kafka
# pip install kafka-python

from kafka import KafkaProducer
from kafka import KafkaConsumer

KAFAKA_TOPIC = "hello"
KAFAKA_SERVERS = ['10.170.15.54:9092']
 
def kfk_producer_send():
    producer = KafkaProducer(bootstrap_servers=KAFAKA_SERVERS)
    producer.send(KAFAKA_TOPIC, bytes("学而不思则罔",encoding="utf-8"))
    producer.send(KAFAKA_TOPIC, bytes("思而不学则殆",encoding="utf-8"))
    print("XXX SENT TWO MESSAGE XXX")
    producer.flush()
    producer.close()
    print("PRODUCER END")

def kfk_consumer_receive():
    consumer = KafkaConsumer(bootstrap_servers=KAFAKA_SERVERS)
    consumer.subscribe(KAFAKA_TOPIC)
    for msg in consumer: print(msg.value) 
    consumer.close()
    print("CONSUMER END")

if __name__ == "__main__":
    kfk_producer_send()
    kfk_consumer_receive()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档