首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >如何使用lapin获取AMQP消息头的值?

如何使用lapin获取AMQP消息头的值?
EN

Stack Overflow用户
提问于 2021-10-22 09:35:26
回答 1查看 106关注 0票数 0

如何通过crate lapin (RabbitMQ客户端)获取AMQP消息的消息头中的值?

我正在尝试从lapin:: message ::Delivery结构中获取邮件头的值。我使用的是返回Option的Delivery.properties.headers()

如何读取FieldTable中的值?

有没有什么例子可以说明如何做到这一点?

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    let mut consumer = channel
        .basic_consume(
            "hello",
            "my_consumer",
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;

    while let Some(delivery) = consumer.next().await {
        let (_, delivery2) = delivery.expect("error in consumer");
        message_cnt+=1;
        let payload_str:String = match String::from_utf8(delivery2.data.to_owned()) {//delivery.data is of type Vec<u8>
            Ok(v) => v,
            Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
        };          
        let log_message:String=format!("message_cnt is:{}, delivery_tag is:{}, exchange is:{}, routing_key is:{}, redelivered is:{}, properties is:'{:?}', received data is:'{:?}'"
            ,&message_cnt
            ,&delivery2.delivery_tag
            ,&delivery2.exchange
            ,&delivery2.routing_key
            ,&delivery2.redelivered
            ,&delivery2.properties//lapin::BasicProperties Contains the properties and the headers of the message.
            ,&payload_str
        );
        let amqp_msg_headers_option:&Option<amq_protocol_types::FieldTable>=delivery2.properties.headers();
        let amqp_msg_headers:&amq_protocol_types::FieldTable=match amqp_msg_headers_option{
            None=>{
                let bt=backtrace::Backtrace::new();
                let log_message=format!(">>>>>At receive_message_from_amqp(), message received has no headers, backtrace is '{:?}'",&bt);
                slog::error!(my_slog_logger,"{}",log_message);
                let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
                return std::result::Result::Err(Box::new(custom_error));
            }
            ,Some(amqp_msg_headers)=>{amqp_msg_headers}
        };
        if amqp_msg_headers.contains_key("worker_id"){
            //let worker_id2:String=amqp_msg_headers.get("worker_id").into();
            let amqp_msg_headers_btm:&std::collections::BTreeMap<amq_protocol_types::ShortString, lapin::types::AMQPValue>=amqp_msg_headers.inner();
            let worker_id2_option=amqp_msg_headers_btm.get(lapin::types::AMQPValue::ShortString("worker_id".into()));
        }
        delivery2
            .ack(BasicAckOptions::default())
            .await?;
    }
EN

回答 1

Stack Overflow用户

发布于 2021-10-25 03:27:31

好吧,我找到了一个解决方案。这就需要使用serde_json。我使用serde_json从从lapin::message::Delivery的属性字段中获得的amq_protocol_types::FieldTable的对象中获取serde_json::Value对象。然后,我对serde_json::Value进行了处理,以获得标头键和值。下面是我为上述逻辑编写的函数。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
fn extract_amqp_msg_headers_values(
    my_slog_logger:&slog::Logger
    ,amqp_msg_headers_basic_properties:&lapin::BasicProperties
)->std::result::Result<std::collections::HashMap<String,String>, Box<std::io::Error>> {
    let mut amqp_msg_headers_hm:std::collections::HashMap<String,String>=std::collections::HashMap::new();
    let amqp_msg_headers_option=amqp_msg_headers_basic_properties.headers();
    let amqp_msg_headers=match amqp_msg_headers_option{
        None=>{
            let bt=backtrace::Backtrace::new();
            let log_message=format!(">>>>>At extract_amqp_msg_headers_values(), message received has no headers, backtrace is '{:?}'",&bt);
            slog::error!(my_slog_logger,"{}",log_message);
            let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
            return std::result::Result::Err(Box::new(custom_error));
        }
        ,Some(amqp_msg_headers)=>{amqp_msg_headers}
    };
    //let mut worker_id2:String="".to_owned();
    let amqp_msg_headers_btm:&std::collections::BTreeMap<amq_protocol_types::ShortString, lapin::types::AMQPValue>=amqp_msg_headers.inner();
    let amqp_msg_headers_serde_value_option=serde_json::to_value(&amqp_msg_headers_btm);
    let amqp_msg_headers_serde_value:serde_json::value::Value=match amqp_msg_headers_serde_value_option{
        Err(err)=>{
            let bt=backtrace::Backtrace::new();
            let log_message=format!(">>>>>At extract_amqp_msg_headers_values(), pos 2b, some error has been encountered transforming amqp_msg_headers to json, amqp_msg_headers is:'{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_msg_headers,&err,&bt);
            slog::error!(my_slog_logger,"{}",log_message);
            let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
            return std::result::Result::Err(Box::new(custom_error));
        }
        ,Ok(serde_json_value)=>{serde_json_value}
    };
    
    let serde_json_map:&serde_json::Map<String,serde_json::Value>=amqp_msg_headers_serde_value.as_object().unwrap();
    
    let amqp_msg_headers_serde_value_key_vec2:Vec<String>=serde_json_map.keys().cloned().collect();
    for amqp_msg_headers_serde_value_key2 in amqp_msg_headers_serde_value_key_vec2{
        let amqp_msg_headers_serde_value2:&serde_json::value::Value=&serde_json_map[&amqp_msg_headers_serde_value_key2];
        let serde_json_map3:&serde_json::Map<String,serde_json::Value>=amqp_msg_headers_serde_value2.as_object().unwrap();
        
        let some_header_key:String=remove_quotes(&amqp_msg_headers_serde_value_key2).to_owned();
        
        let amqp_msg_headers_serde_value_key_vec3:Vec<String>=serde_json_map3.keys().cloned().collect();
        for amqp_msg_headers_serde_value_key3 in amqp_msg_headers_serde_value_key_vec3{
            let some_header_value:String=remove_quotes(&serde_json_map3[&amqp_msg_headers_serde_value_key3].to_string()).to_owned();
            
            amqp_msg_headers_hm.insert(some_header_key.to_owned(),some_header_value.to_owned());
        }
    }
    //slog::info!(my_slog_logger," amqp_msg_headers_hm is:'{:?}'",&amqp_msg_headers_hm);
    Ok(amqp_msg_headers_hm)
}
/// This is a general purpose function to strip double quotes surrounding the supplied &str value.
pub fn remove_quotes(some_str:&str)->&str
{
    // The trimmed string is a slice to the original string, hence no new
    // allocation is performed
    let chars_to_trim: &[char] = &['"',' ', ','];
    let some_str: &str = some_str.trim_matches(chars_to_trim);
    //println!("some_str is:'{}'", some_str);
    return some_str;
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69681016

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文