What is it, naokirin?

Embulkでバッチ処理をしてみよう 其の4

前回までで既存のプラグインを用いたEmbulkでのバッチ処理を実行するために必要な情報を見ていきました。

一度きりの実行の場合はこれまでの知識で十分ですが、定期的に実行したい場合、とくに一連の処理を定期的に実行したい場合は cron だけでは足りない場合もあります(Aの処理を実行した上でBの処理を実行するといった順序が必要な場合など)。

このようなフローが必要な定期実行やエラーハンドリングをしたい場合には digdag を用いると解決します。

今回はEmbulkではありませんが、Embulkとよく利用される digdag についてを簡単に書いていきます。

digdag とは

Digdag – Open Source Workflow Engine for the Multi-Cloud Era

OSSのワークフローエンジンです。

複数のリモートにまたがる場合でもPostgreSQLを用いるなどしてセッション管理をすることもできるため、汎用的なワークフローエンジンとして利用できます。

またcronなどでは設定ファイルと環境が密結合していることが多く、環境構築時点でスケジュールの設定もしなければならないことも多いですが、digdagでは必要に応じて設定を読み込ませることができ、VCSなどでの管理も容易になります。

digdagでEmbulkを定期実行してみよう

今回は今までのサンプルのEmbulkのバッチ処理をdigdagで定期実行して見るようにします。

digdag自体も設定や機能が豊富で詳細に説明すると1つの記事で収まらない部分があります。そのため、導入および入門的な設定に留めて見ていくことにします。詳しい設定方法やアーキテクチャなどはドキュメントを参照してみてください。

https://docs.digdag.io/getting_started.html

digdagの設定を見てみよう

いつもどおりリポジトリをクローンした上で、 example_digdag をチェックアウトしてください。

GitHub - naokirin/embulk_examples

$ git clone https://github.com/naokirin/embulk_examples
$ cd embulk_examples
$ git checkout example_digdag

schedule.digリポジトリに追加されています。この .dig という拡張子のファイルがdigdagにおけるワークフローの設定ファイルとなります。YAMLがベースとなっていますが一部拡張されています。

timezone: Asia/Tokyo

schedule:
  daily>: 12:00:00 

+embulk_run:
  sh>: /embulk/embulk run config/example_4.yaml.liquid
  _error:
     sh>: |
       echo "Error!!!"

+tar:
  sh>: tar -czvf ${session_date}.tar output.txt

かなりシンプルな設定ですが、一般的に利用されるパターンの多くの部分を抑えていると思いますので、この設定を見ていくことにします。

timezone は digdag で設定された時刻をどのタイムゾーンと解釈して実行するかを指定できます。

schedule の項目では、例のように毎日12時などの定期実行用の設定ができます。

Scheduling workflow — Digdag 0.9.5 documentation

項目 説明
daily>: HH:MM:SS 毎日
hourly>: MM:SS 毎時
weekly>: DDD,HH:MM:SS 週ごと
monthly>: D,HH:MM:SS 月ごと
minutes_interval>: M M分ごと
cron>: CRON cron形式の設定

+xxx>: の項目は各ワークフローのステップになります。上から順に実行されていきます。この項目以下で各ステップで実行したい処理を記載するのですが、digdag自体でサポートされているものもありますが、 embulk コマンドの実行にはシェルコマンド実行用の sh>: を用いています。

Operators — Digdag 0.9.5 documentation

_error: の項目では、ステップが失敗したときに実行する処理を書くことができます。これにより失敗時にSlackなどに通知することができます。今回は単に echo を実行しています。

2つ目のステップでは、出力されたファイルをtar圧縮しています。このように複数の処理を順序で実行する場合には cron では難しいこともあり、digdagを使うことで解決する場面もあるかと思います。

${session_date} の部分はセッションの時間から日付を取得して展開することができます。それ以外に利用できるものは以下のドキュメントを参照してみてください。

Workflow definition — Digdag 0.9.5 documentation

digdag を実行しよう

digdagにはおもに3つのモードがあります。server, scheduler, client の3つになります。おそらく業務で利用する場合は server モードを利用することが多いと思います。 また server モードで起動中の digdag にワークフローを登録するためには client モードを利用することになります。

とりあえず動作を確認するために client モードで1回だけ実行してみるようにしてみます。

$ docker build -t embulk_examples .
$ docker run -it --rm -e ROWS=5 embulk_examples /bin/sh

$ /digdag/digdag run schedule.dig

これにより、digdagを実行できます。ちなみに、もう一度そのまま同じコマンドを実行してみてもスキップされます。これはdigdagがセッションを管理しており、複数回実行されないようになっているためです。

それでは server モードで実行してみましょう。今回は検証用なのでデータベースではなくメモリ上での管理にしますが、本番ではh2dbかPostgreSQLを利用することをおすすめします。

$ /digdag/digdag server --memory

これにより server モードで実行できますが、フォアグラウンドで実行されてしまうため、以降の操作ができなくなります。そのため今回は & をコマンドの最後につけてバックグラウンドで実行しておきます。

これだけではワークフローの登録ができていないため、次に client モードで登録をします。

$ /digdag/digdag push default

今回は default というプロジェクト名でワークフローを登録しています。以降は指定されたスケジュールでワークフローが実行されるようになります。

server モードのオプション

さて、digdagの実行方法はわかりましたが、digdagの server モードではオプションがいくつかあります。とくにデータベースの設定や最大スレッド数の設定は重要です。またパラメータを指定することもできます。

Command reference — Digdag 0.9.5 documentation

まとめ

digdag を利用することでワークフローの定期実行をきちんと管理しながら行うことができます。

Embulkを使う際に定期実行する際などは利用を検討してみると良いかもしれません。