OSTRACISM CO.

ScalaとOCamlとF#とPythonと...

並行処理その2

 「すごいEr本」(「すごいErlangゆかいに学ぼう!」)をナナメ読みした。勿論これでErlangプログラムが書けるとは全く思えない。加えてErlangでプログラムを書きたいとは全く思わなかった。なんでだろう。言語としてのErlangの解説というよりは軽量プロセスを旨く扱える特殊なVMでの特殊な専用スクリプトの使い方の解説にしか見えないからか。適切な応用を用意できなかったのがダメなのかな。分散システムで実績があるそうで、まぁ、電話屋の作ったモノだから。

 Erlangの軽量プロセスをScalaに持ってきたのがActor。多分他の言語に持ってきてもActor。並行計算の数学的モデルのアクターモデルはウィキペディアによると70年代に考えだされたそうな。結構古くからある。Erlangはかなり徹底的にアクターモデルを実装してるそうだ。なので、アクターモデルと言語仕様が一体化してるErlangにはキーワードとしてのActorはない。

 マルチスレッドプログラミングってのは大抵用意されてる道具がスレッドの起動と停止とロック機構程度で、割と職人技。ScalaのActor(Akkaではない以前のもの)を見て、色々と割り切ることでマルチスレッドプログラミングがスッキリするとの感触はある。で、OCamlでのActorごっこ。OCamlのスレッドも用意されてるのはスレッドの起動と停止とロック機構程度と思って良い。ConditionもEventも結局使い勝手が悪い。ConditionやEventを実際に使ってる人はいるんだろうかって程度に情報がない。多分皆使えないと判断したんだろう。

 いや、使えないのは俺だ。その後.NETでMonitorクラスを使って簡易Actorを実装。ConditionがMonitorと同じものと気付く。俺の使い方が間違ってただけだ。ロックで囲ったところで待ちに入ると他のスレッドからはロック解除になるところがミソ。

 Actorの中にロック機構や同期を封じ込めて、Actorを使う側では意識させないようにするのがポイント。Actor本体となるスレッドの関数には再入はない。競合に関して扱いやすくなるのが利点。

 分散システムどころかマルチコアすら扱えず、高信頼性も耐障害性も無いなんちゃってActorをどうぞ。


module Actor = struct
  type ('msg, 'result) t = { 
    cue: 'msg Queue.t;
    mutable res: 'result option;
    mc: Mutex.t; 
    mr: Mutex.t;
    ms: Mutex.t;
    cc: Condition.t;
    cr: Condition.t;
    mutable working: bool;
    mutable th: Thread.t option }
  
  exception ERROR_BUG
  
  let create fn = 
    let act = {
      cue = Queue.create ();
      res = None;
      mc = Mutex.create (); 
      mr = Mutex.create (); 
      ms = Mutex.create (); 
      cc = Condition.create ();
      cr = Condition.create ();
      working = true;
      th = None }
    in
    let _ = act.th <- Some (Thread.create fn act) in
    act
  
  let _lock m fn = 
    let _ = Mutex.lock m in
    let r = fn () in
    let _ = Mutex.unlock m in
    let _ = Thread.yield () in
    r
  
  let _post act msg =
    _lock act.mc
      (fun () -> 
        let _ = Queue.add msg act.cue in
        if act.working = false then
          let _ = act.working <- true in
          Condition.signal act.cc
      )
  
  let post act msg =
    _post act (Some msg)
  
  let send act msg =
    (* 複数スレッドからのSendの競合がある *)
    _lock act.mc
      (fun () ->
        _lock act.mr
          (fun () ->
            let _ = post act msg in
            let _ = Condition.wait act.cr act.mr in
            match act.res with
            | Some result -> result
            | _ -> raise ERROR_BUG
          )
      )
  
  (* recieveの戻り値はメッセージのOption *)
  (* Noneが戻ったらループを抜けること *)
  let receive act =
    _lock act.mc
      (fun () ->
        let _ =
          if Queue.is_empty act.cue then
            let _ = act.working <- false in
            Condition.wait act.cc act.mc
        in
        Queue.take act.cue
      )
  
  let reply act result = 
    _lock act.mr
      (fun () ->
        let _ = act.res <- Some result in
        Condition.signal act.cr
      )
    
  let quit act =
    _post act None
  
  let join act =
    match act.th with
    | Some th -> Thread.join th
    | _ -> ()
end

 なるべくシンプルにしたかったのだけど、キュー用とsendのリプライ用に2つMutexと2つのConditionを使う。最初の版ではyieldを使ったポーリングの_wait関数という妥協の産物があった。Conditionの使い方を把握したので、_wait関数は削除。sendも単純になった。receiveの仕様は変更。メッセージのOptionが返る。Noneが返ったらループを抜けるようにするのは利用者の責任。


let main() =
  let act1 = Actor.create
    (fun act ->
      let num = ref 0 in
      let max = ref 0 in
      let min = ref 0 in
      let break = ref false in
      while !break = false do
        match Actor.receive act with
        | None -> break := true
        | Some msg -> (
          match msg with
          | ("+", n) ->
            let _ = print_string "+" in
            let _ = flush stdout in
            let _ = num := !num + n in
            if !num > !max then max := !num
          | ("-", n) ->
            let _ = print_string "-" in
            let _ = flush stdout in
            let _ = num := !num - n in
            if !num < !min then min := !num
          | ("num", _) -> Actor.reply act !num
          | ("max", _) -> Actor.reply act !max
          | ("min", _) -> Actor.reply act !min
          | (op, _) -> print_endline ("unknown " ^ op)
        )
      done
    )
  in
  let shoot op act = 
      for i = 1 to 10000 do 
          Actor.post act1 (op, 1) 
      done 
  in
  let act2 = Actor.create (shoot "+") in
  let act3 = Actor.create (shoot "-") in
  let _ = Actor.join act2 in
  let _ = Actor.join act3 in
  let _ = print_endline (string_of_int (Actor.send act1 ("num", 0))) in
  let _ = print_endline (string_of_int (Actor.send act1 ("max", 0))) in
  let _ = print_endline (string_of_int (Actor.send act1 ("min", 0))) in
  let _ = Actor.quit act1 in
  let _ = Actor.join act1 in
  ()

let _ = main()

 動作確認として適当な利用例。1つのActorに2つのActor(単なるスレッド)が同時にpostする。最終的な合計値は必ず0になるが、最大値と最小値は実行する毎に変化する。昔保育園の運動会で親がやらされたパネル入れ替えゲームを思い出す。シジフォスというか賽の河原。


OCaml actor_sample_O.ml

C# actor_sample_CS.cs


2015.12.22

2016.02.11


「インデックス」へ戻る


OSTRACISM CO.

OSTRA / Takeshi Yoneki