Pythonで標準Queueを使わずdequeとConditionで高速キューを実装する技術

Pythonでマルチスレッド処理を行う際、通常は標準の queue.Queue を使用します。しかし、より細かい制御やパフォーマンスチューニングを求めて、collections.dequethreading.Condition を組み合わせた「自作キュー」の実装に挑むケースがあります。

この記事では、上級者が必ず押さえておくべき「Condition(条件変数)の正しい使い方」と、よくある疑問について解説します。「なぜ if ではなく while なのか?」「通知のすれ違いは起きないのか?」といった、マルチスレッド特有の落とし穴を完全に理解しましょう。

【実装の定石】deque + Condition の基本形

まずは、スレッド間でデータをやり取りする「Producer-Consumerパターン」の正しい実装コードです。これ以外の書き方はバグの温床になります。

from collections import deque
import threading

d = deque()
cv = threading.Condition()

# Consumer(取り出し側)
def consumer():
    with cv:
        while not d:      # 【重要】ifではなくwhileを使う
            cv.wait()
        item = d.popleft()
    # ここでitemを使った処理を行う
#;

# Producer(挿入側)
def producer(item):
    with cv:
        d.append(item)
        cv.notify()       # 1つ入れたので1人を起こす
#;

疑問1:なぜ空判定を if ではなく while にするのか?

やりがちなのが、while not d:if len(d) == 0: と書いてしまうことです。

A. 「虚偽の目覚め(Spurious Wakeup)」対策のため

マルチスレッドの世界には「誰も起こしていないのに、スレッドが勝手に目を覚ます」という現象が存在します(OSやハードウェアの仕様による)。

  • if の場合: 勝手に目が覚めた時、データがないのに popleft() を実行してしまい、IndexError でクラッシュする
  • while の場合: 目が覚めても「本当にデータある?」と再確認する。なければ再び wait() で眠りにつくため安全

また、複数のスレッドが同時に起きた際、他のスレッドにデータを横取りされた場合でも、while なら再確認して待機に戻れるため、堅牢性が高まります。

疑問2:notify()notify_all() はどう使い分ける?

A. 基本は notify()、終了時は notify_all()

  • notify()
    • データが1個入った時などに使う
    • 待機中のスレッドから1つだけを選んで起す。データが1個しかないのに全員起こすと、CPUリソースの無駄(Thundering Herd問題)になるため、通常はこちらが効率的
  • notify_all()
    • 「プログラム終了」や「設定変更」など、全員に知らせる必要がある時に使います。
    • 終了フラグを立てて全員を起こし、ループを抜けさせる処理などで必須となります。

疑問3:誰も待っていない時に notify() したらエラーになる?

A. なりません(問題なし)

wait() しているスレッドがいない時に notify() を実行しても、その通知は単に消滅するだけです。

「通知が消えたら、後から来たスレッドが困るのでは?」と思うかもしれませんが、Consumer側は必ず「寝る前に中身を確認(while not d)」します。 通知を受け取れなくても、データさえ入っていれば wait() をスキップして処理を進めるため、ロジックは破綻しません。

疑問4:超高速(隙間なし)で実行すると遅延するのはなぜか?

キューへの挿入が極めて頻繁に行われる場合、Consumer側がロックを取得できず処理が進まないことがあります。

A. ロックの奪い合い(Lock Contention)とオーバーヘッド

Python(CPython)の実装上、ロックを手放した直後のスレッドが、待機中のスレッドよりも優先して再度ロックを獲得してしまう傾向があります。Producerが高速すぎると、Consumerがロックを取る隙間がなくなり、飢餓状態(Starvation)に陥ります。

対策:

  1. 標準の queue.Queue を使う: 内部がC言語で最適化されており、この問題が起きにくい
  2. バッチ処理にする: 毎回ロックせず、リストにまとめてから一気に処理する
  3. time.sleep(0) Consumer側で処理後に明示的にスレッドを譲る
    def consumer():
        with cv:
            while not d:      # 【重要】ifではなくwhileを使う
                cv.wait()
            item = d.popleft()
        # ここでitemを使った処理を行う
        time.sleep(0)  # 他スレッドに実行権を譲る
    #;

疑問5:毎秒100回程度の処理は問題ないか?

A. 全く問題ありません(余裕です)

毎秒100回(10msに1回)という頻度は、コンピュータにとっては非常に低負荷です。 Pythonのロック取得・解放や通知にかかる時間はマイクロ秒単位です。10msあれば99%以上の時間はロックが解放されている状態となるため、スレッド間の競合も起きず、スムーズに動作します。

「標準Queue」か「自作deque」かで迷う必要はありません。どちらの実装でも十分なパフォーマンスが出ます。

まとめ

この記事の要点をまとめます。

  1. Condition を使う時は、必ず while ループ で wait() を囲む(鉄の掟)
  2. 通常のデータ追加は notify()、終了通知は notify_all()
  3. 通知は「空振り」しても問題ない構造になっている
  4. 毎秒数千回レベルの超高負荷でなければ、Pythonのスレッド同期によるオーバーヘッドは無視できる
このエントリーをはてなブックマークに追加
にほんブログ村 IT技術ブログへ

コメント

メールアドレスが公開されることはありません。 が付いている欄は必須項目です