S-JIS[2006-04-15/2008-06-20] 変更履歴

プログラミング言語比較

マルチスレッド・排他制御

  • VBScript [2007-04-06]
  • VB.NET [2006-04-15/2006-07-24]
  • C言語(gcc, WIN32) [2005-01-16/2006-10-18]
  • Visual C++(MFC) [2005-01-16/2006-08-23]
  • Java [2005-01-16/2008-05-19]
  • C# [2006-03-04/2006-07-24]

スレッド

処理を並行に実行したい場合、1つの処理を1つのスレッド(と呼ばれる単位)に処理させるようにし、1プロセス内で複数のスレッドを並行に(同時に)実行させる。
という機構が最近のコンピューターでは用意されている。

これをマルチスレッドと呼ぶ。(対義語はシングルスレッド[2008-06-20]

スレッド作成方法


マルチスレッドプログラミング

マルチスレッド(複数スレッド)で並列に処理させる場合、同期や排他に気をつける必要がある。[2008-06-20]
同期:順序性を気にする場合。別スレッドの処理が終わるまで待つ必要があるとか。処理が終わったことをどうやって知る/知らせるか?
排他:ある変数を更新する際は、更新中に別スレッドも同時期に更新して変な値にならないよう、排他する(変更中に他スレッドが変更しないように止める)必要がある。

マルチスレッドで動いても大丈夫なことをマルチスレッドセーフ(MTセーフ・スレッドセーフ)と言う。
逆に安全でないことをMTアンセーフ(MT非セーフ)と言う。
安全とは、マルチスレッドで動作させても処理が異常終了したり値の更新が変になったりしないこと。

MTセーフなプログラムを作るには、基本的に以下の点に注意してコーディングする。

本気でMTセーフを追求する場合、「MTセーフ」と明記されている関数だけがMTセーフ。
どこかの有名な格言(?)に、「MTセーフになるように作った関数だけがMTセーフであり、たまたまMTセーフになるということはない」てな感じの言葉があった。
つまり、あるライブラリーのソースを見てMTセーフのように見えたとしても、その関数の仕様に「MTセーフである」と書かれていない以上、MTセーフとは見なせない。(偶然MTセーフになっているだけかもしれない。だからライブラリーのバージョンアップでMTセーフでなくなるかもしれない。仕様にMTセーフと書かれていない以上、そうなってもおかしくない)
逆にMTセーフなプログラムを作るには、それだけ注意を払えということでもある。

MTセーフなプログラムを作る場合、CPUレベル(低水準)の視点で考えなければならない。
CPU上の1ステップで実行されるものは不可分(他スレッドが割り込めない)なので排他する必要がない。
つまり高級言語のソース上で1ステップに見えるものであっても、実際に実行されるCPU上では複数ステップになるかもしれず、したがって排他をかける必要が生じることが多い。
排他をかけるとその分処理が重くなるのは当然だが、「それを嫌って排他しない」という人間にはマルチスレッドプログラミングをする資格は無い。
マルチスレッドプログラミングをミスると、不具合はタイミングの問題が多いので、すぐに発覚するとは限らない。 したがって「試してみたら上手くいった」という程度では通用しない。えてして再現性も低いので、デバッグも難しくなる。
また、排他しないプログラムは、単一のプロセッサーをタイムスライシングで擬似並列に実行するのと、実際に複数プロセッサーがあって本当に並列に実行されるのとでは、細かい動きが異なることがある。
よくある話は、テスト機(単一プロセッサー)で実行しても問題は生じないが、本番機(複数プロセッサー)で負荷をかけて実行すると不具合が出る、というもの。


排他にはそれなりに負荷がかかるので、それを軽減できるよう、目的に応じた色々な種類の排他制御が考案されている。

排他制御名? 説明
クリティカルセクション その範囲のプログラム(セクション)が、複数スレッドで同時に実行されない(1スレッドだけが実行できる)、という状態にする為の排他。
セマフォ 一定の個数までは実行可能で、それ以上になると実行不可、という状態にする為の排他。
読み書きロック 読み込み頻度が多いが書き込み頻度は少ない、という状況に有利なように作られている排他。
排他カウンター 他スレッドからの更新を気にせずに増減できるカウンター。(値を取り出して演算して格納するまでが「一息に」と表現される)

MTセーフな関数は、そのまま呼び出してよい
MTセーフでない関数を呼び出したい場合、基本的には排他して呼び出せば大丈夫なはず。

	1.排他開始
	2.MTセーフでない関数の呼び出し
	3.排他終了

こうすれば、その関数はシングルスレッドで順番に複数回呼ばれるときの動作と同じになる。
(その動作が「マルチスレッドの各スレッドが期待している動作」かどうか、というのは別問題だが…)


オブジェクト指向言語では、関数(メソッド)はクラスに属する。
クラスは静的な変数クラス変数:そのクラスの全インスタンスから共通)と、インスタンス毎に独立した変数インスタンス変数)を持つことが出来る。

クラスのインスタンスメソッドがMTセーフな場合は、同一インスタンスに対し複数スレッドからそのまま呼び出してよい。

処理順 スレッドA 共通のインスタンス変数 スレッドB 備考
1 buf = new Buffer 共通なインスタンスを生成
2 buf.print(123) → buf ← buf.append(456) appendメソッドがMTセーフであれば、問題なし
appendがMTセーフなBufferクラス(例1)
属性 変数・メソッド 説明
インスタンス変数 temp バイト配列
インスタンス変数 append(引数) tempに引数を追加する。(その際、tempに対して排他をかける)
インスタンス変数 get() tempをコピーした新しいバイト配列を作り、それを返す。(その際、tempに対して排他をかける)

MTセーフでない場合、インスタンスを別々に作れば安全かというと、そうとも限らない。

処理順 スレッドA 共通のインスタンス変数 スレッドB 備考
1 buf = new Buffer   buf = new Buffer 共通なインスタンスは無し
2 buf.append(123)   buf.append(456)  
個別にインスタンス化すれば安全なBufferクラス(例2)
属性 変数・メソッド 説明 クラスのイメージ
インスタンス変数 temp バイト配列 class Buffer {
  private byte[] temp;
  public void append(〜) {〜}
  public byte[] get() {〜}
}
インスタンス変数 append(引数) tempに引数を追加する。(排他しない)
インスタンス変数 get() tempをコピーした新しいバイト配列を作り、それを返す。(排他しない)
個別にインスタンス化しても安全でないBufferクラス(例3)
属性 変数・メソッド 説明 クラスのイメージ
クラス変数 temp バイト配列 class Buffer {
  private static byte[] temp;
  public void append(〜) {〜}
  public byte[] get() {〜}
}
インスタンス変数 append(引数) tempに引数を追加する。(排他しない)
インスタンス変数 get() tempをコピーした新しいバイト配列を作り、それを返す。(排他しない)

Bufferクラス(例3)では、内部で配列を保持する変数tempがクラス変数なので、Bufferの全インスタンスで共通。
したがって、排他をかけてやらないとtempに追加される順番がまちまちになってしまう。
というか、この例だと、両方のメソッドの値が追加されてしまうので、スレッド毎に独立した値にならない。

MTセーフでないメソッドを排他して呼び出せば「MTセーフ」という意味では安全になるが、意図した動作になるかどうかは別、ということ。

Javaで例えれば、Buffer1に当たるのがStringBufferBuffer2に当たるのがStringBuilder


 

 
        #include <process.h>      
スレッド生成   Dim thread As New Thread(New ThreadStart(AddressOf スレッドルーチン)) int rc;
pthread_attr_t attr;
rc=pthread_attr_init(&attr);
if(rc!=0) fprintf(stderr,"属性初期化 error");
rc=pthread_attr_setstacksize(&attr, 2*1024*1024);
if(rc!=0) fprintf(stderr,"スタックサイズ error");
    Thread thread=new スレッドクラス(); Thread thread=new Thread(new ThreadStart(スレッド関数));
スレッド開始   thread.Start() pthread_t tid;
rc=pthread_create(&tid, &attr, スレッド関数, NULL);
if(rc!=0) fprintf(stderr,"スレッド開始 error");
uintptr_t tid = _beginthread(スレッド関数, 0, NULL);
if(tid == -1L) perror("beginthread error");
AfxBeginThread(スレッド関数, NULL); thread.start(); thread.Start();
終了待ち           thread.isAlive() thread.IsAlive
  thread.Join() rc=pthread_join(tid, NULL);
if(rc!=0) fprintf(stderr,"スレッド合流 error");
if(WaitForSingleObject((HANDLE)tid, INFINITE)==WAIT_FAILED)
 fprintf(stderr, "wait error:%d\n", GetLastError());
  thread.join(); thread.Join();
始末     rc=pthread_attr_destroy(&attr);
if(rc!=0) fprintf(stderr,"属性破棄 error");
       
スレッド本体   Sub スレッドルーチン()
  Do While 条件
    〜
  Loop
End Sub
void* スレッド関数(void *arg)
{
  while(条件){
    〜
  }
  return NULL;
}
void __cdecl スレッド関数(void *arg)
{
  while(条件){
    〜
  }
  _endthread();
}
UINT AFX_CDECL スレッド関数(LPVOID pParam)
{
  while(条件){
    〜
  }
  return 0;
}
class スレッドクラス extends Thread {
  public void run() {
    while(条件){
      〜
    }
  }
}
void スレッド関数()
{
  while(条件)
  {
    〜
  }
}
    pthread_exit(NULL);   AfxEndThread(0);    
自分     pthread_t tid=pthread_self(); HANDLE h=GetCurrentThread(); CWinThread *p=AfxGetThread(); Thread t=Thread.currentThread(); Thread thread = Thread.CurrentThread;
スリープ WScript.sleep ミリ秒 Thread.Sleep(ミリ秒) sleep(秒);
usleep(マイクロ秒); usleep()は古いのでnanosleep()が推奨らしい

struct timespec ts={秒, ナノ秒};
int rc=nanosleep(&ts, NULL);
if(rc<0 && errno==EINTR){
 //割り込まれた
}

↓selectの本来の使い方とは違うので間違いと言われる方法
struct timeval tv={秒, マイクロ秒};
int rc=select(0, NULL, NULL, NULL, &tv);
if(rc<0 && errno==EINTR){
 //割り込まれた
}
Sleep(ミリ秒); ::Sleep(ミリ秒); try{
 Thread.sleep(ミリ秒);
}catch(InterruptedException e){
 //割り込まれた
}
Thread.Sleep(ミリ秒);
 
クリティカルセクション   Dim lk As New Object 'スレッド共有 pthread_mutex_t lk; /*スレッド共有*/
pthread_mutex_init(&lk, NULL);
CRITICAL_SECTION mutex; //スレッド共有
InitializeCriticalSection(&mutex);
CCriticalSection mutex; //スレッド共有 Object lk=new Object(); //スレッド共有 Object lk=new Object(); //スレッド共有
    rc=pthread_mutex_trylock(&lk);
if(rc==EBUSY) fprintf(stderr,"ロック中");
else if(rc!=0) fprintf(stderr,"ミューテックスロック試行 error");
if(!TryEnterCriticalSection(mutex))
 printf("ロック中"\n");

.NETのみ使用可能
CSingleLock lk(&mutex); //スレッドのローカル変数
if(lk.IsLocked()) TRACE0("ロック中");
   
  SyncLock lk rc=pthread_mutex_lock(&lk);
if(rc!=0) fprintf(stderr,"ミューテックスロック error");
EnterCriticalSection(&mutex); lk.Lock(); synchronized(lk){ lock (lk)
{
  End SyncLock rc=pthread_mutex_unlock(&lk);
if(rc!=0) fprintf(stderr,"ミューテックスアンロック error");
LeaveCriticalSection(&mutex); lk.Unlock(); } }
    rc=pthread_mutex_destroy(&lk);
if(rc!=0) fprintf(stderr,"ミューテックス破棄 error");
DeleteCriticalSection(&mutex);      
セマフォ     sem_t obj;
sem_t *sem=&obj;
rc=sem_init(sem, 0, 1);
if(rc!=0) perror("セマフォ初期化");
↓プロセス間セマフォ新規
rc=sem_unlink("/セマフォ名");
if(rc!=0){
  if(errno==ENOENT){
    /*No such file or directory*/
  }else perror("セマフォ削除");
}
sem_t *sem=sem_open("/セマフォ名", O_CREAT, 0600, 1);
if(sem==SEM_FAILED) perror("セマフォ作成");

↓プロセス間セマフォ既存
sem_t *sem=sem_open("/セマフォ名", 0);
if(sem==SEM_FAILED) perror("セマフォオープン");
なお、セマフォの実体は「/tmp/.セマフォ名」に在る
HANDLE sem = CreateSemaphore(NULL, 1, 1, NULL);
if(sem==NULL)
 fprintf(stderr, "CreateSemaphore error:%d\n", GetLastError());
CSemaphore sem; Semaphore sem = new Semaphore(1);  
    int val;
rc=sem_getvalue(sem,&val);
if(rc!=0) perror("セマフォ取得");
    int val = sem.drainPermits();  
    rc=sem_trywait(sem);
if(rc!=0){
  if(rc==EAGAIN) fprintf(stderr,"ロック中");
  else perror("セマフォロック試行");
}
  CSingleLock lk(&sem); //スレッドのローカル変数
if(lk.IsLocked()) TRACE0("ロック中");
if (!sem.tryAcquire()) System.out.println("ロック中");  
    rc=sem_wait(sem);
if(rc!=0) perror("セマフォロック");
if(WaitForSingleObject(sem, INFINITE)==WAIT_FAILED)
 fprintf(stderr, "sem_wait error:%d\n", GetLastError());
lk.Lock(); sem.acquire();  
    rc=sem_post(sem);
if(rc!=0) perror("セマフォアンロック");
if(!ReleaseSemaphore(sem, 1, NULL))
 fprintf(stderr, "sem_post error:%d\n", GetLastError());
lk.Unlock(); sem.release();  
    rc=sem_destroy(sem);
if(rc!=0) perror("セマフォ破棄");
rc=sem_close(sem);
if(rc!=0) perror("セマフォクローズ");
if(!CloseHandle(sem))
 fprintf(stderr, "sem_term error:%d\n", GetLastError());
     
読み書きロック   lk As New ReaderWriterLock() 'スレッド共有 pthread_rwlock_t lk; /*スレッド共有*/
rc=pthread_rwlock_init(&lk, NULL);
if(rc!=0) fprintf(stderr,"読み書きロック初期化 error");
    ReadWriteLock lk = new ReentrantReadWriteLock(); ReaderWriterLock lk = new ReaderWriterLock(); //スレッド共有
  Try
 lk.AcquireReaderLock(1) '読み
 lk.AcquireWriterLock(1) '書き
Catch e As ApplicationException
 'ロック中
rc=pthread_rwlock_tryrdlock(&lk); //読み
rc=pthread_rwlock_trywrlock(&lk); //書き

if(rc==EBUSY) fprintf(stderr,"ロック中");
else if(rc!=0) fprintf(stderr,"読み書きロック試行 error");
    if (!lk.readLock().tryLock()) {
 //ロック中
}
if (!lk.writeLock().tryLock()) {
 //ロック中
}
try
{
 lk.AcquireReaderLock(1); //読み
 lk.AcquireWriterLock(1); //書き
}
catch (ApplicationException
)
{
 //ロック中
}
  Try
 lk.AcquireReaderLock(Timeout.Infinite) '読み
 lk.AcquireWriterLock(Timeout.Infinite) '書き
rc=pthread_rwlock_rdlock(&lk); //読み
rc=pthread_rwlock_wrlock(&lk); //書き

if(rc!=0) fprintf(stderr,"読み書きロック error");
    try {
 lk.readLock().lock(); //読み
 lk.writeLock().lock(); //書き
try
{
 lk.AcquireReaderLock(Timeout.Infinite); //読み
 lk.AcquireWriterLock(Timeout.Infinite); //書き
  Finally
 lk.ReleaseReaderLock() '読み
 lk.ReleaseWriterLock() '書き
End Try
rc=pthread_rwlock_unlock(&lk); //読み書き
if(rc!=0) fprintf(stderr,"読み書きアンロック error");
    } finally {
 lk.readLock().unlock(); //読み
 lk.writeLock().unlock(); //書き
}
}
finally
{
 lk.ReleaseReaderLock(); //読み
 lk.ReleaseWriterLock(); //書き
}
    rc=pthread_rwlock_destroy(&lk);
if(rc!=0) fprintf(stderr,"読み書きロック破棄 error");
       
 
カウンター           AtomicBoolean
AtomicInteger
AtomicLong
 

言語比較全般へ戻る / 参考文献 / プログラム記号比較 / 技術メモへ戻る
メールの送信先:ひしだま