[Go] 並行処理を用いたモンテカルロ法

Go言語による並行処理の第4章4.6パイプラインをもとにモンテカルロ法で円周率を計算する。 www.amazon.co.jp

乱数による円周率の計算

以下の積分モンテカルロ法を適用し、円周率をモンテカルロ法で求める。 $$ \int_{0}^{1} \frac{4}{ 1 + x^{2} } dx = \pi $$

サンプル数 $ M $ を$ 10,10^{3},10^{5},10^{7} $として、

$$ \bar{f_i} = \frac{1}{M} \sum_{t = 1, ..., M} \frac{4}{1 + {X_t}^{2}} $$

を求め、推定値

$$ \bar{ \bar{f} } = \frac{\sum_i \bar{f}_i}{N_M} $$

を計算する。ただし、$ N_{M}$は独立なモンテカルロ計算の回数である。

並行処理

パイプライン: データを受け取って、何らかの処理をし、どこかに渡すという一連の処理。

本に書いてあるrepeatFn関数やtake関数を用いて実装する。

実装

func repeatFn(done <-chan interface{}, fn func() float64) <-chan float64 {
    valueStream := make(chan float64)
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

func take(done <-chan interface{}, valueStream <-chan float64, num int) <-chan float64 {
    takeStream := make(chan float64)
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

func calc(done <-chan interface{}, valueStream <-chan float64, fn func(float64) float64) <-chan float64 {
    calcStream := make(chan float64)
    go func() {
        defer close(calcStream)
        for x := range valueStream {
            select {
            case <-done:
                return
            case calcStream <- fn(x):
            }
        }
    }()
    return calcStream
}

func calculatePi(n int) float64 {
    var piApprox float64
    for i := 0; i < n; i++ {
        x := rand.Float64()
        piApprox += (4 / (1 + x*x))
    }
    piApprox /= float64(n)
    return piApprox
}

func main() {
    done := make(chan interface{})
    defer close(done)

    var piApprox float64
    var n float64 = math.Pow(10, 6)
    rand := func() float64 { return rand.Float64() }
    f := func(x float64) float64 { return 4 / ((1 + x*x) * n) }

    start := time.Now()
    loop := func() <-chan float64 {
        valueStream := make(chan float64)
        go func() {
            defer close(valueStream)
            for i := 0; i < 10; i++ {
                var mean float64
                for num := range calc(done, take(done, repeatFn(done, rand), int(n)), f) {
                    mean += num
                }
                valueStream <- mean
            }
        }()
        return valueStream
    }

    for num := range loop() {
        piApprox += num
    }

    fmt.Printf("calculated pi: %v, time: %v s\n", piApprox/10, time.Since(start).Seconds())

    start = time.Now()

    piApprox = 0
    for i := 0; i < 10; i++ {
        piApprox += calculatePi(int(n))
    }
    fmt.Printf("calculated pi: %v, time: %v s\n", piApprox/10, time.Since(start).Seconds())

}

calc関数では乱数$ x$に対して$ \frac{4}{1 + x^{2}}$を計算し、それをcalcStreamに追加している。

また、calculatePiは並行処理をせずに単純にモンテカルロ計算をするための関数である。

結果

  • 並行処理
サンプル数 計算された円周率 実行時間(s)
$10$ $ 3.15711$ $ 4.36634 \times 10^{-4}$
$10^{3}$ $ 3.14847$ $0.018$
$10^{5}$ $ 3.14215$ $1.771$
$10^{7}$ $ 3.14151$ $174.374$
  • 単純に処理
サンプル数 計算された円周率 実行時間(s)
$10$ $ 3.23199$ $2.757 \times 10^{-6}$
$10^{3}$ $ 3.14217$ $ 1.75575 \times 10^{-4}$
$10^{5}$ $ 3.14065$ $0.019$
$10^{7}$ $ 3.141571$ $1.772$

考察

明らかに単純にcalculatePi関数を用いて計算した方が速いです。何重にもするのではなく、もっとパイプラインをまとめたほうが速くなりそうです。

また、パイプラインの使い方を本を元に実装する上でちょうど良い例になったのではないかと思います。

[Go] Go言語による並行処理 4章メモ1

Go言語による並行処理の第4章をまとめていく。 www.amazon.co.jp

4.1 拘束

並行プロセスを安全にする方法として、

  • データをイミュータブルにする(作成後に変更できないようにする)
  • データを拘束によって保護する

などが挙げられる。 拘束について、

  • アドホック拘束: 規約によって拘束を達成する
  • レキシカル拘束: レキシカルスコープを使って適切なデータと並行処理のプリミティブだけを複数の並行処理プロセスで使えるように公開する

の2つがある。

レキシカル拘束

3章でみた、チャネルの読み書きの必要な権限だけを公開するというのもその1つ

次の例では、printDatadataスライスには直接アクセスできず、引数として渡さねばならない。 また、printDataの中ではdataスライスの一部しか見ることができない。

printData := func(wg *sync.WaitGroup, data []byte) {
    defer wg.Done()

    var buff bytes.Buffer
    for _, b := range data {
        fmt.Fprintf(&buff, "%c", c)
    }
    fmt.Println(buff.String())
}

var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3])
go printData(&wg, data[3:])

wg.Wait()

for-selectループ

for-selectループを用いるパターンを挙げる。

  • チャネルから繰り返しの変数を送出する
for _, s := range []string{"a", "b", "c"} {
    select {
    case <- done:
        return
    case stringStream <- s:
    }
}
  • 停止シグナルを待つ無限ループ
for {
    select {
    case <-done:
        return
    default:
    }
    // 割り込みできない処理
}

または、

for {
    select {
    case <-done:
        return
    default:
        // 割り込みできない処理
    }
}

4.3 ゴルーチンリークを避ける

ゴルーチンが終了するのは、

  1. 処理を完了する場合
  2. 回復不可能なエラーで処理が続けられない場合
  3. 停止するよう命令された場合

あり、3のようにゴルーチンを終了するには、doneという読み込み専用チャネルを使用する。 ゴルーチンがゴルーチンを生成したのなら、その生成したゴルーチンを停止できるようにするべきである。

doWork := func(done <-chan interface{}, strings <-chan string) <- chan interface{} {
    terminated := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited")
        defer close(terminated)
        for {
            select {
            case s := <- strings:
                // 処理
                fmt.Println(s)
            case <- done:
                return
            }
        }
    }()
    return terminated
}

done := make(chan interface{})
terminated := doWork(done, nil)

go func() {
    // 1秒後に操作をキャンセル
    time.Sleep(1 * time.Second)
    fmt.Println("Canceling doWork goroutine...")
    close(done)
}

<-terminated
fmt.Println("done")

4.4 orチャネル

1つ以上のチャネルを1つのチャネルにまとめ、どれか1つが閉じられたら全部のチャネルを閉じるようにする。 再帰をつかったorチャネルで実現できる(実装略)

4.5 エラーハンドリング

取得される結果とエラーを対にする。 http.Get(url)をするプログラムであれば、

type Result struct {
    Error error
    Response *http.Response
}

を作り、2つをまとめる。

4.6 パイプライン

  • パイプライン: データを受け取って、何らかの処理を行い、どこかに渡すという操作をまとめたもの
  • ステージ: パイプラインで行う操作1つ1つのこと
    • 受け取る型と返す型が同じ
    • 具体化されていない
    • バッチ処理とストリーム処理
      • バッチ処理: データを一気に処理
      • 要素を1つ受け取って、1つ返す

[Go] Go言語による並行処理 3章メモ2

Go言語による並行処理の第3章をまとめていく。 www.amazon.co.jp

3.3 チャネル

宣言、初期化

値をchan型の変数に渡し、プログラムの別の場所でその値をチャネルから読み取る。 以下で宣言、初期化ができる。

var dataStream chan interface{}
dataStream = make(chan interface{})

読み込み、書き込み

送信の場合は->をチャネルの右側に、受診の場合は->を左側に置く。

stringStream := make(chan string)
go func() {
    stringStream <- "hello" // 送信
}

fmt.Println(<-stringStream) // 受診
}

上の例で送信を行うゴルーチンが終了する前にメインゴルーチンが終了しないのは、チャネルがブロックをするからである。

  • チャネルのキャパシティがいっぱいのとき: チャネルに空きができるまで書き込みを待機
  • チャネルが空: 少なくとも1つの要素が入るまで読み込みを待機

チャネルを閉じる

s, ok := <- stringStreamとしたとき、2つ目の戻り値は読み込みができたかどうか、もしくは閉じたチャネルから生成されたデフォルト値のどちらかを示す。チャネルを閉じるのは以下で行う。

valueStream := make(chan interface{})
close(valueStream)

これにより、チャネルをループで処理することができる。

intStream := make(chan int)
go func() {
    defer close(intStream)
    for i := 1; i <= 5; i++ {
        intStream <- i
    }
}()

for integer := range intStream {
    fmt.Println("%v ", integer)
}

また、3.2節のCond型で複数のゴルーチンに同時にシグナルを送信する方法もあったが、それもチャネルで代用できる。(コード略)

バッファ付きチャネル

初期化の際にキャパシティ(nとする)が与えられたチャネルを生成することで、一度も読み込みが行われなくてもn回書き込みが可能。 バッファ付きチャネルはFIFOキューと同じ動作をする。

var dataStream chan interface{}
dataStream = make(chan interface{}, 4) // キャパシティが4

チャネルの所有権

所有権: チャネルを初期化し、書き込み、閉じるゴルーチン 単方向チャネルを宣言するとチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できる。 プログラム内ではチャネルの所有権のスコープを小さくする。

3.4 select文

読み込みの場合、チャネルに書き込みがあったか、閉じられたかを、書き込みの場合キャパシティいっぱいになっていないかを確認する。どのチャネルも当てはまらない場合はブロックする。 case文全体に対して擬似乱数による一様選択をしており、複数のチャネルがあった場合、どのcaseが実行されるかはランダムとなる。

var c1, c2 <- chan interface{}
var c3 chan<- interface{}

select {
case <- c1:
    ...
case <- c2:
    ...
case c3 <- struct{}{}:
    ...
}

すべてのチャネルがブロックしているときに何かするにはdefault節を用いる。 通常、default節はfor-selectループの中で用いられる。

[Go] Go言語による並行処理 3章メモ1

Go言語による並行処理の第3章をまとめていく。 www.amazon.co.jp

3.1 ゴルーチン

関数呼び出しの前にgoキーワードを置くことでゴルーチンを起動できる。

func main() {
    go f()
    ...
}

func f {
    ...
}

3.2 syncパッケージ

3.2.1 WaitGroup

Addでカウンターを1増やし、Doneで1減らす。Waitを呼び出すとカウンターがゼロになるまでブロックする。

var wg sync.WaitGroup

wg.Add(1)
go func() {
    defer wg.Done()
    ...
}

wg.Wait()

3.2.2 MutexとRWMutex

プログラム内のクリティカルセクションを保護する。 以下の例の場合、変数aを保護、その解除を行っている。

var a int
var lock sync.Mutex

... 
go func() {
    lock.Lock()
    defer lock.Unlock()
    a++
}
...

RWMutexは読み込みと書き込みを区別することができる。Lockは上のMutexでのLockと同様、読み書き両方をロックする。RLockは書き込みのみをロックし、読み込みは可能となる。

var lock sync.RWMutex

lock.RLock()
lock.RUnlock()

lock.Lock()
lock.Unlock()

3.3 Cond

イベントの待機や発生を知らせる。

c := sync.NewCond(&sync.Mutex{})

func f() {
    c.L.Lock()
    // クリティカルセクションを操作する処理
    c.L.Unlock()
    c.Signal()
}

func main() {
    c.L.Lock()
    for conditionTrue() == false {
        c.Wait()
    }
    // クリティカルセクションを操作する処理
    go f()
    c.L.Unlock()
}

sync - Go 言語において、Waitは

自動的に c.L のロックを解除し,呼び出し側のゴルーチンの実行を中断します。 後で実行を再開した後, Wait は c.L をロックしてから戻ります。

これを上の例で見ると以下のようになる。

  1. conditionTrue()がfalseになったら、ループに入り、c.Wait()を呼び出すことで呼び出し側(main関数)の処理を一旦停止し、c.Lのロックを解除する。
  2. 関数fでクリティカルセクションを操作する処理が行われ、c.Signal()を実行することで、c.Wait()で停止されていたmain関数の処理が再開される。その際、main関数でc.Lが再びロックされる。
  3. conditionTrue()がfalseのままであったら、1に戻り、trueであったらループから抜け出し、その後の処理を行う。

c.Broadcast()ではc.Wait()となっているすべてのゴルーチンにシグナルを伝える。

3.2.4 Once

sync.Once.Doは一度しか呼び出されない。

Pool

使うものを決まった分だけ作る方法。 Getメソッドではプールに必要なインスタンスがあるか確認し、あれば呼び出し元にそれを返す。なければ、Newを呼び出した結果を返す。作業が終わるとPutメソッドを用いて使っていたインスタンスをプールに返す。

[Python] Import cyclesを解消する

はじめに

テスト駆動開発javaで書かれた部分をpythonで実装しているとき、

ImportError: cannot import name 'Money' from partially initialized module 'moneys.money' (most likely due to a circular import)

というエラーに遭遇しました。 これは循環参照と呼ばれるものであり、

# money.py
from __future__ import annotations
from .expression import Expression

class Money(Expression):
    ...
    def plus(self, added: Money) -> Expression:
        from .total import Total
        return Total(self, added)
   ...
# expression.py
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from .money import Money


class Expression(metaclass=ABCMeta):
    @abstractmethod
    def reduce(self, to: str) -> Money:
        pass

というような形で、money.pyでExpressionをインポートしているのに、expression.pyではMoneyをインポートするという循環参照と呼ばれる状態に陥っているため生じているエラーです。

このエラーはfrom typing import TYPE_CHECKINGを用いて解消できます。

解消法

# expression.py
from __future__ import annotations
from abc import ABCMeta, abstractmethod

# 追加
if TYPE_CHECKING:
    from .money import Money

class Expression(metaclass=ABCMeta):
    @abstractmethod
    def reduce(self, to: str) -> Money:
        pass

とするだけでエラーは解消されます。

より詳しく

どうしてこの2行のコードで解消されるのかをみていきます。

string literal types

pythonの関数で型を指定するとき、その型をstring型で書くことができます。

例えば、下のコードではdef f(a: A) -> None: ...と引数aの型をAとしてしまうと、pythonはまだclass Aを読み込んでいないことからエラーを出します。 しかし、引数の型をstring型の'A'としてやるとエラーを解消できます。

def f(a: 'A') -> None: ...

class A:
    pass

annotations

同様のことがfrom __future__ import annotationsにより可能となります。

以下のコードでは関数fの引数はstring型ではありませんが、annotationsをインポートしていることでエラーなしで実行できます。これは自動で型アノテーションをstring literal化するようなものだとドキュメントに書かれています。

from __future__ import annotations
def f(a: A) -> None: ...

class A:
    pass

typing.TYPE_CHECKING

typingモジュールはTYPE_CHECKINGという変数を定義しており、この変数はランタイム中にはTYPE_CHECKING=Falseであり、型チェック中にはTYPE_CHECKING=Trueとなります。

このことから、if TYPE_CHECKING:内に循環参照の原因となるモジュールをインポートするとランタイム中には実行されないため、エラーを起こさず実行することができます。

したがって、解消法で示したように循環参照のもととなっているモジュールのインポートをif TYPE_CHECKING:内に書くことでエラーが解消できることがわかります。

最後に

参考にしたドキュメントは以下になります。 mypy.readthedocs.io

こちらはTDDの本のコードをpythonで実装したときのレポジトリです。 github.com