Rustのpyo3でのPythonマルチプロセッシングについて。
|
|
Python(Python3.10付近のバージョン)で並列処理を行うには、マルチプロセッシングを使う。
つまり、新しくプロセスを起動して処理を分岐させる。
マルチプロセッシングで分岐するプロセスにオブジェクトを渡すには、そのオブジェクト(のクラス)がpickleによるシリアライズ/デシリアライズに対応する必要がある。
PyO3でMyClassというPythonクラスを用意し、そのオブジェクトをマルチプロセッシングの分岐先プロセスに渡すことを考えてみる。
pyo3 0.27のmultiprocessing-exampleというプロジェクト(モジュール名はmultiprocessing_example)の例。
まず、マルチプロセッシングを使用するPythonプログラムは以下のような感じ。
(MyClassオブジェクトを分岐先プロセスに渡す)
from multiprocessing import Pool import multiprocessing_example as example def main(): my_object = example.MyClass() my_object.name = "Multiprocessing Example" my_object.value = 42 print(f"Main process created: {my_object}") # プロセスを分岐 with Pool(4) as pool: pool.map(worker, [my_object] * 4) def worker(my_object): print(f"Worker received: {my_object}") if __name__ == "__main__": main()
MyClassが単なるPythonクラスの場合(pickleに対応していない場合)、上記のサンプルを実行すると以下のようなエラーが発生する。
multiprocessing-example> uv run python examples/example.py
File "C:\Users\user\AppData\Roaming\uv\python\cpython-3.10.19-windows-x86_64-none\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Users\user\AppData\Roaming\uv\python\cpython-3.10.19-windows-x86_64-none\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'multiprocessing_example.MyClass' object
MyClassをpickleに対応する例。
use pyo3::{IntoPyObjectExt, prelude::*, types::PyTuple};
#[pyclass(module = "multiprocessing_example")]
pub struct MyClass {
#[pyo3(get, set)]
pub name: Option<String>,
#[pyo3(get, set)]
pub value: Option<i32>,
}
pickleに対応するには、#pyclass属性にきちんとモジュール名を指定する必要がある。
(モジュール名を付けないと「builtins.MyClass」として扱われ、そんなクラスは無いというエラーになってしまう)
#[pymethods]
impl MyClass {
#[new]
pub fn new() -> Self {
MyClass {
name: None,
value: None,
}
}
pub fn __repr__(&self) -> String {
format!(
"MyClass(name: {}, value: {})",
self.name.as_deref().unwrap_or("None"),
self.value.map_or("None".to_string(), |v| v.to_string())
)
}
pickleから呼ばれるので、コンストラクター(#new属性のメソッド)を用意する。
(ちなみに、__repr__メソッドはデバッグ出力用で、pickleとは関係ない)
pub fn __reduce__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let cls = py.get_type::<MyClass>();
let args = PyTuple::empty(py);
let state = self.__getstate__();
(cls, args, state).into_bound_py_any(py)
}
pub fn __getstate__(&self) -> (Option<String>, Option<i32>) {
(self.name.clone(), self.value)
}
pub fn __setstate__(&mut self, state: (Option<String>, Option<i32>)) {
self.name = state.0;
self.value = state.1;
}
}
マルチプロセッシングでオブジェクトを分岐先プロセスに渡す際に、分岐元でpickleによってシリアライズされて、分岐先でpickleによってデシリアライズされる。
シリアライズ時に__reduce__メソッドが呼ばれる。
このメソッドからは、デシリアライズの際に呼ばれるコンストラクター(クラスcls)とその引数(args)、および設定値(state)を返す。
コンストラクターが引数無しの場合は、引数(args)は空のタプルになる。
__reduce__メソッドの戻り値の型は「(Bound<'py, PyType>, Bound<'py,
PyTuple>, (Option<String>, Option<i32>))」でもいいのだが、(特にstate部分が)ちょっと長いので、PyAnyにしておくのが楽そう。
デシリアライズ時には、まずコンストラクターが呼ばれてオブジェクトが生成され、次にそのオブジェクトに対して__setstate__メソッドが呼ばれる。
つまり、__reduce__メソッドの戻り値のタプルの第3要素(__getstate__メソッドの戻り値)が__setstate__メソッドに渡される。
受け渡しデータの形式は両者で統一されていれば何でも良い。
(上記の例はタプルで受け渡したが、dictでもバイト列でも構わない)
serde-pickleクレートを使うと、pickle用の__getstate__, __setstate__メソッドの実装をserde-pickleに移譲できる。
(Pythonのマルチプロセッシングがpickleを利用していると言っても、__getstate__, __setstate__メソッド間の受け渡しデータの形式をpickleにする必要性は無い。変換・復元できればserde-pickleでなくても構わない。とはいえ、serde-pickleの使用方法は簡単なので便利)
〜
[dependencies]
pyo3 = "0.27.0"
serde = { version="1.0.228", features=["derive"]}
serde-pickle = "1.2.0"
serde-pickleはserdeのSerialize/Deserializeを利用するので、serdeも追加する。
また、Serialize/Deserializeを使うためにserdeのfeaturesにderiveを指定しておく必要がある。
use pyo3::{
exceptions::PyRuntimeError,
prelude::*,
types::{PyTuple, PyType},
};
use serde::{Deserialize, Serialize};
#[pyclass(module = "multiprocessing_example")]
#[derive(Serialize, Deserialize)]
pub struct MyClass {
#[pyo3(get, set)]
pub name: Option<String>,
#[pyo3(get, set)]
pub value: Option<i32>,
}
対象クラス(struct)の#derive属性でSerialize, Deserializeを指定する。
なお、フィールドに他の構造体や列挙型を含む場合は、それらにもSerialize, Deserializeを指定されている必要がある。
#[pymethods]
impl MyClass {
〜
pub fn __reduce__<'py>(
&self,
py: Python<'py>,
) -> PyResult<(Bound<'py, PyType>, Bound<'py, PyTuple>, Vec<u8>)> {
let cls = py.get_type::<MyClass>();
let args = PyTuple::empty(py);
let state = self.__getstate__()?;
Ok((cls, args, state))
}
pub fn __getstate__(&self) -> PyResult<Vec<u8>> {
serde_pickle::to_vec(self, Default::default())
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
pub fn __setstate__(&mut self, state: &[u8]) -> PyResult<()> {
let state: MyClass = serde_pickle::from_slice(state, Default::default())
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
*self = state;
Ok(())
}
}
__getstate__, __setstate__メソッドでserde_pickleを使う。
この際、対象クラスがserdeのSerialize, Deserializeを実装している必要がある。
Pythonのpickleを使ってシリアライズ/デシリアライズを呼ぶ例。
import multiprocessing_example as example import pickle def main(): my_object = example.MyClass() my_object.name = "Pickle Example" my_object.value = 42 print(f"Main process created: {my_object}") dump = pickle.dumps(my_object) load = pickle.loads(dump) print(f"Unpickled object: {load}") if __name__ == "__main__": main()
multiprocessing-example> uv run python examples/pickle-example.py
PyO3の列挙型をPythonのpickleでシリアライズ/デシリアライズする例。[2026-03-25]
クラスの場合はコンストラクターがあるので__reduce__メソッドでコンストラクター(クラス)を返すが、
列挙型の場合はコンストラクターが無いので、変換用関数を返すようにする。
use serde::{Deserialize, Serialize};
#[pyclass(module = "multiprocessing_example")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum OptionType {
OptionA = 1,
OptionB = 2,
}
#derive属性にSerialize,
Deserializeを加えるのは、この列挙型をフィールドに保持している構造体をserde-pickleでシリアライズ/デシリアライズするため。
この列挙型自身の__reduce__メソッドには関係ない。
#[pymethods]
impl OptionType {
〜
pub fn __reduce__<'py>(&self, py: Python<'py>) -> PyResult<(Bound<'py, PyAny>, (i32,))> {
let callable = py.get_type::<OptionType>().getattr("_from_value")?;
let args = (*self as i32,);
Ok((callable, args))
}
#[staticmethod]
pub fn _from_value(value: i32) -> PyResult<OptionType> {
match value {
1 => Ok(OptionType::OptionA),
2 => Ok(OptionType::OptionB),
_ => Err(PyRuntimeError::new_err("Invalid OptionType value")),
}
}
}
単純な列挙型は「*self」でi32等の数値に変換できるので、シリアライズした値としてはそれを使う。
デシリアライズ(復元)するために、専用の関数(上記の_from_value)を用意する。
__reduce__メソッドの戻り値であるcallableには復元用の関数を指定し、argsにはその関数の引数を指定する。
callableはPythonから呼べる関数オブジェクトである必要があるため、py.get_type().getattr()でPythonとしての関数オブジェクトを取得する。