前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka在php中的使用----生产者与消费者

Kafka在php中的使用----生产者与消费者

作者头像
美团骑手
发布2021-01-18 11:13:25
1.9K0
发布2021-01-18 11:13:25
举报
文章被收录于专栏:技术进阶技术进阶
安装扩展

安装教程 kafka和php的rdkafka扩展教程网上有很多,大家可以自行查询,例如:Kafka-php-使用 PHP 编写的 Kafka 客户端

Kafka文档推荐

不清楚里面的api的可以在文档中查询 kafka中文文档

composer 依赖

创建 composer.json填写内容

代码语言:javascript
复制
{
  "require": {
        "nmred/kafka-php": "v0.2.0.8"
  }
}
异步调用生产者
代码语言:javascript
复制
<?php
require_once __DIR__ . '/Vendor/autoload.php';

use KafkaProducerConfig;
use KafkaProducer;

//异步方式调用
$config = ProducerConfig::getInstance();
//设置数据刷新时间(毫秒)
$config->setMetadataRefreshIntervalMs(10);
//地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//设置生产间隔
$config->setProduceInterval(500);
//生产者
$producer = new Producer(function () {
    return array([
        'topic' => 'test',//主题
        'value' => 'test message',
        'key' => 'testKey',//key
    ]);
});
$producer->success(function ($result) {
    echo '投递成功' . json_encode($result, 256) . PHP_EOL;
});
$producer->error(function ($result) {
    echo '投递失败' . json_encode($result, 256) . PHP_EOL;
});
$producer->send(true);
同步调用生产者
代码语言:javascript
复制
<?php
require_once __DIR__ . '/Vendor/autoload.php';

use KafkaProducerConfig;
use KafkaProducer;

$config = ProducerConfig::getInstance();
//这是元组数据刷新间隔毫秒
$config->setMetadataRefreshIntervalMs(10);
//代理地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//生产间隔
$config->setProduceInterval(10);
$producer = new Producer();
for ($i = 0; $i < 100; $i++) {
    $result = $producer->send(array(
        [
            'topic' => 'test',//主题
            'value' => 'test message',
            'key' => '',//key
        ]
    ));
    echo '投递成功' . json_encode($result, 256) . PHP_EOL;
}
消费者
代码语言:javascript
复制
<?php
require_once __DIR__ . '/Vendor/autoload.php';
use KafkaConsumerConfig;
use KafkaConsumer;
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10);
$config->setMetadataBrokerList('localhost:9092');
//设置分组分区
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(['test']);
//设置偏移量
$config->setOffsetReset('earliest');
$consumer = new Consumer();
$consumer->start(function ($topic, $part, $message) {
    var_dump($message);
});
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-12-21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装扩展
  • Kafka文档推荐
  • composer 依赖
  • 异步调用生产者
  • 同步调用生产者
  • 消费者
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档