この記事は http://www.yesodweb.com/blog/2012/06/conduit-0-5 の翻訳です。


conduit-0.5 をリリースしました。 conduitはストリームデータを扱うためのライブラリです。 conduitを用いると、様々な形のデータを生成、変形、消費するような処理を、 簡単に組み合わせることができるようになります。 enumerator/iterateeパラダイムと同じ問題を解決することを目的に作られましたが、 アプローチはこれらのものとは異なります。 conduitは簡単に理解して利用できるものになることを一番の目的としています。 遅延I/Oとは異なり、リソースの即時開放を保証し、 また、純粋なコードに例外を持ち込みません。

今回のリリースでSourceSinkConduitのそれぞれを作るための、 シンプルで効率の良い、高レベルのインターフェースが提供されるようになりました。 コンストラクタ(Haskellのデータコンストラクタ)をダイレクトに用いる必要は もう殆どなくなったはずです。 床下(訳注:Conduitの持つ機能のこと)には、 上流の結果を受け取ったり、Leftover(訳注:データの食い残し)を明示的に無視するといった、 より強力な機能が組み込まれました。 これらの変更は、ConduitをCategory(訳注:圏を表すHaskellの型クラス) として振る舞うことを可能にします(訳注:今のところ実装はされていない模様)。

新しいバージョンでは、 ひと通りのチュートリアルがHaddock自身に含まれています。 使い始めるにあたっては、まずこれを読むのをお勧めします。 以前のバージョンのConduitを使ったことがある人も、 このチュートリアルを読めば以前と変わった部分がはっきりすると思います (finalizationの動作の詳細とか)。

最後に一つ注意。このリリースにはconduitとそのデフォルト付属パッケージ、 attoparsec-conduitzlib-conduitしか含まれていません。 wai, parsistent, yesodあるいはもっといろいろな古いconduitに依存したパッケージは含まれません。 それらは十分にテストしてリリース準備が整い次第リリースされる予定です。 これらの問題や、リリースについての質問をGithubのIssueに挙げないこと。 Yesodチームがリリースの準備が十分にできたと判断した時に、リリースは行われます。

この記事の残りの部分で、conduitの使い方の洗練された例を紹介します。 ネットワークサーバの作成についての MLでの質問 がベースになっています。


network-conduit はネットワークサーバとクライアントを作るための高レベルのライブラリです。 基本的なサンプルから、違うアプリケーションを作っていきます。 まず、次のコードから始めましょう。

import Data.Conduit
import Data.Conduit.Network

main :: IO ()
main = runTCPServer (ServerSettings 4000 HostAny) app

app :: Application IO
app = ...

Applicationとはなんじゃらホイ? これはソースとシンクを受け取って、それらに何か処理をする単なる関数です (訳注:Applicationの型を補足)。

type Application m = Source m ByteString -> Sink ByteString m () -> m ()

一番シンプルな例、echoでも実装してみましょうか。

app src sink = src $$$$ sink

ここで必要になるのは、ソースとシンクをつなぐことだけ。 あとはconduitが全部面倒を見てくれます。 受け取ったデータは即座に正しくシンクに送られます。 ここには明示的なループがないこと、 中間データを触るコードがないこと、 そして、終了についての明示的なコードが無いこと。 これらに注意されたい。

訳注: TCPサーバのオプションにはポート番号とホストの設定以外含まれていないが、 これで十分実用的なのである。 このTCPサーバは完全に並行にリクエストを処理する実装になる。 つまり接続ごとに独立したスレッドが作られる。 Haskellのスレッドは非常に軽量で、スレッドを一つ作るコストは オブジェクトをひとつ作るコストと大して変わらない。 したがって、サーバオプションとしての並行度の指定は必要がない。 タイムアウトの設定も必要ない。 Haskellのスレッドは(FFIなどを使わない限り)いつでも安全にキャンセルできる。 標準が提供するI/Oライブラリはすべてキャンセル可能なAPIの呼び出しによって行われる。 つまりタイムアウトが必要であれば System.Timeout を使えばよい。

これはあまりにもつまらない例ですね。 エコーバックする際に大文字化するものでも書いてみましょう。 このタスクに必要となるものを列挙します。 まず最初に、バイナリをテキストデータにデコードする必要があります (ascii文字列を仮定することによってこれをインチキすることもできるけど、 それは正しくも楽しくもないので、ちゃんとやることにします)。 次に1文字ずつ大文字に変換します。 最後に、文字データをバイナリに再エンコードして出力に送ります。

幾つか追加のimportが必要になります。 で、これがそのコードです。

import qualified Data.Conduit.List as CL
import Data.Conduit.Text
import Data.Text (toUpper)

app src sink = src
            $$$$ decode utf8
            =$ CL.map toUpper
            =$ encode utf8
            =$ sink

なんという宣言的なアプローチでしょう。 上で列挙した各ステップが、 次のステップにつながれているだけ!

別の例を考えましょう。 受け取った各チャンクに対して、そのサイズを表示します。

import qualified Data.ByteString.Char8 as S8
app src sink = src
            $$$$ CL.map (\bs -> S8.pack $ show (S8.length bs) ++ "\n")
            =$ sink

なにやらパターンが見えてきました。 ソースとシンクを端において、その間をconduitのパイプラインでつなぐ。 ではその考えを進めて、パターンを抽象化してみましょう。

app src sink = src $$$$ conduit =$ sink
conduit :: Conduit ByteString IO ByteString
conduit = CL.map (\bs -> S8.pack $ show (S8.length bs) ++ "\n")

最初の例だと、こうなります。

conduit = decode utf8 =$= CL.map toUpper =$= encode utf8

さてこうなると、一つ疑問が生じます。 なんでnetwork-conduitはソースとシンクを提供しているのでしょうか? Applicationは単にConduitではだめなのか? その答え、ソースとシンクに分けて提供しているのは、 それがConduitよりも真に強力だからです。 あとで見ていきますが、それによってconnect-and-resumeのような 強力な機能が実現できます。

制御フロー

今までの例はどれも無限のパイプライン、 つまり、同じ処理を接続が切れるまで繰り返すものでした。 なにか制御フローを追加してみましょう。 次のコードは先頭に "quit" の4文字が含まれるチャンクを受信するまで、 エコーを繰り返すというものです。

conduit = do
    mbs <- await
    case mbs of
        Nothing -> return ()
        Just bs
            | "quit" `S8.isPrefixOf` bs -> return ()
            | otherwise -> do
                yield bs
                conduit -- loop

単なる無限ループではなく、 明示的に awaityield を読んでデータを読み書きするようになりました。

他のI/Oとインターリーブ

簡単なファイルサーバを作りましょう。 ファイル名を送ると、サーバがそのファイルの内容を返してくるようなものです。 これを行うためには、わずかにプログラムの型を変更する必要があります。 IOモナドの代わりに、ResourceT IOモナドを使うようにします。 そうすれば、例外安全にファイルアクセスができるようになります。

import qualified Data.Conduit.Binary as CB

main :: IO ()
main = runResourceT $ runTCPServer (ServerSettings 4000 HostAny) app

app :: Application (ResourceT IO)
app src sink = src $$$$ conduit =$ sink

conduit :: Conduit ByteString (ResourceT IO) ByteString
conduit = CB.lines =$=
          awaitForever (CB.sourceFile . S8.unpack . S8.takeWhile (/= '\r'))

ここでは意図的にファイル名のエンコーディングを無視しています。 普通は(訳注:モダンなHaskellコードでは、次のようなイケてる) system-filepathfilesystem-conduitパッケージを使うのだけど、 説明が億劫なので、単にascii文字列としてアンパックしています。 また、Data.Conduit.Binary.lines関数は’\n’しか取り除かないので、 ’\r’を手動で取り除いています。 ほとんどのtelnetクライアントは改行コードとしてCRLFを送信するので、 これを取り除く処理を入れるべきでしょう。

awaitForever は 入力がある限り内部の関数を実行する、便利関数です。 もちろん、さっきみたいな“quitアプローチ”と組み合わせての、手動ループ制御もできます。

conduit =
    CB.lines =$= loop
  where
    loop = do
        mbs <- await
        case mbs of
            Nothing -> return ()
            Just bs
                | "quit" `S8.isPrefixOf` bs -> return ()
                | otherwise -> do
                    CB.sourceFile $ S8.unpack $ S8.takeWhile (/= '\r') bs
                    loop

クライアントサイド

network-conduit には、サーバのインターフェースにとても良く似た、 クライアントのためのインターフェースもあります。 例を見てみます。

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.Network
import Data.ByteString.Char8 ()

main :: IO ()
main = runTCPClient (ClientSettings 4000 "localhost") client

client :: Application IO
client src sink =
    src $$$$ conduit =$ sink
  where
    conduit = do
        yield "hello"
        await >>= liftIO . print

        yield "world"
        await >>= liftIO . print

        yield "goodbye"
        await >>= liftIO . print

ここには特に見所はありません。 このセクションの目的は、最後の例のための準備です。

プロキシサーバ

conduitのモチベーションの一つとして、 HTTPプロキシサーバを作れるようにするというのがあります。 enumeratorでは、私(訳注:もちろん原文の著者)を含む多くの人が、 ちゃんと動作するプロキシサーバを、 与えられたピースの組み合わせで作るのが とてつもなく難しいということを発見していました (多段にネストしたIterateeを使えば、実際にはできます。でもそれは恐るべき苦痛でもあります)。

じゃあ、簡単なネットワークプロキシサーバを作ってみましょうか。 これは次のように動作します。

  • クライアントが接続する
  • クライアントはサーバのポート番号を一行で送ってくる
  • クライアントはサーバのホスト名を一行で送ってくる
  • プロキシがサーバに接続する
  • プロキシはクライアントに “Successful connection” と返答する
  • 次を繰り返す

    • クライアントはプロキシにチャンクを送る
    • プロキシは同じチャンクをサーバに送る
    • サーバはプロキシにチャンクを送る
    • プロキシはクライアントにそのチャンクを送る

標準のソケットベース(あるいはハンドルベース)の関数を使えば、 これは別にそんなに難しくありません。 2つの別々のソケットに対して、sendとrecvをすれば良いだけです。 ここでのポイントは、自分自身がアプリケーションの実行フローを制御するので、 違うソースを使う処理に簡単にインターリーブできるということです。 Conduit(やEnumerator)は制御フローの面倒を見てもらう代わりに、 インターリーブが難しくなるという、ちょうど逆転が起こるのです。

そこでConduitはコントロールフローをアプリケーションに返す「抜け道」を 用意することにしました。 それがconnect-and-resumeと呼ばれるものです。 なんだか恐ろしげに聞こえますが、大丈夫、怖くないですよ。 ソースにシンクを接続し、シンクが終了するまで実行して、結果を返します。 ここで、その結果と一緒に新しい resumable source を返します。 それから、そのresumable sourceを再度ソースとして使います。 それの繰り返しになります。

最初に、main関数を見ていきましょう。 クライアントからの接続を受け付けます。

main = forkIO $ runTCPServer (ServerSettings 5000 HostAny) proxy

プロキシの処理では、ポートとホスト名が必要になります。 次に、そのサーバへ接続します。 一行読み込みを行う関数と、 ポートとホスト名のペアを読み込むヘルパ関数を定義しましょう。

takeLine = do
    let linefeed = 10
    bss <- CB.takeWhile (/= linefeed) =$ CL.consume
    CB.drop 1 -- drop the newline
    return $ S8.takeWhile (/= '\r') $ S8.concat bss

getPortHost = do
    portBS <- takeLine
    hostBS <- takeLine
    return $ ClientSettings (read $ S8.unpack portBS) (S8.unpack hostBS)

次に、connect-and-resume ($$$$+ 演算子) を用いてproxy関数を定義しましょう。 ソースをシンク、getPortHostに接続して、クライアントの設定と新しいResumableSourceを受け取ります。 これをリードループ(proxyLoop)に渡します。

proxy :: Application IO
proxy fromClient0 toClient = do
    (fromClient, clientSettings) <- fromClient0 $$$$+ getPortHost
    runTCPClient clientSettings (proxyLoop fromClient toClient)

proxyLoopからは、まず接続成功メッセージをクライアントに送ります。 サーバから受け取るためのResumableSourceを作り、ループを開始します。

proxyLoop fromClient toClient fromServer0 toServer = do
    yield "Connected to server" $$$$ toClient
    -- convert fromServer0 from a normal Source to a ResumableSource
    (fromServer, ()) <- fromServer0 $$$$+ return ()
    loop fromClient fromServer
  where

内部のループは(訳注:ちょっとややこしそうに見えるけど)とても直接的です。 上の4ステップを忠実にこなしているだけです。

    loop fromClient fromServer = do
        (fromClient', mbs) <- fromClient $$$$++ await
        case mbs of
            Nothing -> close fromClient' fromServer
            Just bs -> do
                yield bs $$$$ toServer
                (fromServer', mbs) <- fromServer $$$$++ await
                case mbs of
                    Nothing -> do
                        yield "Server closed connection" $$$$ toClient
                        close fromClient' fromServer'
                    Just bs -> do
                        yield bs $$$$ toClient
                        loop fromClient' fromServer'

ここでは2つのトリックが用いられています。 1つ目は、$$$$++ 演算子。 これは $$$$+ 演算子と同じですが、普通のSourceの代わりに すでにあるResumableSourceに対して動作します。 “continue resuming”ということです。 2つ目は、close 関数の呼び出し。 普通のSourceSinkを使っているときは、 これらは使い終わると自動的に閉じられ(訳注:リソース解放など)ます。 しかしResumableSourceは、あとでの利用のためにすぐには閉じません。 これを行うためには明示的に閉じてやる必要があります。 これをやるのは簡単です。$$$$+-演算子を使います。

    close x y = do
        x $$$$+- return ()
        y $$$$+- return ()

connect-and-resumeは常に必要になるとは限りませんが、 やりたいことが所謂「コーナーケース」にあるような場合には、とても便利に使えます。

完全なソース

Gistで完全なソースが見られます。 このチュートリアルが、Conduitの強力さと、その使い方を示す助けになれば幸いです。 何か質問や、こうすべきだという点があれば、ぜひお知らせ願いたい!

ところで、Felipeの指摘で、 proxyLoopを、スレッドを用いて実装するというのがありました。 私はconnect-and-resumeのデモのために、故意にproxyLoopをこのように書きました。 でも、スレッドを用いれば次のようにも実装できます。

proxyLoop fromClient0 toClient fromServer0 toServer = do
    yield "Connected to server" $$$$ toClient
    m <- M.newEmptyMVar
    tid1 <- forkIO $ do
        fromServer0 $$$$ toClient
        M.putMVar m True
    tid2 <- forkIO $ do
        fromClient0 $$$$+- toServer
        M.putMVar m False
    x <- M.takeMVar m
    if x
        then killThread tid2
        else killThread tid1

訳注: 並行処理のための便利なライブラリ async を使うと恐ろしくシンプルに。 (サーバのSource -> クライアントのSink) と (クライアントのSource -> サーバのSink) の接続を並行に行うというやりたい処理をまさにそのまま書けてしまった。 全体のソースはこちら

proxyLoop fromClient0 toClient fromServer0 toServer = do
  yield "Connected to server" $$$$ toClient
  (fromServer0 $$$$ toClient) `race_` (fromClient0 $$$$+- toServer)