前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 入门

kafka 入门

作者头像
句小芒
发布2022-12-29 18:07:55
4110
发布2022-12-29 18:07:55
举报
简介

kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。 Kafka安装配置

注: 新版本的kafka 里面集成了zookeeper ,所以不需要再单独下载安装。

下载

代码语言:javascript
复制
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
解压
代码语言:javascript
复制
 tar -zxvf kafka_2.12-2.5.0.tgz
修改 kafka-server 的配置文件

cd kafka_2.11-2.2.0

vim config/server.properties 修改其中的:

代码语言:javascript
复制
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka-logs

启动zk【默认端口2181】

代码语言:javascript
复制
bin/zookeeper-server-start.sh config/zookeeper.properties

出现下图,说明启动成功

启动命令我们可能会想要以守护进程的方式启动进程:在命令行末尾加上 &即可;

接下来启动 kafka
启动Kafka

使用 kafka-server-start.sh 启动 kafka 服务: cd 到解压包的目录下,使用命令

代码语言:javascript
复制
bin/kafka-server-start.sh config/server.properties &

如果不需要以守护进程方式启动,可以不加&

如果报错:

代码语言:javascript
复制
/usr/local/kafka/kafka_2.11-1.1.0/bin/kafka-run-class.sh:行271: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-5.b12.el7_4.x86_64//bin/java: 没有那个文件或目录

通过echo $PATH 查看环境变量得知:原因我们在这里配置了jdk环境变量,没找到执行命令java而导致报错。

解决办法: 打开环境变量文件,删掉或注释掉自己配置的jdk配置,使用系统默认的即可正常启动

测试使用

创建 topic 使用 kafka-topics.sh 创建单分区单副本的 topic demo:

代码语言:javascript
复制
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo

查看 topic 列表:

代码语言:javascript
复制
bin/kafka-topics.sh --list --zookeeper localhost:2181
发送消息【生产者】

开启一个终端,使用 kafka-console-producer.sh 发送消息:

代码语言:javascript
复制
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
读取消息【消费者】

再开启一个终端,使用 kafka-console-consumer.sh 接收消息并在终端打印:

代码语言:javascript
复制
 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

接下来,使用PHP来生产数据:

首先我们要安装kafka 扩展

1、安装kafka的扩展php-rdkafka 1.1、在安装php-rdkafka之前,需要先安装librdkafka

代码语言:javascript
复制
git clone https://github.com/edenhill/librdkafka.git
 ./configure
 make
 sudo make install

1.2、安装php-rdkafka

代码语言:javascript
复制
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
 
#生成configure文件
$ /Users/shiyibo/LNMP/php/bin/phpize   //这个根据自己路径
 
#编译安装
$ ./configure --with-php-config=/Users/shiyibo/LNMP/php/bin/php-config   //这里是自己的扩展路径
$ make
$ make install 
 
#在php.ini 文件中配置 rdkafka扩展
$ vim /Users/shiyibo/LNMP/php/etc/php.ini   //这里是自己的PHP配置文件地址
extension=rdkafka.so
 
#查看扩展是否生效
$php -m | grep kafka
生产者代码实现
代码语言:javascript
复制
<?php
/**
 * 消息生产者
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Producer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("demo");

// 从终端接收输入 
$oInputHandler = fopen('php://stdin', 'r');

while (true) {
    echo "\nEnter  messages:\n";
    $sMsg = trim(fgets($oInputHandler));

   // 空消息意味着退出
    if (empty($sMsg)) {
        break;
    }

    // 发送消息
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}

echo "done\n";
终端开启一个消费者窗口
代码语言:javascript
复制
# 因为生产者会往demo的topic中发送消息,消费者直接消费demo即可
kafka-console-consumer --bootstrap-server localhost:9092 --topic demo

终端执行生产者php 文件:

代码语言:javascript
复制
php producer.php

进行生产,然后查看消费者终端。

消费者也可通过PHP 代码实现:

代码语言:javascript
复制
<?php

/**
 * 消费者消费消息
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("demo");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $oMsg->payload, "\n";
    }
}

此时只需执行consumer.php 文件即可看到生产数据

代码语言:javascript
复制
php consumer.php

kafka 函数参考地址

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 下载
    • 解压
      • 修改 kafka-server 的配置文件
        • 接下来启动 kafka
          • 启动Kafka
            • 测试使用
              • 发送消息【生产者】
                • 读取消息【消费者】
                  • 首先我们要安装kafka 扩展
                    • 生产者代码实现
                      • 终端开启一个消费者窗口
                      相关产品与服务
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档