GPars大きいですね…
データ並列の部分で書くだけでこんな量になるとは。しかも機能が他にもいろいろあって、全然終わらない。
これは腰を据えてやっていかないと全容把握は難しいかも。
今回はGroovy 1.8.1とGPars 0.12を使いました。
GParsPool
import groovyx.gpars.GParsPool GParsPool.withPool { ['H','e','l','l','o',' ','W','o','r','l','d'].eachParallel { print it } }
result(example):
rldWolloeH
GParsPool.withPool内でeachなどのイテレーティブなメソッドを並列に処理してくれるxxxParallel()というメソッドが使える。詳しくはJSR-166y(ParallelArray)を基にした並列処理をするらしい。
結果の出力の順序は並列処理されているために実行のたびに変わる。
ちなみにwithPoolに引数でスレッド数を指定できる。デフォルトは(コア数 + 1)です。
GParsExecutorsPool
import groovyx.gpars.GParsExecutorsPool GParsExecutorsPool.withPool { ['H','e','l','l','o',' ','W','o','r','l','d'].eachParallel { print it } }
result(example):
Hloe oWrlld
GParsExecutorsPool.withPool内でeachなどのイテレーティブなメソッドを並列に処理してくれるxxxParallel()というメソッドが使える。ただし、GparsPoolよりは対応しているメソッドが少ない模様。詳しくはJava Executorをもとにしている模様。
結果の出力の順序は並列処理されているために実行のたびに変わる。
Parallel Enchantor
import groovyx.gpars.ParallelEnhancer def list = 'Hello World'.toCharArray() ParallelEnhancer.enhanceInstance(list) println list.collectParallel{it.toString() * 2}.inject(""){i, j -> i + j}
result:
HHeelllloo WWoorrlldd
ParallelEnchantorを使うとCollectionをGPars(Executors)Pool.withPool内で無くとも、xxxParallel()が使えるようになる。
Memoize
import groovyx.gpars.GParsPool // 処理時間計測用のメソッド def elapsed(Closure c) { def start = System.currentTimeMillis() c.call() println ("\ntime=" + (System.currentTimeMillis() - start)) } GParsPool.withPool { def data = ['H','e','l','l','o',' ','W','o','r','l','d'] Closure clos = {index -> sleep(100) data[index] }.gmemoize() elapsed{data.size().times {print clos(it)}} println "" elapsed{data.size().times {print clos(it)}} }
result(example):
Hello World time=1111 Hello World time=4
GParsPool.withPool内でクロージャに対してgmemoize()を呼び出しておくと、次に呼び出されたときにその結果をキャッシュしておき、2回目以降に同じ引数で呼び出されたときにそのキャッシュを使うようになります。
今回はクロージャ内でsleep(100)を呼び出しているため、一度目は時間がかかっていますが、二回目はキャッシュのデータを使っているためsleep()が呼び出されず、高速に処理されています。
ネットからのダウンロードや、ファイル入力でキャッシュにしておいても問題ない場合などには重宝しそうです。ですが、キャッシュされる分メモリが使われるということを考慮しなければならないので注意してください。一応、キャッシュの最大、最小を引数で渡せるメソッドもあるのでその辺りも活用するのがいいと思います。
Map/Reduce
import groovyx.gpars.GParsPool GParsPool.withPool { print("Hello World".parallel .map {[it, 1]} .groupBy {it[0]}.getParallel() .map {it.value = it.value.size; it} .sort {-it.value}.collection ) }
result:
[l=3, o=2, H=1, e=1, W=1, =1, d=1, r=1]
Map/Reduceアルゴリズムを関数型プログラミングのように扱える機能です。
対応していないメソッドはgetParallel()を呼ぶこと、最後のデータはラッピングされているのでcollectionを呼ぶことが必要です。
Parallel Arrays
import groovyx.gpars.GParsPool import extra166y.Ops.Reducer import extra166y.Ops GParsPool.withPool { println 'Hello World'.parallelArray .withMapping( { if(it.matches(/[a-z]/)) it.toUpperCase() else if(it.matches(/[A-Z]/)) it.toLowerCase() else it } as Ops.Op ) .reduce({a, b -> "$a$b"} as Reducer, "") }
result:
hELLO wORLD
JSR-166yを直接使う方法のようです(たぶん…)。
Composable Asyncronous Function
import groovyx.gpars.AsyncFun import groovyx.gpars.GParsPool class SearchCombine{ def array1 = ["Good night", "Good evening", "Hello", "Good morning", "Hero"] def array2 = ["Scala", "Java", "Ruby", "Clojure", "Groovy"] @AsyncFun Closure searchWord ={List<String> texts, String word -> texts.find { it.contains(word) } } @AsyncFun Closure comb = {String first, String second -> first + " " + second } String result() { return comb(searchWord(array1, "Hel"), searchWord(array2, "G")).get() } } GParsPool.withPool { def result = new SearchCombine().result() println result }
result:
Hello Groovy
@AsyncFunをつけたクロージャを並列処理できます。
asyncFun()を呼び出して作ったクロージャも使えるはずですが、正直分かりません(ぉ
Asyncronous Invocation
import groovyx.gpars.GParsPool GParsPool.withPool { def results = [] 'Hello'.each {word -> results << [{it}, {"lower: " + it.toLowerCase()}, {"upper: " + it.toUpperCase()}, {"ascii: " + it.bytes}]*.callAsync(word) } results.each {print it*.get()} }
result:
[H, lower: h, upper: H, ascii: [72]][e, lower: e, upper: E, ascii: [101]][l, lower: l, upper: L, ascii: [108]][l, lower: l, upper: L, ascii: [108]][o, lower: o, upper: O, ascii: [111]]
callAsync()メソッドを呼び出すことで、クロージャを非同期実行します。
Parallel Speculation
import groovyx.gpars.GParsPool def array = ["Hello Java", "Hello Scala", "Hello Clojure", "Hello Groovy"] def alternative0 = {array[0]} def alternative1 = {array[1]} def alternative2 = {array[2]} def alternative3 = {array[3]} GParsPool.withPool(4) { println GParsPool.speculate([alternative0, alternative1, alternative2, alternative3]) }
result(example):
Hello Groovy
GParsPool.speculationを使うと、配列で渡したクロージャを並列に処理して、そのうちでもっとも早く処理が終わったものを結果として使用します。アルゴリズムをその都度プログラマが選んだりせず、並列処理を有効に利用して結果的に早いアルゴリズムを使用できるようにできたりするみたいです。
今回は単純に配列にアクセスするだけなので、配列の要素のうちのどれかが出てくる乱数処理みたいになってますw
Fork/Join
import groovyx.gpars.GParsPool Closure hello = {List<String> names -> if(names.size() <= 1) return "hello " + names[0] names.each { forkOffChild([it]) } getChildrenResults().each{println it} } GParsPool.withPool { GParsPool.runForkJoin(["Anna", "John", "Ken"], hello) }
result:
hello Anna hello John hello Ken
GParsPoolではFork/Joinを使うことができます。
forkOffChild()でFork、getChildrenResults()でJoinです。
実際にFork/Joinを走らせるにはrunForkJoin()を使います。
書いてみたのですが、ほとんどの機能が使えていません…
GParsには今回書いたデータ並列用の他にアクター、データフロー、エージェント、Stmなどがあり、かなり大きなライブラリです。
全部は使える気がしませんね。このサンプルを書いているときに実はmakeConcurrent()というメソッドが上手く機能してくれなかったりしています。よくわからないです…