我最近尝试使用python向Kafka发送消息。当使用简单的字节消息时,它可以工作。但是现在,我有了一个json数据,我需要将其发送到Kafka主题,然后Java应用程序将使用它。
我试图找出如何将json转换为byteArray (这是Java应用程序期望的有效负载)。所以,我想出了下面的python脚本。但它失败了,因为json中有一些布尔变量,并且我得到了一个类型错误,因为Json true和Python True的大小写不同。我尝试用单引号将json括起来,但再次收到错误消息'EOL while scanning string literal‘。只有我修复了这个错误,我才能知道我是否能够将这些数据发送到Kafka,所以到目前为止,我正在努力处理转换部分。下面是我的代码和json。
Json:
{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
,
"state": {
"slips": [{
"id": "550",
"creationDate": "2017-01-30T14:14:14.000+0000",
"accountRef": "1",
"customerRef": "2",
"source": {
"channelRef": "K"
},
"receipt": "O/0000002/0000487",
"isSettled": true,
"isConfirmed": true,
"lines": {
"number": 1,
"win": 1,
"lose": 0,
"voided": 0
}
}]
}
}
Python脚本:
#!/usr/bin/python
from kafka import KafkaProducer
KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'
producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'),bootstrap_servers=KAFKA_BROKERS)
messages = '{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
},
"state": {
"slips": [{
"id": "550",
"creationDate": "2017-01-30T14:14:14.000+0000",
"accountRef": "1",
"customerRef": "2",
"source": {
"channelRef": "K"
},
"receipt": "O/0000002/0000487",
"isSettled": true,
"isConfirmed": true,
"lines": {
"number": 1,
"win": 1,
"lose": 0,
"voided": 0
}
}]
}
}'
info_as_json = json.loads(messages)
producer.send(KAFKA_TOPIC, info_as_json)
在我发布消息之前,消费者一直在使用消息,比如:
messages = [b'hello kafka', b'I am sending', b'3 test messages']
消费者:
#!/usr/bin/python
import sys
from kafka import KafkaConsumer
KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'
consumer = KafkaConsumer(bootstrap_servers=KAFKA_BROKERS,auto_offset_reset='earliest')
consumer.subscribe([KAFKA_TOPIC])
try:
for message in consumer:
print(message.value)
except KeyboardInterrupt:
sys.exit()
更新:
我在json字符串中添加了三重引号,现在生产者代码没有给出任何错误。但是消费者并没有使用这些消息。至少,它没有像我预期的那样打印它们。
发布于 2018-07-23 05:20:44
最后,我可以使用消息了。制片人好像出了点问题。我浏览了StackOverflow上的一些帖子,然后在我的生产者代码中添加了以下两个更改,它就这样工作了。
1)初始化producer时的linger_ms=10
producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'),bootstrap_servers=KAFKA_BROKERS, linger_ms=10)
2)发送消息后刷新
producer.flush()
我还不知道为什么我的生产者在没有这些改变的情况下工作,而不是json。
https://stackoverflow.com/questions/51456992
复制相似问题