What is it, naokirin?

Embulkでバッチ処理をしてみよう 其の1

かなり昔から使われているツールですが、最近Embulkを利用することがあったので忘れないうちに入門の情報を書いていきます。

よくEmbulk は Fluentd のバッチ版などと呼ばれて紹介されています。

Fluentdとは違い、コマンドなどで実行したときのみデータソースからアウトプット先へデータを流すバッチ処理を行います。 類似の機能としてGoogle Dataflowなどがありますが、Embulkは自分で立てられる点やプラグインの自由度があり、Google Cloudを使っていない場合やすでに決まったデータソースがあったりする場合などは重宝します。

今回は基本的な機能やDockerによる実行までを確認していきます。

Embulkとは

Embulk — Embulk 0.8 documentation

Embulkはデータソースからデータを取得、様々なフィルタやバリデーションなどをかけた上で、アウトプット先へデータをロードするためのバッチ処理用のツールになります。データソースからの取得やフィルタ、アウトプットへのロード処理などは対応したプラグインを利用し、設定をすることで簡単に実行させることができるようになります。

f:id:naokirin:20181231130816p:plain

ちなみにEmbulkはJavaJRubyがメインで実行されており、Java7, 8のバージョンでしか実行できません。そのため他でJavaを利用している環境で併用することはおすすめしません。どうしても利用する場合はDockerなどのVM上で利用するなどを検討しましょう。

EmbulkをDockerで実行できるようにしよう

それでは詳細な説明より前に、まずはDockerでEmbulkを実行できるようにして各機能を都度実行して確認できるようにしてみましょう。 Dockerを利用できる環境がない場合は先にそちらの準備をしてみてください。

サンプルのDockerfileを利用してembulkを実行しよう

まずは以下のリポジトリをクローンしてきて、 example_1 のタグにチェックアウトしましょう。

GitHub - naokirin/embulk_examples

$ git clone https://github.com/naokirin/embulk_examples
$ cd embulk_examples
$ git checkout example_1

それではこのクローンしたリポジトリに移動した上で以下を実行し、dockerでコンテナを実行しましょう。

$ docker build -t embulk_examples .
$ docker run -it --rm embulk_examples sh -c "/embulk/embulk --version"

これでEmbulkのバージョンが表示されれば準備は成功です。

記事の時点での最新バージョンとして embulk 0.9.12 を利用するようにしていますが、もしより最新のバージョンを利用したい場合には、リポジトリのDockerfileの ENV EMBULK_VERSION 0.9.12 の項目のバージョンを変更して docker build から再実行してみてください。

サンプルの設定を実行してみよう

それではEmbulkを実行するために必要な設定を見ていきましょう。

Embulkを実行する上で基本となる設定は、以下の2つです。

  • プラグインのインストールのための Gemfile
  • インプット、アウトプットなどの実際の処理を設定をするYAMLファイル

それではサンプルの設定を参考にしつつ、設定の中身を見ていきます。

プラグインのインストールのためのGemfile

Gemfileは簡単に言うと、Ruby用のgemと呼ばれるパッケージを管理するためのBundlerで設定として利用するファイルのことです。 embulkではサブコマンドとしてこのBundlerを実行して必要なプラグインをインストールすることができます。基本的な記法は一般的なRubyでのGemfileと同じため、より詳しいバージョン指定方法などはそちらを参考にしてください。

それではサンプルの設定がリポジトリにあるので、見てみましょう。

source 'https://rubygems.org/'

gem 'embulk'

gem 'embulk-input-randomj'
gem 'embulk-output-command'
gem 'embulk-formatter-fast_jsonl'

このうち、 source 'https://rubygems.org/'gem 'embulk' については一旦おまじないと思って無視してください。

これ以降は使用するプラグインのgemパッケージを指定しています。今回は簡単なサンプルということでランダムなデータを生成できる embulk-input-randomj 、コマンド実行によりアウトプットできる embulk-output-command を利用します。実際にテストする際にもこのようなシンプルなインプットやアウトプット用のプラグインを利用することでその他プラグインの挙動のチェックなどを簡単に行うことができます。

最後の embulk-formatter-fast_jsonl は、アウトプット時に行単位のjsonに変換するために利用します。CSV以外の形式で出力する場合にはよく利用される形式かと思います。各プラグインについてはまた改めて説明をします。

どのようなプラグインがあるかについては以下を参考にしてください。ただし、実際の業務などで利用する場合にはリンク先をきちんと確認してプラグインの機能を確認して利用することをおすすめします。一部機能が正しく動かないなどもあるため、その点には注意して利用しましょう。

List of Embulk Plugins by Category

インプット、アウトプットなどの実際の処理を設定をするYAMLファイル

それでは次に実際のバッチ処理の設定を行うYAMLファイルについてです。

リポジトリでは config/example_1.yaml にあります。

in:
  type: randomj
  rows: 10
  threads: 1
  primary_key: id
  schema:
    - {name: id, type: long}
    - {name: value, type: string, length: 10}
out:
  type: command
  command: "cat - > '/workdir/output.txt'"
  formatter:
    type: fast_jsonl

まずは inout がトップに並んでいます。これはデータソースからのインプットで利用するプラグインとその設定、アウトプットに利用するプラグインとその設定を書く部分となります。 typeプラグイン名を指定し、それ以降では各プラグインの仕様に沿った設定を記載します。

また out の項目にある formatter は各データのフォーマットを変換するプラグインの設定を行う部分です。

Gemfile に書いていたプラグインの名称が type の項目に書かれているのがわかるかと思います。

今回は、ランダムな数字の id と10文字の文字列の value のデータを10個用意してそれを output.txt に出力します。

# Dockerでマウントする都合上、先に空の出力ファイルを作成します
$ touch output.txt
$ docker run -it --rm -v $(pwd)/output.txt:/workdir/output.txt embulk_examples sh -c "/embulk/embulk run config/example_1.yaml"

output.txt に 1〜10の id と10文字のランダムな value をキーにしたデータが作成されたでしょうか?

このように、インプットしたデータを1行単位で処理してアウトプットに出力していくのがEmbulkの基本的な挙動となります。これだけではまだEmbulkなしでもできるようなスクリプト実行レベルの機能に見えますが、この時点でもEmbulkの利点が見えてきます。

先程実行した際のEmbulkのログ出力を見てみましょう(出力内容は環境ごとにやや異なると思います)

Embulk v0.9.12
[WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.                                                    
[INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"                                                               
[INFO] (main): Started Embulk v0.9.12
[INFO] (0001:transaction): Loaded plugin embulk-input-randomj (0.5.1)                                                                         
[INFO] (0001:transaction): Loaded plugin embulk-output-command (0.1.4)                                                                        
[INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4                                
[INFO] (0001:transaction): Loaded plugin embulk-formatter-fast_jsonl (0.1.2)                                                                  
[INFO] (0001:transaction): {done:  0 / 1, running: 0}
[INFO] (0015:task-0000): Using command [sh, -c, cat - > '/workdir/output.txt']                                                                
[INFO] (0015:task-0000): Using command [sh, -c, cat - > '/workdir/output.txt']                                                                
[INFO] (0015:task-0000): Using command [sh, -c, cat - > '/workdir/output.txt']                                                                
[INFO] (0015:task-0000): Using command [sh, -c, cat - > '/workdir/output.txt']                                                                
[INFO] (0001:transaction): {done:  1 / 1, running: 0}
[INFO] (main): Committed.
[INFO] (main): Next config diff: {"in":{},"out":{}}

よく見ていくと、スレッドを利用した並列、並行処理を自動的に行なってくれていることがわかります。このように、Embulkを用いることでマルチスレッドなデータ処理を手軽に実行できるのも大きな利点の1つです。またここまでコード類を一切書かずに実行することができているのも大きな利点の1つと言えます。

今回はEmbulkの雰囲気を掴むところまでとなりましたが、次からはコマンド類やより実用的な話ができればと思います。