首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

文盘 Rust -- tonic-Rust grpc 初体验

gRPC 是开发中常用的开源高性能远程过程调用(RPC)框架,tonic 是基于 HTTP/2 的 gRPC 实现,专注于高性能、互操作性和灵活性。该库的创建是为了对 async/await 提供一流的支持,并充当用 Rust 编写的生产系统的核心构建块。今天我们聊聊通过使用 tonic 调用 grpc 的的具体过程。

工程规划

rpc 程序一般包含 server 端和 client 端,为了方便我们把两个程序打包到一个工程里面 新建 tonic_sample 工程

cargo new tonic_sample

Cargo.toml 如下

[package]name = "tonic_sample"version = "0.1.0"edition = "2021"

[[bin]] # Bin to run the gRPC servername = "stream-server"path = "src/stream_server.rs"

[[bin]] # Bin to run the gRPC clientname = "stream-client"path = "src/stream_client.rs"

[dependencies]tokio.workspace = truetonic = "0.9"tonic-reflection = "0.9.2"prost = "0.11"tokio-stream = "0.1"async-stream = "0.2"serde = { version = "1.0", features = ["derive"] }serde_json = "1.0"rand = "0.7"h2 = { version = "0.3" }anyhow = "1.0.75"futures-util = "0.3.28"

[build-dependencies]tonic-build = "0.9"

tonic 的示例代码还是比较齐全的,本次我们参考 tonic 的 streaming example:https://github.com/hyperium/tonic/tree/master/examples/src/streaming。

首先编写 proto 文件,用来描述报文。proto/echo.proto

syntax = "proto3";

package stream;

// EchoRequest is the request for echo.message EchoRequest { string message = 1; }

// EchoResponse is the response for echo.message EchoResponse { string message = 1; }

// Echo is the echo service.service Echo { // UnaryEcho is unary echo. rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} // ServerStreamingEcho is server side streaming. rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} // ClientStreamingEcho is client side streaming. rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} // BidirectionalStreamingEcho is bidi streaming. rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}}

文件并不复杂,只有两个 message 一个请求一个返回,之所以选择这个示例是因为该示例包含了 rpc 中的流式处理,包括了 server 流、client 流以及双向流的操作。 编辑 build.rs 文件

use std::{env, path::PathBuf};

fn main() -> Result { tonic_build::compile_protos("proto/echo.proto")?; Ok(())}

该文件用来通过 tonic-build 生成 grpc 的 rust 基础代码

完成上述工作后就可以构建 server 和 client 代码了

stream_server.rs

pub mod pb { tonic::include_proto!("stream");}

use anyhow::Result;use futures_util::FutureExt;use pb::{EchoRequest, EchoResponse};use std::{ error::Error, io::ErrorKind, net::{SocketAddr, ToSocketAddrs}, pin::Pin, thread, time::Duration,};use tokio::{ net::TcpListener, sync::{ mpsc, oneshot::{self, Receiver, Sender}, Mutex, }, task::{self, JoinHandle},};use tokio_stream::{ wrappers::{ReceiverStream, TcpListenerStream}, Stream, StreamExt,};use tonic::{transport::Server, Request, Response, Status, Streaming};type EchoResult = Result;type ResponseStream = Pin;

fn match_for_io_error(err_status: &Status) -> Option { let mut err: &(dyn Error + 'static) = err_status;

loop { if let Some(io_err) = err.downcast_ref::() { return Some(io_err); }

// h2::Error do not expose std::io::Error with `source()` // https://github.com/hyperium/h2/pull/462 if let Some(h2_err) = err.downcast_ref::() { if let Some(io_err) = h2_err.get_io() { return Some(io_err); } }

err = match err.source() { Some(err) => err, None => return None, }; }}

#[derive(Debug)]pub struct EchoServer {}

#[tonic::async_trait]impl pb::echo_server::Echo for EchoServer { async fn unary_echo(&self, req: Request) -> EchoResult { let req_str = req.into_inner().message;

let response = EchoResponse { message: req_str }; Ok(Response::new(response)) }

type ServerStreamingEchoStream = ResponseStream;

async fn server_streaming_echo( &self, req: Request, ) -> EchoResult { println!("EchoServer::server_streaming_echo"); println!("\tclient connected from: {:?}", req.remote_addr());

// creating infinite stream with requested message let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message, }); let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));

let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::::Ok(item)).await { Ok(_) => { // item (server response) was queued to be send to client } Err(_item) => { // output_stream was build from rx and both are dropped break; } } } println!("\tclient disconnected"); });

let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ServerStreamingEchoStream )) }

async fn client_streaming_echo( &self, _: Request, ) -> EchoResult { Err(Status::unimplemented("not implemented")) }

type BidirectionalStreamingEchoStream = ResponseStream;

async fn bidirectional_streaming_echo( &self, req: Request, ) -> EchoResult { println!("EchoServer::bidirectional_streaming_echo");

let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128);

tokio::spawn(async move { while let Some(result) = in_stream.next().await { match result { Ok(v) => tx .send(Ok(EchoResponse { message: v.message })) .await .expect("working rx"), Err(err) => { if let Some(io_err) = match_for_io_error(&err) { if io_err.kind() == ErrorKind::BrokenPipe { eprintln!("\tclient disconnected: broken pipe"); break; } }

match tx.send(Err(err)).await { Ok(_) => (), Err(_err) => break, // response was droped } } } } println!("\tstream ended"); });

// echo just write the same data that was received let out_stream = ReceiverStream::new(rx);

Ok(Response::new( Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream )) }}

#[tokio::main]async fn main() -> Result { // 基础server let server = EchoServer {}; Server::builder() .add_service(pb::echo_server::EchoServer::new(server)) .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap()) .await .unwrap(); Ok(())}

server 端的代码还是比较清晰的,首先通过 tonic::include_proto! 宏引入 grpc 定义,参数是 proto 文件中定义的 package 。我们重点说说 server_streaming_echo function 。这个 function 的处理流程明白了,其他的流式处理大同小异。首先 通过 std::iter::repeat function 定义一个迭代器;然后构建 tokio_stream 在本示例中 每 200 毫秒产生一个 repeat;最后构建一个 channel ,tx 用来发送从 stream 中获取的内容,rx 封装到 response 中返回。最后 main 函数 拉起服务。

client 代码如下

pub mod pb { tonic::include_proto!("stream");}

use std::time::Duration;use tokio_stream::{Stream, StreamExt};use tonic::transport::Channel;

use pb::{echo_client::EchoClient, EchoRequest};

fn echo_requests_iter() -> impl Stream { tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { message: format!("msg {:02}", i), })}

async fn unary_echo(client: &mut EchoClient, num: usize) { for i in 0..num { let req = tonic::Request::new(EchoRequest { message: "msg".to_string() + &i.to_string(), }); let resp = client.unary_echo(req).await.unwrap(); println!("resp:{}", resp.into_inner().message); }}

async fn streaming_echo(client: &mut EchoClient, num: usize) { let stream = client .server_streaming_echo(EchoRequest { message: "foo".into(), }) .await .unwrap() .into_inner();

// stream is infinite - take just 5 elements and then disconnect let mut stream = stream.take(num); while let Some(item) = stream.next().await { println!("\treceived: {}", item.unwrap().message); } // stream is droped here and the disconnect info is send to server}

async fn bidirectional_streaming_echo(client: &mut EchoClient, num: usize) { let in_stream = echo_requests_iter().take(num);

let response = client .bidirectional_streaming_echo(in_stream) .await .unwrap();

let mut resp_stream = response.into_inner();

while let Some(received) = resp_stream.next().await { let received = received.unwrap(); println!("\treceived message: `{}`", received.message); }}

async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient, dur: Duration) { let in_stream = echo_requests_iter().throttle(dur);

let response = client .bidirectional_streaming_echo(in_stream) .await .unwrap();

let mut resp_stream = response.into_inner();

while let Some(received) = resp_stream.next().await { let received = received.unwrap(); println!("\treceived message: `{}`", received.message); }}

#[tokio::main]async fn main() -> Result { let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap(); println!("Unary echo:"); unary_echo(&mut client, 10).await; tokio::time::sleep(Duration::from_secs(1)).await;

println!("Streaming echo:"); streaming_echo(&mut client, 5).await; tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions

// Echo stream that sends 17 requests then graceful end that connection println!("\r\nBidirectional stream echo:"); bidirectional_streaming_echo(&mut client, 17).await;

// Echo stream that sends up to `usize::MAX` requests. One request each 2s. // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from // graceful client disconnection (above example) on the server side. println!("\r\nBidirectional stream echo (kill client with CTLR+C):"); bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;

Ok(())}

测试一下,分别运行 server 和 client

cargo run --bin stream-servercargo run --bin stream-client

在开发中,我们通常不会再 client 和 server 都开发好的情况下才开始测试。通常在开发 server 端的时候采用 grpcurl 工具进行测试工作

grpcurl -import-path ./proto -proto echo.proto listgrpcurl -import-path ./proto -proto echo.proto describe stream.Echogrpcurl -plaintext -import-path ./proto -proto echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho

此时,如果我们不指定 -import-path 参数,执行如下命令

grpcurl -plaintext 127.0.0.1:50051 list

会出现如下报错信息

Failed to list services: server does not support the reflection API

让服务端程序支持 reflection API

首先改造 build.rs

use std::{env, path::PathBuf};

fn main() -> Result { let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); tonic_build::configure() .file_descriptor_set_path(out_dir.join("stream_descriptor.bin")) .compile(&["proto/echo.proto"], &["proto"]) .unwrap(); Ok(())}

file_descriptor_set_path 生成一个文件,其中包含为协议缓冲模块编码的 prost_types::FileDescriptorSet 文件。这是实现 gRPC 服务器反射所必需的。

接下来改造一下 stream-server.rs,涉及两处更改。

新增 STREAM_DESCRIPTOR_SET 常量

pub mod pb { tonic::include_proto!("stream"); pub const STREAM_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("stream_descriptor");}

修改 main 函数

#[tokio::main]async fn main() -> Result { // 基础server // let server = EchoServer {}; // Server::builder() // .add_service(pb::echo_server::EchoServer::new(server)) // .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap()) // .await // .unwrap();

// tonic_reflection let service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET) .with_service_name("stream.Echo") .build() .unwrap();

let addr = "0.0.0.0:50051".parse().unwrap();

let server = EchoServer {};

Server::builder() .add_service(service) .add_service(pb::echo_server::EchoServer::new(server)) .serve(addr) .await?; Ok(())}

register_encoded_file_descriptor_set 将包含编码的 prost_types::FileDescriptorSet 的 byte slice 注册到 gRPC Reflection 服务生成器注册。

再次测试

grpcurl -plaintext 127.0.0.1:50051 listgrpcurl -plaintext 127.0.0.1:50051 describe stream.Echo

返回正确结果。

以上完整代码地址:https://github.com/jiashiwen/wenpanrust/tree/main/tonic_sample

-end-

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OVPXPEuT6_wpC_rzugN0Yhpg0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券