[Concurrent Haskell] 無限リストが止まらない
- 2009-11-03
- カテゴリ: Client Side
- タグ: Tips Haskell 並行計算 アルゴリズム
HaskellのControl.Concurrent.Chanを使って、簡単な並行処理プログラムを作ろうとして試行錯誤したのでメモ。
Control.Concurrent.ChanはHaskellで並行処理をするときに使うことができるFIFOキューライブラリ。複数のスレッドからキューにデータを流し込んで、別のスレッドがそのデータを読み取って処理をしたりすることができる。
Control.Concurrent.Chanを使えば、スレッド間の排他処理とかを全く考えずに済むので、簡単に並行処理プログラムを書けるかと思っていたけど、処理終了時の扱いに悩んだので、ちょこちょこと試行錯誤してみた。
詳細は続きで。
簡単な並行動作サンプル
まずは簡単なサンプル。
import Control.Concurrent main = do chan <- newChan -- creating a new channel forkIO $ producer chan "hello" -- starting producer thread products <- getChanContents chan -- getting contents as an infinite list consumer products -- calling consumer with the list producer chan x = do writeChan chan x writeChan chan x writeChan chan x consumer ps = mapM_ print ps
↑のコードは、与えられた文字列を単純に3回チャネルに書き込むproducer
と、チャネルに書き込まれた文字列を順番に出力するconsumer
が並行して動作するだけの単純なプログラム。
これはこれで期待した通りに動いてくれるんだけど、残念ながらこのプログラムは終了しない。なぜなら、consumer
がチャネルに新たな値が書き込まれるのを待ち続けてしまうから。
どうやら、Chanモジュールには「チャネルを閉じる」というような処理をする仕組みが備わっていないようなので、自力で実装しなければいけないっぽい。
番兵を使った解決
ということで、新たに書き加えてみたのが↓のコード。
import Control.Concurrent import Data.Maybe main = do chan <- newChan forkIO $ producer chan "hello" products <- getChanContents chan consumer $ takeWhile isJust products -- taking until Nothing appears producer chan x = do writeChan chan (Just x) writeChan chan (Just x) writeChan chan (Just x) writeChan chan Nothing -- sentinel consumer ps = mapM_ (print . fromJust) ps
↑のコードでは、Maybe型のNothingを番兵として使うことで、チャネルの終わりを明示できるようにした。これで終了するプログラムを書くことができる。
複数producer
対応
takeWhile
を使った↑のコードでは、producer
が複数になったケースに対応できないので、N回目のNothingが現れるまで値を取得する関数を書いてみた↓。
-- | Takes 'Just' values until nth 'Nothing' appears. takeUntilNthNothing :: Int -> [Maybe a] -> [a] takeUntilNthNothing 0 _ = [] takeUntilNthNothing n (Just x :xs) = x:takeUntilNthNothing n xs takeUntilNthNothing n (Nothing:xs) = takeUntilNthNothing (n - 1) xs takeUntilNthNothing _ [] = []
↑の関数ではついでにfromJust
相当の処理も加えてある。
この関数を使ってproducer
を多重化したサンプルコードが↓。
import Control.Concurrent main = do chan <- newChan forkIO $ producer chan "hello" forkIO $ producer chan "world" -- another producer thread products <- getChanContents chan consumer $ takeUntilNthNothing 2 products -- taking until second Nothing producer chan x = do writeChan chan (Just x) writeChan chan (Just x) writeChan chan (Just x) writeChan chan Nothing -- sentinel consumer ps = mapM_ print ps takeUntilNthNothing :: Int -> [Maybe a] -> [a]
これで何とかproducer
の多重化には対応できそう。consumer
を多重化するにはどうすればいいだろうとか、少し考えたけど面倒くさそうなのでまた今度にする。
補足
- Nothingを使って
producer
とconsumer
の同期を取る方法が正しい方法なのかはわからない。とりあえず期待通りに動いただけ。 - Control.Concurrentのリファレンスを見た感じだと、Control.Concurrent.MVarを使ってスレッドの終了を察知して、それで
producer
とconsumer
の同期を取るのが汎用的な方法っぽい。 - 他の言語なら番兵として
null
を使ったりするところだろうから、HaskellではMaybeを使ってみた。普通の値を書き込むのにわざわざJustとか書かなければいけなくなってちょっとわずらわしい気はする。 - そのわずらわしさを排除するために↓のようなコードも書いてみたけど、ちょっとやりすぎな気がしないでもない。
import Control.Concurrent import Control.Exception main = do chan <- newChan myForkIO chan $ producer "hello" myForkIO chan $ producer "world" products <- getChanContents chan consumer $ takeUntilNthNothing 2 products myForkIO chan io = forkIO $ io ((writeChan chan) . Just) `finally` writeChan chan Nothing producer x write = do write x write x write x consumer ps = mapM_ print ps takeUntilNthNothing :: Int -> [Maybe a] -> [a]
関連記事
- [Windows] NTFSハードリンクで増分スナップショットをとるバックアップツールを作りました!
- [JavaScript] ランダムなひらがな列を出力するBookmarklet
- [Concurrent Haskell] 無限リストが止まらない
- NTFSについて調べたのでメモ
- XML (HTML) の特殊文字をエスケープするVimスクリプトを書いた
トラックバック URL
- http://liosk.blog103.fc2.com/tb.php/189-4d2b1645