What is it, naokirin?

GParsのデータ並列のハロワ的コードを書いてみた

GPars大きいですね…

データ並列の部分で書くだけでこんな量になるとは。しかも機能が他にもいろいろあって、全然終わらない。

これは腰を据えてやっていかないと全容把握は難しいかも。

今回はGroovy 1.8.1とGPars 0.12を使いました。

GParsPool

import groovyx.gpars.GParsPool

GParsPool.withPool {
  ['H','e','l','l','o',' ','W','o','r','l','d'].eachParallel {
     print it
  }
}

result(example):

 rldWolloeH

GParsPool.withPool内でeachなどのイテレーティブなメソッドを並列に処理してくれるxxxParallel()というメソッドが使える。詳しくはJSR-166y(ParallelArray)を基にした並列処理をするらしい。
結果の出力の順序は並列処理されているために実行のたびに変わる。

ちなみにwithPoolに引数でスレッド数を指定できる。デフォルトは(コア数 + 1)です。

GParsExecutorsPool

import groovyx.gpars.GParsExecutorsPool

GParsExecutorsPool.withPool {
  ['H','e','l','l','o',' ','W','o','r','l','d'].eachParallel {
     print it
  }
}

result(example):

Hloe oWrlld

GParsExecutorsPool.withPool内でeachなどのイテレーティブなメソッドを並列に処理してくれるxxxParallel()というメソッドが使える。ただし、GparsPoolよりは対応しているメソッドが少ない模様。詳しくはJava Executorをもとにしている模様。
結果の出力の順序は並列処理されているために実行のたびに変わる。

Parallel Enchantor

import groovyx.gpars.ParallelEnhancer

def list = 'Hello World'.toCharArray()
ParallelEnhancer.enhanceInstance(list)

println list.collectParallel{it.toString() * 2}.inject(""){i, j -> i + j}

result:

HHeelllloo  WWoorrlldd

ParallelEnchantorを使うとCollectionをGPars(Executors)Pool.withPool内で無くとも、xxxParallel()が使えるようになる。

Memoize

import groovyx.gpars.GParsPool

// 処理時間計測用のメソッド
def elapsed(Closure c) {
  def start = System.currentTimeMillis()
  c.call()
  println ("\ntime=" + (System.currentTimeMillis() - start))
}

GParsPool.withPool {
  def data = ['H','e','l','l','o',' ','W','o','r','l','d']
  Closure clos = {index ->
    sleep(100)
    data[index]
  }.gmemoize()

  elapsed{data.size().times {print clos(it)}}
  println ""
  elapsed{data.size().times {print clos(it)}}
}

result(example):

Hello World
time=1111

Hello World
time=4

GParsPool.withPool内でクロージャに対してgmemoize()を呼び出しておくと、次に呼び出されたときにその結果をキャッシュしておき、2回目以降に同じ引数で呼び出されたときにそのキャッシュを使うようになります。

今回はクロージャ内でsleep(100)を呼び出しているため、一度目は時間がかかっていますが、二回目はキャッシュのデータを使っているためsleep()が呼び出されず、高速に処理されています。
ネットからのダウンロードや、ファイル入力でキャッシュにしておいても問題ない場合などには重宝しそうです。ですが、キャッシュされる分メモリが使われるということを考慮しなければならないので注意してください。一応、キャッシュの最大、最小を引数で渡せるメソッドもあるのでその辺りも活用するのがいいと思います。

Map/Reduce

import groovyx.gpars.GParsPool

GParsPool.withPool {
  print("Hello World".parallel
        .map {[it, 1]}
        .groupBy {it[0]}.getParallel()
        .map {it.value = it.value.size; it}
        .sort {-it.value}.collection
  )
}

result:

[l=3, o=2, H=1, e=1, W=1,  =1, d=1, r=1]

Map/Reduceアルゴリズム関数型プログラミングのように扱える機能です。

対応していないメソッドはgetParallel()を呼ぶこと、最後のデータはラッピングされているのでcollectionを呼ぶことが必要です。

Parallel Arrays

import groovyx.gpars.GParsPool
import extra166y.Ops.Reducer
import extra166y.Ops

GParsPool.withPool {
  println 'Hello World'.parallelArray
  .withMapping( {
    if(it.matches(/[a-z]/)) it.toUpperCase()
    else if(it.matches(/[A-Z]/)) it.toLowerCase()
    else it
    } as Ops.Op
  )
  .reduce({a, b -> "$a$b"} as Reducer, "")
}

result:

hELLO wORLD

JSR-166yを直接使う方法のようです(たぶん…)。

Composable Asyncronous Function

import groovyx.gpars.AsyncFun
import groovyx.gpars.GParsPool

class SearchCombine{
  def array1 = ["Good night", "Good evening", "Hello", "Good morning", "Hero"]
  def array2 = ["Scala", "Java", "Ruby", "Clojure", "Groovy"]

  @AsyncFun Closure searchWord ={List<String> texts, String word ->
    texts.find { it.contains(word) }
  }

  @AsyncFun Closure comb = {String first, String second ->
    first + " " + second
  }

  String result() {
    return comb(searchWord(array1, "Hel"), searchWord(array2, "G")).get()
  }
}

GParsPool.withPool {
  def result = new SearchCombine().result()
  println result
}

result:

Hello Groovy

@AsyncFunをつけたクロージャを並列処理できます。
asyncFun()を呼び出して作ったクロージャも使えるはずですが、正直分かりません(ぉ

Asyncronous Invocation

import groovyx.gpars.GParsPool

GParsPool.withPool {
  def results = []
  'Hello'.each {word ->
    results << [{it}, {"lower: " + it.toLowerCase()}, {"upper: " + it.toUpperCase()}, {"ascii: " + it.bytes}]*.callAsync(word)
  }
  results.each {print it*.get()}
}

result:

[H, lower: h, upper: H, ascii: [72]][e, lower: e, upper: E, ascii: [101]][l, lower: l, upper: L, ascii: [108]][l, lower: l, upper: L, ascii: [108]][o, lower: o, upper: O, ascii: [111]]

callAsync()メソッドを呼び出すことで、クロージャを非同期実行します。

Parallel Speculation

import groovyx.gpars.GParsPool

def array = ["Hello Java", "Hello Scala", "Hello Clojure", "Hello Groovy"]
def alternative0 = {array[0]}
def alternative1 = {array[1]}
def alternative2 = {array[2]}
def alternative3 = {array[3]}

GParsPool.withPool(4) { println GParsPool.speculate([alternative0, alternative1, alternative2, alternative3]) }

result(example):

Hello Groovy

GParsPool.speculationを使うと、配列で渡したクロージャを並列に処理して、そのうちでもっとも早く処理が終わったものを結果として使用します。アルゴリズムをその都度プログラマが選んだりせず、並列処理を有効に利用して結果的に早いアルゴリズムを使用できるようにできたりするみたいです。
今回は単純に配列にアクセスするだけなので、配列の要素のうちのどれかが出てくる乱数処理みたいになってますw

Fork/Join

import groovyx.gpars.GParsPool

Closure hello = {List<String> names ->
  if(names.size() <= 1)
    return "hello " + names[0]
  names.each { forkOffChild([it]) }
  getChildrenResults().each{println it}
}

GParsPool.withPool { GParsPool.runForkJoin(["Anna", "John", "Ken"], hello) }

result:

hello Anna
hello John
hello Ken

GParsPoolではFork/Joinを使うことができます。
forkOffChild()でFork、getChildrenResults()でJoinです。
実際にFork/Joinを走らせるにはrunForkJoin()を使います。


書いてみたのですが、ほとんどの機能が使えていません…
GParsには今回書いたデータ並列用の他にアクター、データフロー、エージェント、Stmなどがあり、かなり大きなライブラリです。

全部は使える気がしませんね。このサンプルを書いているときに実はmakeConcurrent()というメソッドが上手く機能してくれなかったりしています。よくわからないです…