[Concurrent Haskell] 無限リストが止まらない

[Concurrent Haskell] 無限リストが止まらない

HaskellのControl.Concurrent.Chanを使って、簡単な並行処理プログラムを作ろうとして試行錯誤したのでメモ。

Control.Concurrent.ChanはHaskellで並行処理をするときに使うことができるFIFOキューライブラリ。複数のスレッドからキューにデータを流し込んで、別のスレッドがそのデータを読み取って処理をしたりすることができる。

Control.Concurrent.Chanを使えば、スレッド間の排他処理とかを全く考えずに済むので、簡単に並行処理プログラムを書けるかと思っていたけど、処理終了時の扱いに悩んだので、ちょこちょこと試行錯誤してみた。

詳細は続きで。

簡単な並行動作サンプル

まずは簡単なサンプル。

import Control.Concurrent

main = do
    chan <- newChan                     -- creating a new channel
    forkIO $ producer chan "hello"      -- starting producer thread
    products <- getChanContents chan    -- getting contents as an infinite list
    consumer products                   -- calling consumer with the list

producer chan x = do writeChan chan x
                     writeChan chan x
                     writeChan chan x

consumer ps = mapM_ print ps

↑のコードは、与えられた文字列を単純に3回チャネルに書き込むproducerと、チャネルに書き込まれた文字列を順番に出力するconsumerが並行して動作するだけの単純なプログラム。

これはこれで期待した通りに動いてくれるんだけど、残念ながらこのプログラムは終了しない。なぜなら、consumerがチャネルに新たな値が書き込まれるのを待ち続けてしまうから。

どうやら、Chanモジュールには「チャネルを閉じる」というような処理をする仕組みが備わっていないようなので、自力で実装しなければいけないっぽい。

番兵を使った解決

ということで、新たに書き加えてみたのが↓のコード。

import Control.Concurrent
import Data.Maybe

main = do
    chan <- newChan
    forkIO $ producer chan "hello"
    products <- getChanContents chan
    consumer $ takeWhile isJust products        -- taking until Nothing appears

producer chan x = do writeChan chan (Just x)
                     writeChan chan (Just x)
                     writeChan chan (Just x)
                     writeChan chan Nothing     -- sentinel

consumer ps = mapM_ (print . fromJust) ps

↑のコードでは、Maybe型のNothingを番兵として使うことで、チャネルの終わりを明示できるようにした。これで終了するプログラムを書くことができる。

複数producer対応

takeWhileを使った↑のコードでは、producerが複数になったケースに対応できないので、N回目のNothingが現れるまで値を取得する関数を書いてみた↓。

-- | Takes 'Just' values until nth 'Nothing' appears.
takeUntilNthNothing :: Int -> [Maybe a] -> [a]
takeUntilNthNothing 0 _ = []
takeUntilNthNothing n (Just x :xs) = x:takeUntilNthNothing n xs
takeUntilNthNothing n (Nothing:xs) = takeUntilNthNothing (n - 1) xs
takeUntilNthNothing _ [] = []

↑の関数ではついでにfromJust相当の処理も加えてある。

この関数を使ってproducerを多重化したサンプルコードが↓。

import Control.Concurrent

main = do
    chan <- newChan
    forkIO $ producer chan "hello"
    forkIO $ producer chan "world"              -- another producer thread
    products <- getChanContents chan
    consumer $ takeUntilNthNothing 2 products   -- taking until second Nothing

producer chan x = do writeChan chan (Just x)
                     writeChan chan (Just x)
                     writeChan chan (Just x)
                     writeChan chan Nothing     -- sentinel

consumer ps = mapM_ print ps

takeUntilNthNothing :: Int -> [Maybe a] -> [a]

これで何とかproducerの多重化には対応できそう。consumerを多重化するにはどうすればいいだろうとか、少し考えたけど面倒くさそうなのでまた今度にする。

補足

  • Nothingを使ってproducerconsumerの同期を取る方法が正しい方法なのかはわからない。とりあえず期待通りに動いただけ。
  • Control.Concurrentのリファレンスを見た感じだと、Control.Concurrent.MVarを使ってスレッドの終了を察知して、それでproducerconsumerの同期を取るのが汎用的な方法っぽい。
  • 他の言語なら番兵としてnullを使ったりするところだろうから、HaskellではMaybeを使ってみた。普通の値を書き込むのにわざわざJustとか書かなければいけなくなってちょっとわずらわしい気はする。
  • そのわずらわしさを排除するために↓のようなコードも書いてみたけど、ちょっとやりすぎな気がしないでもない。
import Control.Concurrent
import Control.Exception

main = do
    chan <- newChan
    myForkIO chan $ producer "hello"
    myForkIO chan $ producer "world"
    products <- getChanContents chan
    consumer $ takeUntilNthNothing 2 products

myForkIO chan io =
    forkIO $ io ((writeChan chan) . Just) `finally` writeChan chan Nothing

producer x write = do write x
                      write x
                      write x

consumer ps = mapM_ print ps

takeUntilNthNothing :: Int -> [Maybe a] -> [a]
スポンサーサイト

関連記事

トラックバック URL

http://liosk.blog103.fc2.com/tb.php/189-4d2b1645

トラックバック

コメント

コメントの投稿

お名前
コメント
編集キー