[Go] 並行処理を用いたモンテカルロ法
Go言語による並行処理の第4章4.6パイプラインをもとにモンテカルロ法で円周率を計算する。 www.amazon.co.jp
乱数による円周率の計算
以下の積分にモンテカルロ法を適用し、円周率をモンテカルロ法で求める。 $$ \int_{0}^{1} \frac{4}{ 1 + x^{2} } dx = \pi $$
サンプル数 $ M $ を$ 10,10^{3},10^{5},10^{7} $として、
$$ \bar{f_i} = \frac{1}{M} \sum_{t = 1, ..., M} \frac{4}{1 + {X_t}^{2}} $$
を求め、推定値
$$ \bar{ \bar{f} } = \frac{\sum_i \bar{f}_i}{N_M} $$
を計算する。ただし、$ N_{M}$は独立なモンテカルロ計算の回数である。
並行処理
パイプライン: データを受け取って、何らかの処理をし、どこかに渡すという一連の処理。
本に書いてあるrepeatFn
関数やtake
関数を用いて実装する。
実装
func repeatFn(done <-chan interface{}, fn func() float64) <-chan float64 { valueStream := make(chan float64) go func() { defer close(valueStream) for { select { case <-done: return case valueStream <- fn(): } } }() return valueStream } func take(done <-chan interface{}, valueStream <-chan float64, num int) <-chan float64 { takeStream := make(chan float64) go func() { defer close(takeStream) for i := 0; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream } func calc(done <-chan interface{}, valueStream <-chan float64, fn func(float64) float64) <-chan float64 { calcStream := make(chan float64) go func() { defer close(calcStream) for x := range valueStream { select { case <-done: return case calcStream <- fn(x): } } }() return calcStream } func calculatePi(n int) float64 { var piApprox float64 for i := 0; i < n; i++ { x := rand.Float64() piApprox += (4 / (1 + x*x)) } piApprox /= float64(n) return piApprox } func main() { done := make(chan interface{}) defer close(done) var piApprox float64 var n float64 = math.Pow(10, 6) rand := func() float64 { return rand.Float64() } f := func(x float64) float64 { return 4 / ((1 + x*x) * n) } start := time.Now() loop := func() <-chan float64 { valueStream := make(chan float64) go func() { defer close(valueStream) for i := 0; i < 10; i++ { var mean float64 for num := range calc(done, take(done, repeatFn(done, rand), int(n)), f) { mean += num } valueStream <- mean } }() return valueStream } for num := range loop() { piApprox += num } fmt.Printf("calculated pi: %v, time: %v s\n", piApprox/10, time.Since(start).Seconds()) start = time.Now() piApprox = 0 for i := 0; i < 10; i++ { piApprox += calculatePi(int(n)) } fmt.Printf("calculated pi: %v, time: %v s\n", piApprox/10, time.Since(start).Seconds()) }
calc
関数では乱数$ x$に対して$ \frac{4}{1 + x^{2}}$を計算し、それをcalcStream
に追加している。
また、calculatePi
は並行処理をせずに単純にモンテカルロ計算をするための関数である。
結果
- 並行処理
サンプル数 | 計算された円周率 | 実行時間(s) |
---|---|---|
$10$ | $ 3.15711$ | $ 4.36634 \times 10^{-4}$ |
$10^{3}$ | $ 3.14847$ | $0.018$ |
$10^{5}$ | $ 3.14215$ | $1.771$ |
$10^{7}$ | $ 3.14151$ | $174.374$ |
- 単純に処理
サンプル数 | 計算された円周率 | 実行時間(s) |
---|---|---|
$10$ | $ 3.23199$ | $2.757 \times 10^{-6}$ |
$10^{3}$ | $ 3.14217$ | $ 1.75575 \times 10^{-4}$ |
$10^{5}$ | $ 3.14065$ | $0.019$ |
$10^{7}$ | $ 3.141571$ | $1.772$ |
考察
明らかに単純にcalculatePi
関数を用いて計算した方が速いです。何重にもするのではなく、もっとパイプラインをまとめたほうが速くなりそうです。
また、パイプラインの使い方を本を元に実装する上でちょうど良い例になったのではないかと思います。
[Go] Go言語による並行処理 4章メモ1
Go言語による並行処理の第4章をまとめていく。 www.amazon.co.jp
4.1 拘束
並行プロセスを安全にする方法として、
- データをイミュータブルにする(作成後に変更できないようにする)
- データを拘束によって保護する
などが挙げられる。 拘束について、
- アドホック拘束: 規約によって拘束を達成する
- レキシカル拘束: レキシカルスコープを使って適切なデータと並行処理のプリミティブだけを複数の並行処理プロセスで使えるように公開する
の2つがある。
レキシカル拘束
3章でみた、チャネルの読み書きの必要な権限だけを公開するというのもその1つ
次の例では、printData
はdata
スライスには直接アクセスできず、引数として渡さねばならない。
また、printDataの中ではdata
スライスの一部しか見ることができない。
printData := func(wg *sync.WaitGroup, data []byte) { defer wg.Done() var buff bytes.Buffer for _, b := range data { fmt.Fprintf(&buff, "%c", c) } fmt.Println(buff.String()) } var wg sync.WaitGroup wg.Add(2) data := []byte("golang") go printData(&wg, data[:3]) go printData(&wg, data[3:]) wg.Wait()
for-selectループ
for-selectループを用いるパターンを挙げる。
- チャネルから繰り返しの変数を送出する
for _, s := range []string{"a", "b", "c"} { select { case <- done: return case stringStream <- s: } }
- 停止シグナルを待つ無限ループ
for { select { case <-done: return default: } // 割り込みできない処理 }
または、
for { select { case <-done: return default: // 割り込みできない処理 } }
4.3 ゴルーチンリークを避ける
ゴルーチンが終了するのは、
- 処理を完了する場合
- 回復不可能なエラーで処理が続けられない場合
- 停止するよう命令された場合
あり、3のようにゴルーチンを終了するには、done
という読み込み専用チャネルを使用する。
ゴルーチンがゴルーチンを生成したのなら、その生成したゴルーチンを停止できるようにするべきである。
doWork := func(done <-chan interface{}, strings <-chan string) <- chan interface{} { terminated := make(chan interface{}) go func() { defer fmt.Println("doWork exited") defer close(terminated) for { select { case s := <- strings: // 処理 fmt.Println(s) case <- done: return } } }() return terminated } done := make(chan interface{}) terminated := doWork(done, nil) go func() { // 1秒後に操作をキャンセル time.Sleep(1 * time.Second) fmt.Println("Canceling doWork goroutine...") close(done) } <-terminated fmt.Println("done")
4.4 orチャネル
1つ以上のチャネルを1つのチャネルにまとめ、どれか1つが閉じられたら全部のチャネルを閉じるようにする。 再帰をつかったorチャネルで実現できる(実装略)
4.5 エラーハンドリング
取得される結果とエラーを対にする。
http.Get(url)
をするプログラムであれば、
type Result struct { Error error Response *http.Response }
を作り、2つをまとめる。
4.6 パイプライン
[Go] Go言語による並行処理 3章メモ2
Go言語による並行処理の第3章をまとめていく。 www.amazon.co.jp
3.3 チャネル
宣言、初期化
値をchan
型の変数に渡し、プログラムの別の場所でその値をチャネルから読み取る。
以下で宣言、初期化ができる。
var dataStream chan interface{} dataStream = make(chan interface{})
読み込み、書き込み
送信の場合は->
をチャネルの右側に、受診の場合は->
を左側に置く。
stringStream := make(chan string) go func() { stringStream <- "hello" // 送信 } fmt.Println(<-stringStream) // 受診 }
上の例で送信を行うゴルーチンが終了する前にメインゴルーチンが終了しないのは、チャネルがブロックをするからである。
- チャネルのキャパシティがいっぱいのとき: チャネルに空きができるまで書き込みを待機
- チャネルが空: 少なくとも1つの要素が入るまで読み込みを待機
チャネルを閉じる
s, ok := <- stringStream
としたとき、2つ目の戻り値は読み込みができたかどうか、もしくは閉じたチャネルから生成されたデフォルト値のどちらかを示す。チャネルを閉じるのは以下で行う。
valueStream := make(chan interface{}) close(valueStream)
これにより、チャネルをループで処理することができる。
intStream := make(chan int) go func() { defer close(intStream) for i := 1; i <= 5; i++ { intStream <- i } }() for integer := range intStream { fmt.Println("%v ", integer) }
また、3.2節のCond
型で複数のゴルーチンに同時にシグナルを送信する方法もあったが、それもチャネルで代用できる。(コード略)
バッファ付きチャネル
初期化の際にキャパシティ(nとする)が与えられたチャネルを生成することで、一度も読み込みが行われなくてもn回書き込みが可能。 バッファ付きチャネルはFIFOキューと同じ動作をする。
var dataStream chan interface{} dataStream = make(chan interface{}, 4) // キャパシティが4
チャネルの所有権
所有権: チャネルを初期化し、書き込み、閉じるゴルーチン 単方向チャネルを宣言するとチャネルを所有するゴルーチンとチャネルを利用するだけのゴルーチンを区別できる。 プログラム内ではチャネルの所有権のスコープを小さくする。
3.4 select文
読み込みの場合、チャネルに書き込みがあったか、閉じられたかを、書き込みの場合キャパシティいっぱいになっていないかを確認する。どのチャネルも当てはまらない場合はブロックする。 case文全体に対して擬似乱数による一様選択をしており、複数のチャネルがあった場合、どのcaseが実行されるかはランダムとなる。
var c1, c2 <- chan interface{} var c3 chan<- interface{} select { case <- c1: ... case <- c2: ... case c3 <- struct{}{}: ... }
すべてのチャネルがブロックしているときに何かするにはdefault
節を用いる。
通常、default
節はfor-selectループの中で用いられる。
[Go] Go言語による並行処理 3章メモ1
Go言語による並行処理の第3章をまとめていく。 www.amazon.co.jp
3.1 ゴルーチン
関数呼び出しの前にgo
キーワードを置くことでゴルーチンを起動できる。
func main() { go f() ... } func f { ... }
3.2 syncパッケージ
3.2.1 WaitGroup
Addでカウンターを1増やし、Doneで1減らす。Waitを呼び出すとカウンターがゼロになるまでブロックする。
var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() ... } wg.Wait()
3.2.2 MutexとRWMutex
プログラム内のクリティカルセクションを保護する。
以下の例の場合、変数a
を保護、その解除を行っている。
var a int var lock sync.Mutex ... go func() { lock.Lock() defer lock.Unlock() a++ } ...
RWMutexは読み込みと書き込みを区別することができる。Lockは上のMutexでのLockと同様、読み書き両方をロックする。RLockは書き込みのみをロックし、読み込みは可能となる。
var lock sync.RWMutex
lock.RLock()
lock.RUnlock()
lock.Lock()
lock.Unlock()
3.3 Cond
イベントの待機や発生を知らせる。
c := sync.NewCond(&sync.Mutex{}) func f() { c.L.Lock() // クリティカルセクションを操作する処理 c.L.Unlock() c.Signal() } func main() { c.L.Lock() for conditionTrue() == false { c.Wait() } // クリティカルセクションを操作する処理 go f() c.L.Unlock() }
sync - Go 言語において、Waitは
自動的に c.L のロックを解除し,呼び出し側のゴルーチンの実行を中断します。 後で実行を再開した後, Wait は c.L をロックしてから戻ります。
これを上の例で見ると以下のようになる。
conditionTrue()
がfalseになったら、ループに入り、c.Wait()
を呼び出すことで呼び出し側(main関数)の処理を一旦停止し、c.Lのロックを解除する。- 関数fでクリティカルセクションを操作する処理が行われ、
c.Signal()
を実行することで、c.Wait()
で停止されていたmain関数の処理が再開される。その際、main関数でc.Lが再びロックされる。 conditionTrue()
がfalseのままであったら、1に戻り、trueであったらループから抜け出し、その後の処理を行う。
c.Broadcast()
ではc.Wait()
となっているすべてのゴルーチンにシグナルを伝える。
3.2.4 Once
sync.Once.Do
は一度しか呼び出されない。
Pool
使うものを決まった分だけ作る方法。
Get
メソッドではプールに必要なインスタンスがあるか確認し、あれば呼び出し元にそれを返す。なければ、New
を呼び出した結果を返す。作業が終わるとPut
メソッドを用いて使っていたインスタンスをプールに返す。
[Python] Import cyclesを解消する
はじめに
テスト駆動開発のjavaで書かれた部分をpythonで実装しているとき、
ImportError: cannot import name 'Money' from partially initialized module 'moneys.money' (most likely due to a circular import)
というエラーに遭遇しました。 これは循環参照と呼ばれるものであり、
# money.py from __future__ import annotations from .expression import Expression class Money(Expression): ... def plus(self, added: Money) -> Expression: from .total import Total return Total(self, added) ...
# expression.py from __future__ import annotations from abc import ABCMeta, abstractmethod from .money import Money class Expression(metaclass=ABCMeta): @abstractmethod def reduce(self, to: str) -> Money: pass
というような形で、money.py
でExpressionをインポートしているのに、expression.py
ではMoneyをインポートするという循環参照と呼ばれる状態に陥っているため生じているエラーです。
このエラーはfrom typing import TYPE_CHECKING
を用いて解消できます。
解消法
# expression.py from __future__ import annotations from abc import ABCMeta, abstractmethod # 追加 if TYPE_CHECKING: from .money import Money class Expression(metaclass=ABCMeta): @abstractmethod def reduce(self, to: str) -> Money: pass
とするだけでエラーは解消されます。
より詳しく
どうしてこの2行のコードで解消されるのかをみていきます。
string literal types
pythonの関数で型を指定するとき、その型をstring型で書くことができます。
例えば、下のコードではdef f(a: A) -> None: ...
と引数aの型をA
としてしまうと、pythonはまだclass A
を読み込んでいないことからエラーを出します。
しかし、引数の型をstring型の'A'
としてやるとエラーを解消できます。
def f(a: 'A') -> None: ... class A: pass
annotations
同様のことがfrom __future__ import annotations
により可能となります。
以下のコードでは関数fの引数はstring型ではありませんが、annotationsをインポートしていることでエラーなしで実行できます。これは自動で型アノテーションをstring literal化するようなものだとドキュメントに書かれています。
from __future__ import annotations def f(a: A) -> None: ... class A: pass
typing.TYPE_CHECKING
typingモジュールはTYPE_CHECKINGという変数を定義しており、この変数はランタイム中にはTYPE_CHECKING=False
であり、型チェック中にはTYPE_CHECKING=True
となります。
このことから、if TYPE_CHECKING:
内に循環参照の原因となるモジュールをインポートするとランタイム中には実行されないため、エラーを起こさず実行することができます。
したがって、解消法で示したように循環参照のもととなっているモジュールのインポートをif TYPE_CHECKING:
内に書くことでエラーが解消できることがわかります。
最後に
参考にしたドキュメントは以下になります。 mypy.readthedocs.io
こちらはTDDの本のコードをpythonで実装したときのレポジトリです。 github.com