What is it, naokirin?

Embulk の input plugin を書いてみよう

今回は少し前に Embulk を使う際に input plugin を用途に合わせて書き換えを行ったりしたので、そのあたりの簡単な input plugin の実装の方法について書いていきます。

Embulk 自体については正月あたりに書いた記事を参照してみてください。

Embulkでバッチ処理をしてみよう 其の1 - what is it, naokirin?
Embulkでバッチ処理をしてみよう 其の2 - what is it, naokirin?
Embulkでバッチ処理をしてみよう 其の3 - what is it, naokirin?
Embulkでバッチ処理をしてみよう 其の4 - what is it, naokirin?

ちなみに今回は実装が簡単で、今回書き換えを行う際にも使用したRubyによる実装を行います。ただし、パフォーマンスを気にする場合には Java を利用することが推奨されます。理由としては、JRubyにおけるオブジェクトデータの変換部分にコストがかかるためのようです。

参考: embulk で pure java と jruby でプラグインを作ったときの速度比較 - Qiita

(参考元の記事が古い情報なのと、個人的に実際に動かしてみたりはしていないので必要な場合は調べてみてください)

固定のデータを流すプラグインを作成しますが、すでに command プラグインなどがあるため、実際の利用についてはそちらを検討してください。

また gem として公開する部分については記載しませんので、そちらを知りたい方は別の記事を参照してみてください。

プラグインテンプレートの作成

まずはプラグインテンプレートの作成をします。

今回は example という名前のInputプラグインを作成します。

$ embulk new ruby-input example

これで embulk-input-example というディレクトリが作成されて、その下にテンプレートが作成されます。

$ ls embulk-input-example
Gemfile  LICENSE.txt  README.md  Rakefile  embulk-input-example.gemspec  lib

プラグインの実装をする

ソースコードlib 以下にあります。

$ head -n 6 lib/embulk/input/example.rb

module Embulk
  module Input

    class Example < InputPlugin
      Plugin.register_input("example", self)

まず、プラグインの実装は Embulk::Input モジュール以下に作成する必要があること、 InputPlugin を継承することが必要になります。

次にメソッドを見てみましょう。生成された時点で下記のメソッドが存在することがわかります(コメントアウト分も含めて記載しています)。

  • self.transaction
  • self.resume
  • self.guess
  • init
  • run

これらのうち、 self.transactioninitrun については必須になります。

それ以外については基本的に実装を変更する必要がないため、今回は割愛します。

self.transaction

まずは self.transaction を実装します。

ここでは yaml ファイルのプラグイン向けの設定から受け取ったパラメータを config として受け取り、 taskcolumnsthreads という3つを定義して self.resume に渡して、レジュームのための処理と後続の処理の実行をします。

最終的には以下のような実装になりますが、各項目について見ていきます。

  def self.transaction(config, &control)
    inputs = config[:inputs]
    task = {
      inputs: inputs,
      count: config.param(:count, :integer, default: 1)
    }

    columns = []
    config[:columns].each_with_index do |c, i|
      columns << Column.new(i, c['name'], c['type'].to_sym)
    end

    threads = 1

    resume(task, columns, threads, &control)
  end

task

task では実行する際に利用するパラメータをハッシュにして後続の処理に渡します。

今回は設定として固定値の入力データとそれを繰り返す回数を受け取るようにしておきます。

    inputs = config[:inputs]
    task = {
      inputs: inputs,
      count: config.param(:count, :integer, default: 1)
    }
  ...

config.param() により設定を読み込むことができます。デフォルト値も設定できますが、デフォルト値を設定しない場合に設定項目が存在しない場合は実行時に例外となります。

config.param(<設定項目名>, <型名>, default: <デフォルト値>)

もしくは [] によるアクセスもできます。こちらではyamlに設定項目が存在しない場合、 nil となります。

columns

columns では読み込むデータのカラムのスキーマを定義して後続の処理に渡します。

固定されている場合もあるとは思いますが、多くの場合でスキーマ自体は固定されていないと思いますので、今回は yaml の設定から読み込んだものを利用するようにします。

  columns = []
  config[:columns].each_with_index do |column, i|
    columns << Column.new(i, column['name'], column['type'].to_sym)
  end

利用できる型名は以下になります。

  • :boolean
  • :long
  • :double
  • :string
  • :timestamp
  • :json

embulk/column.rb at master · embulk/embulk · GitHub

threads

スレッド数は今回固定の 1 としますが、必要な場合は変更するようにします。

  threads = 1

init

init では実際の実行を行う前の前処理をすることができます。

InputPlugin で以下のような実装がされていることからも、どのような挙動になるかわかるかと思います。

embulk/input_plugin.rb at master · embulk/embulk · GitHub

def initialize(task, schema, index, page_builder)
  @task = task
  @schema = schema
  @index = index
  @page_builder = page_builder
  init
end

今回は利用しないため、実装はしません。

run

run では実際の実行処理を記述します。

@page_builder.addself.transaction で登録した順序で実際のデータを追加していき、 @page_builder.finish で処理を終了します。

def run
  inputs = @task[:inputs]
  count = @task[:count]
  count.times do |i|
    inputs.each do |input|
      @page_builder.add(input)
    end
  end
  @page_builder.finish
end

実行してみよう

ローカルにあるプラグインの実装をもとに Embulk を実行してみます。

まずはテスト実行用の Gemfile と yamlファイルを作成します。

# Gemfile
source 'https://rubygems.org/'

gem 'embulk'

gem 'embulk-input-example', path: '/path/to/embulk-input-example'
gem 'embulk-output-command'
gem 'embulk-formatter-fast_jsonl'
# example.yml
in:
  type: example
  count: 2
  inputs:
    - [ "bob", 19 ]
    - [ "alice", 18 ]
  columns:
    - { name: name, type: string }
    - { name: age, type: long }
out:
  type: command
  command: "cat - > '/workdir/output.txt'"
  formatter:
    type: fast_jsonl

そして、上記のGemfile、example.yml のあるディレクトリで以下を実行します。

$ embulk bundle
$ embulk preview -L <プラグインのディレクトリ> example.yml
2019-01-28 04:43:05.853 +0000: Embulk v0.9.12
2019-01-28 04:43:07.065 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.                                                     
2019-01-28 04:43:10.608 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"                                                                
2019-01-28 04:43:11.617 +0000 [INFO] (main): Started Embulk v0.9.12
2019-01-28 04:43:11.680 +0000 [INFO] (0001:preview): Loaded plugin embulk/input/example from a load path                                                                     
+-------------+----------+
| name:string | age:long |
+-------------+----------+
|         bob |       19 |
|       alice |       18 |
|         bob |       19 |
|       alice |       18 |
+-------------+----------+

無事、動作を確認することができました。

まとめ

今回は Embulk の input plugin の簡易な実装をやってみました。

より複雑なものについては、gemspec ファイルに依存関係を記述したり、gemの登録をしたりすることをおすすめしますが、個人的に利用するものなどはこのようなローカルでの作成、実行もできるので、ちょっとしたカスタマイズをしてやりたいなどは上記を参考にしてみると良いかと思います。

最後に今回実装したものを GitHub に公開しているので、不明点があればそちらを参考にしてみてください(最後のサンプル実行用のデータは example ディレクトリにあります)。

github.com