今回は少し前に Embulk を使う際に input plugin を用途に合わせて書き換えを行ったりしたので、そのあたりの簡単な input plugin の実装の方法について書いていきます。
Embulk 自体については正月あたりに書いた記事を参照してみてください。
Embulkでバッチ処理をしてみよう 其の1 - what is it, naokirin?
Embulkでバッチ処理をしてみよう 其の2 - what is it, naokirin?
Embulkでバッチ処理をしてみよう 其の3 - what is it, naokirin?
Embulkでバッチ処理をしてみよう 其の4 - what is it, naokirin?
ちなみに今回は実装が簡単で、今回書き換えを行う際にも使用したRubyによる実装を行います。ただし、パフォーマンスを気にする場合には Java を利用することが推奨されます。理由としては、JRubyにおけるオブジェクトデータの変換部分にコストがかかるためのようです。
参考: embulk で pure java と jruby でプラグインを作ったときの速度比較 - Qiita
(参考元の記事が古い情報なのと、個人的に実際に動かしてみたりはしていないので必要な場合は調べてみてください)
固定のデータを流すプラグインを作成しますが、すでに command プラグインなどがあるため、実際の利用についてはそちらを検討してください。
また gem として公開する部分については記載しませんので、そちらを知りたい方は別の記事を参照してみてください。
プラグインテンプレートの作成
まずはプラグインテンプレートの作成をします。
今回は example という名前のInputプラグインを作成します。
$ embulk new ruby-input example
これで embulk-input-example というディレクトリが作成されて、その下にテンプレートが作成されます。
$ ls embulk-input-example
Gemfile LICENSE.txt README.md Rakefile embulk-input-example.gemspec lib
プラグインの実装をする
ソースコードは lib
以下にあります。
$ head -n 6 lib/embulk/input/example.rb module Embulk module Input class Example < InputPlugin Plugin.register_input("example", self)
まず、プラグインの実装は Embulk::Input
モジュール以下に作成する必要があること、 InputPlugin
を継承することが必要になります。
次にメソッドを見てみましょう。生成された時点で下記のメソッドが存在することがわかります(コメントアウト分も含めて記載しています)。
- self.transaction
- self.resume
- self.guess
- init
- run
これらのうち、 self.transaction
、 init
、 run
については必須になります。
それ以外については基本的に実装を変更する必要がないため、今回は割愛します。
self.transaction
まずは self.transaction を実装します。
ここでは yaml ファイルのプラグイン向けの設定から受け取ったパラメータを config
として受け取り、 task
、 columns
、 threads
という3つを定義して self.resume
に渡して、レジュームのための処理と後続の処理の実行をします。
最終的には以下のような実装になりますが、各項目について見ていきます。
def self.transaction(config, &control) inputs = config[:inputs] task = { inputs: inputs, count: config.param(:count, :integer, default: 1) } columns = [] config[:columns].each_with_index do |c, i| columns << Column.new(i, c['name'], c['type'].to_sym) end threads = 1 resume(task, columns, threads, &control) end
task
task
では実行する際に利用するパラメータをハッシュにして後続の処理に渡します。
今回は設定として固定値の入力データとそれを繰り返す回数を受け取るようにしておきます。
inputs = config[:inputs] task = { inputs: inputs, count: config.param(:count, :integer, default: 1) } ...
config.param()
により設定を読み込むことができます。デフォルト値も設定できますが、デフォルト値を設定しない場合に設定項目が存在しない場合は実行時に例外となります。
config.param(<設定項目名>, <型名>, default: <デフォルト値>)
もしくは []
によるアクセスもできます。こちらではyamlに設定項目が存在しない場合、 nil
となります。
columns
columns
では読み込むデータのカラムのスキーマを定義して後続の処理に渡します。
固定されている場合もあるとは思いますが、多くの場合でスキーマ自体は固定されていないと思いますので、今回は yaml の設定から読み込んだものを利用するようにします。
columns = [] config[:columns].each_with_index do |column, i| columns << Column.new(i, column['name'], column['type'].to_sym) end
利用できる型名は以下になります。
- :boolean
- :long
- :double
- :string
- :timestamp
- :json
embulk/column.rb at master · embulk/embulk · GitHub
threads
スレッド数は今回固定の 1 としますが、必要な場合は変更するようにします。
threads = 1
init
init
では実際の実行を行う前の前処理をすることができます。
InputPlugin
で以下のような実装がされていることからも、どのような挙動になるかわかるかと思います。
embulk/input_plugin.rb at master · embulk/embulk · GitHub
def initialize(task, schema, index, page_builder) @task = task @schema = schema @index = index @page_builder = page_builder init end
今回は利用しないため、実装はしません。
run
run
では実際の実行処理を記述します。
@page_builder.add
で self.transaction
で登録した順序で実際のデータを追加していき、 @page_builder.finish
で処理を終了します。
def run inputs = @task[:inputs] count = @task[:count] count.times do |i| inputs.each do |input| @page_builder.add(input) end end @page_builder.finish end
実行してみよう
ローカルにあるプラグインの実装をもとに Embulk を実行してみます。
まずはテスト実行用の Gemfile と yamlファイルを作成します。
# Gemfile source 'https://rubygems.org/' gem 'embulk' gem 'embulk-input-example', path: '/path/to/embulk-input-example' gem 'embulk-output-command' gem 'embulk-formatter-fast_jsonl'
# example.yml in: type: example count: 2 inputs: - [ "bob", 19 ] - [ "alice", 18 ] columns: - { name: name, type: string } - { name: age, type: long } out: type: command command: "cat - > '/workdir/output.txt'" formatter: type: fast_jsonl
そして、上記のGemfile、example.yml のあるディレクトリで以下を実行します。
$ embulk bundle $ embulk preview -L <プラグインのディレクトリ> example.yml 2019-01-28 04:43:05.853 +0000: Embulk v0.9.12 2019-01-28 04:43:07.065 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2019-01-28 04:43:10.608 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems" 2019-01-28 04:43:11.617 +0000 [INFO] (main): Started Embulk v0.9.12 2019-01-28 04:43:11.680 +0000 [INFO] (0001:preview): Loaded plugin embulk/input/example from a load path +-------------+----------+ | name:string | age:long | +-------------+----------+ | bob | 19 | | alice | 18 | | bob | 19 | | alice | 18 | +-------------+----------+
無事、動作を確認することができました。
まとめ
今回は Embulk の input plugin の簡易な実装をやってみました。
より複雑なものについては、gemspec ファイルに依存関係を記述したり、gemの登録をしたりすることをおすすめしますが、個人的に利用するものなどはこのようなローカルでの作成、実行もできるので、ちょっとしたカスタマイズをしてやりたいなどは上記を参考にしてみると良いかと思います。
最後に今回実装したものを GitHub に公開しているので、不明点があればそちらを参考にしてみてください(最後のサンプル実行用のデータは example ディレクトリにあります)。