60分钟

Kafka实战练习

Kafka实战练习

实验预计耗时:60分钟

1. 课程背景

1.1 课程目的

消息队列Kafka是一个分布式、高吞吐量、高可扩展性的消息系统,其也因为强大的分布式流数据处理能力,被广泛用于大数据数据传递场景。Kafka基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。Kafka具有数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合等场景。

通过此实验,您可以掌握Kafka基本功能的使用,掌握Kafka生产者与消费者的开发过程。

1.2 课前知识准备

学习本课程前,学员需要掌握以下前置知识:

1、能力基础

  • Linux基本操作:掌握Linux远程登录、文件与目录管理、vim编辑器使用等。
  • Java开发基础:掌握Java面向对象编程、Maven项目构建等。

2、相关技术

  • Kafka:是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。消息队列 Kafka的架构图如下所示:
    • 生产者Producer可能是网页活动产生的消息、服务日志等信息。生产者通过push模式将消息发布到 Cloud Kafka 的 Broker 集群。
    • 集群通过ZooKeeper管理集群配置,进行leader选举,故障容错等。
    • 消费者Consumer被划分为若干个Consumer Group。消费者通过pull模式从Broker中消费消息。

3、相关概念

  • Kafka相关术语介绍:
    • Broker:Kafka集群包含一个或多个服务器时,这种服务器被称为broker。
    • Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
    • Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。
    • Producer:负责发布消息到Kafka broker。
    • Consumer:消息消费者,向Kafka broker读取消息的客户端。
    • Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

2. 实验环境

2.1 实验操作环境

本课程需要以下实验操作环境:

  1. 可以接入互联网的笔记本电脑或者台式机,本实验使用的本地计算机为Windows系统。
  2. 实验环境:本地计算机(具备Java开发环境+PuTTY)+腾讯云控制台。
  3. Java软件开发工具包JDK(版本:1.8)
  4. Maven(版本:3.5及以上)
  5. Eclipse或者IDEA,此实验采用IDEA作为开发工具

2.2 实验架构图

本实验将使用EMR三节点集群(Master节点和两个Core节点),使用PuTTY连接Master节点的方式访问集群。实验任务通过Presto和Hive的协同使用,让学员了解Presto的基本使用流程。实验架构图如下:

2.3 实验的数据规划表

资源名称

数据

说明

腾讯云账号

账号:XXXXXXXX、密码:XXXXXXXX

涉及产品如下:VPC、EMR

PuTTY

版本:0.73

Putty下载

3. 实验流程

实验共包含三个阶段的任务:

  • 实验环境准备:需要在腾讯云云CVM上部署一个Kafka实例,步骤包括CVM实例购买,Java安装,ZooKeeper安装以及Kafka的安装与测试。
  • 生产者与消费者代码开发:在体验过简单的Kafka基本功能后,我们将通过Java分别开发Producer和Consumer,用于在Kafka实例上运行,实现消息的生产与消费。
  • 消息生产与消费:在Producer和Consumer打包成功后,我们将它们上传至云服务器,通过运行自己封装的Jar包实现Kafka基本功能的使用。

4. 实验步骤

任务1 实验环境准备

【任务目标】

在腾讯云通过云服务器CVM实例,快速安装Kafka,并在启动Kafka后,测试Kafka的基本功能。

【任务步骤】

1、云服务器购买

1.购买三台云服务器。登录腾讯云官网,进入云服务器主页:https://console.cloud.tencent.com/cvm/index

2.点击“新建“按钮购买云服务器

3.首先在选择机型页面,机型配置如下。

配置项

内容

计费模式

按量计费或竞价实例

地域

广州

可用区

广州三区

网络

新建或使用已有VPC

实例

S5.MEDIUM4(标准型S5,2核4GB)

镜像

公共镜像 CentOS 7.6 64位

系统盘

高性能云硬盘 50GB

公网带宽

免费分配独立公网IP,按使用流量

确认无误后,点击下一步:设置主机

4.设置主机页面,配置如下:

配置项

内容

所属项目

默认项目

安全组

使用使用放通全部端口安全组

实例名称

kafka

登陆方式

设置密码(学员可根据自己习惯选择)

注:如果没有新建安全组,可以通过安全组选项下方的“新建安全组”直接新建。

确认无误后,点击下一步:确认配置信息

5.确认开通

服务器信息如下图所示,点击开通

6.开通后,可以在云服务器实例列表中查看到实例信息。

2、安装Kafka环境

Kafka运行由Java语言编写,故运行依赖于JRE。Kafka依赖于ZooKpeer。因此本步骤安装Kafka之前,先要依次安装Java和ZooKeeper。

1.使用PuTTY登录CVM实例,安装Java开发环境

yum -y install java-1.8.0-openjdk.x86_64

测试是否安装成功:

java -version

2.安装ZooKeeper

切换到/opt目录下:

cd /opt

下载ZooKeeper压缩包:

wget https://course-public-resources-1252758970.cos.ap-chengdu.myqcloud.com/PracticalApplication/202001bigdata/9-kafka/apache-zookeeper-3.5.6-bin.tar.gz

解压缩文件到当前文件夹:

tar -xzvf apache-zookeeper-3.5.6-bin.tar.gz

目录重命名:

mv apache-zookeeper-3.5.6-bin zookeeper

3.配置ZooKeeper

复制默认配置:

cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg

配置zookeeper环境变量,首先打开profile文件;

vim /etc/profile

按i进入编辑模式,在文件末尾添加zookeeper环境变量;

#set zookeeper environment
export ZK_HOME=/opt/zookeeper
export PATH=$ZK_HOME/bin:$PATH

保存文件后,让该环境变量生效:

source /etc/profile

4.启动ZooKeeper:

/opt/zookeeper/bin/zkServer.sh start

全部启动后,查看启动结果,显示启动模式为:standalone;

/opt/zookeeper/bin/zkServer.sh status

5.安装Kafka

切换到/opt目录下:

cd /opt

下载Kafka:

wget https://course-public-resources-1252758970.cos.ap-chengdu.myqcloud.com/PracticalApplication/202001bigdata/9-kafka/kafka_2.11-2.4.0.tgz

解压并进入安装目录:

tar -xzf kafka_2.11-2.4.0.tgz

重命名文件夹名称:

mv kafka_2.11-2.4.0 kafka

启动Kafka服务器,使用默认配置;

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

3、测试Kafka基本功能

1.建立主题

让我们用一个分区和一个副本创建一个名为“ test”的主题:

/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

现在,如果我们运行list topic命令,我们可以看到该主题:

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

2.发送一些消息

Kafka带有一个命令行客户端,它将获取输入,并将其作为消息发送到Kafka。默认情况下,每行将作为单独的消息发送。运行生产者,然后在控制台中键入一些消息以发送到服务器。

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

发送消息你想输入的一些消息内容,如:

This is a message

This is another message

Ctrl+C 结束消息的输入。

3.启动消费者

Kafka还有一个命令行消费者,它将消息转储到标准输出。

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

任务2 生产者与消费者代码开发

【任务目标】

使用IDEA创建Maven工程,开发Kafka生产者与消费者,并打包等待上传云服务器运行。

【任务步骤】

1、Idea中创建项目

1.打开IDEA后,点击Create New Project

2.选择项目类型为Maven:

3.GroupId为:com.test;ArtifactId为:kafka_project;点击Next

4.Project name为:kafka_project,点击Finish

5.项目创建成功后配置Maven的pom.xml,内容参考如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>kafka_project</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>Consumer</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
   implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.test.bigdata.Consumer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

编辑pom.xml后,导入并开始下载依赖。

2、项目代码编写

1.在java目录下,创建包com.test.bigdata;

2.在创建好的包内创建生产者Producer类,并编写生产者代码:

package com.test.bigdata;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Producer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 设置broker地址,请修改为CVM实例的内网IP
        props.put("bootstrap.servers", "172.16.16.3:9092");
        props.put("retries", 0);
        // 配置key-value允许使用参数化类型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(props);
        //使用producer发送一条消息
        producer.send(new ProducerRecord("test", "key1", "这是一条消息"));
        System.out.println("发送成功");
        producer.close();
    }
}

注意:请自行修改bootstrap.servers的IP地址为您自己的实例IP。

3.编写消费者代码;

在com.test.bigdata包下创建Cosumer.java类,编写代码如下:

package com.test.bigdata;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        Properties props=new Properties();
        // 设置broker地址,请修改为CVM实例的内网IP
        props.put("bootstrap.servers","172.16.16.3:9092");
        props.put("group.id","tp");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        //创建一个消费者客户端实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅主题
        consumer.subscribe(Collections.singletonList("test"));
        System.out.println("Subscribed to topic"+"test");
        //循环消费消息
        while(true){
            ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records){
                System.out.println("receiver a message from consumer client:"+record.value());
            }
        }
    }
}

注意:请自行修改bootstrap.servers的IP地址为您自己的实例IP。

3、项目代码打包

1.首先对Producer进行打包,注意pom.xml中的finalName为Producer,mainClass为com.test.bigdata.Producer;

2.双击Maven项目生命周期的package,项目打包后会在项目路径下的target目录下生成一个Producer.jar

3.接下来打包Consumer,需要先修改pom.xml中的finalName标签为Consumer,mainClass标签为com.test.bigdata.Consumer;

4.将项目目录下的target文件夹内的Producer.jar,Consumer.jar移动到D盘,等待上传云服务器。

任务3 消息生产与消费

【任务目标】

在云服务器Kafka实例上运行生产者和消费者的Jar包,实现消息的生产与消费。

【任务步骤】

1、jar包上传云服务器

1.使用mkdir命令在CVM中创建一个/test目录。

创建文件夹test;

mkdir /test

切换到test路径下;

cd /test

2.找到PuTTY的安装目录,在上方地址栏输入cmd并执行。

3.上传jar包

在弹出的黑窗口首先输入psftp,打开psftp工具用来传输文件;

psftp

接下来连接服务器,回车后需要输入用户名和密码;

open xxx.xxx.xxx.xxx 

用于切换远程Linux 服务器上的目录;

cd /test/

lcd命令用于切换本地的路径;

lcd D:\

上传生产者jar包;

put Producer.jar 

上传消费者jar包;

put Consumer.jar

上传需要一点时间,请耐心等待,命令使用可以参考下图:

2、执行消息生产与消费

1.执行消息生产

本步骤需同时打开两个PuTTY客户端链接实例,首先在第一个PuTTY上使用Java运行消费者,执行命令如下;

切换到/test目录:

cd /test

执行Consumer:

java -jar Consumer.jar

在打印Subscribed to topictest后,表示等待topic里的数据接收。

2.另启一个PuTTY客户端连接实例,执行生产者,执行命令如下:

切换到/test目录:

cd /test

执行Producer:

java -jar Producer.jar

3.再回看第一个PuTTY客户端,消费者此时已经接收到了信息,打印结果如下:

至此,您已经完成了本次实验的全部任务,相信您已经掌握了Kafka的基本编程与接本功能的使用。

5. 注意事项

如实验资源无需保留,请在实验结束后及时销毁,以免产生额外费用。