Rust版gRPCのtonic 0.14のストリーミングのメモ。
|
|
Rustのtonic(tonic-prost)0.14のストリーミング(rpcの引数や戻り値をstreamにする)の例。
引数を大きなデータ(バイト列)にするときは、適当なサイズのチャンクに分け、順次送信する。
戻り値を大きなデータ(バイト列)にするときも同様。
streamでアップロードする関数と、streamでダウンロードする関数を作ってみる。
syntax = "proto3";
package example.grpc;
message UploadChank {
bytes data = 1;
}
message UploadResponse {
int64 size = 1;
}
message DownloadRequest {
int64 dummy = 1;
}
message DownloadResponse {
bytes data = 1;
}
service ExampleService {
rpc upload(stream UploadChank) returns (UploadResponse);
rpc download(DownloadRequest) returns (stream DownloadResponse);
}
〜
[dependencies]
tonic = "0.14.5"
tonic-prost = "0.14.5"
prost = "0.14"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1.18"
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, transport::Server};
use crate::example::grpc::{
DownloadRequest, DownloadResponse, UploadChank, UploadResponse,
example_service_server::{ExampleService, ExampleServiceServer},
};
#[allow(non_camel_case_types)]
pub mod example {
pub mod grpc {
tonic::include_proto!("example.grpc");
}
}
↑tonic::include_proto!マクロの引数には、protoファイルに定義されたパッケージ名を指定する。
protoファイルのreturnsにstreamが入っている場合、「関数名Stream」というtypeが定義される。
関数名の先頭が小文字だとtype名の先頭も小文字になるが、これはRustのtypeの命名規則に反するので、ビルド時に警告が出る。
そこで、#allowでnon_camel_case_typesを指定しておく。
↓生成されたExampleServiceトレイトの実装を作成する。
struct ExampleServiceImpl {}
#[tonic::async_trait]
impl ExampleService for ExampleServiceImpl {
async fn upload(
&self,
request: Request<tonic::Streaming<UploadChank>>,
) -> Result<Response<UploadResponse>, Status> {
let mut stream = request.into_inner();
let mut size = 0;
while let Some(chunk) = stream.message().await? {
size += chunk.data.len() as i64;
}
Ok(Response::new(UploadResponse { size }))
}
// 戻り値がstreamのときは、RPC関数名の後ろにStreamが付けられたtypeを実装する(親traitのオーバーライド)
type downloadStream = ReceiverStream<Result<DownloadResponse, Status>>;
async fn download(
&self,
_request: Request<DownloadRequest>,
) -> Result<Response<Self::downloadStream>, Status> {
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
for i in 0..5 {
let data = format!("chunk {}", i).into_bytes();
tx.send(Ok(DownloadResponse { data })).await.unwrap();
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let port = 50051;
let addr = format!("0.0.0.0:{port}").parse()?;
println!("address = {}", addr);
let service = ExampleServiceImpl {};
Server::builder()
.add_service(ExampleServiceServer::new(service))
.serve(addr)
.await?;
Ok(())
}
cargo run --bin server
Ctrl+Cで止める。
use tonic::Request;
use crate::example::grpc::{DownloadRequest, UploadChank, example_service_client::ExampleServiceClient};
#[allow(non_camel_case_types)]
pub mod example {
pub mod grpc {
tonic::include_proto!("example.grpc");
}
}
↑tonic::include_proto!マクロの引数には、protoファイルに定義されたパッケージ名を指定する。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let port = 50051;
let url = format!("http://localhost:{port}");
println!("address = {}", url);
let mut client = ExampleServiceClient::connect(url).await?;
// upload
let big_data: Vec<u8> = vec![0u8; 1024 * 1024]; // 1MB
const CHUNK_SIZE: usize = 64 * 1024;
let chunks: Vec<UploadChank> = big_data
.chunks(CHUNK_SIZE)
.map(|c| UploadChank { data: c.to_vec() })
.collect();
let stream = tokio_stream::iter(chunks);
let response = client.upload(Request::new(stream)).await?;
println!("uploaded size = {}", response.into_inner().size);
// download
let mut stream = client
.download(Request::new(DownloadRequest { dummy: 0 }))
.await?
.into_inner();
while let Some(chunk) = stream.message().await? {
println!("received: {}", String::from_utf8(chunk.data)?);
}
Ok(())
}
cargo run --bin stream_client
メソッドの外側からチャンク(1回に送信するデータ)を渡す例。[2026-04-25]
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Request;
use crate::example::grpc::{DownloadRequest, UploadChank, example_service_client::ExampleServiceClient};
// upload
let big_data: Vec<u8> = vec![0u8; 1024 * 1024]; // 1MB
let (tx, rx) = mpsc::channel(4);
let stream = ReceiverStream::new(rx);
// ストリームを先に渡しておく
let request_handle = tokio::spawn(async move { client.upload(Request::new(stream)).await });
// チャンクを逐次送信
const CHUNK_SIZE: usize = 64 * 1024;
for chunk in big_data.chunks(CHUNK_SIZE) {
upload_chunk(&tx, chunk.to_vec()).await?;
}
drop(tx); // ストリーム終了を通知
let response = request_handle.await??;
println!("uploaded size = {}", response.into_inner().size);
async fn upload_chunk(
tx: &mpsc::Sender<UploadChank>,
data: Vec<u8>,
) -> Result<(), mpsc::error::SendError<UploadChank>> {
let chunk = UploadChank { data };
tx.send(chunk).await
}
drop(tx)を実行しないと、続きのデータがあるものと思われて、送信が終了しない。
送信を途中でキャンセルするには、キャンセル通知用のtx/rx(Sender/Receiver)を用意する。
// upload
let big_data: Vec<u8> = vec![0u8; 1024 * 1024]; // 1MB
let (tx, rx) = mpsc::channel(4);
let stream = ReceiverStream::new(rx);
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
// ストリームを先に渡しておく
let request_handle = tokio::spawn(async move {
tokio::select! {
result = client.upload(Request::new(stream)) => result,
_ = cancel_rx => Err(tonic::Status::cancelled("upload cancelled")),
}
});
// チャンクを逐次送信
const CHUNK_SIZE: usize = 64 * 1024;
for chunk in big_data.chunks(CHUNK_SIZE) {
upload_chunk(&tx, chunk.to_vec()).await?;
}
let _ = cancel_tx.send(()); // 送信キャンセルを通知
// drop(tx); // ストリーム終了を通知
let response_result = request_handle.await?;
match response_result {
Ok(response) => println!("uploaded size = {}", response.into_inner().size),
Err(status) if status.code() == tonic::Code::Cancelled => {
println!("upload was cancelled as expected. message: {}", status.message());
}
Err(status) => println!("upload failed with unexpected error: {:?}", status),
}
送信をキャンセルする際は、cancel_tx.send()を呼ぶ。
すると、request_handleの中でcancel_rxが呼ばれるので、Status::cancelledのエラーを返す。
これにより、サーバー側にキャンセルが通知される。
そして、サーバー側からの応答として、キャンセルのエラーが返ってくる。
なお、このエラーのメッセージは、Status::cancelledで渡したメッセージそのもの。(少なくとも、Rustのtonicで作ったgRPCサーバーの場合は)