我是Rust的新手,并且使用Tonic编写了一个简单的应用程序,该应用程序将在gRPC上传输一些值。这些值最初是作为BoxStream (Pin<Box<Stream>>
)从外部库中获取的,而requires的API需要一些实现Stream
的东西(当然Pin不需要)。
Tonic's streaming example使用ReceiverStream将mpsc频道转换为流,并派生出一个线程将值推入其中。这将要求流的生命周期为'static
,这不是我的实际实现的选项,因为我的流的生命周期与返回它的类相关联。
最好的方式是提供一些实现流的东西,我可以从我的Pin<Box<Stream>>
提供给Tonic
src/main.rs (这不会编译,因为BoxStream<'static,Entry>不实现IntoStreamingRequest)
use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};
struct Entry {
key: String,
}
fn main() {
// Create Request
let stream: BoxStream<'static, Entry> = api_function();
let request = stream.into_streaming_request();
// Send request
//let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
//let response = client.grpc_function(request).await?;
}
fn api_function() -> BoxStream<'static, Entry> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
Cargo.toml
[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"
提供的编译错误:
error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
--> src\main.rs:12:26
|
12 | let request = stream.into_streaming_request();
| ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
|
::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
|
408 | pub struct Pin<P> {
| -----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sync`
|
::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
|
27 | pub trait Stream {
| ----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sized`
| doesn't satisfy `_: Sync`
|
= note: the following trait bounds were not satisfied:
`Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
发布于 2021-09-10 16:09:05
问题是,problem只为同时是Send
和Sync
的类型实现IntoStreamingRequest
impl<T> IntoStreamingRequest for T
where
T: Stream + Send + Sync + 'static
但BoxStream
并非如此:
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
您应该复制它的定义并添加一个绑定的额外+ Sync
,而不是使用BoxStream
:
fn api_function() -> Pin<Box<dyn Stream<Item = Entry> + Send + Sync + 'static>> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
而且,因为stream!()
宏返回的流已经是Send + Sync
,所以您的代码可以很好地编译。
PS:在以下位置删除不必要的类型提示:
let stream: BoxStream<'static, Entry> = api_function();
// should become:
let stream = api_function(); // after the above change it's not BoxStream anymore!
https://stackoverflow.com/questions/69134211
复制相似问题