现场大数据项目Kafka组件大多数使用的语言集中在Java、Python、Go、C++。最近现场需求使用Erlang对接,遂有本文。Erlang是一种函数式并发编程语言,专为高并发、分布式、高容错的实时系统设计。rebar3是Erlang生态的标准构建工具,解决Erlang原生开发中依赖管理、编译流程、测试运行等环节的碎片化问题,类似于Java的Maven。
当前Erlang/OTP最新版本为27.2.2,本文使用该版本进行实践,如下:
下载对应版本的源码:
wget https://github.com/erlang/otp/releases/download/OTP-27.2.4/otp_src_27.2.4.tar.gz
tar -xvf otp_src_27.2.4.tar.gz
cd otp_src_27.2.4
源码编译、安装:
./configure
make -j8
make install
验证效果,如下:
erl -version
下载安装包:
wget https://s3.amazonaws.com/rebar3/rebar3
chmod +x rebar3
执行安装命令、验证效果如下:
./rebar3 local install
/root/.cache/rebar3/bin/rebar3 –v
通过rebar3命令构建一个新项目,项目名称为myErlangKafkaApp
/root/.cache/rebar3/bin/rebar3 new app myErlangKafkaApp
构建完成以后,可以查看项目目录结构
cd myErlangKafkaApp && tree
可以看到配置文件rebar.config及src代码目录
配置文件中添加依赖库,brod使用最新版本4.4.0,与其配套kafka_protoco为4.2.3:l
vim rebar.config
{erl_opts, [debug_info]}.
{deps, [
{brod, "4.4.0"},
{kafka_protocol, "4.2.3"}
]}.
{erl_opts, [debug_info]}.
{shell, [
%% {config, "config/sys.config"},
{apps, [myErlangKafkaApp]}
]}.
配置完成,执行命令下载依赖库:
/root/.cache/rebar3/bin/rebar3 deps get
在项目src目录新建自己的代码文件:
vim src/kafka_producer.erl
-module(kafka_producer).
-export([start/0, send_message/2, send_message_async/2]).
-define(KAFKA_BOOTSTRAP, [{"10.121.198.221", 9092}]).
-define(TOPIC, <<"felixzh_topic">>).
start() ->
{ok, _} = application:ensure_all_started(brod),
ok = brod:start_client(?KAFKA_BOOTSTRAP, my_client),
ok = brod:start_producer(my_client, ?TOPIC, []).
send_message(Key, Value) ->
brod:produce_sync(
my_client,
?TOPIC,
0,
Key,
Value
),
io:format("Message sent successfully~n").
send_message_async(Key, Value) ->
brod:produce(
my_client,
?TOPIC,
0,
Key,
Value
).
/root/.cache/rebar3/bin/rebar3 compile
/root/.cache/rebar3/bin/rebar3 shell
%% 启动客户端
kafka_producer:start().
%% 同步发送消息
kafka_producer:send_message(<<"key">>, <<"Hello Kafka!">>).
%% 异步发送消息
kafka_producer:send_message_async(<<"key">>, <<"Hello Kafka!">>).
发送数据完成之后,查看Kafka数据如下:
至此,整个Erlang与brod操作Kafka的入门案例整理完成!