前回までで既存のプラグインを用いたEmbulkでのバッチ処理を実行するために必要な情報を見ていきました。
一度きりの実行の場合はこれまでの知識で十分ですが、定期的に実行したい場合、とくに一連の処理を定期的に実行したい場合は cron だけでは足りない場合もあります(Aの処理を実行した上でBの処理を実行するといった順序が必要な場合など)。
このようなフローが必要な定期実行やエラーハンドリングをしたい場合には digdag を用いると解決します。
今回はEmbulkではありませんが、Embulkとよく利用される digdag についてを簡単に書いていきます。
digdag とは
Digdag – Open Source Workflow Engine for the Multi-Cloud Era
OSSのワークフローエンジンです。
複数のリモートにまたがる場合でもPostgreSQLを用いるなどしてセッション管理をすることもできるため、汎用的なワークフローエンジンとして利用できます。
またcronなどでは設定ファイルと環境が密結合していることが多く、環境構築時点でスケジュールの設定もしなければならないことも多いですが、digdagでは必要に応じて設定を読み込ませることができ、VCSなどでの管理も容易になります。
digdagでEmbulkを定期実行してみよう
今回は今までのサンプルのEmbulkのバッチ処理をdigdagで定期実行して見るようにします。
digdag自体も設定や機能が豊富で詳細に説明すると1つの記事で収まらない部分があります。そのため、導入および入門的な設定に留めて見ていくことにします。詳しい設定方法やアーキテクチャなどはドキュメントを参照してみてください。
https://docs.digdag.io/getting_started.html
digdagの設定を見てみよう
いつもどおりリポジトリをクローンした上で、 example_digdag
をチェックアウトしてください。
GitHub - naokirin/embulk_examples
$ git clone https://github.com/naokirin/embulk_examples
$ cd embulk_examples
$ git checkout example_digdag
schedule.dig
がリポジトリに追加されています。この .dig
という拡張子のファイルがdigdagにおけるワークフローの設定ファイルとなります。YAMLがベースとなっていますが一部拡張されています。
timezone: Asia/Tokyo schedule: daily>: 12:00:00 +embulk_run: sh>: /embulk/embulk run config/example_4.yaml.liquid _error: sh>: | echo "Error!!!" +tar: sh>: tar -czvf ${session_date}.tar output.txt
かなりシンプルな設定ですが、一般的に利用されるパターンの多くの部分を抑えていると思いますので、この設定を見ていくことにします。
timezone
は digdag で設定された時刻をどのタイムゾーンと解釈して実行するかを指定できます。
schedule
の項目では、例のように毎日12時などの定期実行用の設定ができます。
Scheduling workflow — Digdag 0.9.5 documentation
項目 | 説明 |
---|---|
daily>: HH:MM:SS | 毎日 |
hourly>: MM:SS | 毎時 |
weekly>: DDD,HH:MM:SS | 週ごと |
monthly>: D,HH:MM:SS | 月ごと |
minutes_interval>: M | M分ごと |
cron>: CRON | cron形式の設定 |
+xxx>:
の項目は各ワークフローのステップになります。上から順に実行されていきます。この項目以下で各ステップで実行したい処理を記載するのですが、digdag自体でサポートされているものもありますが、 embulk
コマンドの実行にはシェルコマンド実行用の sh>:
を用いています。
Operators — Digdag 0.9.5 documentation
_error:
の項目では、ステップが失敗したときに実行する処理を書くことができます。これにより失敗時にSlackなどに通知することができます。今回は単に echo
を実行しています。
2つ目のステップでは、出力されたファイルをtar圧縮しています。このように複数の処理を順序で実行する場合には cron では難しいこともあり、digdagを使うことで解決する場面もあるかと思います。
${session_date}
の部分はセッションの時間から日付を取得して展開することができます。それ以外に利用できるものは以下のドキュメントを参照してみてください。
Workflow definition — Digdag 0.9.5 documentation
digdag を実行しよう
digdagにはおもに3つのモードがあります。server, scheduler, client の3つになります。おそらく業務で利用する場合は server モードを利用することが多いと思います。 また server モードで起動中の digdag にワークフローを登録するためには client モードを利用することになります。
とりあえず動作を確認するために client モードで1回だけ実行してみるようにしてみます。
$ docker build -t embulk_examples . $ docker run -it --rm -e ROWS=5 embulk_examples /bin/sh $ /digdag/digdag run schedule.dig
これにより、digdagを実行できます。ちなみに、もう一度そのまま同じコマンドを実行してみてもスキップされます。これはdigdagがセッションを管理しており、複数回実行されないようになっているためです。
それでは server モードで実行してみましょう。今回は検証用なのでデータベースではなくメモリ上での管理にしますが、本番ではh2dbかPostgreSQLを利用することをおすすめします。
$ /digdag/digdag server --memory
これにより server モードで実行できますが、フォアグラウンドで実行されてしまうため、以降の操作ができなくなります。そのため今回は &
をコマンドの最後につけてバックグラウンドで実行しておきます。
これだけではワークフローの登録ができていないため、次に client モードで登録をします。
$ /digdag/digdag push default
今回は default というプロジェクト名でワークフローを登録しています。以降は指定されたスケジュールでワークフローが実行されるようになります。
server モードのオプション
さて、digdagの実行方法はわかりましたが、digdagの server モードではオプションがいくつかあります。とくにデータベースの設定や最大スレッド数の設定は重要です。またパラメータを指定することもできます。
Command reference — Digdag 0.9.5 documentation
まとめ
digdag を利用することでワークフローの定期実行をきちんと管理しながら行うことができます。
Embulkを使う際に定期実行する際などは利用を検討してみると良いかもしれません。