如何通过crate lapin (RabbitMQ客户端)获取AMQP消息的消息头中的值?
我正在尝试从lapin:: message ::Delivery结构中获取邮件头的值。我使用的是返回Option的Delivery.properties.headers()
如何读取FieldTable中的值?
有没有什么例子可以说明如何做到这一点?
let mut consumer = channel
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
while let Some(delivery) = consumer.next().await {
let (_, delivery2) = delivery.expect("error in consumer");
message_cnt+=1;
let payload_str:String = match String::from_utf8(delivery2.data.to_owned()) {//delivery.data is of type Vec<u8>
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
let log_message:String=format!("message_cnt is:{}, delivery_tag is:{}, exchange is:{}, routing_key is:{}, redelivered is:{}, properties is:'{:?}', received data is:'{:?}'"
,&message_cnt
,&delivery2.delivery_tag
,&delivery2.exchange
,&delivery2.routing_key
,&delivery2.redelivered
,&delivery2.properties//lapin::BasicProperties Contains the properties and the headers of the message.
,&payload_str
);
let amqp_msg_headers_option:&Option<amq_protocol_types::FieldTable>=delivery2.properties.headers();
let amqp_msg_headers:&amq_protocol_types::FieldTable=match amqp_msg_headers_option{
None=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At receive_message_from_amqp(), message received has no headers, backtrace is '{:?}'",&bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
}
,Some(amqp_msg_headers)=>{amqp_msg_headers}
};
if amqp_msg_headers.contains_key("worker_id"){
//let worker_id2:String=amqp_msg_headers.get("worker_id").into();
let amqp_msg_headers_btm:&std::collections::BTreeMap<amq_protocol_types::ShortString, lapin::types::AMQPValue>=amqp_msg_headers.inner();
let worker_id2_option=amqp_msg_headers_btm.get(lapin::types::AMQPValue::ShortString("worker_id".into()));
}
delivery2
.ack(BasicAckOptions::default())
.await?;
}
发布于 2021-10-25 03:27:31
好吧,我找到了一个解决方案。这就需要使用serde_json。我使用serde_json从从lapin::message::Delivery的属性字段中获得的amq_protocol_types::FieldTable的对象中获取serde_json::Value对象。然后,我对serde_json::Value进行了处理,以获得标头键和值。下面是我为上述逻辑编写的函数。
fn extract_amqp_msg_headers_values(
my_slog_logger:&slog::Logger
,amqp_msg_headers_basic_properties:&lapin::BasicProperties
)->std::result::Result<std::collections::HashMap<String,String>, Box<std::io::Error>> {
let mut amqp_msg_headers_hm:std::collections::HashMap<String,String>=std::collections::HashMap::new();
let amqp_msg_headers_option=amqp_msg_headers_basic_properties.headers();
let amqp_msg_headers=match amqp_msg_headers_option{
None=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At extract_amqp_msg_headers_values(), message received has no headers, backtrace is '{:?}'",&bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
}
,Some(amqp_msg_headers)=>{amqp_msg_headers}
};
//let mut worker_id2:String="".to_owned();
let amqp_msg_headers_btm:&std::collections::BTreeMap<amq_protocol_types::ShortString, lapin::types::AMQPValue>=amqp_msg_headers.inner();
let amqp_msg_headers_serde_value_option=serde_json::to_value(&amqp_msg_headers_btm);
let amqp_msg_headers_serde_value:serde_json::value::Value=match amqp_msg_headers_serde_value_option{
Err(err)=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At extract_amqp_msg_headers_values(), pos 2b, some error has been encountered transforming amqp_msg_headers to json, amqp_msg_headers is:'{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_msg_headers,&err,&bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
}
,Ok(serde_json_value)=>{serde_json_value}
};
let serde_json_map:&serde_json::Map<String,serde_json::Value>=amqp_msg_headers_serde_value.as_object().unwrap();
let amqp_msg_headers_serde_value_key_vec2:Vec<String>=serde_json_map.keys().cloned().collect();
for amqp_msg_headers_serde_value_key2 in amqp_msg_headers_serde_value_key_vec2{
let amqp_msg_headers_serde_value2:&serde_json::value::Value=&serde_json_map[&amqp_msg_headers_serde_value_key2];
let serde_json_map3:&serde_json::Map<String,serde_json::Value>=amqp_msg_headers_serde_value2.as_object().unwrap();
let some_header_key:String=remove_quotes(&amqp_msg_headers_serde_value_key2).to_owned();
let amqp_msg_headers_serde_value_key_vec3:Vec<String>=serde_json_map3.keys().cloned().collect();
for amqp_msg_headers_serde_value_key3 in amqp_msg_headers_serde_value_key_vec3{
let some_header_value:String=remove_quotes(&serde_json_map3[&amqp_msg_headers_serde_value_key3].to_string()).to_owned();
amqp_msg_headers_hm.insert(some_header_key.to_owned(),some_header_value.to_owned());
}
}
//slog::info!(my_slog_logger," amqp_msg_headers_hm is:'{:?}'",&amqp_msg_headers_hm);
Ok(amqp_msg_headers_hm)
}
/// This is a general purpose function to strip double quotes surrounding the supplied &str value.
pub fn remove_quotes(some_str:&str)->&str
{
// The trimmed string is a slice to the original string, hence no new
// allocation is performed
let chars_to_trim: &[char] = &['"',' ', ','];
let some_str: &str = some_str.trim_matches(chars_to_trim);
//println!("some_str is:'{}'", some_str);
return some_str;
}
https://stackoverflow.com/questions/69681016
复制相似问题