首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >将json作为bytearray发送给使用Python的kafka

将json作为bytearray发送给使用Python的kafka
EN

Stack Overflow用户
提问于 2018-07-21 22:28:57
回答 1查看 4.4K关注 0票数 0

我最近尝试使用python向Kafka发送消息。当使用简单的字节消息时,它可以工作。但是现在,我有了一个json数据,我需要将其发送到Kafka主题,然后Java应用程序将使用它。

我试图找出如何将json转换为byteArray (这是Java应用程序期望的有效负载)。所以,我想出了下面的python脚本。但它失败了,因为json中有一些布尔变量,并且我得到了一个类型错误,因为Json true和Python True的大小写不同。我尝试用单引号将json括起来,但再次收到错误消息'EOL while scanning string literal‘。只有我修复了这个错误,我才能知道我是否能够将这些数据发送到Kafka,所以到目前为止,我正在努力处理转换部分。下面是我的代码和json。

Json:

代码语言:javascript
复制
{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
,
"state": {
"slips": [{
  "id": "550",
  "creationDate": "2017-01-30T14:14:14.000+0000",
  "accountRef": "1",
  "customerRef": "2",
  "source": {
    "channelRef": "K"
  },
  "receipt": "O/0000002/0000487",
  "isSettled": true,
  "isConfirmed": true,
  "lines": {
    "number": 1,
    "win": 1,
    "lose": 0,
    "voided": 0
  }
}]
}
}

Python脚本:

代码语言:javascript
复制
#!/usr/bin/python

from kafka import KafkaProducer

KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'

producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'),bootstrap_servers=KAFKA_BROKERS)

messages = '{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
},
"state": {
"slips": [{
"id": "550",
"creationDate": "2017-01-30T14:14:14.000+0000",
"accountRef": "1",
"customerRef": "2",
"source": {
"channelRef": "K"
},
"receipt": "O/0000002/0000487",
"isSettled": true,
"isConfirmed": true,
"lines": {
"number": 1,
"win": 1,
"lose": 0,
"voided": 0
 }
}]
}
}'

info_as_json = json.loads(messages)

producer.send(KAFKA_TOPIC, info_as_json)

在我发布消息之前,消费者一直在使用消息,比如:

代码语言:javascript
复制
messages = [b'hello kafka', b'I am sending', b'3 test messages']

消费者:

代码语言:javascript
复制
#!/usr/bin/python

import sys
from kafka import KafkaConsumer

KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'

consumer = KafkaConsumer(bootstrap_servers=KAFKA_BROKERS,auto_offset_reset='earliest')

consumer.subscribe([KAFKA_TOPIC])
try:
    for message in consumer:
        print(message.value)
except KeyboardInterrupt:
    sys.exit()

更新:

我在json字符串中添加了三重引号,现在生产者代码没有给出任何错误。但是消费者并没有使用这些消息。至少,它没有像我预期的那样打印它们。

EN

回答 1

Stack Overflow用户

发布于 2018-07-23 05:20:44

最后,我可以使用消息了。制片人好像出了点问题。我浏览了StackOverflow上的一些帖子,然后在我的生产者代码中添加了以下两个更改,它就这样工作了。

1)初始化producer时的linger_ms=10

代码语言:javascript
复制
producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'),bootstrap_servers=KAFKA_BROKERS, linger_ms=10)

2)发送消息后刷新

代码语言:javascript
复制
producer.flush()

我还不知道为什么我的生产者在没有这些改变的情况下工作,而不是json。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51456992

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档