前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka的生产者模式(四)

Kafka的生产者模式(四)

作者头像
无涯WuYa
发布2021-04-20 11:27:41
6320
发布2021-04-20 11:27:41
举报

Kafka系统作为MQ的中间件,都是基于生产者和消费者的模式,思维生产者可以简单的理解就是把应用程序的log信息写入到Kafka的集群,因为有了生产者写入的数据,也就有了消费者对数据的消费。Kafka系统的核心组件主要是生产者,消费者,数据流,连接器。其实这也符合逻辑,也就是说信息的输入,中间是处理过程,最后是信息输出的过程,如下所示:

对于Kafka的生产者写入数据的过程,简单的描述主要为:Kafka系统实时读取原始数据(可能是log数据,也可能是应用程序其他的数据),然后把实时读取到的原始数据写入到Kafka的集群中,当然这过程也会涉及到对原始数据的清洗(这些不在本认真的范畴内),Kafka系统生产者的交互具体如下所示:

一般的方式是通过Kafka系统的bin目录下kafka-console-producer.sh来写入数据,然后使用消费端的工具就能够看到往生产者写入数据的过程。下面主要演示下使用代码的方式,也就是单线程的方式往Kafka的生产者里面写入数据,实现代码具体如下:

代码语言:javascript
复制
package MQ;


import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.Properties;

/*
* 模拟Kafka生产者客户端
* */
public class KafkaMqProducer extends Thread
{
   //创建日志对象
   private  final  Logger logger=LoggerFactory.getLogger(KafkaMqProducer.class);


   public Properties configure()
   {
      Properties properties=new Properties();
      //指定kafka的集群地址
      properties.put("bootstrap.servers","localhost:9092");
      //设置应答机制
      properties.put("acks","1");
      //批量提交大小
      properties.put("batch.size",16384);
      //延时提交
      properties.put("linger.ms",1);
      //缓充大小
      properties.put("buffer.memory",33554432);
      //序列化主键
      properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
      //序列化值
      properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
      return properties;
   }

   public  void run()
   {
      Producer<String, String> objProducer=new org.apache.kafka.clients.producer.KafkaProducer<String, String>(this.configure());
      //模拟发送数据
      JSONObject jsonObject=new JSONObject();
      jsonObject.put("username","无涯");
      jsonObject.put("city","西安");
      jsonObject.put("age",18);
      jsonObject.put("date",new Date().toString());
      //异步发送,调用回调函数,给主题login写入数据
      objProducer.send(new ProducerRecord<String, String>("login", jsonObject.toJSONString()), new Callback() {
         @Override
         public void onCompletion(RecordMetadata recordMetadata, Exception e)
         {
            if(e!=null)
            {
               logger.error("发送错误,信息具体为:"+e.getMessage());
            }
            else
            {
               logger.info("写入的数据为:"+recordMetadata.offset());
            }
         }
      });

      try{
         Thread.sleep(3000);
      }catch(Exception e){
         e.printStackTrace();
      }
      //关闭生产者的对象
      objProducer.close();
   }

   public static void main(String[] args)
   {
      KafkaMqProducer kafkaMqProducer=new KafkaMqProducer();
      kafkaMqProducer.start();
   }
}

执行代码后,查看主题“login”消费者,就可以看到数据被写入到了生产者,消费者在执行代码后输出的信息如下:

代码语言:javascript
复制
kafka-console-consumer.sh  --bootstrap-server localhost:9092 -topic login
{"date":"Sat Apr 10 19:42:42 CST 2021","city":"西安","age":18,"username":"无涯"}

在Maven的pom.xml文件里面得引入kafka的信息,具体为:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>2.7.0</version>
</dependency>

如果使用Python来操作Kafka,首先需要安装操作Kafka的第三方的库,库的安装方式为:

代码语言:javascript
复制
pip3 install kafka-python

我们实现把拉钩网搜索测试开发职位的数据写入到Kafka的生产者,那么整体思路就是获取拉勾网测试开发职位的数据,然后Kafka读取数据写入到生产者,实现代码如下:

代码语言:javascript
复制
#!/usr/bin/env python
#!coding:utf-8
from kafka import  KafkaProducer
import  json
import  requests

def laGou():
   r=requests.post(
      url='https://www.lagou.com/jobs/positionAjax.json?needAddtionalResult=false',
      data={'first':False,'pn':2,'kd':'测试开发工程师','sid':'850031016ddf4030a88f6754e5dc006a'},
      headers={
         'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
         'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
         'referer':'https://www.lagou.com/jobs/list_%E6%B5%8B%E8%AF%95%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88?labelWords=&fromSearch=true&suginput=',
         'cookie':'JSESSIONID=ABAAAECAAEBABII00AC62E013C0625CA93D0EB4398C66D6; WEBTJ-ID=20210410%E4%B8%8B%E5%8D%887:57:51195751-178bba53a39117-08cbdf3583191a-33697c08-1296000-178bba53a3ae9b; RECOMMEND_TIP=true; PRE_UTM=; PRE_HOST=; PRE_LAND=https%3A%2F%2Fwww.lagou.com%2F; user_trace_token=20210410195751-f07d0d66-519e-48d4-877c-5387150e2a09; LGSID=20210410195751-db63c8f7-870f-4cbe-ac29-02dc960ab170; PRE_SITE=https%3A%2F%2Fwww.lagou.com; LGUID=20210410195751-67a56d41-e0b1-4cde-ba97-e277061bba86; privacyPolicyPopup=false; _ga=GA1.2.962131978.1618055871; _gat=1; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1618055871; sajssdk_2015_cross_new_user=1; sensorsdata2015session=%7B%7D; _gid=GA1.2.1436722793.1618055871; index_location_city=%E5%85%A8%E5%9B%BD; __lg_stoken__=923813bee6e58b745829da50ad9441f719dc153ffd1937f393cf3fa8fbafe370e01d13c554827a78150d672f8697c19d5842821d051bcd2ee1ce37be887549bcbd1f7469c674; X_MIDDLE_TOKEN=b6b1f78324fc9dd4509291efd5b2504d; SEARCH_ID=4b5c97dd7b084167b3007a6d87b7d95e; X_HTTP_TOKEN=3676ec642119da6d878550816138d25526c88f2008; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22178bba53b758db-02a6a23b110e22-33697c08-1296000-178bba53b76cc0%22%2C%22first_id%22%3A%22%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%2C%22%24os%22%3A%22MacOS%22%2C%22%24browser%22%3A%22Chrome%22%2C%22%24browser_version%22%3A%2289.0.4389.114%22%7D%2C%22%24device_id%22%3A%22178bba53b758db-02a6a23b110e22-33697c08-1296000-178bba53b76cc0%22%7D; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1618055879; TG-TRACK-CODE=search_code; LGRID=20210410195804-8c8f7df0-646e-4133-98c0-63f0266fae20'
      })
   return r.json()

def sendData():
   producer=KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda m: json.dumps(m).encode('ascii'))
   producer.send("login",laGou())
   producer.flush()
   producer.close()

if __name__ == '__main__':
   sendData()

代码执行后,消费端消费的数据信息如下所示:

如上可以看到,数据写入到了生产者,消费者这边就能够看到生产者生产的数据。批量执行代码,见Kafka监控面板里面生产者的性能数据:

感谢您的关注,后续会持续更新!您也可购买我的书籍和视频课程!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档