この記事は http://www.yesodweb.com/blog/2012/06/conduit-0-5 の翻訳です。
conduit-0.5 をリリースしました。 conduit
はストリームデータを扱うためのライブラリです。 conduit
を用いると、様々な形のデータを生成、変形、消費するような処理を、 簡単に組み合わせることができるようになります。 enumerator
/iteratee
パラダイムと同じ問題を解決することを目的に作られましたが、 アプローチはこれらのものとは異なります。 conduit
は簡単に理解して利用できるものになることを一番の目的としています。 遅延I/Oとは異なり、リソースの即時開放を保証し、 また、純粋なコードに例外を持ち込みません。
今回のリリースでSource
、Sink
、Conduit
のそれぞれを作るための、 シンプルで効率の良い、高レベルのインターフェースが提供されるようになりました。 コンストラクタ(Haskellのデータコンストラクタ)をダイレクトに用いる必要は もう殆どなくなったはずです。 床下(訳注:Conduitの持つ機能のこと)には、 上流の結果を受け取ったり、Leftover
(訳注:データの食い残し)を明示的に無視するといった、 より強力な機能が組み込まれました。 これらの変更は、ConduitをCategory
(訳注:圏を表すHaskellの型クラス) として振る舞うことを可能にします(訳注:今のところ実装はされていない模様)。
新しいバージョンでは、 ひと通りのチュートリアルがHaddock自身に含まれています。 使い始めるにあたっては、まずこれを読むのをお勧めします。 以前のバージョンのConduitを使ったことがある人も、 このチュートリアルを読めば以前と変わった部分がはっきりすると思います (finalizationの動作の詳細とか)。
最後に一つ注意。このリリースにはconduit
とそのデフォルト付属パッケージ、 attoparsec-conduit
やzlib-conduit
しか含まれていません。 wai
, parsistent
, yesod
あるいはもっといろいろな古いconduitに依存したパッケージは含まれません。 それらは十分にテストしてリリース準備が整い次第リリースされる予定です。 これらの問題や、リリースについての質問をGithubのIssueに挙げないこと。 Yesodチームがリリースの準備が十分にできたと判断した時に、リリースは行われます。
この記事の残りの部分で、conduit
の使い方の洗練された例を紹介します。 ネットワークサーバの作成についての MLでの質問 がベースになっています。
network-conduit はネットワークサーバとクライアントを作るための高レベルのライブラリです。 基本的なサンプルから、違うアプリケーションを作っていきます。 まず、次のコードから始めましょう。
import Data.Conduit
import Data.Conduit.Network
main :: IO ()
main = runTCPServer (ServerSettings 4000 HostAny) app
app :: Application IO
app = ...
Application
とはなんじゃらホイ? これはソースとシンクを受け取って、それらに何か処理をする単なる関数です (訳注:Application
の型を補足)。
type Application m = Source m ByteString -> Sink ByteString m () -> m ()
一番シンプルな例、echoでも実装してみましょうか。
app src sink = src $$$$ sink
ここで必要になるのは、ソースとシンクをつなぐことだけ。 あとはconduit
が全部面倒を見てくれます。 受け取ったデータは即座に正しくシンクに送られます。 ここには明示的なループがないこと、 中間データを触るコードがないこと、 そして、終了についての明示的なコードが無いこと。 これらに注意されたい。
訳注: TCPサーバのオプションにはポート番号とホストの設定以外含まれていないが、 これで十分実用的なのである。 このTCPサーバは完全に並行にリクエストを処理する実装になる。 つまり接続ごとに独立したスレッドが作られる。 Haskellのスレッドは非常に軽量で、スレッドを一つ作るコストは オブジェクトをひとつ作るコストと大して変わらない。 したがって、サーバオプションとしての並行度の指定は必要がない。 タイムアウトの設定も必要ない。 Haskellのスレッドは(FFIなどを使わない限り)いつでも安全にキャンセルできる。 標準が提供するI/Oライブラリはすべてキャンセル可能なAPIの呼び出しによって行われる。 つまりタイムアウトが必要であれば System.Timeout を使えばよい。
これはあまりにもつまらない例ですね。 エコーバックする際に大文字化するものでも書いてみましょう。 このタスクに必要となるものを列挙します。 まず最初に、バイナリをテキストデータにデコードする必要があります (ascii文字列を仮定することによってこれをインチキすることもできるけど、 それは正しくも楽しくもないので、ちゃんとやることにします)。 次に1文字ずつ大文字に変換します。 最後に、文字データをバイナリに再エンコードして出力に送ります。
幾つか追加のimport
が必要になります。 で、これがそのコードです。
import qualified Data.Conduit.List as CL
import Data.Conduit.Text
import Data.Text (toUpper)
app src sink = src
$$$$ decode utf8
=$ CL.map toUpper
=$ encode utf8
=$ sink
なんという宣言的なアプローチでしょう。 上で列挙した各ステップが、 次のステップにつながれているだけ!
別の例を考えましょう。 受け取った各チャンクに対して、そのサイズを表示します。
import qualified Data.ByteString.Char8 as S8
app src sink = src
$$$$ CL.map (\bs -> S8.pack $ show (S8.length bs) ++ "\n")
=$ sink
なにやらパターンが見えてきました。 ソースとシンクを端において、その間をconduit
のパイプラインでつなぐ。 ではその考えを進めて、パターンを抽象化してみましょう。
app src sink = src $$$$ conduit =$ sink
conduit :: Conduit ByteString IO ByteString
conduit = CL.map (\bs -> S8.pack $ show (S8.length bs) ++ "\n")
最初の例だと、こうなります。
conduit = decode utf8 =$= CL.map toUpper =$= encode utf8
さてこうなると、一つ疑問が生じます。 なんでnetwork-conduit
はソースとシンクを提供しているのでしょうか? Application
は単にConduit
ではだめなのか? その答え、ソースとシンクに分けて提供しているのは、 それがConduit
よりも真に強力だからです。 あとで見ていきますが、それによってconnect-and-resume
のような 強力な機能が実現できます。
制御フロー
今までの例はどれも無限のパイプライン、 つまり、同じ処理を接続が切れるまで繰り返すものでした。 なにか制御フローを追加してみましょう。 次のコードは先頭に "quit"
の4文字が含まれるチャンクを受信するまで、 エコーを繰り返すというものです。
conduit = do
mbs <- await
case mbs of
Nothing -> return ()
Just bs
| "quit" `S8.isPrefixOf` bs -> return ()
| otherwise -> do
yield bs
conduit -- loop
単なる無限ループではなく、 明示的に await
と yield
を読んでデータを読み書きするようになりました。
他のI/Oとインターリーブ
簡単なファイルサーバを作りましょう。 ファイル名を送ると、サーバがそのファイルの内容を返してくるようなものです。 これを行うためには、わずかにプログラムの型を変更する必要があります。 IO
モナドの代わりに、ResourceT IO
モナドを使うようにします。 そうすれば、例外安全にファイルアクセスができるようになります。
import qualified Data.Conduit.Binary as CB
main :: IO ()
main = runResourceT $ runTCPServer (ServerSettings 4000 HostAny) app
app :: Application (ResourceT IO)
app src sink = src $$$$ conduit =$ sink
conduit :: Conduit ByteString (ResourceT IO) ByteString
conduit = CB.lines =$=
awaitForever (CB.sourceFile . S8.unpack . S8.takeWhile (/= '\r'))
ここでは意図的にファイル名のエンコーディングを無視しています。 普通は(訳注:モダンなHaskellコードでは、次のようなイケてる) system-filepathと filesystem-conduitパッケージを使うのだけど、 説明が億劫なので、単にascii文字列としてアンパックしています。 また、Data.Conduit.Binary.lines
関数は’\n’しか取り除かないので、 ’\r’を手動で取り除いています。 ほとんどのtelnetクライアントは改行コードとしてCRLFを送信するので、 これを取り除く処理を入れるべきでしょう。
awaitForever
は 入力がある限り内部の関数を実行する、便利関数です。 もちろん、さっきみたいな“quitアプローチ”と組み合わせての、手動ループ制御もできます。
conduit =
CB.lines =$= loop
where
loop = do
mbs <- await
case mbs of
Nothing -> return ()
Just bs
| "quit" `S8.isPrefixOf` bs -> return ()
| otherwise -> do
CB.sourceFile $ S8.unpack $ S8.takeWhile (/= '\r') bs
loop
クライアントサイド
network-conduit
には、サーバのインターフェースにとても良く似た、 クライアントのためのインターフェースもあります。 例を見てみます。
{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.Network
import Data.ByteString.Char8 ()
main :: IO ()
main = runTCPClient (ClientSettings 4000 "localhost") client
client :: Application IO
client src sink =
src $$$$ conduit =$ sink
where
conduit = do
yield "hello"
await >>= liftIO . print
yield "world"
await >>= liftIO . print
yield "goodbye"
await >>= liftIO . print
ここには特に見所はありません。 このセクションの目的は、最後の例のための準備です。
プロキシサーバ
conduit
のモチベーションの一つとして、 HTTPプロキシサーバを作れるようにするというのがあります。 enumerator
では、私(訳注:もちろん原文の著者)を含む多くの人が、 ちゃんと動作するプロキシサーバを、 与えられたピースの組み合わせで作るのが とてつもなく難しいということを発見していました (多段にネストしたIterateeを使えば、実際にはできます。でもそれは恐るべき苦痛でもあります)。
じゃあ、簡単なネットワークプロキシサーバを作ってみましょうか。 これは次のように動作します。
- クライアントが接続する
- クライアントはサーバのポート番号を一行で送ってくる
- クライアントはサーバのホスト名を一行で送ってくる
- プロキシがサーバに接続する
- プロキシはクライアントに “Successful connection” と返答する
次を繰り返す
- クライアントはプロキシにチャンクを送る
- プロキシは同じチャンクをサーバに送る
- サーバはプロキシにチャンクを送る
- プロキシはクライアントにそのチャンクを送る
標準のソケットベース(あるいはハンドルベース)の関数を使えば、 これは別にそんなに難しくありません。 2つの別々のソケットに対して、sendとrecvをすれば良いだけです。 ここでのポイントは、自分自身がアプリケーションの実行フローを制御するので、 違うソースを使う処理に簡単にインターリーブできるということです。 Conduit(やEnumerator)は制御フローの面倒を見てもらう代わりに、 インターリーブが難しくなるという、ちょうど逆転が起こるのです。
そこでConduitはコントロールフローをアプリケーションに返す「抜け道」を 用意することにしました。 それがconnect-and-resume
と呼ばれるものです。 なんだか恐ろしげに聞こえますが、大丈夫、怖くないですよ。 ソースにシンクを接続し、シンクが終了するまで実行して、結果を返します。 ここで、その結果と一緒に新しい resumable source を返します。 それから、そのresumable source
を再度ソースとして使います。 それの繰り返しになります。
最初に、main関数を見ていきましょう。 クライアントからの接続を受け付けます。
main = forkIO $ runTCPServer (ServerSettings 5000 HostAny) proxy
プロキシの処理では、ポートとホスト名が必要になります。 次に、そのサーバへ接続します。 一行読み込みを行う関数と、 ポートとホスト名のペアを読み込むヘルパ関数を定義しましょう。
takeLine = do
let linefeed = 10
bss <- CB.takeWhile (/= linefeed) =$ CL.consume
CB.drop 1 -- drop the newline
return $ S8.takeWhile (/= '\r') $ S8.concat bss
getPortHost = do
portBS <- takeLine
hostBS <- takeLine
return $ ClientSettings (read $ S8.unpack portBS) (S8.unpack hostBS)
次に、connect-and-resume
($$$$+
演算子) を用いてproxy
関数を定義しましょう。 ソースをシンク、getPortHost
に接続して、クライアントの設定と新しいResumableSource
を受け取ります。 これをリードループ(proxyLoop
)に渡します。
proxy :: Application IO
proxy fromClient0 toClient = do
(fromClient, clientSettings) <- fromClient0 $$$$+ getPortHost
runTCPClient clientSettings (proxyLoop fromClient toClient)
proxyLoop
からは、まず接続成功メッセージをクライアントに送ります。 サーバから受け取るためのResumableSource
を作り、ループを開始します。
proxyLoop fromClient toClient fromServer0 toServer = do
yield "Connected to server" $$$$ toClient
-- convert fromServer0 from a normal Source to a ResumableSource
(fromServer, ()) <- fromServer0 $$$$+ return ()
loop fromClient fromServer
where
内部のループは(訳注:ちょっとややこしそうに見えるけど)とても直接的です。 上の4ステップを忠実にこなしているだけです。
loop fromClient fromServer = do
(fromClient', mbs) <- fromClient $$$$++ await
case mbs of
Nothing -> close fromClient' fromServer
Just bs -> do
yield bs $$$$ toServer
(fromServer', mbs) <- fromServer $$$$++ await
case mbs of
Nothing -> do
yield "Server closed connection" $$$$ toClient
close fromClient' fromServer'
Just bs -> do
yield bs $$$$ toClient
loop fromClient' fromServer'
ここでは2つのトリックが用いられています。 1つ目は、$$$$++
演算子。 これは $$$$+
演算子と同じですが、普通のSource
の代わりに すでにあるResumableSource
に対して動作します。 “continue resuming”ということです。 2つ目は、close
関数の呼び出し。 普通のSource
、Sink
を使っているときは、 これらは使い終わると自動的に閉じられ(訳注:リソース解放など)ます。 しかしResumableSource
は、あとでの利用のためにすぐには閉じません。 これを行うためには明示的に閉じてやる必要があります。 これをやるのは簡単です。$$$$+-
演算子を使います。
close x y = do
x $$$$+- return ()
y $$$$+- return ()
connect-and-resume
は常に必要になるとは限りませんが、 やりたいことが所謂「コーナーケース」にあるような場合には、とても便利に使えます。
完全なソース
Gistで完全なソースが見られます。 このチュートリアルが、Conduitの強力さと、その使い方を示す助けになれば幸いです。 何か質問や、こうすべきだという点があれば、ぜひお知らせ願いたい!
ところで、Felipeの指摘で、 proxyLoop
を、スレッドを用いて実装するというのがありました。 私はconnect-and-resume
のデモのために、故意にproxyLoop
をこのように書きました。 でも、スレッドを用いれば次のようにも実装できます。
proxyLoop fromClient0 toClient fromServer0 toServer = do
yield "Connected to server" $$$$ toClient
m <- M.newEmptyMVar
tid1 <- forkIO $ do
fromServer0 $$$$ toClient
M.putMVar m True
tid2 <- forkIO $ do
fromClient0 $$$$+- toServer
M.putMVar m False
x <- M.takeMVar m
if x
then killThread tid2
else killThread tid1
訳注: 並行処理のための便利なライブラリ async を使うと恐ろしくシンプルに。 (サーバのSource -> クライアントのSink) と (クライアントのSource -> サーバのSink) の接続を並行に行うというやりたい処理をまさにそのまま書けてしまった。 全体のソースはこちら。
proxyLoop fromClient0 toClient fromServer0 toServer = do yield "Connected to server" $$$$ toClient (fromServer0 $$$$ toClient) `race_` (fromClient0 $$$$+- toServer)