What is OpML

参考

メモ

OpMLについて

USENIX OpML'19のテックジャイアントたちが話題にしている主要なトピックからスコープを推測。

  • ヘテロジーニアスアーキテクチャの活用(エッジ・クラウドパラダイム)
  • モデルサービング、レイテンシの改善
  • 機械学習パイプライン
  • 機械学習による運用改善(AIOps)
  • 機械学習のデバッガビリティ改善

USENIX OpML'19 の定義

USENIX OpML'19 によると、 OpML = Operational Machine Learning の定義。

カンファレンスの定義は以下の通り。

The 2019 USENIX Conference on Operational Machine Learning (OpML '19) provides a forum for both researchers and industry practitioners to develop and bring impactful research advances and cutting edge solutions to the pervasive challenges of ML production lifecycle management. ML production lifecycle is a necessity for wide-scale adoption and deployment of machine learning and deep learning across industries and for businesses to benefit from the core ML algorithms and research advances.

上記では、機械学習の生産ライフサイクルの管理は、機械学習や深層学習が産業により広く用いられるため、またコアとなる機械学習アルゴリズムや研究からビジネス上のメリットを享受するために必要としている。

コミッティー

カンファレンスサイトの情報から集計すると、以下のような割合だった。

多様な企業、組織からコミッティを集めているが、 Googleなど一部企業からのコミッティが多い。

セッション

カンファレンスのセッションカテゴリは以下の通り。

カテゴリとしては、今回は実経験に基づく知見を披露するセッションが件数多めだった。

一方で、同じカテゴリでも話している内容がかなり異なる印象は否めない。 もう数段階だけサブ・カテゴライズできるようにも見え、まだトピックが体系化されていないことを伺わせた。

Google、LinkedIn、Microsoft等のテックジャイアントの主要なセッションを見ていて気になったキーワードを並べる。

  • ヘテロジーニアスアーキテクチャの活用(エッジ・クラウドパラダイム)
  • モデルサービング、レイテンシの改善
  • 機械学習パイプライン
  • 機械学習による運用改善(AIOps)
  • 機械学習のデバッガビリティ改善

その他アルゴリズムの提案も存在。

小さめの企業では、ML as a Serviceを目指した取り組みがいくつか見られた。

セッションをピックアップして紹介

実経験にもとづく知見のセッションを一部メモ。

Opportunities and Challenges Of Machine Learning Accelerators In Production

  • タイトル: Opportunities and Challenges Of Machine Learning Accelerators In Production
  • キーワード: TPU, Google, ヘテロジーニアスアーキテクチャ, 性能
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_ananthanarayanan.pdf

CPUでは近年の深層学習ワークロードの計算アルゴリズム、量において不十分(ボトルネックになりがち)だが、 TPU等のアーキテクチャにより改善。

しかしTPUが入ってくると「ヘテロジーニアスなアーキテクチャ」になる。 これをうまく使うための工夫が必要。

加えて近年のTPUについて紹介。

Accelerating Large Scale Deep Learning Inference through DeepCPU at Microsoft

  • タイトル: Accelerating Large Scale Deep Learning Inference through DeepCPU at Microsoft
  • キーワード: モデルサービング, レイテンシの改善, Microsoft, 性能
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_zhang-minjia.pdf

Microsoft。 CPU上でRNNを高速にサーブするためのライブラリ: DeepCPU 目標としていたレイテンシを実現するためのもの。 例えばTensorFlow Servingで105msかかっていた「質問に答える」という機能に関し、 目標を超えるレイテンシ4.1msを実現した。 スループットも向上。

A Distributed Machine Learning For Giant Hogweed Eradication

  • タイトル: A Distributed Machine Learning For Giant Hogweed Eradication
  • キーワード: Big Data基盤, モデルサービング, ドローン, 事例, NTTデータ
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19-nttd.pdf

NTTデータ。 デンマークにおいて、ドローンで撮影した画像を利用し、危険外来種を見つける。 Big Data基盤と機械学習基盤の連係に関する考察。

AIOps: Challenges and Experiences in Azure

  • タイトル: AIOps: Challenges and Experiences in Azure
  • キーワード: Microsoft, 運用, 運用改善, AIOps
  • スライド: 非公開

スライド非公開のため、概要から以下の内容を推測。 Microsoft。 Azureにて、AIOps(AIによるオペレーション?)を実現したユースケースの紹介。 課題と解決方法を例示。

Quasar: A High-Performance Scoring and Ranking Library

  • タイトル: Quasar: A High-Performance Scoring and Ranking Library
  • キーワード: LinkedIn, スコアリング, ランキング, 性能, フレームワーク
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_birkmanis.pdf

スコアリングやランキング生成のためにLinkedInで用いられているライブラリ。 Quasar.

AI from Labs to Production - Challenges and Learnings

  • タイトル: AI from Labs to Production - Challenges and Learnings
  • キーワード: AIの商用適用
  • スライド: 非公開

エンタープライズ向けにAIを使うのはB2CにAIを適用するときとは異なる。

MLOp Lifecycle Scheme for Vision-based Inspection Process in Manufacturing

  • タイトル: MLOp Lifecycle Scheme for Vision-based Inspection Process in Manufacturing
  • キーワード: Samsung Research, 運用, 深層学習, 製造業, 運用スキーム
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_lim.pdf

Samsung Research。 製造業における画像検知?における運用スキームの提案。 異なるステークホルダが多数存在するときのスキーム。 学習、推論、それらをつなぐ管理機能を含めて提案。 ボルトの検査など。

Deep Learning Vector Search Service

  • タイトル: Deep Learning Vector Search Service
  • キーワード: Microsoft, Bing, 検索, ベクトルサーチ, ANN(Approximae Nearest Neighbor), 深層学習
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_zhu.pdf

深層学習を用いてコンテンツをベクトル化する。 ベクトル間の関係がコンテンツの関係性を表現。 スケーラビリティを追求。

Signal Fabric An AI-assisted Platform for Knowledge Discovery in Dynamic System

  • タイトル: Signal FabricAn AI-assisted Platform for Knowledge Discovery in Dynamic System
  • キーワード: Microsoft, Azure, 運用改善のためのAI活用
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_aghajanyan.pdf

Microsoft。Azure。 動的に変化するパブリッククラウドのサービスの振る舞いを把握するためのSignal Fabric。 「AI」も含めて様々な手段を活用。

Relevance Debugging and Explaining at LinkedIn

  • タイトル: Relevance Debugging and Explaining at LinkedIn
  • キーワード: LinkedIn, AIのデバッグ, 機械学習基盤, デバッガビリティの改善
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_qiu.pdf

LinkedIn。 分散処理のシステムは複雑。AIエンジニアたちはそれを把握するのが難しい。 そこでそれを支援するツールを開発した。 様々な情報をKafkaを通じて集める。 それを可視化したり、クエリしたり。

Shooting the moving target: machine learning in cybersecurity

  • タイトル: Shooting the moving target: machine learning in cybersecurity
  • キーワード: セキュリティへの機械学習適用, 機械学習のモデル管理
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_arun.pdf

セキュリティへの機械学習適用。 基盤はモジューライズされ?、拡張可能で、 モデルを繰り返し更新できる必要がある。

Deep Learning Inference Service at Microsoft

  • タイトル: Deep Learning Inference Service at Microsoft
  • キーワード: 深層学習, 推論, Microsoft, ヘテロジーニアスアーキテクチャ
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_soifer.pdf

Microsoft。 深層学習の推論に関する工夫。インフラ。 ヘテロジーニアス・リソースの管理、リソース分離、ルーティングなど。 数ミリ秒単位のレイテンシを目指す。 モデルコンテナをサーブする仕組み。

ソリューション関係

KnowledgeNet: Disaggregated and Distributed Training and Serving of Deep Neural Networks

  • タイトル: KnowledgeNet: Disaggregated and Distributed Training and Serving of Deep Neural Networks
  • キーワード: エッジ・クラウドパラダイム, DNN, 分散処理, ヘテロジーニアスアーキテクチャ
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_biookaghazadeh.pdf

DNNは強化学習、物体検知、ビデオ処理、VR・ARなどに活用されている。 一方でそれらの処理はクラウド中心のパラダイムから、エッジ・クラウドパラダイムに 移りつつある。 そこでDNNの学習とサービングをエッジコンピュータにまたぐ形で提供するようにする、 KnwledgeNetを提案する。 ヘテロジーニアスアーキテクチャ上で学習、サービングする。

クラウドでTeacherモデルを学習し、エッジでStudentモデルを学習する。

Continuous Training for Production ML in the TensorFlow Extended (TFX) Platform

  • タイトル: Continuous Training for Production ML in the TensorFlow Extended (TFX) Platform
  • キーワード: Google, 継続的な機械学習, TFX(TensorFlow Extended), 機械学習のパイプライン
  • スライド: https://www.usenix.org/system/files/opml19papers-baylor.pdf

大企業は、継続的な機械学習パイプラインに支えられている。 それが止まりモデルが劣化すると、下流のシステムが影響を受ける。 TensorFlow Extendedを使ってパイプラインを構成する。

Reinforcement Learning Based Incremental Web Crawling

  • タイトル: Reinforcement Learning Based Incremental Web Crawling
  • キーワード: ウェブクロール, 機械学習?
  • スライド: 非公開

Katib: A Distributed General AutoML Platform on Kubernetes

  • タイトル: Katib: A Distributed General AutoML Platform on Kubernetes
  • キーワード: AutoML, Kubernetes, ハイパーパラメータサーチ, ニューラルアーキテクチャサーチ
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_zhou.pdf

Kubernetes上に、ハイパーパラメータサーチ、ニューラルアーキテクチャサーチのためのパイプラインをローンチするAutoMLの仕組み。

Machine Learning Models as a Service

  • タイトル: Machine Learning Models as a Service
  • キーワード: 中央集権的な機械学習基盤, 機械学習基盤のセキュリティ, Kafkaを使ったビーコン
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_wenzel.pdf

データサイエンティストがモデル設計や学習よりも、デプロイメントに時間をかけている状況を受け、 中央集権的な機械学習基盤を開発。 PyPIに悪意のあるライブラリが混入した件などを考慮し、中央集権的にセキュリティを担保。 Kafkaにビーコンを流す。リアルタイムに異常検知するフロー、マイクロバッチでデータレイクに格納し、再学習をするフロー。 その他、ロギング、トレーシング(トラッキング)、モニタリング。

Stratum: A Serverless Framework for the Lifecycle Management of Machine Learning-based Data Analytics Tasks

  • タイトル: Stratum: A Serverless Framework for the Lifecycle Management of Machine Learning-based Data Analytics Tasks
  • キーワード: 機械学習向けのサーバレスアーキテクチャ, Machine Learning as a Service, 推論, エッジ・クラウド
  • スライド: https://www.usenix.org/sites/default/files/conference/protected-files/opml19_slides_bhattacharjee.pdf

Stratumというサーバレスの機械学基盤を提案。 モデルのデプロイ、スケジュール、データ入力ツールの管理を提供。 機械学習ベースの予測分析のアーキテクチャ。

その他のセッション傾向

キーワード:

  • スケーラビリティ
  • 監視・観測、診断
  • 最適化・チューニング

SYSML

SYSMLの定義

公式ウェブサイトには、以下のように記載されている。

The Conference on Systems and Machine Learning (SysML) targets research at the intersection of systems and machine learning. The conference aims to elicit new connections amongst these fields, including identifying best practices and design principles for learning systems, as well as developing novel learning methods and theory tailored to practical machine learning workflows.

以上から、「システムと機械学習の両方に関係するトピックを扱う」と解釈して良さそうである。

SYSML'19

コミッティなど

チェアたちの所属する組織の構成は以下の通り。

プログラムコミッティの所属する組織の構成は以下の通り。

チェア、コミッティともに、幅広い層組織からの参画が見受けられるが、大学関係者(教授等)が多い。 僅かな偏りではあるが、Stanford Univ.、Carnegie Mellon Univ.、UCBあたりの多さが目立つ。 コミッティに限れば、MITとGoogleが目立つ。

トピック

トピックは以下の通り。

  • Parallel & Distributed Learning
  • Security and Privacy
  • Video Analytics
  • Hardware
  • Hardware & Debbuging
  • Efficient Training & Inference
  • Programming Models

概ねセッション数に偏りは無いようだが、1個目のParallel & Distributed Learningだけは他と比べて2倍のセッション数だった。

TicTac: Accelerating Distributed Deep Learning with Communication Scheduling

  • タイトル: TicTac: Accelerating Distributed Deep Learning with Communication Scheduling
  • キーワード: 深層学習, DNNの高速化, TensorFlow, PyTorch, 学習高速化, 推論高速化
  • スライド: https://www.sysml.cc/doc/2019/199.pdf

パラメータ交換を高速化。その前提となる計算モデルを考慮して、パラメータ交換の順番を制御する。 TensorFlowを改造。パラメータサーバを改造。 モデル自体を変更することなく使用可能。 推論で37.7%、学習で19.2%の改善。

共有

ideavimrc

参考

メモ

IntelliJ(Android Studio)のVimプラグイン「IdeaVim」の使い方と設定 に.ideavimrcの書き方の例が記載されている。 ただし、リンクで書かれているGitHubレポジトリは存在しないようだた。 

上記ブログを参考にしつつ、とりあえず初期バージョンを作成。→ https://github.com/dobachi/ideavimrc このあと色々と付け足す予定。

共有

sort by timestamp with NERDTree

参考

メモ

vimでNERDTreeを使うとき、直近ファイルを参照したいことが多いため、デフォルトのソート順を変えることにした。

Support sorting files and directories by modification time. #901 を参照し、タイムスタンプでソートする機能が取り込まれていることがわかったので、 NERDTree.txt を参照して設定した。 具体的には、 NERDTreeSortOrder の節を参照。

vimrcには、

1
let NERDTreeSortOrder=['\/$', '*', '\.swp$',  '\.bak$', '\~$', '[[-timestamp]]']

を追記した。デフォルトのオーダに対し、 [[-timestamp]] を追加し、タイムスタンプ降順でソートするようにした。

共有

Spark Docker Image

参考

メモ

Spark公式のDockerfileを活用

Spark公式ドキュメントの Running Spark on Kubernetes 内にある Docker Images によると、 SparkにはDockerfileが含まれている。

以下のパッケージで確認。

1
2
3
$ cat RELEASE
Spark 2.4.3 built for Hadoop 2.7.3
Build flags: -B -Pmesos -Pyarn -Pkubernetes -Pflume -Psparkr -Pkafka-0-8 -Phadoop-2.7 -Phive -Phive-thriftserver -DzincPort=3036

Dockerfileは、パッケージ内の kubernetes/dockerfiles/spark/Dockerfile に存在する。 また、これを利用した bin/docker-image-tool.sh というスクリプトが存在し、 Dockerイメージをビルドして、プッシュできる。 今回はこのスクリプトを使ってみる。

ヘルプを見ると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ ./bin/docker-image-tool.sh --help
Usage: ./bin/docker-image-tool.sh [options] [command]
Builds or pushes the built-in Spark Docker image.

Commands:
build Build image. Requires a repository address to be provided if the image will be
pushed to a different registry.
push Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
-R file Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
available when running applications inside the minikube cluster.

Check the following documentation for more information on using the minikube Docker daemon:

https://kubernetes.io/docs/getting-started-guides/minikube/#reusing-the-docker-daemon

Examples:
- Build image in minikube with tag "testing"
./bin/docker-image-tool.sh -m -t testing build

- Build and push image with tag "v2.3.0" to docker.io/myrepo
./bin/docker-image-tool.sh -r docker.io/myrepo -t v2.3.0 build
./bin/docker-image-tool.sh -r docker.io/myrepo -t v2.3.0 push

つづいて最新版Sparkのバージョンをタグ付けて試す。

1
2
$ sudo ./bin/docker-image-tool.sh -r docker.io/dobachi -t v2.4.3 build
$ sudo ./bin/docker-image-tool.sh -r docker.io/dobachi -t v2.4.3 push

これは成功。 早速起動して試してみる。 なお、Sparkのパッケージは、 /opt/spark 以下に含まれている。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ sudo docker run --rm -it dobachi/spark:v2.4.3 /bin/bash

(snip)

bash-4.4# /opt/spark/bin/spark-shell

(snip)

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

データを軽く読み込んでみる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
scala> val path = "/opt/spark/data/mllib/kmeans_data.txt"
scala> val textFile = spark.read.textFile(path)
scala> textFile.first
res1: String = 0.0 0.0 0.0

scala> textFile.count
res2: Long = 6

scala> val csvLike = spark.read.format("csv").
option("sep", " ").
option("inferSchema", "true").
option("header", "false").
load(path)

scala> csvLike.show
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|0.0|0.0|0.0|
|0.1|0.1|0.1|
|0.2|0.2|0.2|
|9.0|9.0|9.0|
|9.1|9.1|9.1|
|9.2|9.2|9.2|
+---+---+---+

scala> csvLike.where($"_c0" > 5).show
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|9.0|9.0|9.0|
|9.1|9.1|9.1|
|9.2|9.2|9.2|
+---+---+---+

なお、PySparkが含まれているのは、違うレポジトリであり、 dobachi/spark-py を用いる必要がある。

1
2
3
4
5
6
7
8
9
10
11
$ sudo docker run --rm -it dobachi/spark-py:v2.4.3 /bin/bash

(snip)

bash-4.4# /opt/spark/bin/pyspark
Python 2.7.16 (default, May 6 2019, 19:35:26)
[GCC 8.3.0] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Could not open PYTHONSTARTUP
IOError: [Errno 2] No such file or directory: '/opt/spark/python/pyspark/shell.py'
>>>

この状態だと、OS標準付帯のPythonのみ使える状態に見えた。 実用を考えると、もう少し分析用途のPythonライブラリを整えた方が良さそう。

Anaconda等も利用するとして?

上記の通り、Spark公式のDockerfileで作るDockerイメージには、Python関係の分析用のライブラリ、ツールが不足している。 そこでこれを補うために、Anaconda等を利用しようとするとして、パッと見でほんの少し議論なのは、 なにをベースにするかという点。

どっちに合わせるか悩むところだが、Debianベースに置き換えでも良いか、という気持ちではある。 Debianベースのほうが慣れているという気持ちもあり。

また、Anacondaを合わせて導入するようにしたら、環境変数 PYSPARK_PYTHON も設定しておいたほうが良さそう。

共有

Databricks AutoML Toolkit

参考

メモ

ブログ?記事?

Databricks launches AutoML Toolkit for model building and deployment の内容を確認する。

MLFlowなどをベースに開発された。 ハイパーパラメタータチューニング、バッチ推論、モデルサーチを自動化する。

TensorFlowやSageMakerなどと連携。

データサイエンティストとエンジニアが連携可能という点が他のAutoMLと異なる要素。 またコード実装能力が高い人、そうでない人混ざっているケースに対し、 高い抽象度で実装なく動かせると同時に、細かなカスタマイズも可能にする。

DatabricksのAutoMLは、Azureの機械学習サービスとも連携する。(パートナーシップあり)

GitHub

GitHub databrickslabs/automl-toolkit を軽く確認する。

Readme(2019/08/23現在)

「non-supported end-to-end supervised learning solution for automating」と定義されている。 以下の内容をautomationするようだ。

  • Feature clean-up(特徴選択?特徴の前処理?)
  • Feature vectorization(特徴のデータのベクトル化)
  • Model selection and training(モデル選択と学習)
  • Hyper parameter optimization and selection(ハイパーパラメータ最適化と選択)
  • Batch Prediction(バッチ処理での推論)
  • Logging of model results and training runs (using MLFlow)(MLFlowを使ってロギング、学習実行)

対応している学習モデルは以下の通り。 Sparkの機械学習ライブラリを利用しているようなので、それに準ずることになりそう。

  • Decision Trees (Regressor and Classifier)
  • Gradient Boosted Trees (Regressor and Classifier)
  • Random Forest (Regressor and Classifier)
  • Linear Regression
  • Logistic Regression
  • Multi-Layer Perceptron Classifier
  • Support Vector Machines
  • XGBoost (Regressor and Classifier)

開発言語はScala(100%)

DBFS APIを前提としているように見えることから、Databricks Cloud前提か?

ハイレベルAPI(FamilyRunner)について載っている例から、ポイントを抜粋する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// パッケージを見る限り、Databricksのラボとしての活動のようだ?
import com.databricks.labs.automl.executor.config.ConfigurationGenerator
import com.databricks.labs.automl.executor.FamilyRunner

val data = spark.table("ben_demo.adult_data")


// MLFlowの設定値など、最低限の上書き設定を定義する
val overrides = Map("labelCol" -> "income",
"mlFlowExperimentName" -> "My First AutoML Run",

// Databrciks Cloud上で起動するDeltaの場所を指定
"mlFlowTrackingURI" -> "https://<my shard address>",
"mlFlowAPIToken" -> dbutils.notebook.getContext().apiToken.get,
"mlFlowModelSaveDirectory" -> "/ml/FirstAutoMLRun/",
"inferenceConfigSaveLocation" -> "ml/FirstAutoMLRun/inference"
)

// コンフィグを適切に?自動生成。合わせてどのモデルを使うかを指定している。
val randomForestConfig = ConfigurationGenerator.generateConfigFromMap("RandomForest", "classifier", overrides)
val gbtConfig = ConfigurationGenerator.generateConfigFromMap("GBT", "classifier", overrides)
val logConfig = ConfigurationGenerator.generateConfigFromMap("LogisticRegression", "classifier", overrides)

// 生成されたコンフィグでランナーを生成し、実行
val runner = FamilyRunner(data, Array(randomForestConfig, gbtConfig, logConfig)).execute()

上記実装では、以下のようなことが裏で行われる。

  • 前処理
  • 特徴データのベクタライズ
  • 3種類のモデルについてパラメータチューニング

実装を確認

エントリポイントとしては、 com.databricks.labs.automl.executor.FamilyRunner としてみる。

com.databricks.labs.automl.executor.FamilyRunner

executeメソッドが例として載っていたので確認。 当該メソッド全体は以下の通り。

com/databricks/labs/automl/executor/FamilyRunner.scala:115

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def execute(): FamilyFinalOutput = {

val outputBuffer = ArrayBuffer[FamilyOutput]()

configs.foreach { x =>
val mainConfiguration = ConfigurationGenerator.generateMainConfig(x)

val runner: AutomationRunner = new AutomationRunner(data)
.setMainConfig(mainConfiguration)

val preppedData = runner.prepData()

val preppedDataOverride = preppedData.copy(modelType = x.predictionType)

val output = runner.executeTuning(preppedDataOverride)

outputBuffer += new FamilyOutput(x.modelFamily, output.mlFlowOutput) {
override def modelReport: Array[GenericModelReturn] = output.modelReport
override def generationReport: Array[GenerationalReport] =
output.generationReport
override def modelReportDataFrame: DataFrame =
augmentDF(x.modelFamily, output.modelReportDataFrame)
override def generationReportDataFrame: DataFrame =
augmentDF(x.modelFamily, output.generationReportDataFrame)
}
}

unifyFamilyOutput(outputBuffer.toArray)

}

また、再掲ではあるが、当該メソッドは以下のように呼び出されている。

1
2
// 生成されたコンフィグでランナーを生成し、実行
val runner = FamilyRunner(data, Array(randomForestConfig, gbtConfig, logConfig)).execute()

第1引数はデータ、第2引数は採用するアルゴリズムごとのコンフィグを含んだArray。

では、executeメソッドを少し細かく見る。

まずループとなっている箇所があるが、ここはアルゴリズムごとのコンフィグをイテレートしている。

com/databricks/labs/automl/executor/FamilyRunner.scala:119

1
configs.foreach { x =>

つづいて、渡されたコンフィグを引数に取りながら、パラメータを生成する。

com/databricks/labs/automl/executor/FamilyRunner.scala:120

1
val mainConfiguration = ConfigurationGenerator.generateMainConfig(x)

次は上記で生成されたパラメータを引数に取りながら、処理の自動実行用のAutomationRunnerインスタンスを生成する。

com/databricks/labs/automl/executor/FamilyRunner.scala:122

1
2
val runner: AutomationRunner = new AutomationRunner(data)
.setMainConfig(mainConfiguration)

つづいてデータを前処理する。

com/databricks/labs/automl/executor/FamilyRunner.scala:125

1
2
3
val preppedData = runner.prepData()

val preppedDataOverride = preppedData.copy(modelType = x.predictionType)

つづいて生成されたAutomationRunnerインスタンスを利用し、パラメータチューニングを実施する。

com/databricks/labs/automl/executor/FamilyRunner.scala:129

1
val output = runner.executeTuning(preppedDataOverride)

その後、出力内容が整理される。 レポートの類にmodelFamilyの情報を付与したり、など。

com/databricks/labs/automl/executor/FamilyRunner.scala:131

1
2
3
4
5
6
7
8
9
outputBuffer += new FamilyOutput(x.modelFamily, output.mlFlowOutput) {
override def modelReport: Array[GenericModelReturn] = output.modelReport
override def generationReport: Array[GenerationalReport] =
output.generationReport
override def modelReportDataFrame: DataFrame =
augmentDF(x.modelFamily, output.modelReportDataFrame)
override def generationReportDataFrame: DataFrame =
augmentDF(x.modelFamily, output.generationReportDataFrame)
}

最後に、モデルごとのチューニングのループが完了したあとに、 それにより得られた複数の出力内容をまとめ上げる。 (Arrayをまとめたり、DataFrameをunionしたり、など)

com/databricks/labs/automl/executor/FamilyRunner.scala:142

1
unifyFamilyOutput(outputBuffer.toArray)

com.databricks.labs.automl.AutomationRunner#executeTuning

指定されたモデルごとにうチューニングを行う部分を確認する。

当該メソッド内では、modelFamilyごとの処理の仕方が定義されている。 以下にRandomForestの場合を示す。

com/databricks/labs/automl/AutomationRunner.scala:1597

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  protected[automl] def executeTuning(payload: DataGeneration): TunerOutput = {

val genericResults = new ArrayBuffer[GenericModelReturn]

val (resultArray, modelStats, modelSelection, dataframe) =
_mainConfig.modelFamily match {
case "RandomForest" =>
val (results, stats, selection, data) = runRandomForest(payload)
results.foreach { x =>
genericResults += GenericModelReturn(
hyperParams = extractPayload(x.modelHyperParams),
model = x.model,
score = x.score,
metrics = x.evalMetrics,
generation = x.generation
)
}
(genericResults, stats, selection, data)


(snip)

なお、2019/8/25現在対応しているモデルは以下の通り。(caseになっていたのを抜粋)

1
2
3
4
5
6
7
8
case "RandomForest" =>
case "XGBoost" =>
case "GBT" =>
case "MLPC" =>
case "LinearRegression" =>
case "LogisticRegression" =>
case "SVM" =>
case "Trees" =>

さて、モデルごとのチューニングの様子を確認する。 RandomForestを例にすると、実態は以下の runRandomForest メソッドに見える。

com/databricks/labs/automl/AutomationRunner.scala:1604

1
val (results, stats, selection, data) = runRandomForest(payload)

runRandomForest メソッドは、以下のようにペイロードを引数にとり、 その中でチューニングを実施する。

com/databricks/labs/automl/AutomationRunner.scala:46

1
2
3
4
5
  private def runRandomForest(
payload: DataGeneration
): (Array[RandomForestModelsWithResults], DataFrame, String, DataFrame) = {

(snip)

特徴的なのは、非常にきめ細やかに初期設定している箇所。 これらの設定を上書きしてチューニングできる、はず。

com/databricks/labs/automl/AutomationRunner.scala:58

1
2
3
4
5
6
7
    val initialize = new RandomForestTuner(cachedData, payload.modelType)
.setLabelCol(_mainConfig.labelCol)
.setFeaturesCol(_mainConfig.featuresCol)
.setRandomForestNumericBoundaries(_mainConfig.numericBoundaries)
.setRandomForestStringBoundaries(_mainConfig.stringBoundaries)

(snip)

実際にチューニングしているのは、以下の箇所のように見える。 evolveWithScoringDF 内では、 RandomForestTuner クラスのメソッドが呼び出され、 ストラテジ(実装上は、バッチ方式か、continuous方式か)に従ったチューニングが実施される。

com/databricks/labs/automl/AutomationRunner.scala:169

1
val (modelResultsRaw, modelStatsRaw) = initialize.evolveWithScoringDF()

共有

Manage the flow of hexo documentation with CircleCI

参考

メモ

HexoでPandasレンダラーを使うために最新版PandocをインストールしたDockerイメージを用意

ドキュメント等では、circleciのDockerイメージを使うことが記載されているが、 ここでは予め最新版Pandocをインストールしておきたかったので、circleciのDockerイメージをベースにしつつ、 インストーラをwget、dpkgでインストールしたDockerイメージを自作して利用した。

Dockerfileは以下の通り。

1
2
3
4
5
FROM circleci/node:8.10.0

RUN wget -P /tmp https://github.com/jgm/pandoc/releases/download/2.7.3/pandoc-2.7.3-1-amd64.deb

RUN sudo dpkg -i /tmp/pandoc-2.7.3-1-amd64.deb

今回は、これを使って作ったDockerイメージを、予めDocker Hubに登録しておく。 ここでは hoge/fuga:1.0 と登録したものとする。

HexoをCircleCIでビルドするための設定ファイルの作成

上記で作ったDockerイメージを使う、以下のようなCircleCi設定ファイルを作り、 Hexoプロジェクトのトップディレクトリ以下 .circleci/config.yml に保存する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
defaults: &defaults 
docker:
- image: hoge/fuga:1.0
environment:
TZ: Asia/Tokyo
working_directory: ~/work

version: 2
jobs:
build:
<<: *defaults
steps:
- checkout
- run: git submodule init
- run: git submodule update
- run: npm install --save
- run: node_modules/.bin/hexo clean
- run: sed -i -e "s#http://cdn.mathjax.org/mathjax#https://cdn.mathjax.org/mathjax#" node_modules/hexo-renderer-mathjax/mathjax.html
- run: cat node_modules/hexo-renderer-mathjax/mathjax.html
- run: node_modules/.bin/hexo generate
- run: git config --global user.name <your name>
- run: git config --global user.email <your email address>
- run: node_modules/.bin/hexo deploy
- persist_to_workspace:
root: .
paths: [ '*' ]

上記 config.yml は、今後の拡張性を踏まえて記載してあるので、やや冗長な表現になっている。

なお、

1
- run: sed -i -e "s#http://cdn.mathjax.org/mathjax#https://cdn.mathjax.org/mathjax#" node_modules/hexo-renderer-mathjax/mathjax.html

としている箇所があるが、これは現在の hexo-renderer-mathjax をインストールすると、httpsではなくhttpを使用するようになっており、 結果としてgithub.ioでサイト公開するとmathjax部分が意図通りに表示されないからである。 HexoのレンダリングエンジンとしてPandocを使う にもその旨説明がある。 mathjax.html を見る限り、GitHub上のmasterブランチでは修正済みのようだ。

CircleCI上でプロジェクトを作る

GitHubアカウントを連携している場合、既存のレポジトリが見えるので、 プロジェクトを作成する。

GitHubへのPush用のSSH鍵を登録する

LaTeXファイルをGithubにpushしたらCircle CIでPDF生成するようにした あたりを参考に、SSH鍵の登録を実施。 プロジェクトの設定から、「Checkout SSH keys」を開き、「Add user key」から鍵を生成→登録した。

GitHub側のアカウント設定から鍵が登録されたことが確認できる。

以上で手順終了。 ブログを追記して、プロジェクトにpushするとCircleCIがhexo generateし、hexo deployするようになる。

共有

Chi: a scalable and programmable control plane for distributed stream processing systems

参考

メモ

2018/6にVLDBに掲載された論文。Microsoftが中心となって執筆?

多数のパラメータがあり、調整可能性がある場合に、動的な設定を可能にするChiというControll Planeを提案。

昨今のストリーム処理エンジン(やアプリケーション?)は、様々なSLO(Service Level Objective)を目指し、 多数の調整用パラメータが存在する。 これをチューニングするのは玄人でも一筋縄ではない。

重要なのは、動的に設定できることに加え、常にモニタリングしフィードバックすること。

本論文では、非同期的にコントロールポリシーを適用するモデル(Control Plane)を提案。 ストリームデータを伝搬する機能を利用し、コントロールメッセージを伝搬する。 またコントロールメッセージを扱うためのリアクティブなプログラミングモデルを合わせて提案。これにより、ポリシーを実装するのにハイレベルAPIを利用すればよくなる。 グローバルな同期が不要であることが特徴。

過去の技術としては、Heron、Flink、Spark Streamingを例に挙げ、限定的な対応にとどまっている旨を指摘。

Chiは、 Orleans の上に構築されたFlare(★補足:Microsoft内部のシステムのようだ)という仕組み上に実装し、試した。 Trillも利用しているとのこと。

★補足:GitHub等に実装が公開されていないか軽く確認したが存在しないようだ。

2. MOTIVATION

メジャーなパブリッククラウド(★補足:ここではAzureのことを指しているのだと思われる)で生じるログを集めて分析したい。 サーバ台数は20万台、数十PB/日。 これはGoogleのログと同様の規模。

ワークロードを予測しがたい

生じるログはバラエティに富んでいる。これはそれを生成するプロセス、コンポーネントがバラエティ豊かだからである。 例えば、何か障害が起きるとそのとたんにバーストトラフィックが発生する。 デバッグモードを有効にするとログ量が増える。

特徴として、ひとつの種類のストリームの量も、バラエティ豊かであることが挙げられる。 例えば、あるストリームについては1分の間に数千万イベントまで到達するケースもあった。(つまり、バーストトラフィック)

これはトラフィックが予測困難であることを示す。

データの多様性

Skewが存在し、多様である。

マルチテナントなコントロールポリシー

ログにはError、Infoなどのレベルがあり、要件に応じて多様である。 またそれぞれ異なるSLOを求められることがある。 例えばInfoレベルはExactly Onceだが、それ以外はベストエフォート、など。

これをひとつのストリーム処理システムで実現しようとする。

またControl Policyは、リソース使用の最適化に用いられることもある。 ある処理の中間データがほかの処理で使われるために保持されたり、 データレイアウトやストレージの使い方を最適化したり…など。

これらの個別の先行研究は存在するが、まとめて管理しようとすると、 Control Planeに柔軟性とスケーラビリティが必要になる。

3. BACKGROUND

ここでは、Naiad、StreamScope、Apache Flinkを例にとって説明。 いずれも、オペレータのDAG構造のモデル。

ひとつの例として、 LINQスタイルのクエリでWordCountを表現。

LINQ風の表現とオペレータのDAG表現の例

計算モデルの前提は以下の通り。

DAGを\(\displaystyle G( V,\ E)\)で表す。

\(\displaystyle E\)はEdgeのセット、\(\displaystyle V\)がOperatorのセット。

\(\displaystyle u\ \rightharpoonup v\)はOperator \(\displaystyle u\)から\(\displaystyle v\)への有向Edgeを表す。

\(\displaystyle \{\cdot \rightharpoonup v\}\)\(\displaystyle v\)への入力Edgeを表す。

\(\displaystyle \{v\rightharpoonup \cdot \}\)\(\displaystyle v\)からの出力Edgeを表す。

Operator \(\displaystyle v\ \in V\)はトリプル\(\displaystyle ( s_{v} \ ,\ f_{v} \ ,\ p_{v})\)で表す。 この時、\(\displaystyle s_{v}\)\(\displaystyle v\)のステート、\(\displaystyle f_{v}\)\(\displaystyle v\)で実行される関数、\(\displaystyle p_{v}\)はステートに含まれないプロパティを表す。

\(\displaystyle f_{v}\)の例としては、 \[\begin{equation*} f:\ s_{v} \ ,\ m_{e_{i} \in \{\cdot \ \rightharpoonup \ v\}} \rightharpoonup s'_{v} ,\ m'_{e_{i} \in \{v\ \rightharpoonup \ \cdot \}} \end{equation*}\] が挙げられていた。ここで\(\displaystyle m\)はメッセージを表す。

つまり、\(\displaystyle m_{e_{i} \in \{\cdot \ \rightharpoonup \ v\}}\)は、入力Edge \(\displaystyle e_{i}\)から入ってくるメッセージを表す。 また入力を受け取った時点でステート\(\displaystyle s_{v}\)だったものを、出力時点で\(\displaystyle s'_{v}\)に変える関数を表す。 また、\(\displaystyle \ m'_{e_{i} \in \{v\ \rightharpoonup \ \cdot \}}\)は出力メッセージを表す。

4. DESIGN

データ処理と同様のAPIを設け、データ処理のパイプライン(data-plane)を使う。 グローバルな同期を必要とせず、非同期的なControl Operationを可能にする。

4.1 Overview

前提は以下の通り。

  • Operator間のチャンネルはExactly Onceであること
  • メッセージはFIFOで処理されること
  • 一度に処理されるメッセージは1個ずつ
  • 基盤となるSPEはOperatorに関する基本的なオペレーション(起動、停止、Kill)に対応すること

Control Loopを通じて、Control Messageが伝搬される。 Control Loopの流れは以下の通り。

  • Phase 1:
    • Controllerにより意思決定され、ユニークなIDが生成される
    • Control Operationはシリアライズされる
  • Phase 2:
    • Source OperatorにOperationが渡される
    • データフローに沿ってControl Messageが伝搬される
    • 途中で情報が付与されることもある
    • 各OperatorはControl Messageに沿って反応する
  • Phase 3:
    • SinkからControllerにフィードバックされる

[LINQ風の表現とオペレータのDAG表現の例] のDAG表現を用いた例でいえば、 もともと Reducer \(\{R_1, R_2\}\) で処理していたのを \(\{R_1, R_2, R_3\}\) で処理するように変更する例が載っている。 Control Messageが \(\{R_1, R_2\}\) に届くと、ルーティング情報を更新しつつ、ステート情報をCheckpointする。 ステート情報は再分配され、 \(R_3\) にも分配される。 \(R_3\) は、Control Messageを受け取ると入力チャンネルをブロックし、 \(\{R_1, R_2\}\) からのステートがそろうのを待つ。 ステートがそろったら、マージし、新しい関数を動かし始める。

ControllerがすべてのControl Messageを受け取ったら、スケールアウトは完了である。

4.2 Control Mechanism

4.2.1 Graph Transitions through Meta Topology

★この辺を読んでいる

<WIP>

共有

How to become a (Throughput) Billionaire The Stream Processing Engine PipeFabric

参考

メモ

2019年の論文

Reliable stream data processing for elastic distributed stream processing systems が引用されているため気になった。

ドイツのイルメナウ大学?で開発されているストリーム処理エンジン。 論文アブストラクトでは、「開発中」とされていたが、PipeFabricのGitHub を見ると、2019/8/14現在で最終更新が7か月前(2019/1)だった。

モダンなHWをどう生かすか?ということがテーマのように見えた。

共有

On-the-fly Reconfiguration of Query Plans for Stateful Stream Processing Engines

参考

メモ

Bartnik, A., Del Monte, B., Rabl, T. & Markl, V., らの論文。2019年。

多くのストリーム処理エンジンが、データ量の変動に対して対応するためにクエリの再起動を必要とし、 ステートの再構成(再度の分散化?)にコストを必要とする点に着目し、新たな手法を提案。 Apache Flinkへのアドオンとして対応し、外部のトリガを用いてオペレータの置換を行えることを示した。

Modificaton Coordinatorを導入し、RPC経由でメッセージ伝搬と同一の方法でModificationに関する情報を 各オペレータインスタンスに伝える。 またステートの保持にはCheckopintの仕組みを導入し、各オペレータインスタンスごとに ステートを永続化する。上流・下流オペレータはその状況を見ながら動作する。

多くの先行研究に対し、以下の点が特徴的であると言える。

  • 並列度の変更だけではなく、データフロー変更、オペレータの追加に対応
  • ステートサイズが小さいケースに加え、大きい場合も考慮し実証
  • ステートサイズが小さい場合で数秒、ステートサイズが大きい倍で数十秒のオーダで オペレータインスタンスのマイグレーション、オペレータの追加などに対応
  • Exactly onceセマンティクスを考慮

1 Introduction

ストリームデータの流量について。 ソーシャルメディアの昼夜での差のように予測可能なものもあるが、 スポーツや天気の影響のように予測不可能なものもある。

最近のSPE(Stream Processing Engine)は、実行中の設定変更に一部対応し始めているが、 基本的には設定変更のためには再実行が必要。 (ここではApache Flink、Apache Storm、Apache Sparkを例にとっている)

そこで、提案手法では、実行中の設定変更を可能にする。

  • ネットワークバッファのサイズ変更
  • ステートフル/レスの両オペレーションのマイグレーション(オペレータをほかのノードに移す)
  • 並列度の変更
  • 新しいオペレーションの追加
  • UDFの変更

Apache Flinkに上記内容をプロトタイプし、動作を検証。

2 Background

2.1 Data Stream Processing

近年のSPEのターゲットは、大量データの並列分散処理。 アプローチは2種類:マイクロバッチ、tuple-at-a-time。

マイクロバッチはスループット重視。

tuple-at-a-timeは入力レコードに対して、より細かな粒度での処理を可能にするが、 実際のところ物理レベルではバッチ処理のメカニズムを採用している。 1

SPEでは、基本的にはデータフローの表現にDAGを利用。 Source、Processing、Sink、それをつなぐEdgeで構成される。 Processingノードはステートフル、ステートレスのどちらもあえりえる。

2.2 Apache Flink

FlinkはParallelization Contract (PACT)プログラミングモデルを採用。 Flinkは投稿されたクエリをコンパイル(最適化含む)したうえで実行する。

Flinkはパイプライニングとバッファリングに依存する。 オペレータがデータを送信するときにはバッファを使用しバッファが一杯になったら送るようになっている。 これによりレイテンシを抑えつつ、スループットを稼ぐ。

Flinkはオペレータのフュージョンにも対応。 フュージョンされたオペレータはプッシュベースで通信する。

バックプレッシャもある。

2.3 Fault Tolerance and Checkpointing in Apache Flink

チェックポイントは、状態情報を保存し、故障の際には復旧させる。 このおかげで、メッセージの送達保証の各セマンティクスを選択できるようになる。 また、セーブポイントのメカニズムを築くことになる。 これにより、計画的に停止、再起動させることができるようになる。

Flinkの場合、セーブポイントから再開する際に、並列度を変えるだけではなく、 オペレータを追加・削除可能。

定期的なチェックポイントにより、状態情報が保存される。 状態情報が小さい場合は性能影響は小さい。(逆もしかり)

Flinkでは、プログラムエラーの際は実行を停止・再開し、最後にチェックポイントした 状態情報から再開する。 このとき、データソースはApache Kafkaのように再取得可能なものを想定している。 そうでないと、クエリの再開時にデータが一部失われることになる。

3 Protocol Description

3.1 System Model

システムの前提

Operatorはそれぞれの並列度を持つ。 論理プランは、Source、Sink、Processing OperatorでDAGを構成する。 実際にSPE上で動作する物理プランをJobと呼ぶ。

Operatorは

  • 単一の下流のOperatorにレコードを送る
  • すべての下流のOperatorにブロードキャストする
  • 何らかのパーティションルールにより送る

のいずれかの動作をする。

CheckpointのためにMarkerも送る。 Markerは定期的か、ユーザ指定により送られる。 i は、i番目のCheckpointであることを示す。 内部通信経路でMarkerを受け取ったオペレータは、スナップショットを作成して下流のオペレータにMarkerを送る。 また、スナップショットは非同期的にバックグラウンドの永続化ストレージに保存される。

ここで、SPEはExactly Onceでの処理を保証するものとする。 SPEはControllerとWorkerで構成される。

3.2 Migration Protocol Description

Modification Markerを送ることで、マイグレーションを開始する。 2

各Operatorは、eventualにMarkerを受けとり、マイグレーションに備えるが、 このとき枚グレート対象となるOperatorの上流・下流のOperatorにも影響することがポイント。 上流のOperatorは、バッファを駆使しつつ、Tupleの順序性を担保する。

3.3 Modification Protocols Description

Operatorをマイグレートするだけでなく、Operatorの追加、更新が可能。 ただし、その場合はUDFの配布が必要となる。

3.3.1 Introduction of New Operators

上流OperatorはMarkerを受け取るとバッファリングし始め、その旨下流に通知する。 すべての上流Operatorのバッファリングが開始されたら、新しいOperatorのインスタンスが起動する。 ただし、実際に起動させる前に、あらかじめUDFを配布しておく。 下流Operatorは、新しく起動されたOperatorに接続を試みる。

4 System Architecture

試したFlinkは1.3.2。 ★

4.1 Vanilla Components Overview

まずクライアントがModification ControlメッセージをCoordinatorに送る。 CoordinatorはWorkerに当該情報をいきわたらせる。 このメッセージは、通常のデータとは異なり、RPCとして送られる。 ここではActorモデルに基づく。

Flinkの場合、2個のOperator間の通信はProducer/Consumer関係の下やり取りされる。

各Operatorのインスタンスは、上流からデータを取得するための通信路に責任を持つ。

システムアーキテクチャのイメージ

4.2 Our Changes on the Coordinator Side

Modification Coordinatorは、Modificatoinに関する一切を取り仕切る。 バリデーションも含む。 例えば、現在走っているジョブに対して適用可能か?の面など。

Modificationの大まかな状態遷移は以下の通り。

Modificationのステート

Modificationに関係し、Taskの大まかな状態遷移は以下の通り。

Taskのステート

4.3 Our Changes on the Worker Side

オペレータ間の通信は、オペレータの関係に依存する。 例えば、同一マシンで動ているProducerインスタンスとConsumerインスタンスの場合はメモリを使ってデータをやり取りする。 一方、異なるマシンで動ている場合はネットワーク通信を挟む。

さて、Modificationが生じた場合、新しいConsumerが動き始めるまで、上流のインスタンスはバッファリングしないといけない。 提案手法では、ディスクへのスピル機能を含むバッファリングの仕組みを提案。それ専用のキューを設けることとした。

4.4 Query Plan Modifications

あるOperatorがModificationを実現するには、上流と下流のOperatorの合わせた対応も必要。 そこでModification CoordinatorがModificationメッセージに、関連情報全体を載せ、RPCを使って各Operatorに伝搬する。

4.4.1 Upstream Operators

Checkpointの間、上流から下流に向けて、Checkpointマーカーを直列に並べることで 故障耐性を実現する。 各オペレータは上流からのマーカーがそろうまでバッファリングを続ける。

もしオペレータをマイグレートしようとすると、このバッファもマイグレートする必要がある。 しかしこのバッファインの仕組みは、内部的な機能ではない(★要確認)ため、一定の手続きが必要。 Modificationメッセージには次のCheckpointのIDが含まれている。 このIDに該当するCheckpointが発動されたときには、マイグレート対象のオペレータの上流オペレータは CheckpointバリアのメッセージをModificationメッセージと一緒に送る。 このイベント情報は、上流オペレータがレコードをストレージにフラッシュしていることを下流に示すものとなる。 また、これを通じて、マイグレート対象となるオペレータと上流オペレータの間には仕掛中のレコードがないことを確認できる。

以上を踏まえると、Modificationを安全に進めるためには、Checkpointを待つことになる。 Checkpointインターバルは様々な要因で決まるが、例えばオペレータ数とステートの大きさに依存する。 ステートが大きく、Checkpointインターバルが大きい場合は、それだけModification開始を待たなくてはならない、ということである。

4.4.2 Target and Downstream Operators

下流のオペレータは、基本的には上流のオペレータの新しい情報を待つのみ。

5 Protocol Implementation

5.1 Operator Migration

Modification Coordinatorがトリガーとなるメッセージをソースオペレータから発出。 対象オペレータに加え、上流オペレータも特定する。(上流オペレータは、レコードをディスクにスピルする)

オペレータはcheckpointのマーカを待つ。 Checkpoint Coordinatorがマーカを発出し、マイグレート対象のオペレータの上流オペレータは 送信を止める。

各オペレータはPausing状態に移行するとともに、現在の状態情報をModification Coordinatorに送る。

さらに、下流のオペレータに新しいロケーションを伝える。

各オペレータがPaused状態に移行。 すべてのオペレータがPaused状態に移行したら、オペレータを再起動する。

その後、Modification Coordinatorが状態ロケーション?をアタッチし、タスクを実行開始する。

提案手法では、FlinkのCheckpointの仕組みを使用し、各オペレータの状態情報を取得し、アサインする。

5.2 Introduction of new Operators

オペレータのModificationと同様の流れで、新しいオペレーションの挿入にも対応する。 上流のオペレータがスピルした後、新しいオペレータが挿入される。 デプロイのペイロードには、コンパイル済のコードが含まれる。

5.3 Changing the Operator Function

Modification CoordinatorがModificationメッセージと一緒に、 新しいUDFを配布する。 Task Managerは非同期的にUFDを取得する。

またcallbackを用いて、グローバルCheckpointが完了したときに、新しいUDFを用いるようにする。

6 Evaluation

ここから先は評価となるが、ここではポイントのみ紹介する。

6.3 Workloads

3種類のワークロード、

  • 小さなステートの場合を確認するため、要素数をカウントするワークロード(SMQ)
  • 大きなステートの場合を確認するため、Nexmarkベンチマーク 3 のクエリ8(NBQ8)。(オンラインオークションのワークロード)
  • 上記SMQ、NBQ8についてオペレータのマイグレートを実施

2個めのワークロードではステートサイズが大きいので、インクリメンタルCheckpointを利用。 Flinkの場合は、埋め込みのRocksDB。

6.4 Migration Protocol Benchmark

6.4.1 Stateful Map Job Performance Drill Down

レイテンシで見るとスパイクが生じるのは、ストリーム処理のジョブがリスタートするタイミング。

オペレータのMigration中、1度3500msecのレイテンシのスパイクが生じた。 またコミュニケーションオーバヘッドもあるようだ。

概ね、秒オーダ。

6.4.2 Nexmark Benchmark Performance Drill Down

概ね、100~200秒オーダ。

ステートサイズは全部で13.5GBで、そのうち2.7GBがステート用のバックエンドに格納され、再現された。

80個のジェネレータのスループットは、Migration発生時も大きくは変わらなかった。

6.5 Introducing new operators at runtime

SMQのワークロードを用いた検証。 概ね、秒オーダ。

6.6 Replacing the operator function at runtime

SMQのワークロードを用いた検証。 概ね、秒オーダ。

スループットへの影響は小さい。

6.7 Discussion

Checkpointの同期は課題になりがち。 ステートのサイズが小さいときは高頻度で同期も可能かもしれないが、大きいときは頻度高くCheckpointすると、処理に影響が出る。 例えばステートサイズが小さい時には6秒以内にModificationを開始できたが、大きい時には60秒程度になった。

NBQ8の場合、従来のシャットダウンを伴うsavepointの仕組みと比べ、性能上の改善が見られた。

データソースが永続的であれば再取得することでExactly Onceを実現する。

実行中のジョブに対し、新しいオペレータを挿入することもできた。概ね10秒程度。

オペレータの関数を変更することもできた。概ね9秒程度。 これを突き詰めていくと、内部・外部状態に応じて挙動を変える、ということもできるようになるはず。

★補足: とはいえ、そのような挙動変更は、最初からUDF内に組み込んでおくべきとも考える。(普通に条件文を内部に入れておけば?と) 条件分岐が問題になるほどシビアなレイテンシ要求があるユースケースで、ここで挙げられているようなストリーム処理エンジンを使うとは思えない。 ★補足終わり:

7 Related work

★補足:特に気になった関連研究を以下に並べる。

  • Schneider, S.; Hirzel, M.; Gedik, B.; Wu, K.: Auto-parallelizing stateful distributed streaming applications. ACM PACT, 2012.
    • 並列度の変更に関する先行研究
    • 本論文の提案手法では、大きなステートサイズの際のexactly onceを対象としている点が異なる
  • Wu, Y.; Tan, K. L.: ChronoStream: Elastic stateful stream computation in the cloud. IEEE ICDE, 2015.
    • 弾力性の実現に関する先行研究
    • ただし別の論文の指摘によると、同期に関する課題がある
    • データフロー変更には対応していない。あくまでオペレータのマイグレーションのみ。
  • Heinze, T.; Pappalardo, V.; Jerzak, Z.; Fetzer, C.: Auto-scaling techniques for elastic data stream processing. In: IEEE ICDE Workshops. 2014. および、 Heinze, T.; Ji, Y.; Roediger, L.; Pappalardo, V.; Meister, A.; Jerzak, Z.; Fetzer, C.: FUGU: Elastic Data Stream Processing with Latency Constraints. IEEE Data Eng. Bull., 2015.
    • オートスケールのタイミングを判断する。オンライン機械学習を利用。
    • 簡単なマイグレーションのシナリオを想定。
  • Nasir, M.; Morales, G.; Kourtellis, N.; Serafini, M.: When Two Choices Are not Enough: Balancing at Scale in Distributed Stream Processing. CoRR, abs/1510.05714, 2015.
    • 並列度の調整に関する先行研究
    • 特にホットキーが存在する場合、そこにオペレータインスタンスを割り当てるように動く。
    • データフロー変更などには対応しない。
  • Mai, L.; Zeng, K.; Potharaju, R.; Xu, L.; Venkataraman, S.; Costa, P.; Kim, T.; Muthukrishnan, S.; Kuppa, V.; Dhulipalla, S.; Rao, S.: Chi: A Scalable and Programmable Control Plane for Distributed Stream Processing Systems. VLDB, 2018.
    • 本論文で扱っているのに近い
    • ただしステートサイズが大きなケースは対象としていない

  1. Carbone, P.; Ewen, S.; Fora, G.; Haridi, S.; Richter, S.; Tzoumas, K.: State Management in Apache Flink: Consistent Stateful Distributed Stream Processing. VLDB, 2017.↩︎

  2. Del Monte, B.: Efficient Migration of Very Large Distributed State for Scalable Stream Processing. VLDB PhD Workshop, 2017.↩︎

  3. Tucker, P.; Tufte, K.; Papadimos, V.; Maier, D.: NEXMark - A Benchmark for Queries over Data Streams. 2018.↩︎

共有

Reliable stream data processing for elastic distributed stream processing systems

参考

メモ

Reliable stream data processing for elastic distributed stream processing systems では、 弾力性を有するストリーム処理エンジン(Distributed Stream Processing System: DSPS)の さらなる課題について触れている。

  • 動的にオペレータがスケールアップ、ダウンする中で、データのバックアップの一貫性を保つ故障耐性が必要がある
  • チェックポイントへのロールバックが、直近のオートスケール調整を元に戻らせる可能性がある
共有