Rust版gRPCのtonic 0.14のストリーミングのメモ。
Rustのtonic(tonic-prost)0.14で ストリーミング(rpcの引数や戻り値をstreamにする)の例。
引数を大きなデータ(バイト列)にするときは、適当なサイズのチャンクに分け、順次送信する。
戻り値が大きなデータ(バイト列)にするときも同様。
streamでアップロードする関数と、streamでダウンロードする関数を作ってみる。
syntax = "proto3";
package example.grpc;
message UploadChank {
bytes data = 1;
}
message UploadResponse {
int64 size = 1;
}
message DownloadRequest {
int64 dummy = 1;
}
message DownloadResponse {
bytes data = 1;
}
service ExampleService {
rpc upload(stream UploadChank) returns (UploadResponse);
rpc download(DownloadRequest) returns (stream DownloadResponse);
}
〜
[dependencies]
tonic = "0.14.5"
tonic-prost = "0.14.5"
prost = "0.14"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1.18"
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, transport::Server};
use crate::example::grpc::{
DownloadRequest, DownloadResponse, UploadChank, UploadResponse,
example_service_server::{ExampleService, ExampleServiceServer},
};
#[allow(non_camel_case_types)]
pub mod example {
pub mod grpc {
tonic::include_proto!("example.grpc");
}
}
↑tonic::include_proto!マクロの引数には、protoファイルに定義されたパッケージ名を指定する。
protoファイルのreturnsにstreamが入っている場合、「関数名Stream」というtypeが定義される。
関数名の先頭が小文字だとtype名の先頭も小文字になるが、これはRustのtypeの命名規則に反するので、ビルド時に警告が出る。
そこで、#allowでnon_camel_case_typesを指定しておく。
↓生成されたExampleServiceトレイトの実装を作成する。
struct ExampleServiceImpl {}
#[tonic::async_trait]
impl ExampleService for ExampleServiceImpl {
async fn upload(
&self,
request: Request<tonic::Streaming<UploadChank>>,
) -> Result<Response<UploadResponse>, Status> {
let mut stream = request.into_inner();
let mut size = 0;
while let Some(chunk) = stream.message().await? {
size += chunk.data.len() as i64;
}
Ok(Response::new(UploadResponse { size }))
}
// 戻り値がstreamのときは、RPC関数名の後ろにStreamが付けられたtypeを実装する(親traitのオーバーライド)
type downloadStream = ReceiverStream<Result<DownloadResponse, Status>>;
async fn download(
&self,
_request: Request<DownloadRequest>,
) -> Result<Response<Self::downloadStream>, Status> {
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
for i in 0..5 {
let data = format!("chunk {}", i).into_bytes();
tx.send(Ok(DownloadResponse { data })).await.unwrap();
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let port = 50051;
let addr = format!("0.0.0.0:{port}").parse()?;
println!("address = {}", addr);
let service = ExampleServiceImpl {};
Server::builder()
.add_service(ExampleServiceServer::new(service))
.serve(addr)
.await?;
Ok(())
}
cargo run --bin server
Ctrl+Cで止める。
use tonic::Request;
use crate::example::grpc::{DownloadRequest, UploadChank, example_service_client::ExampleServiceClient};
#[allow(non_camel_case_types)]
pub mod example {
pub mod grpc {
tonic::include_proto!("example.grpc");
}
}
↑tonic::include_proto!マクロの引数には、protoファイルに定義されたパッケージ名を指定する。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let port = 50051;
let url = format!("http://localhost:{port}");
println!("address = {}", url);
let mut client = ExampleServiceClient::connect(url).await?;
// upload
let big_data: Vec<u8> = vec![0u8; 1024 * 1024]; // 1MB
const CHUNK_SIZE: usize = 64 * 1024;
let chunks: Vec<UploadChank> = big_data
.chunks(CHUNK_SIZE)
.map(|c| UploadChank { data: c.to_vec() })
.collect();
let stream = tokio_stream::iter(chunks);
let response = client.upload(Request::new(stream)).await?;
println!("uploaded size = {}", response.into_inner().size);
// download
let mut stream = client
.download(Request::new(DownloadRequest { dummy: 0 }))
.await?
.into_inner();
while let Some(chunk) = stream.message().await? {
println!("received: {}", String::from_utf8(chunk.data)?);
}
Ok(())
}
cargo run --bin stream_client