[Concurrent Haskell] 無限リストが止まらない
- 2009-11-03
- カテゴリ: Client Side
- タグ: Tips Haskell 並行計算 アルゴリズム
HaskellのControl.Concurrent.Chanを使って、簡単な並行処理プログラムを作ろうとして試行錯誤したのでメモ。
Control.Concurrent.ChanはHaskellで並行処理をするときに使うことができるFIFOキューライブラリ。複数のスレッドからキューにデータを流し込んで、別のスレッドがそのデータを読み取って処理をしたりすることができる。
Control.Concurrent.Chanを使えば、スレッド間の排他処理とかを全く考えずに済むので、簡単に並行処理プログラムを書けるかと思っていたけど、処理終了時の扱いに悩んだので、ちょこちょこと試行錯誤してみた。
詳細は続きで。
簡単な並行動作サンプル
まずは簡単なサンプル。
import Control.Concurrent
main = do
chan <- newChan -- creating a new channel
forkIO $ producer chan "hello" -- starting producer thread
products <- getChanContents chan -- getting contents as an infinite list
consumer products -- calling consumer with the list
producer chan x = do writeChan chan x
writeChan chan x
writeChan chan x
consumer ps = mapM_ print ps
↑のコードは、与えられた文字列を単純に3回チャネルに書き込むproducerと、チャネルに書き込まれた文字列を順番に出力するconsumerが並行して動作するだけの単純なプログラム。
これはこれで期待した通りに動いてくれるんだけど、残念ながらこのプログラムは終了しない。なぜなら、consumerがチャネルに新たな値が書き込まれるのを待ち続けてしまうから。
どうやら、Chanモジュールには「チャネルを閉じる」というような処理をする仕組みが備わっていないようなので、自力で実装しなければいけないっぽい。
番兵を使った解決
ということで、新たに書き加えてみたのが↓のコード。
import Control.Concurrent
import Data.Maybe
main = do
chan <- newChan
forkIO $ producer chan "hello"
products <- getChanContents chan
consumer $ takeWhile isJust products -- taking until Nothing appears
producer chan x = do writeChan chan (Just x)
writeChan chan (Just x)
writeChan chan (Just x)
writeChan chan Nothing -- sentinel
consumer ps = mapM_ (print . fromJust) ps
↑のコードでは、Maybe型のNothingを番兵として使うことで、チャネルの終わりを明示できるようにした。これで終了するプログラムを書くことができる。
複数producer対応
takeWhileを使った↑のコードでは、producerが複数になったケースに対応できないので、N回目のNothingが現れるまで値を取得する関数を書いてみた↓。
-- | Takes 'Just' values until nth 'Nothing' appears. takeUntilNthNothing :: Int -> [Maybe a] -> [a] takeUntilNthNothing 0 _ = [] takeUntilNthNothing n (Just x :xs) = x:takeUntilNthNothing n xs takeUntilNthNothing n (Nothing:xs) = takeUntilNthNothing (n - 1) xs takeUntilNthNothing _ [] = []
↑の関数ではついでにfromJust相当の処理も加えてある。
この関数を使ってproducerを多重化したサンプルコードが↓。
import Control.Concurrent
main = do
chan <- newChan
forkIO $ producer chan "hello"
forkIO $ producer chan "world" -- another producer thread
products <- getChanContents chan
consumer $ takeUntilNthNothing 2 products -- taking until second Nothing
producer chan x = do writeChan chan (Just x)
writeChan chan (Just x)
writeChan chan (Just x)
writeChan chan Nothing -- sentinel
consumer ps = mapM_ print ps
takeUntilNthNothing :: Int -> [Maybe a] -> [a]
これで何とかproducerの多重化には対応できそう。consumerを多重化するにはどうすればいいだろうとか、少し考えたけど面倒くさそうなのでまた今度にする。
補足
- Nothingを使って
producerとconsumerの同期を取る方法が正しい方法なのかはわからない。とりあえず期待通りに動いただけ。 - Control.Concurrentのリファレンスを見た感じだと、Control.Concurrent.MVarを使ってスレッドの終了を察知して、それで
producerとconsumerの同期を取るのが汎用的な方法っぽい。 - 他の言語なら番兵として
nullを使ったりするところだろうから、HaskellではMaybeを使ってみた。普通の値を書き込むのにわざわざJustとか書かなければいけなくなってちょっとわずらわしい気はする。 - そのわずらわしさを排除するために↓のようなコードも書いてみたけど、ちょっとやりすぎな気がしないでもない。
import Control.Concurrent
import Control.Exception
main = do
chan <- newChan
myForkIO chan $ producer "hello"
myForkIO chan $ producer "world"
products <- getChanContents chan
consumer $ takeUntilNthNothing 2 products
myForkIO chan io =
forkIO $ io ((writeChan chan) . Just) `finally` writeChan chan Nothing
producer x write = do write x
write x
write x
consumer ps = mapM_ print ps
takeUntilNthNothing :: Int -> [Maybe a] -> [a]
トラックバックURL
- http://liosk.blog103.fc2.com/tb.php/189-4d2b1645

