首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用音调gRPC流解绑要发送的BoxStream

使用音调gRPC流解绑要发送的BoxStream
EN

Stack Overflow用户
提问于 2021-09-10 15:00:22
回答 1查看 253关注 0票数 0

我是Rust的新手,并且使用Tonic编写了一个简单的应用程序,该应用程序将在gRPC上传输一些值。这些值最初是作为BoxStream (Pin<Box<Stream>>)从外部库中获取的,而requires的API需要一些实现Stream的东西(当然Pin不需要)。

Tonic's streaming example使用ReceiverStream将mpsc频道转换为流,并派生出一个线程将值推入其中。这将要求流的生命周期为'static,这不是我的实际实现的选项,因为我的流的生命周期与返回它的类相关联。

最好的方式是提供一些实现流的东西,我可以从我的Pin<Box<Stream>>提供给Tonic

src/main.rs (这不会编译,因为BoxStream<'static,Entry>不实现IntoStreamingRequest)

代码语言:javascript
运行
复制
use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};

struct Entry {
    key: String,
}

fn main() {
    // Create Request
    let stream: BoxStream<'static, Entry> = api_function();
    let request = stream.into_streaming_request();

    // Send request
    //let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
    //let response = client.grpc_function(request).await?;
}

fn api_function() -> BoxStream<'static, Entry> {
    Box::pin(stream! {
        let entries = vec!(
            Entry {key: String::from("value1")},
            Entry {key: String::from("value2")}
        );

        for entry in entries {
            yield entry;
        }
    })
}

Cargo.toml

代码语言:javascript
运行
复制
[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"

提供的编译错误:

代码语言:javascript
运行
复制
error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
   --> src\main.rs:12:26
    |
12  |     let request = stream.into_streaming_request();
    |                          ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
    |
   ::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
    |
408 | pub struct Pin<P> {
    | -----------------
    | |
    | doesn't satisfy `_: IntoStreamingRequest`
    | doesn't satisfy `_: Sync`
    |
   ::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
    |
27  | pub trait Stream {
    | ----------------
    | |
    | doesn't satisfy `_: IntoStreamingRequest`
    | doesn't satisfy `_: Sized`
    | doesn't satisfy `_: Sync`
    |
    = note: the following trait bounds were not satisfied:
            `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
            which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
            which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
            `dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
            which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
            which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
            which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
            `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
            which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-10 16:09:05

问题是,problem只为同时是SendSync的类型实现IntoStreamingRequest

代码语言:javascript
运行
复制
impl<T> IntoStreamingRequest for T
where
    T: Stream + Send + Sync + 'static

BoxStream并非如此:

代码语言:javascript
运行
复制
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;

您应该复制它的定义并添加一个绑定的额外+ Sync,而不是使用BoxStream

代码语言:javascript
运行
复制
fn api_function() -> Pin<Box<dyn Stream<Item = Entry> + Send + Sync + 'static>> {
    Box::pin(stream! {
        let entries = vec!(
            Entry {key: String::from("value1")},
            Entry {key: String::from("value2")}
        );

        for entry in entries {
            yield entry;
        }
    })
}

而且,因为stream!()宏返回的流已经是Send + Sync,所以您的代码可以很好地编译。

PS:在以下位置删除不必要的类型提示:

代码语言:javascript
运行
复制
    let stream: BoxStream<'static, Entry> = api_function();
// should become:
    let stream = api_function(); // after the above change it's not BoxStream anymore!
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69134211

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档