S-JIS[2026-03-16]

gRPC Rustストリーミング

Rust版gRPCのtonic 0.14のストリーミングのメモ。


概要

Rusttonic(tonic-prost)0.14で ストリーミング(rpcの引数や戻り値をstreamにする)の例。

引数を大きなデータ(バイト列)にするときは、適当なサイズのチャンクに分け、順次送信する。
戻り値が大きなデータ(バイト列)にするときも同様。


protoファイルの例

streamでアップロードする関数と、streamでダウンロードする関数を作ってみる。

プロジェクト/proto/example.proto

syntax = "proto3";

package example.grpc;

message UploadChank {
    bytes data = 1;
}

message UploadResponse {
    int64 size = 1;
}

message DownloadRequest {
    int64 dummy = 1;
}

message DownloadResponse {
    bytes data = 1;
}

service ExampleService {
    rpc upload(stream UploadChank) returns (UploadResponse);
    rpc download(DownloadRequest) returns (stream DownloadResponse);
}

gRPCサーバーの例

プロジェクト/Cargo.toml:

〜
[dependencies]
tonic = "0.14.5"
tonic-prost = "0.14.5"
prost = "0.14"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1.18"

プロジェクト/src/bin/server.rs

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, transport::Server};

use crate::example::grpc::{
    DownloadRequest, DownloadResponse, UploadChank, UploadResponse,
    example_service_server::{ExampleService, ExampleServiceServer},
};
#[allow(non_camel_case_types)]
pub mod example {
    pub mod grpc {
        tonic::include_proto!("example.grpc");
    }
}

↑tonic::include_proto!マクロの引数には、protoファイルに定義されたパッケージ名を指定する。

protoファイルのreturnsにstreamが入っている場合、「関数名Stream」というtypeが定義される。
関数名の先頭が小文字だとtype名の先頭も小文字になるが、これはRustのtypeの命名規則に反するので、ビルド時に警告が出る。
そこで、#allowでnon_camel_case_typesを指定しておく。

↓生成されたExampleServiceトレイトの実装を作成する。

struct ExampleServiceImpl {}

#[tonic::async_trait]
impl ExampleService for ExampleServiceImpl {
    async fn upload(
        &self,
        request: Request<tonic::Streaming<UploadChank>>,
    ) -> Result<Response<UploadResponse>, Status> {
        let mut stream = request.into_inner();

        let mut size = 0;
        while let Some(chunk) = stream.message().await? {
            size += chunk.data.len() as i64;
        }

        Ok(Response::new(UploadResponse { size }))
    }
    // 戻り値がstreamのときは、RPC関数名の後ろにStreamが付けられたtypeを実装する(親traitのオーバーライド)
    type downloadStream = ReceiverStream<Result<DownloadResponse, Status>>;

    async fn download(
        &self,
        _request: Request<DownloadRequest>,
    ) -> Result<Response<Self::downloadStream>, Status> {
        let (tx, rx) = mpsc::channel(4);

        tokio::spawn(async move {
            for i in 0..5 {
                let data = format!("chunk {}", i).into_bytes();
                tx.send(Ok(DownloadResponse { data })).await.unwrap();
            }
        });

        Ok(Response::new(ReceiverStream::new(rx)))
    }
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let port = 50051;
    let addr = format!("0.0.0.0:{port}").parse()?;
    println!("address = {}", addr);

    let service = ExampleServiceImpl {};

    Server::builder()
        .add_service(ExampleServiceServer::new(service))
        .serve(addr)
        .await?;

    Ok(())
}

サーバーの実行方法

cargo run --bin server

Ctrl+Cで止める。


gRPCクライアントの例

プロジェクト/src/bin/stream_client.rs

use tonic::Request;

use crate::example::grpc::{DownloadRequest, UploadChank, example_service_client::ExampleServiceClient};
#[allow(non_camel_case_types)]
pub mod example {
    pub mod grpc {
        tonic::include_proto!("example.grpc");
    }
}

↑tonic::include_proto!マクロの引数には、protoファイルに定義されたパッケージ名を指定する。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let port = 50051;
    let url = format!("http://localhost:{port}");
    println!("address = {}", url);

    let mut client = ExampleServiceClient::connect(url).await?;
    // upload
    let big_data: Vec<u8> = vec![0u8; 1024 * 1024]; // 1MB

    const CHUNK_SIZE: usize = 64 * 1024;

    let chunks: Vec<UploadChank> = big_data
        .chunks(CHUNK_SIZE)
        .map(|c| UploadChank { data: c.to_vec() })
        .collect();
    let stream = tokio_stream::iter(chunks);

    let response = client.upload(Request::new(stream)).await?;

    println!("uploaded size = {}", response.into_inner().size);
    // download
    let mut stream = client
        .download(Request::new(DownloadRequest { dummy: 0 }))
        .await?
        .into_inner();

    while let Some(chunk) = stream.message().await? {
        println!("received: {}", String::from_utf8(chunk.data)?);
    }

    Ok(())
}

クライアントの実行方法

cargo run --bin stream_client

gRPC Rustへ戻る / gRPCへ戻る / 技術メモへ戻る
メールの送信先:ひしだま