技術メモ

技術メモ

ラフなメモ

ワークフローエンジンをGoで実装した

ワークフローエンジンとは?

  • 定期的なタスクの実行し、結果を管理
  • タスク間の依存関係を定義して、適切な順序関係に制御してタスクを実行
  • タスクの実行状況/結果を管理

するソフトウェアのことです。例えば、日々の業務で発生する大量のデータを収集し、加工して、集計するようなデータプラットフォームに役に立つでしょう。他にも日時で発生する定型的な業務をワークフローとして管理したいシチュエーションでの利用などがあります。

  • [ ] TODO: 図を入れる

お試しで何かをするだけであれば、ワークフローエンジンは不要です。単にアドホックになにかをするだけであれば、シェルなりなんらかのスクリプトを手動で実行するだけで十分です。 cronでスクリプトを実行するのも一つの方法でしょう。しかしcronでは以下の理由で継続的な業務を遂行するのは難しいです。

  • タスクの依存関係の管理
    • 何らかの理由(例えば、ある日に処理するデータ量が多くて時間がかかるなど)でタスクの実行に時間がかかっている場合に、後続のタスクの実行を制御したい場合があります。これはcronで管理するのは難しいです。
  • エラー時のハンドリング/リラン
    • エラー時が発生したときに後続のタスクを実行しない、あるいはエラーから容易にリランなどをすることによって回復できることが必要です。cronでは運用が複雑になるでしょう。
  • タスクの依存関係の管理
    • cronでは、今どのサーバでどのタスクが動いているのか、どのタスクが完了したのかという、実行状況を管理することが難しい。

JP1などのジョブ管理ツールを用いて、1つの処理に対して1つのPジョブを割り当てるようなナイーブな構成も考えられます。しかし、この場合はジョブの構成が複雑になり運用が大変です。タスクを新たに追加したり、タスクの順番を容易に入れ替えることができるような仕組みが必要です。

そこでワークフローエンジンが必要になります。

既存のワークフローエンジン

以下にあるように、OSSとして実装されているワークフローエンジンはたくさん存在します。以下のリポジトリに記載されているものの一部を抜粋してみます。

https://github.com/meirwah/awesome-workflow-engines

Project Name Stars Forks Open Issues Description Last Commit
airflow 15288 5785 266 Apache Airflow - A platform to programmatically author, schedule, and monitor workflows 2020-01-14 19:51:31
argo 4434 683 346 Argo Workflows: Get stuff done with Kubernetes. 2020-01-14 18:34:04
rundeck 3540 704 1107 Enable Self-Service Operations: Give specific users access to your existing tools, services, and scripts 2020-01-14 18:53:46
cadence 3277 274 290 Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way. 2020-01-14 04:52:04
azkaban 3023 1244 646 Azkaban workflow manager. 2020-01-10 21:09:14
conductor 2356 728 169 Conductor is a microservices orchestration engine - https://netflix.github.io/conductor/ 2020-01-10 04:14:10
brigade 1854 195 103 Event-based Scripting for Kubernetes. 2019-12-23 21:04:11
workflow-core 1851 447 78 Lightweight workflow engine for .NET Standard 2020-01-12 04:12:50
Wexflow 1738 417 58 A high-performance, extensible, modular and cross-platform workflow engine. Built for automation and optimized for SaaS integration, Wexflow runs on Windows, Linux, macOS and the cloud. 2019-12-20 20:21:53
prefect 1572 122 87 The Prefect Core workflow engine 2020-01-14 19:44:00
kiba 1273 75 6 Data processing & ETL framework for Ruby 2019-12-23 08:27:50
zeebe 1045 151 274 Distributed Workflow Engine for Microservices Orchestration 2020-01-14 18:08:46
nextflow 969 226 243 A DSL for data-driven computational pipelines 2020-01-14 21:08:57
digdag 845 146 182 Workload Automation System 2020-01-14 09:57:48
cromwell 511 176 638 Scientific workflow engine designed for simplicity & scalability. Trivially transition between one off use cases to massive scale production environments 2020-01-13 15:56:54
titanoboa 305 18 2 community version of fully distributed, highly scalable and fault tolerant workflow orchestration platform for JVM 2020-01-11 12:38:31
piper 277 43 11 piper - distributed workflow engine 2018-11-05 15:48:49
fission-workflows 264 30 51 Workflows for Fission: Fast, reliable and lightweight function composition for serverless functions 2019-07-03 15:00:24
fireworks 182 117 33 The Fireworks Workflow Management Repo. 2019-12-06 15:32:05
imixs-workflow 174 38 40 The open source workflow technology for business applications 2020-01-14 15:44:24
flor 163 15 1 a workflow engine 2020-01-07 12:42:02
copper-engine 152 55 30 COPPER - a high performance Java workflow engine 2019-12-09 12:27:02
utask 123 11 16 µTask is an automation engine that models and executes business processes declared in yaml. ✏️📋 2020-01-14 13:21:35
cylc-flow 108 70 220 Cylc: a workflow engine for cycling systems. Repository master branch: core meta-scheduler component of cylc-8 (in development); Repository 7.8.x branch: full cylc-7 system. 2020-01-14 06:54:12

ワークフローエンジンの分類

ワークフローエンジンの特徴を簡単に分類すると以下のように考えることができます。OSSを選定する際にはツールの特徴を理解して、チームにフィットして運用可能なプロダクトを選定する必要があります。

  • タスク定義方法
  • ルーティング
    • 静的:予めワークフローのタスクを登録しておいて、DAGのフローにしたがってタスクを逐次実行する
    • 動的:タスクを処理した後に何かをルックアップして、次に実行するタスクを特定し実行する

Flower

https://github.com/d-tsuji/flower

ところでワークフローは性質上、個々の業務と密結合することが多いと考えています。上記のようなOSSは便利ですが、必要以上に機能があって導入が難しかったり、あるいは運用が複雑になることがあります。必要以上にリッチなエンジンを用いる必要はなく、必要十分で運用可能な小さなワークフローエンジンがほしい。という思いでPoC実装をはじめたのが Flower です。

アーキテクチャ

タスクを逐次処理するシンプルなDAGのワークフローを管理するエンジンです。ワーカープールを用いた並行処理がしやすい、ということからGoで実装しました。アーキテクチャは以下のようになっています。

  • ジョブはHTTPで実行登録
  • ワーカーはGoroutineとしてワーカープールを実装
  • 実行登録されたジョブはpollingで監視して、ワーカーにディスパッチ

system_overview

特徴

以下の特徴があります。

  • ワークフローを構成するタスクをマスタで一括管理
    • パラメータなどの埋め込みが可能
  • タスクを構成する処理はComponentとしてGoで実装
  • エラー時にどのタスクでエラーになったかわかる仕組みを提供
  • テーブルのレコード更新でリランが可能
  • 重複実行制御
  • 流量制御

ワークフローを構成するタスクをマスタで一括管理

ワークフローをどのように定義するか?という方法についてはいくつか方法があります。ファイル形式にするのがOSSでは良く見ます。AirflowではPythonファイルとして定義しています。Digdagではdigファイルという形式で定義しています。

FlowerではマスタにレコードをInsertして定義するという方法で実装しました。静的なルーティングをサポートしています。現在終了したタスクのIDからルックアップテーブルを引いて、次のタスクを探索して、そのタスクをタスク管理テーブルに実行待ちとして登録、ディスパッチャーが取得するのを待つ、というアプローチも考えられます。ただ動的なルーティングはタスクのフローが複雑になりがちで運用が難しくなりがちということと、そもそも非常にシンプルなDAG ( 1 -> 2 -> 3 -> 4 というイメージ) のみをサポートしているので動的なルーティングは必要ありません。

また、タスクの定義方法に関して、タスク定義はなるべくシンプルなインターフェースにしたかったため、スクリプトで定義する方法を選択しませんでした。Yamlファイルなどの形式は十分ありえます。ユーザがワークフローを定義するインターフェースはYamlファイル形式で、アプリケーションとしては内部的にデータベースに永続化して保存しておく、Yamlが更新された場合はデータベースのレコードを更新するアプローチも考えられます。

タスクを構成する処理はComponentとしてGoで実装

タスクの実装はコンポーネントとしてGoで実装します。ワークフローの構成するジョブはあるタスクが複数のコンポーネントから使われることがあるでしょう。一部のパラメータだけ変更して、実際に処理するロジックは同一、というタスク定義はよく見ます。

PoCの実装としてコンポーネントとして実装したのはHTTPリクエストでジョブを実行するコンポーネントのみです。実際のタスクを実行するサーバが別に存在して、そのサーバに対してHTTPでパラメータを渡してタスクを実行するという方法です。 他のコンポーネントとして考えられるのは、データベースに対して、単にSQLで加工処理をしたい、といった用途でSQLを実行するコンポーネントなどがあります。あるいは、インプットのソースとなるサーバからデータを収集するロジックを実装したりなど。前述したとおりタスクはワークフローを扱うドメインと密結合する事が多いので、ドメインで必要な処理をコンポーネントとして拡張していくことになります。

一度コンポーネントを揃えてしまえば、それを用いたワークフロー定義がマスタ定義のみでできるので、便利ではないでしょうか。

エラー時にどのタスクでエラーになったかわかる仕組みを提供/テーブルのレコード更新でリランが可能

ワークフローエンジンで重要なポイントとして、エラーハンドリングがあります。ジョブは常に成功するわけではなく、なんからの内部/外部要因によってエラーになりえます。エラー発生時に容易にリカバリできるようなワークフローエンジンでなければ運用に耐えられないでしょう。Flowerでは実行登録されたタスクはタスク管理テーブルのレコードとして永続化しています。今どこまで実行されていて、どのタスクでエラーになったかひと目でわかるようになっています。

リランに関しては、基本的に自動でのリカバリは考えていないです。何からの対応が必要になることが多いためです。手動でリカバリを実施して中断されているレコードのステータスを実行待ちにすることでリランすることが可能です。

重複実行制御

基本的にはディスパッチャーは1つのディスパッチャーのみで動く想定していますが、複数のディスパッチャーがタスク管理テーブルを参照してワーカーにジョブをディスパッチャするシチュエーションも存在します。実行待ちになっているジョブが複数のディスパッチャから参照されて、二重起動されることは避けなければなりません。

排他制御PostgreSQLの行ロックを用いています。行ロックは性能上の問題になりやすいため、1度に更新する対象とするレコードは1レコードのみにしています。行ロックでロックしてから、実行待ちタスクを実行中に更新するため、仮に複数のディスパッチャから同時に実行待ちのタスクをFETCHしたとしても、同時に更新されることはありません。SELECT FOR UPDATE でロックが取得できるのを待つため、一方のディスパッチャでは実行待ちのタスクをFETCHすることはできません。動作は常に1回のみ動作することを保証しています。

流量制御

リクエストの流量制御をサポートしています。Flowerはあくまでもワークフローエンジンなので、実際のタスクを実行する役割は別のサーバにオフロードすることが多くなります。同時にタスクを実行すると、タスクを実行するサーバのリソース(CPU/Memory/IO)などがボトルネックになって、タスクが失敗することがあります。流量制御をするポイントは2つあります。

  1. タスク管理テーブルから実行可能なタスクをFETCHして、ワーカーにputする数の制御
  2. ワーカー数の制御

GoではGoroutineを用いて軽量なスレッドを立ち上げることができるので、ワーカーを多めに立ち上げたとしてもリソースを多く必要とすることはないと考えています。よって、流量制御としては主に1点目が重要と考えています。実行中のタスクと実行待ちかつ実行可能なタスクの一覧を取得し、同時に実行されるタスク数を制御するアプローチです。実装としてはSQLに寄せていて、実行可能タスクを取得するときに、row_number() を使って適切に順序付けをし、予め指定した同時実行数以下に絞ることで、ワーカーにputして動作しているタスク数を制御しています。

その他の検討事項

  • ディスパッチャーのポーリング
  • 管理コンソール

ディスパッチャーのポーリング

タスク管理テーブルから、実行可能なタスクを取得するためにデータベースをポーリングしています。現状、あるタスクの完了して次のタスクが実行されるまでに最大ポーリング間隔の時間だけWaitされることになります。完了したことを契機に次のタスクを実行させたい場合は、イベント通知を検討する必要があります。これはディスパッチャーにchannelを持たせて、完了時にディスパッチャーのchannelにputすることでイベント通知することができます。

管理コンソール

単純にこれはあれば良いなぁ、と思っているが実装していないものです。OSSではたいていなにかしらのUIが存在することが多く、運用性の向上のためにあればよいな、と思います。