前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pulsar学习笔记1:helloworld

pulsar学习笔记1:helloworld

作者头像
菩提树下的杨过
发布2019-07-02 16:59:10
1.1K0
发布2019-07-02 16:59:10
举报

pulsar号称是下一代的消息系统,这二年风光无限,大有干掉kafka的势头,如果想快速体验下,可以按以下步骤在本地搭建一个单机版本:(mac环境+jdk8)

一、 下载

代码语言:javascript
复制
wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.3.2/apache-pulsar-2.3.2-bin.tar.gz

目前最新版本是2.3.2 

二、解压

代码语言:javascript
复制
tar -zxvf apache-pulsar-2.3.2-bin.tar.gz

三、单机模式启动

代码语言:javascript
复制
cd apache-pulsar-2.3.2
bin/pulsar standalone

四、 测试收发消息

pulsar自带的client工具,可以直接测试收发消息。

收消息命令如下:

代码语言:javascript
复制
bin/pulsar-client consume my-topic -s "first-subscription"

表示从“my-topic”这个topic上消费消息,并且指定订阅名称为“first-subscription”

发消息命令如下:

代码语言:javascript
复制
bin/pulsar-client produce my-topic --messages "hello pulsar"

表示发送消息到my-topic这个topic上。

注:测试时,可以单独开2个terminal,先在其中1个运行consumer,然后在另1个运行produce

附:如果希望写java代码实现收发消息,可参考https://github.com/yjmyzz/pulsar-sample

五、 Function测试

function是一个极有前途的功能,可以把一个topic中喷出的消息,实时接收并处理后,再把处理结果发到另一个topic,相当于轻量级的流式计算。

./examples目录下有一个api-examples.jar包,里面自带了一些Function示例。

5.1 部署Function

部署的过程,其实就是把带处理逻辑的jar包,放到集群上,命令如下:

代码语言:javascript
复制
bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--className org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/exclamation-input \
--output persistent://public/default/exclamation-output \
--name exclamation

大致是创建一个function,来源是examples/api-examples.jar这个文件,并指定了具体的类名(因为一个jar包中,可以写多个function,必须指定具体的className), 然后这个function的入参是exclamation-input这个topic,处理完的结果,将输出到exclamation-output,最后这个function在pulsar中的名字是exclamation - 注:如果上述命令执行失败,可以尝试把className,换成classname. (不同版本的pulsar这个参数的大小写略有不同)

附:ExclamationFunction的java源码如下,逻辑很简单,只是在输入参数后加一个!

代码语言:javascript
复制
package org.apache.pulsar.functions.api.examples;

import java.util.function.Function;

public class ExclamationFunction implements Function<String, String> {
    @Override
    public String apply(String input) {
        return String.format("%s!", input);
    }
}

5.2 查看已部署的function列表

代码语言:javascript
复制
bin/pulsar-admin functions list \
--tenant public \
--namespace default

5.3 启动消费者,查看实时处理结果   

代码语言:javascript
复制
bin/pulsar-client consume persistent://public/default/exclamation-output \
--subscription-name my-subscription \
--num-messages 0

5.4 启动生产者,产生实时处理所需的素材

代码语言:javascript
复制
bin/pulsar-client produce persistent://public/default/exclamation-input \
--num-produce 1 \
--messages "Hello world"

参考文章:

http://pulsar.apache.org/docs/en/functions-quickstart/

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档