The 8 Requirements of Real-Time Stream Processing

参照

メモ

MillWheel でも触れられているストリーム処理について書かれた論文。 2005年。 時代は古いが当時の考察は現在でも有用と考える。

1. Introduction

ウォール街のデータ処理量は大きい。 2005年時点で122,000msg/sec。年率2倍で増大。 しかも処理レイテンシが秒では遅い。 金融に限らず異常検知、センサーデータ活用などのユースケースでは同様の課題感だろう。

「メインメモリDBMS」、「ルールベースエンジン」などがこの手のユースケースのために再注目されたが、 その後いわゆる「ストリーム処理エンジン」も登場。

2. Eight rules for stream processing

Rule 1: Keep the Data Moving

ストレージに書き込むなどしてレイテンシを悪化させてはならない。

ポーリングもレイテンシを悪化させる。 そこでイベント駆動のアプローチを用いることがよく考えられる。

Rule 2: Query using SQL on Streams (StreamSQL)

ハイレベルAPIを用いることで、開発サイクルを加速し、保守性を上げる。

StreamSQL:ストリーム処理固有の機能を有するSQL リッチなウィンドウ関数、マージなど。

Rule 3: Handle Stream Imperfections (Delayed, Missing and Out-of-Order Data)

例えば複数のストリームからなるデータを処理する際、 あるストリームからのデータが到着しないとき、タイムアウトして結果を出す必要がある。

同じようにout-of-orderデータについてもウィンドウに応じてタイムアウトしないといけない。

Rule 4: Generate Predictable Outcomes

結果がdeterministicであり、再現性があること。

計算そのものだけではなく、対故障性の観点でも重要。

Rule 5: Integrate Stored and Streaming Data

過去データに基づき、何らかのパターンなどを見つける処理はよくある。 そのためステート管理の仕組み(データストア)は重要。

例えば金融のトレーディングで、あるパターンを見つける、など。 他にも異常検知も。

過去データを使いながら、実時間のデータにキャッチアップするケースもあるだろう。

このようにヒストリカルデータとライブデータをシームレスにつなぐ機能は重要。

Rule 6: Guarantee Data Safety and Availability

HAは悩ましい課題。 故障が発生してフェールオーバするとして、バックアップハードウェアを 立ち上げて処理可能な状態にするような待ち時間は許容できない。

そこでタンデム構成を取ることは現実的にありえる。

Rule 7: Partition and Scale Applications Automatically

ローレベルの実装を経ずに、分散処理できること。 またマルチスレッド処理可能なこと。

Rule 8: Process and Respond Instantaneously

数万メッセージ/secの量を処理する。 さらにレイテンシはマイクロ秒オーダから、ミリ秒オーダ。

そのためには極力コンポーネント間をまたぐ処理をしないこと。

3. SOFTWARE TECHNOLOGIES for STREAM PROCESSING

基本的なアーキテクチャは、DBMS、ルールベースエンジン、ストリーム処理エンジン。

DBMSはデータをストアしてから処理する。データを動かしつづけるわけではない。

またSPEはSQLスタイルでのストリームデータの処理を可能とする。

SPEとルールベースエンジンは、ブロックおよびタイムアウトを実現しやすい。 DBMSはそのあたりはアプリケーションに委ねられる。 仮にトリガを用いたとしても…。

いずれのアーキテクチャにおいても、予測可能な結果を得るためには、 deterministicな処理を実現しないといけない(例えばタイムスタンプの並びを利用、など) SPEとルールベースエンジンはそれを実現しやすいが、DBMSのACID特性は あくまでトラディショナルなデータベースのためのもであり、ストリームデータ処理のための ものではない。

状態を保存するデータストアとしてDBMSを見たとき、サーバ・クライアントモデルのDBMSは 主にレイテンシの面で不十分である。 唯一受け入れられるとしたら、アプリケーション内に埋め込めるDBMSの場合である。 一方ルールベースエンジンはストリームデータを扱うのは得意だが、 大規模な状態管理と状態に対する柔軟なクエリが苦手である。

3.3 Tabular results

このあたりにまとめ表が載っている。

共有

MillWheel Paper

参考

メモ

昔のメモをここに転記。

1. Introduction

故障耐性、ステート永続化、スケーラビリティが重要。 Googleでは多数の分散処理システムを運用しているので、何かしらおかしなことが置き続けている。

MillWheelはMapReduceと同様にフレームワーク。ただし、ストリーム処理やローレイテンシでの処理のため。

Spark StreamingとSonoraは、チェックポイント機能持つが、実装のためのオペレータが限られている。 S4は故障耐性が不十分。 Stormはexactly onceで動作するが、Tridentは厳密な順序保証が必要。

MapReduce的な考えをストリーム処理に持ち込んでも、妥協した柔軟性しか得られていない。 ストリーミングSQLは簡潔な手段をもたらしているが、ステートの抽象化、複雑なアプリケーションの実装しやすさという意味では、 オペレーショナルなフローに基づくアプローチのほうが有利。

  • 分散処理の専門家でなくても複雑なストリーム処理を実装できること
  • 実行可能性

2. Motivation and requirements

GoogleのZeitgeistを例にした動機の説明。

  • 永続化のためのストレージ
  • Low Watermark(遅れデータへの対応)
  • 重複への対応(Exactly Onceの実現)

3. System overview

Zeitgeistの例で言えば、検索クエリが入力データであり、データ流量がスパイクしたり、凹んだりしたことが出力データ。

データ形式:key、value、timestamp

4. Core concept

キーごとに計算され、並列処理化される。 また、キーがアグリゲーションや比較の軸になる。 Zeitgeistの例では、例えば検索クエリそのものをキーとすることが考えられる。 また、キー抽出のための関数を定義して用いる。

ストリーム:データの流れ。 コンピューテーションは複数のストリームを出力することもある。

ウィンドウ修正するときなどに用いられるステートを保持する。 そのための抽象化の仕組みがある。 キーごとのステート。

Low Watermarkの仕組みがあり、Wall timeが進むごとにWatermarkが進む。 時間経過とともにWatermarkを越したデータに対し、計算が行われる。

タイマー機能あり。 Low Watermarkもタイマーでキックされる、と考えて良い。

5. API

計算APIは、キーごとのステートをフェッチ、加工し、必要に応じてレコード生成し、タイマーを セットする。

Injector:MillWheelに外部データを入力する。 injector low watermarkも生成可能。

なお、low watermarkを逸脱するデータが生じた場合は、 ユーザアプリでは捨てるか、それとも現在の集計に組み込むか決められる。

6. Fault tolerance

到達保証の考え方としては、ユーザアプリで冪等性を考慮しなくて良いようにする、という点が挙げられる。

基本的にAckがなければデータが再送される設計に基づいているが、 受信者がAckを返す直前に何らかの理由で故障した場合、データが重複して処理される可能性がある。 そこでユニークIDを付与し、重複デーかどうかを確かめられるようにする。 判定にはブルームフィルタも利用する。 ID管理の仕組みにはガベージコレクションの仕組みもある。

チェックポイントの仕組みもある。 バックエンドストレージにはBigtableなどを想定。

なお、性能を重視しチェックポイントを無効化することもできるが、 そうすると何らかの故障が生じて、データ送信元へのAckが遅れることがある。 パイプラインが並列化すると、それだけシステム内のどこかで故障が生じる可能性は上がる。 そこで、滞留するデータについては部分的にチェックポイントすることで、 計算コストとエンドツーエンドレイテンシのバランスを保つ。 ★

永続化されたステートの一貫性を保つため、アトミックなオペレーションでラップする。 ただし、マシンのフェールオーバ時などにゾンビ化したWriterが存在する可能性を考慮し、 シークエンサートークンを用いるようにする。 つまり、新たに現れたWriterは古いシークエンサートークンを無効化してから、 動作を開始するようにしている。

7. System Implementation

MillWheelはステート管理のためにデータストア(BigTableなど)を使う。 故障発生時にはデータストアからメモリ情報を再現する。

low watermarkのジャーナルはデータストアに保存される。

感想:このあたりデータストアの性能は、最終的なパフォーマンスに大きく影響しそうだ。 ★

8. Evaluation

単純なシャッフル処理では、レイテンシの中央値は数ミリ秒。95パーセンタイルで30ミリ秒。 Exactly onceなどを有効化すると中央値は33.7ミリ秒。95パーセンタイルで93.8ミリ秒。

CPU数をスケールアウトしても、レイテンシに著しい劣化は見られない。(逆に言うと、99パーセンタイルではある程度の劣化が見られる)

low watermarkは処理のステージをまたぐと、実時間に対してラグが生じる。 このあたりは活発に改善を進められているところ。

ステート管理などにストレージ(BitTableなど)を使う。 これによりリード・ライトがたくさん発生する。 ワーカにおけるキャッシュは有効。

実際のユースケースは、広告関係。 そのほか、Google Street Viewでのパノラマ画像の生成など。 ★

9. Related work

ストリーム処理システムが必要とするものは、以下の論文に記載されている。 The 8 Requirements of Real-Time Stream Processing

Spark Streamingに対しては、MillWheelの方がgeneralであると主張。 RDDへの依存性がユーザに制約をもたらす、とも。 またチェックポイントの間隔はMillWheelの方が粒度が細かい。

共有

System monitor on Ubuntu18

参考

メモ

Ubuntu 18.04 LTSをインストールした直後に行う設定 & インストールするソフト に記載されているとおり、 GNOME Shell ExtensionsからSystem-monitorをインストールすると、タスクバー上にシステムモニターを表示できる。

共有

Alt Tab on Ubuntu18

参考

メモ

Ubuntu 18.04 LTSをインストールした直後に行う設定 & インストールするソフト に諸々記載されている。 設定から変えられる。

共有

Docker DesktopをWSLから利用している場合のマウントについて

参考

メモ

Docker for WindowsをWSLから使う時のVolumeの扱い方 に起債されている通りだが、 うっかり -v /tmp/hoge:/hoge とかやると、Docker Desktopで使用しているDocker起動用の仮想マシン中の /tmp/hoge をマウントすることになる。

共有

MLflow

参考

メモ

クイックスタートを試す

公式クイックスタート の通り、簡単な例を試す。 手元のAnaconda環境で仮想環境を構築し、実行する。

1
2
3
$ /opt/Anaconda/default/bin/conda create -n mlflow ipython jupyter
$ source activate mlflow
$ pip install mlflow

サンプルコードを実装。実験のために、もともと載っていた内容を修正。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ mkdir -p Sources/mlflow_quickstart
$ cd Sources/mlflow_quickstart
$ cat << EOF > quickstart.py
> import os
> from mlflow import log_metric, log_param, log_artifact
>
> if __name__ == "__main__":
> # Log a parameter (key-value pair)
> log_param("param1", 5)
>
> # Log a metric; metrics can be updated throughout the run
> log_metric("foo", 1)
> log_metric("foo", 2)
> log_metric("foo", 3)
>
> # Log an artifact (output file)
> with open("output.txt", "w") as f:
> f.write("Hello world! 1\n")
> with open("output.txt", "w") as f:
> f.write("Hello world! 2\n")
> log_artifact("output.txt")
> with open("output.txt", "w") as f:
> f.write("Hello world! 3\n")
> log_artifact("output.txt")
> EOF

以下のようなファイルが生成される。

1
2
$ ls
mlruns output.txt quickstart.py

つづいてUIを試す。

1
$ mlflow ui

ブラウザで、 http://localhost:5000/ にアクセスするとウェブUIを見られる。 メトリクスは複数回記録すると履歴となって時系列データとして見えるようだ。 一方、アーティファクトは複数回出力しても1回分(最後の1回?)分しか記録されない?

なお、複数回実行すると、時系列データとして登録される。 試行錯誤の履歴が残るようになる。

クイックスタートのプロジェクト

公式クイックスタート には、MLflowプロジェクトとして取り回す方法の例として、 GitHubに登録されたサンプルプロジェクトをロードして実行する例が載っている。

1
$ mlflow run https://github.com/mlflow/mlflow-example.git -P alpha=5

GitHubからロードされたプロジェクトが実行される。 なお、ローカルファイルシステムのmlrunsには今回実行したときの履歴が保存される。

チュートリアルの確認

公式チュートリアル を見ながら考える。 examples/sklearn_elasticnet_wine/train.py が取り上げられているのでまずは確認する。

27行目辺りからメイン関数。

最初にCSVファイルを読み込んでPandas DFを作る。 examples/sklearn_elasticnet_wine/train.py:32

1
2
wine_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wine-quality.csv")
data = pd.read_csv(wine_path)

入力ファイルを適当に分割したあと、MLflowのセッション(と呼べばよいのだろうか)を起動する。 examples/sklearn_elasticnet_wine/train.py:47

1
with mlflow.start_run():

これにより、MLflowが動作に必要とするスタックなどが初期化される。 詳しくは mlflow/tracking/fluent.py:71 あたりの run メソッドの実装を参照。

セッション開始後、モデルを定義し学習を実行する。 その後推論結果を取得し、メトリクスを計算する。 このあたりは通常のアプリと同じ実装。

1
2
3
4
5
6
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)

predicted_qualities = lr.predict(test_x)

(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

その後パラメータ、メトリクス、モデルを記録する。

1
2
3
4
5
6
7
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)

mlflow.sklearn.log_model(lr, "model")

このあたりが、MLflowのトラッキング機能を利用している箇所。

試しにパラメータを色々渡して実行してみる。 以下のようなシェルスクリプトを用意した。

1
2
3
4
5
6
7
for alpha in 0 1 0.5
do
for l1_ratio in 1 0.5 0.2 0
do
python ~/Sources/mlflow/examples/sklearn_elasticnet_wine/train.py ${alpha} ${l1_ratio}
done
done

その後、mlflow ui コマンドでUIを表示すると、先程試行した実験の結果がわかる。 メトリクスでソートもできるので、 モデルを試行錯誤しながら最低なモデル、パラメータを探すこともできる。

パッケージング

チュートリアルでは、以下のようなMLprojectファイル、conda.yamlが紹介されている。

MLproject

1
2
3
4
5
6
7
8
9
10
name: tutorial

conda_env: conda.yaml

entry_points:
main:
parameters:
alpha: float
l1_ratio: {type: float, default: 0.1}
command: "python train.py {alpha} {l1_ratio}"

このファイルはプロジェクトの定義を表すものであり、 依存関係と実装へのエントリーポイントを表す。 見てのとおり、パラメータ定義やコマンド定義が記載されている。

conda.yaml

1
2
3
4
5
6
7
8
name: tutorial
channels:
- defaults
dependencies:
- python=3.6
- scikit-learn=0.19.1
- pip:
- mlflow>=1.0

このファイルは環境の依存関係を表す。

このプロジェクトを実行するには、以下のようにする。

1
$ mlflow run ~/Sources/mlflow/examples/sklearn_elasticnet_wine -P alpha=0.5

これを実行すると、conda環境を構築し、その上でアプリを実行する。 もちろん、run情報が残る。

Dockerでパッケージング

Dockerでパッケージング を見ると、Dockerをビルドしてその中でアプリを動かす例も載っている。

最初にDockerイメージをビルドしておく。

1
2
$ cd ~/Sources/mlflow/examples/docker
$ docker build -t mlflow-docker-example -f Dockerfile .

つづいて、プロジェクトを実行する。

1
$ sudo mlflow run ~/Sources/mlflow/examples/docker -P alpha=0.5

これでconda環境で実行するのと同じように、Docker環境で実行される。

ちなみにMLprojectファイルは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
name: docker-example

docker_env:
image: mlflow-docker-example

entry_points:
main:
parameters:
alpha: float
l1_ratio: {type: float, default: 0.1}
command: "python train.py --alpha {alpha} --l1-ratio {l1_ratio}"

モデルサーブ

チュートリアルで学習したモデルをサーブできる。 mlflow ui でモデルの情報を確認すると、アーティファクト内にモデルが格納されていることがわかるので、 それを対象としてサーブする。

1
$ mlflow models serve -m /home/dobachi/Sources/mlflow_tutorials/mlruns/0/a87ee8c6c6f04f5c822a32e3ecae830e/artifacts/model -p 1234

サーブされたモデルを使った推論は、REST APIで可能。

1
$ curl -X POST -H "Content-Type:application/json; format=pandas-split" --data '{"columns":["fixed acidity","volatile acidity","citric acid","residual sugar","chlorides","free sulfur dioxide","total sulfur dioxide","density","pH","sulphates","alcohol"],"data":[[7,0.27,0.36,20.7,0.045,45,170,1.001,3,0.45,8.8]]}' http://127.0.0.1:1234/invocations

Runファイルが保存される場所

where-runs-are-recorded によると、以下の通り。

  • Local file path (specified as file:/my/local/dir), where data is just directly stored locally.
    • →ローカルファイルシステム
  • Database encoded as +://:@:/. Mlflow supports the dialects mysql, mssql, sqlite, and postgresql. For more details, see SQLAlchemy database uri.
    • →データベース(SQLAlchemyが対応しているもの)
  • HTTP server (specified as https://my-server:5000), which is a server hosting an MLFlow tracking server.
    • →MLflowトラッキングサーバ
  • Databricks workspace (specified as databricks or as databricks://, a Databricks CLI profile.

トラッキングサーバを試す

まずはトラッキングサーバを起動しておく。

1
$ mlflow server --backend-store-uri /tmp/hoge --default-artifact-root /tmp/fuga --host 0.0.0.0

つづいて、学習を実行する。

1
$ MLFLOW_TRACKING_URI=http://localhost:5000 mlflow run ~/Sources/mlflow/examples/sklearn_elasticnet_wine -P alpha=0.5

前の例と違い、環境変数MLFLOW_TRACKING_URIが利用され、上記で起動したトラッキングサーバが指定されていることがわかる。 改めてブラウザで、http://localhost:5000にアクセスすると、先程実行した学習の履歴を確認できる。

mlflowコマンドを確認する

mlflowコマンドの実態は以下のようなPythonスクリプトである。

1
2
3
4
5
6
7
8
import re
import sys

from mlflow.cli import cli

if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
sys.exit(cli())

mlflow.cliは、clickを使って実装されている。 コマンドとしては、

  • run
  • ui
  • server

あたりに加え、以下のようなものが定義されていた。

mlflow/cli.py:260

1
2
3
4
5
6
7
cli.add_command(mlflow.models.cli.commands)
cli.add_command(mlflow.sagemaker.cli.commands)
cli.add_command(mlflow.experiments.commands)
cli.add_command(mlflow.store.cli.commands)
cli.add_command(mlflow.azureml.cli.commands)
cli.add_command(mlflow.runs.commands)
cli.add_command(mlflow.db.commands)

start_runについて

mlflowを使うときには、以下のようにstart_runメソッドを呼び出す。

1
2
3
with mlflow.start_run():
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)

start_runメソッドの実態は以下のように定義されている。

mlflow/init.py:54

1
start_run = mlflow.tracking.fluent.start_run

なお、start_runの戻り値は mlflow.ActiveRun とのこと。

mlflow/tracking/fluent.py:155

1
2
_active_run_stack.append(ActiveRun(active_run_obj))
return _active_run_stack[-1]

mlflow/tracking/fluent.py:61

1
2
3
4
5
6
7
8
9
10
11
12
13
class ActiveRun(Run):  # pylint: disable=W0223
"""Wrapper around :py:class:`mlflow.entities.Run` to enable using Python ``with`` syntax."""

def __init__(self, run):
Run.__init__(self, run.info, run.data)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
status = RunStatus.FINISHED if exc_type is None else RunStatus.FAILED
end_run(RunStatus.to_string(status))
return exc_type is None

ActiveRunクラスは上記のような実装なので、withステートメントで用いる際には、終了処理をするようになっている。 ステータスを変更し、アクティブな実行(ActiveRun)を終了させる。

上記の構造から見るに、アクティブな実行(ActiveRun)はネスト構造?が可能なように見える。

uiの実装

cli.pyのuiコマンドを確認すると、どうやら手元で気軽に確認するたのコマンドのようだった。

mlflow/cli.py:158

1
2
3
4
5
6
def ui(backend_store_uri, default_artifact_root, port):
"""
Launch the MLflow tracking UI for local viewing of run results. To launch a production
server, use the "mlflow server" command instead.

The UI will be visible at http://localhost:5000 by default.

実際には簡易設定で、serverを起動しているようだ。

mlflow/cli.py:184

1
_run_server(backend_store_uri, default_artifact_root, "127.0.0.1", port, 1, None, [])

mlflow/server/init.py:51

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _run_server(file_store_path, default_artifact_root, host, port, workers, static_prefix,
gunicorn_opts):
"""
Run the MLflow server, wrapping it in gunicorn
:param static_prefix: If set, the index.html asset will be served from the path static_prefix.
If left None, the index.html asset will be served from the root path.
:return: None
"""
env_map = {}
if file_store_path:
env_map[BACKEND_STORE_URI_ENV_VAR] = file_store_path
if default_artifact_root:
env_map[ARTIFACT_ROOT_ENV_VAR] = default_artifact_root
if static_prefix:
env_map[STATIC_PREFIX_ENV_VAR] = static_prefix
bind_address = "%s:%s" % (host, port)
opts = shlex.split(gunicorn_opts) if gunicorn_opts else []
exec_cmd(["gunicorn"] + opts + ["-b", bind_address, "-w", "%s" % workers, "mlflow.server:app"],
env=env_map, stream_output=True)

なお、serverのヘルプを見ると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Usage: mlflow server [OPTIONS]

Run the MLflow tracking server.

The server which listen on http://localhost:5000 by default, and only
accept connections from the local machine. To let the server accept
connections from other machines, you will need to pass --host 0.0.0.0 to
listen on all network interfaces (or a specific interface address).

Options:
--backend-store-uri PATH URI to which to persist experiment and run
data. Acceptable URIs are SQLAlchemy-compatible
database connection strings (e.g.
'sqlite:///path/to/file.db') or local
filesystem URIs (e.g.
'file:///absolute/path/to/directory'). By
default, data will be logged to the ./mlruns
directory.
--default-artifact-root URI Local or S3 URI to store artifacts, for new
experiments. Note that this flag does not
impact already-created experiments. Default:
Within file store, if a file:/ URI is provided.
If a sql backend is used, then this option is
required.
-h, --host HOST The network address to listen on (default:
127.0.0.1). Use 0.0.0.0 to bind to all
addresses if you want to access the tracking
server from other machines.
-p, --port INTEGER The port to listen on (default: 5000).
-w, --workers INTEGER Number of gunicorn worker processes to handle
requests (default: 4).
--static-prefix TEXT A prefix which will be prepended to the path of
all static paths.
--gunicorn-opts TEXT Additional command line options forwarded to
gunicorn processes.
--help Show this message and exit.

なお、上記ヘルプを見ると、runデータを保存するのはデフォルトではローカルファイルシステムだが、 SQLAlchemyでアクセス可能なRDBMSでも良いようだ。

サーバとして、gunicornを使っているようだ。 多数のリクエストをさばくため、複数のワーカを使うこともできるようだ。

mlflow/server/init.py:68

1
2
exec_cmd(["gunicorn"] + opts + ["-b", bind_address, "-w", "%s" % workers, "mlflow.server:app"],
env=env_map, stream_output=True)

上記の通り、 mlflow.server:app が実態なので確認する。 このアプリケーションはFlaskが用いられている。 いったん、 / の定義から。

mlflow/server/init.py:45

1
2
3
4
# Serve the index.html for the React App for all other routes.
@app.route(_add_static_prefix('/'))
def serve():
return send_from_directory(STATIC_DIR, 'index.html')

mlflow/server/init.py:16

1
2
3
4
REL_STATIC_DIR = "js/build"

app = Flask(__name__, static_folder=REL_STATIC_DIR)
STATIC_DIR = os.path.join(app.root_path, REL_STATIC_DIR)

以上のように、 mlflow/server/js 以下にアプリが存在するようだが、 そのREADME.mdを見ると、当該アプリは https://github.com/facebook/create-react-app を 使って開発されたように見える。

mlflow-exampleプロジェクトを眺めてみる

mlflow-example には、mlflow公式のサンプルプロジェクトが存在する。 この中身を軽く眺めてみる。

MLproject

ファイルの内容は以下の通り。

1
2
3
4
5
6
7
8
9
10
name: tutorial

conda_env: conda.yaml

entry_points:
main:
parameters:
alpha: float
l1_ratio: {type: float, default: 0.1}
command: "python train.py {alpha} {l1_ratio}"

プロジェクト名は tutorial であり、condaによる環境構成情報は別途 conda.yaml に定義されていることがわかる。

エントリポイントには複数を定義可能だが、ここでは1個のみ(mainのみ)定義されている。 パラメータは2個(alphal1_ratio)与えられている。 それらのパラメータは、実行コマンド定義内でコマンドライン引数として渡されることになっている。

なお、実行されるPythonスクリプト内では以下のように、コマンドライン引数を処理している。

train.py:44

1
2
alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5

conda.yaml

本ファイルには、condaを使ってインストールするライブラリが指定されている。

1
2
3
4
5
6
7
8
9
name: tutorial
channels:
- defaults
dependencies:
- numpy=1.14.3
- pandas=0.22.0
- scikit-learn=0.19.1
- pip:
- mlflow

numpy、pandas、scikit-learnあたりの基本的なライブラリをcondaで導入し、 最後にpipでmlflowを導入していることがわかる。

またチャンネルの設定もできるようであるが、ここではデフォルトのみ使用することになっている。

train.py

ここでは、ハイライトを確認する。

最初に入力データの読み出しと分割等。

train.py:31

1
2
3
4
5
6
7
8
9
10
11
12
# Read the wine-quality csv file (make sure you're running this from the root of MLflow!)
wine_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wine-quality.csv")
data = pd.read_csv(wine_path)

# Split the data into training and test sets. (0.75, 0.25) split.
train, test = train_test_split(data)

# The predicted column is "quality" which is a scalar from [3, 9]
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]

MLflowのセッションを開始

train.py:47

1
with mlflow.start_run():

モデルを定義し、学習。その後テストデータを用いて予測値を算出し、メトリクスを計算する。

train.py:48

1
2
3
4
5
6
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)

predicted_qualities = lr.predict(test_x)

(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

上記のメトリクスは標準出力にも出されるようにもなっている。 手元で試した例では、以下のような感じ。

1
2
3
4
Elasticnet model (alpha=5.000000, l1_ratio=0.100000):
RMSE: 0.8594260117338262
MAE: 0.6480675144220314
R2: 0.046025292604596424

最後に、メトリクスを記録し、モデルを保存する。

train.py:60

1
2
3
4
5
6
7
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)

mlflow.sklearn.log_model(lr, "model")

【ボツ】Dockeでクイックスタートを実行

※以下の手順は、UIを表示させようとしたところでエラーになっている。まだ原因分析していない。 -> おおかたバインドするアドレスが127.0.0.1になっており、外部から参照できなくなっているのでは、と。-> mlflow ui の実装をぱっと見る限り、そうっぽい。

公式GitHub からCloneし、Dockerイメージをビルドする。 これを実行環境とする。

1
2
3
$ cd Sources
$ git clone https://github.com/mlflow/mlflow.git
$ cd mlflow/examples/docker

なお、実行時に便利なようにDockerfileを以下のように修正し、ipython等をインストールしておく。

1
2
3
4
diff --git a/examples/docker/Dockerfile b/examples/docker/Dockerfileindex e436f49..686e0e2 100644--- a/examples/docker/Dockerfile+++ b/examples/docker/Dockerfile@@ -1,5 +1,7 @@ FROM continuumio/miniconda:4.5.4+RUN conda install ipython jupyter
+ RUN pip install mlflow>=1.0 \
&& pip install azure-storage==0.36.0 \
&& pip install numpy==1.14.3

ではビルドする。

1
$ sudo -i docker build -t "dobachi/mlflow:latest" `pwd`

チュートリアルのサンプルアプリを作成する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ cat << EOF > tutorial.py
> import os
> from mlflow import log_metric, log_param, log_artifact
>
> if __name__ == "__main__":
> # Log a parameter (key-value pair)
> log_param("param1", 5)
>
> # Log a metric; metrics can be updated throughout the run
> log_metric("foo", 1)
> log_metric("foo", 2)
> log_metric("foo", 3)
>
> # Log an artifact (output file)
> with open("output.txt", "w") as f:
> f.write("Hello world!")
> log_artifact("output.txt")
> EOF

起動する。 このとき、サンプルアプリも /apps 以下にマウントするようにする。

1
$ sudo -i docker run -v `pwd`:/apps --rm -it dobachi/mlflow:latest  /bin/bash

先程作成したアプリを実行する。

1
2
# cd apps
# python tutorial.py

結果は以下の通り。

1
2
# cat output.txt  
Hello world!

ここで mlflow ui を起動して見ようと持ったが、ウェブフロントエンドに繋がらなかった。 デバッグを試みる。

共有

HBase on Docker for test

参考

メモ

最初は、 dajobe/hbase-docker を試そうとしたが、後から HariSekhon/Dockerfiles を見つけた。 この中の hbase ディレクトリ以下にある手順に従ったところ、無事に起動した。

HariSekhon/Dockerfiles は、ほかにもビッグデータ系のプロダクトのDockerイメージを配布しているようだ。

ただ、Dockerfileを見ると割と作りこまれているようなので、動作確認にはよいがまじめな環境としてはきちんと見直してから使う方がよさそう。

共有

Spark Summit 2019

セッションメモ

Accelerate Your Apache Spark with Intel Optane DC Persistent Memory

Accelerate Your Apache Spark with Intel Optane DC Persistent Memory

p.12あたりにアーキイメージがある。 キャッシュを中心としたイメージ。 DCPMMを用いる。

p.14 Optimized Analytics Package(OAP)はDCMPPを用いてSpark SQLを加速する。

p.16あたりからキャシュデザインに関する課題感や考え方が記載されている。 例えば…

  • 手動によるメモリ管理はフラグメンテーションを生じさせる
  • libvmemcacheのリンクドリストによりロック待ちを起因としたボトルネックが生じる
    • そこでリングバッファを利用し、できるだけロック機会を減らすことにした

p.26あたりからSpark SQLを題材とした検証結果が載っている。 DRAMに乗り切るサイズであえてやってみた例、乗り切らさないサイズでやってみた例。

p.30あたりからSpark MLlibのK-meansを題材とした検証結果が載っている。 ★ DCPMMのデータに関する階層の定義がある。 DRAM+Diskの場合と比べ、間にOptain DC Persistent Memoryが入ることで性能向上を狙う。 K-meansのワークロードでは最初にHDFSからデータをロードしたあと、 エグゼキュータのローカルディスクを含むストレージを使用してイテレーティブに計算する。 このときDCPMMを用いることで大きなサイズのデータを扱うときでも、ディスクにスピルさせることなく動作可能になり、処理時間の縮小になった。(つまり、性能向上効果はデータサイズによる) ★

Building Robust Production Data Pipelines with Databricks Delta

Building Robust Production Data Pipelines with Databricks Delta

p.5 データレイクがあっても、信頼するデータがない(データを信頼できない)ことで 多くのデータサイエンスや機械学習が失敗している。

p.7 なぜデータが信頼できなくなるのかというと、

  • 失敗したジョブが破損したデータを生成する
  • スキーマを強制するすべがなく、一貫性が崩れたり、 データの品質が悪くなる。
  • 追記・読み込み、バッチとストリーム処理の混在が許されない (補足:これだけデータの信頼性の話とは異なるような気がした)

p.10 鍵となる特徴が載っている。 ACID(トランザクション)、スキーマ強制、バッチとストリーム処理の混在、タイムトラベル(スナップショット)

Deploying Enterprise Scale Deep Learning in Actuarial Modeling at Nationwide

Deploying Enterprise Scale Deep Learning in Actuarial Modeling at Nationwide

ユースケース。

自動車保険を中心としたサービスを提供するNationwide社の事例。

p.7 ★ CDOを筆頭とし、Enterprise Data Officeを構成。 EDOは完全なデータと分析サービスをもとにソリューションを提供し、ビジネスを強化する。

p.8 Analytical Lab。 Databricksのサービスを用いて分析パイプラインを構成?

p.11 Enterprise Analytics Officeというコンセプトの下、データから叡智を取り出す方法論を提供。

p.17 Sparkを用いている理由は、並列化可能な内容がたくさんあるから。例えば以下の通り。 * データ変換がたくさん * スコアリングも並列化可能 * ハイパーパラメータチューニングなども並列化可能

Improving Apache Spark's Reliability with DataSourceV2

Improving Apache Spark's Reliability with DataSourceV2

Neflixの事例。

S3が結果整合性。 Hiveがファイルのリスティングに依存。たまに失敗する。 Netflixの規模だと、「たまに」というのが毎日になる。

p.7 2016年当時の工夫。

p.10〜 DataFrameのDataSourceに関連するつらみ。

DataFrameWriterでは振る舞いが規定されていない。

p.17 Icebergの採用(2019) Hive等で課題となっていたテーブルの物理構造を見直した。 くわしくは、 Strata NY 2018でのIcebergの説明 参照。

p.20 DataSourcedV2が登場したが、一部の課題は引き継がれた。

  • 書き込みのバリデーションがない
  • SaveModeの挙動がデータソースまかせ

p.21' DSv2での活動紹介

Lessons Learned Using Apache Spark for Self-Service Data Prep in SaaS World

Lessons Learned Using Apache Spark for Self-Service Data Prep in SaaS World

ユースケース。

workday における事例。

p.13 「Prism」分析のワークフロー。

p.14 分析前の前処理はSparkで実装される。 ウェブフロントエンドを通じて処理を定義する。

インタラクティブな処理とバッチ処理の両方に対応。

p.25 Sparkのデータ型にさらにデータ型を追加。 StructType、StructFieldを利用して実装。

p.28〜 lessons learnedをいくつか。

ネストされたSQL、self joinやself unionなどの二重オペレータ、ブロードキャストJoinの効率の悪さ、 Caseセンシブなグループ化

Using Spark Mllib Models in a Production Training and Serving Platform: Experiences and Extensions ★

Using Spark Mllib Models in a Production Training and Serving Platform: Experiences and Extensions

ユースケース。

Uberの機械学習基盤 Michelangelo についてのセッション。 UberではSpark MLlibをベースとしつつも、独自の改造を施して用いている。

p.2〜 Spark MLlibのパイプラインに関する説明。 パイプラインを用いることで、一貫性をもたせることができる。

  • データ変換
  • 特徴抽出と前処理
  • 機械学習モデルによる推論
  • 推論後の処理

p.8〜 パイプラインにより複雑さを隠蔽できる。 ★ 例:異なるワークフローを含むことによる複雑さ、異なるユーザニーズを含むことによる複雑さ。

p.10〜 達成したいこと。

まずは性能と一貫性の観点。

  • Spark SQLをベースとした高い性能
  • リアルタイムサービング(P99レイテンシ < 10ms、高いスループット)
  • バッチとリアルタイム両対応

柔軟性とVelocityの観点。

  • モデル定義:ライブラリやフレームワークの柔軟性
  • Michelangeloの構造
  • Sparkアップグレード

プロトコルバッファによるモデル表現の改善の観点。 MLeap、PMML、PFA、Spark PipelineModel。 Spark PiplelineModelを採用したかったのだが以下の観点が難点だった。

  • モデルロードが遅い
  • サービングAPIがオンラインサービング向けには遅い

p.15〜 改善活動。例えば、SparkのAPIを使ってメタデータをロードしていたところを シンプルなJava APIでロードするようにする、など細かな改善を実施。 Parquet関連では、ParquetのAPIを直接利用するように変えた。

A Journey to Building an Autonomous Streaming Data PlatformScaling to Trillion Events Monthly at Nvidia

A Journey to Building an Autonomous Streaming Data PlatformScaling to Trillion Events Monthly at Nvidia

NVIDIAの考えるアーキテクチャの変遷。 V1からV2へ、という形式で解説。

p.10〜 V1アーキテクチャ。 KafkaやS3を中心としたデータフロー。 p.12にワークフローイメージ。

p.13 V1で学んだこと。データ量が増えた、セルフサービス化が必要、など。 ★ 状況が変わったこともある。

p.14〜 V2アーキテクチャ。 セルフサービス化のために、NV Data Bots、NV Data Appsを導入。

p.23 NV Data Appsによる機械化で課題になったのは、 スキーマ管理。 Elastic Beatsではネストされたスキーマも用いれるが、そうするとスキーマの推論が難しい。

p.29 V2でのワークフロー。 ワークフローの大部分が機械化された。 ★

p.32 スケーラビリティの課題を解くため、Databricks Deltaを導入。 PrestoからSpark + Deltaに移行。

Apache Spark on K8S Best Practice and Performance in the Cloud

Apache Spark on K8S Best Practice and Performance in the Cloud

TencentにおけるSpark on k8sに関するナレッジの紹介。

p.6 「Sparkling」について。アーキテクチャなど。

p.18〜 YARNとk8sでの性能比較結果など。 TeraSortでの動作確認では、YARNおほうがスループットが高い結果となり、 データサイズを大きくしたところk8sは処理が途中で失敗した。 ディスクへの負荷により、Evictされたため、tmpfsを使用するようにして試す例も記載されている。

p.23〜 spark-sql-perfによるベンチマーク結果。 そのままでは圧倒的に性能が悪い。これはYARNではデフォルトでyarn.local.dirにしていされた 複数のディスクを使用するのに対し、on k8sでは一つのディレクトリがマウントされるから。 これを複数ディスクを使うようにした結果も記載されている。YARNには届かないが随分改善された。

Deep Learning on Apache Spark at CERN’s Large Hadron Collider with Intel Technologies ★

Deep Learning on Apache Spark at CERN’s Large Hadron Collider with Intel Technologies

p.6 CERNで扱うデータ規模。 PB/secのデータを生成することになる。

p.9〜 Sparkを実行するのにYARNとk8sの両方を使っている。 JupyterからSparkにつなぎ、その裏で様々なデータソース(データストア)にアクセスする。

p.13 物理データにアクセスする際には、Hadoop APIからxrootdプロトコルでアクセス。

p.18 Apache Spark、Analytics Zoo、BigDLを活用。 Analytics Zooを利用することでTensorFlow、BigDL、Sparkを結びつける。

また、このあとワークフロー(データフロー)に沿って個別に解説。

Kerasなどを用いてモデル開発し、その後BigDLでスケールアウトしながら分散学習。 BigDLは、よくスケールする。

p.31 推論はストリーム処理ベースで行う。 Kafka経由でデータをフィードし、サーブされたモデルをSparkで読み込んで推論。 また、FPGAも活用する。

Elastify Cloud-Native Spark Application with Persistent Memory

Elastify Cloud-Native Spark Application with Persistent Memory

Tencentにおけるアーキテクチャの説明。

p.4 データ規模など。 100PB+

p.5 TencentにおけるBig Data / AI基盤

p.6 MemVerge:メモリ・コンバージド基盤(MCI)

p.7〜 MapReduceの時代は、データを移動するのではなくプログラムを移動する。 その後ネットワークは高速化(高速なネットワークをDC内に導入する企業の増加) SSD導入の割合は増加。

p.10 ただし未だにDRAM、SSD、HDD/TAPEの階層は残る。 そこでIntel Optain DC Persistent Memoryが登場。(DCPMM)

p.14 MemVergeによるPMEMセントリックなデータプラットフォーム。 クラスタワイドにPMEMを共有。

p.17〜 Sparkのシャッフルとブロックマネージャについて。 現状の課題をうけ、シャッフルを再検討。

p.20 エグゼキュータでのシャッフル処理に関し、データの取扱を改善。 ストレージと計算を分離し、さらにプラガブルにした。 さらにストレージ層に、Persistent Memoryを置けるようにした。

Geospatial Analytics at Scale with Deep Learning and Apache Spark

Geospatial Analytics at Scale with Deep Learning and Apache Spark

p.7 新たなチャレンジ

  • 立地なデータの増加(ドローンの導入など)
  • トラディショナルなツールがスケーラビリティない
  • パイプラインの構成

p.10 Apache Spark : glue of Big Data

p.13〜 Sparkにおける画像の取扱。 Spark2.3でImageSchemaの登場。

Spark Join 画像をXMLと結合する。 イメージのチップ作成。

p.18 Deep Learningフレームワーク。 Spark Deep Learning Pipelines

p.22 Magellan

Improving Call Center Operations and Marketing ROI with Real-Time AI/ML Streams

Improving Call Center Operations and Marketing ROI with Real-Time AI/ML Streams

スライドが公開されていなかった(2019/05/18時点)

Large-Scale Malicious Domain Detection with Spark AI

Large-Scale Malicious Domain Detection with Spark AI

DDoS、暗号マイニングマルウェア。 Tencentの事例。

p.17 シーケンスを使って検知する。

p.19〜 Domain2Vec、Domainクラスタリング

p.24 Word2Vecを使って、ドメインからベクトルを生成する。 (ドメインのつながりを文字列結合してから実施?)

p.28あたり LSHをつかった例も載っている。

Migrating to Apache Spark at Netflix

Migrating to Apache Spark at Netflix

現在Netflixでは90%以上のジョブがSparkで構成されている。

p.11〜 アップストリームへの追従 複数バージョンを並行利用するようにしている

p.17〜 各バージョンで安定性の課題があり、徐々に解消されていった

p.22 メモリ管理はNetflixにおいても課題だった。 教育よって是正していた面もある。

p.23〜 Sparkを使うに当たってのベストプラクティスを列挙 設定方針、心構え、ルールなど。

Apache Arrow-Based Unified Data Sharing and Transferring Format Among CPU and Accelerators

Apache Arrow-Based Unified Data Sharing and Transferring Format Among CPU and Accelerators

p.6 FPGAでオフロード。

p.8〜 オフロードによるオーバーヘッドは無視できない。

Apache Spark and Sights at Speed: Streaming, Feature Management, and Execution

Apache Spark and Sights at Speed: Streaming, Feature Management, and Execution

スライドが公開されていなかった(2019/05/18時点)

Data-Driven Transformation: Leveraging Big Data at Showtime with Apache Spark

Data-Driven Transformation: Leveraging Big Data at Showtime with Apache Spark

ユースケース。

SHOWTIME の事例。

SHOWTIMEはスタンドアローンのストリーミングサービス。 そのため、顧客のインタラクションデータが集まる。

p.6 ビジネス上生じる疑問の例。

疑問は、答えられる量を上回って増加。

簡単な質問だったら通常のレポートで答えられるが、 複雑な質問に答えるには時間と専門的なスキルが必要。

p.10 データストラテジーチーム。 ★ データと分析を民主化する、購買者の振る舞いを理解し予測する、データ駆動のプログラミングやスケジューリングに対応する。

p.12 ★ 数千のメトリクスや振る舞いを観測する。 ユーザとシリーズの関係性をトラックする。

p.14 機械学習を用いてユーザの振る舞いをモデル化する。

p.20〜 Airflowでパイプラインを最適化。 AirflowとDatabricksを組み合わせる。 ★

p.29 Databricks Deltaも利用している。 ★

In-Memory Storage Evolution in Apache Spark

In-Memory Storage Evolution in Apache Spark

スライドが公開されていなかった(2019/05/18時点)

SparkML: Easy ML Productization for Real-Time Bidding

SparkML: Easy ML Productization for Real-Time Bidding

リアルタイムでの広告へのBid。 動機:機械学習を用いることで、マーケットをスマートにしたい。

p.6 スケールに関する数値。例:300億件/secのBid意思決定。

p.7 ゴール。

p.8〜 9年前からHadoopを使い、4年ほど前にSparkにTry。

p.12 SparkのMLlibにおけるパイプラインに対し拡張を加えた。 「RowModel」を扱う機能を追加。

p.13 StringIndexerを用いたカテゴリ値の変換が遅いので、独自に開発。

p.14 リソースボトルネックが、キャンペーンによって変わる。そしてうまくリソースを使い切れない。 そこでジョブを並列で立ち上げることにした。

p.15〜 モデルデプロイ(最新モデルへの切り替え)が難しい。 モデルを生成したあと、部分的にデプロイしたあと全体に反映させる。(A/Bテスト後の切り替え)

レイテンシの制約が厳しかった。 ★

補足:モデルを頻繁にデプロイするケースにおいてのレイテンシ保証は難しそうだ ★

p.18 「セルフチューニング」のためのパイプラン ★

Best Practices for Hyperparameter Tuning with MLflow

Best Practices for Hyperparameter Tuning with MLflow

p.4〜 ハイパーパラメータチューニングについて。

p.6 チューニングにおける挑戦★

p.9 データサイエンスにおけるチューニングのフロー ★

p.10 AutoMLは、ハイパーパラメータチューニングを含む。

p.13〜 チューニング方法の例 ★

  • マニュアルサーチ
  • グリッドサーチ
  • ランダムサーチ
  • ポピュレーションベースのアルゴリズム
  • ベイジアンアルゴリズム
    • ハイパーパラメータをlossとして最適化?

p.37 様々なツールのハイパーパラメータチューニング手段のまとめ ★

p.40〜 MLflowの機能概要

p.42 単一のモデルではなく、パイプライン全体をチューニングすること。 ★

p.49 ハイパーパラメータチューニングの並列化。 Hyperopt、Apache Spark、MLflowインテグレーション ★

Scaling Apache Spark on Kubernetes at Lyft

Scaling Apache Spark on Kubernetes at Lyft

p.6 Liftにおけるバッチ処理アーキテクチャの進化。 ★

p.7 アーキテクチャ図。 Druidも含まれている。

p.8 初期のバッチ処理アーキテクチャ図。

p.9〜 SQLでは複雑になりすぎる処理が存在する。またPythonも用いたい。 例:PythonからGeoに関するライブラリを呼び出して用いたい。

Sparkを中心としたアーキテクチャ。

p.19 残ったチャレンジ

  • Spark on k8sは若い
  • 単一クラスタのスケール限界
  • コントロールプレーン
  • ポッドChurnやIPアロケーションのスロットリング

p.22 最新アーキテクチャ★

Spark本体、ヒストリサーバ、Sparkのオペレータ、ジョブ管を含め、すべてk8s上に構成。 他にもゲートウェイ機能をk8s上に実現。

p.23〜 複数クラスタによるアーキテクチャ。 クラスタごとにラベルをつけて、ゲートウェイ経由で使い分ける。 バックエンドのストレージ(ここではS3?)、スキーマ等は共通化。

クラスタ間でローテーション。

複数のネームスペース。

ジョブ管から見たとき、ポッドを共有するアーキテクチャもありえる。

p.28 DDLをDMLから分離する。 Spark Thrift Server経由でDDLを実行。

p.29 プライオリティとプリエンプションはWIP。

p.32 Sparkジョブのコンフィグをオーバレイ構成にする。 ★

p.36 モニタリングのツールキット。★

p.38 Terraformを使った構成。Jinjaテンプレートでパラメータを扱うようだ。

p.39 今後の課題 ★ サーバレス、セルフサービス可能にする、など。 Spark3系への対応もある。

p.40

  • Sparkは様々なワークロードに適用可能
  • k8sを使うことで複数バージョンへの対応や依存関係への対応がやりやすくなる
  • マルチクラスタ構成のメッシュアーキテクチャにすることでSpark on k8sはスケールさせられる

A "Real-Time" Architecture for Machine Learning Execution with MLeap ★

A "Real-Time" Architecture for Machine Learning Execution with MLeap

p.2 機械学習におけるリアルタイムのユースケースはごく一部。

p.5 1999年から異常検知に取り組んできた企業。

p.7 データフローのイメージ図。 MLeap。

p.10 リアルタイムのアーキテクチャのオーバービュー。 ヒストリカルデータストアからHDFSにデータを取り込み、教師あり機械学習モデルを作成する。 モデルはモデル管理用のデータストアに格納され、推論のシステムに渡される。

モデルを作るところはMLeapのパイプラインで構成される。

p.12 並列処理は、メッセージバス(Kafkaなど)によって実現される。 推論結果の生データをログに書き出すこと。

p.13 モデル管理のフロー。 学習したモデルをデプロイするときには、ブルー・グリーンデプロイメントのように実施する。

p.16〜 メトリクスについて。 平均と分散。 MLeapにより、レイテンシの99パーセンタイルが改善。

Managing Apache Spark Workload and Automatic Optimizing

Managing Apache Spark Workload and Automatic Optimizing

Optimizing Delta/Parquet Data Lakes for Apache Spark

スライドが公開されていなかった。

Creating an Omni-Channel Customer Experience with ML, Apache Spark, and Azure Databricks

Creating an Omni-Channel Customer Experience with ML, Apache Spark, and Azure Databricks

Optimizing Performance and Computing Resource Efficiency of In-Memory Big Data Analytics with Disaggregated Persistent Memory

Optimizing Performance and Computing Resource Efficiency of In-Memory Big Data Analytics with Disaggregated Persistent Memory

Self-Service Apache Spark Structured Streaming Applications and Analytics

Self-Service Apache Spark Structured Streaming Applications and Analytics

Building Resilient and Scalable Data Pipelines by Decoupling Compute and Storage

Building Resilient and Scalable Data Pipelines by Decoupling Compute and Storage Building Resilient and Scalable Data Pipelines by Decoupling Compute and Storage: https://www.slideshare.net/databricks/building-resilient-and-scalable-data-pipelines-by-decoupling-compute-and-storage

Automating Real-Time Data Pipelines into Databricks Delta

Automating Real-Time Data Pipelines into Databricks Delta Automating Real-Time Data Pipelines into Databricks Delta: https://databricks.com/session/automating-real-time-data-pipelines-into-databricks-delta

資料が公開されていなかった。

Reimagining Devon Energy’s Data Estate with a Unified Approach to Integrations, Analytics, and Machine Learning

Reimagining Devon Energy’s Data Estate with a Unified Approach to Integrations, Analytics, and Machine Learning Reimagining Devon Energy’s Data Estate with a Unified Approach to Integrations, Analytics, and Machine Learning: https://www.slideshare.net/databricks/reimagining-devon-energys-data-estate-with-a-unified-approach-to-integrations-analytics-and-machine-learning

Data Prep for Data Science in MinutesA Real World Use Case Study of Telematics

Data Prep for Data Science in MinutesA Real World Use Case Study of Telematics Data Prep for Data Science in MinutesA Real World Use Case Study of Telematics: https://www.slideshare.net/databricks/data-prep-for-data-science-in-minutesa-real-world-use-case-study-of-telematics

A Deep Dive into Query Execution Engine of Spark SQL

A Deep Dive into Query Execution Engine of Spark SQL A Deep Dive into Query Execution Engine of Spark SQL: https://www.slideshare.net/databricks/a-deep-dive-into-query-execution-engine-of-spark-sql

Balancing Automation and Explanation in Machine Learning

Balancing Automation and Explanation in Machine Learning Balancing Automation and Explanation in Machine Learning: https://www.slideshare.net/databricks/balancing-automation-and-explanation-in-machine-learning

Enabling Data Scientists to bring their Models to Market

Enabling Data Scientists to bring their Models to Market Enabling Data Scientists to bring their Models to Market: https://databricks.com/session/enabling-data-scientists-to-bring-their-models-to-market

Nikeの事例? 資料が公開されていなかった。

TensorFlow Extended: An End-to-End Machine Learning Platform for TensorFlow

TensorFlow Extended: An End-to-End Machine Learning Platform for TensorFlow TensorFlow Extended: An End-to-End Machine Learning Platform for TensorFlow: https://www.slideshare.net/databricks/tensorflow-extended-an-endtoend-machine-learning-platform-for-tensorflow

Continuous Applications at Scale of 100 Teams with Databricks Delta and Structured Streaming

Continuous Applications at Scale of 100 Teams with Databricks Delta and Structured Streaming Continuous Applications at Scale of 100 Teams with Databricks Delta and Structured Streaming: https://www.slideshare.net/databricks/continuous-applications-at-scale-of-100-teams-with-databricks-delta-and-structured-streaming

How Australia’s National Health Services Directory Improved Data Quality, Reliability, and Integrity with Databricks Delta and Structured Streaming

How Australia’s National Health Services Directory Improved Data Quality, Reliability, and Integrity with Databricks Delta and Structured Streaming How Australia’s National Health Services Directory Improved Data Quality, Reliability, and Integrity with Databricks Delta and Structured Streaming: https://www.slideshare.net/databricks/how-australias-national-health-services-directory-improved-data-quality-reliability-and-integrity-with-databricks-delta-and-structured-streaming

共有

Alluxio Security

参考

メモ

Alluxio公式ドキュメントのセキュリティに関する説明 を眺めてみる。

提供機能は以下の通りか。

  • 認証
  • 認可
    • POSIXパーミッション相当の認可機能を提供する
  • Impersonation
    • システムユーザなどが複数のユーザを演じることが可能
  • 監査ログの出力

ユーザ認証

基本的には、2019/05/10現在ではSIMPLEモードが用いられるようである。 alluxio.security.login.username プロパティで指定されたユーザ名か、そうでなければOS上のユーザ名がログインユーザ名として用いられる。

公式ドキュメントのAuthenticationの説明 によると、

JAAS (Java Authentication and Authorization Service) is used to determine who is currently executing the process.

とのこと。設定をどうするのか、という点はあまり記載されていない。

Kerberos認証はサポートされていない

なお、 Kerberosがサポートされていない? を見る限り、2019/05/10現在でKerberosがサポートされていない。

SIMPLEモードの認証

また、 AlluxioのJAAS実装では、login時に必ずTrueを返す? を見ると、2019/05/10現在ではユーザ・パスワード認証がloginメソッド内で定義されていないように見える。 また、念のために commitでも特に認証をしていないように見える? を見ても、commitメソッド内でも何かしら認証しているように見えない?

ただ、 mSubject.getPrincipalsしている箇所 があるので、そこを確認したほうが良さそう。

関連する実装を確認する。

そこでまずは、 alluxio.security.LoginUser#login メソッドを確認する。 当該メソッドではSubjectインスタンスを生成し、 alluxio.security.LoginUser#createLoginContext メソッド 内部で javax.security.auth.login.LoginContext クラスのインスタンス生成に用いている。

alluxio/security/LoginUser.java:80

1
2
3
4
5
6
7
8
9
Subject subject = new Subject();

try {
// Use the class loader of User.class to construct the LoginContext. LoginContext uses this
// class loader to dynamically instantiate login modules. This enables
// Subject#getPrincipals to use reflection to search for User.class instances.
LoginContext loginContext = createLoginContext(authType, subject, User.class.getClassLoader(),
new LoginModuleConfiguration(), conf);
loginContext.login();

ちなみに、 alluxio.security.authentication.AuthType を確認している中で、 SIMPLEモードのときはクライアント側、サーバ側ともにVerify処理をしない旨のコメントを見つけた。

alluxio/security/authentication/AuthType.java:26

1
2
3
4
5
6
/**
* User is aware in Alluxio. On the client side, the login username is determined by the
* "alluxio.security.login.username" property, or the OS user upon failure.
* On the server side, the verification of client user is disabled.
*/
SIMPLE,

現時点でわかったことをまとめると、SIMPLEモード時はプロパティで渡された情報や、そうでなければ OSユーザから得られた情報を用いてユーザ名を用いるようになっている。

CUSTOMモードの認証

一方、CUSTOMモード時はサーバ側で任意のVersify処理を実行する旨のコメントが記載されていた。

1
2
3
4
5
6
7
/**
* User is aware in Alluxio. On the client side, the login username is determined by the
* "alluxio.security.login.username" property, or the OS user upon failure.
* On the server side, the user is verified by a Custom authentication provider
* (Specified by property "alluxio.security.authentication.custom.provider.class").
*/
CUSTOM,

ユーザをVerifyしたいときは、CUSTOMモードを使い、サーバ側で確認せよ、ということか。 ただ、CUSTOMモードは Alluxio公式ドキュメントのセキュリティに関する説明 において、

CUSTOM Authentication is enabled. Alluxio file system can know the user accessing it, and use customized AuthenticationProvider to verify the user is the one he/she claims.

Experimental. This mode is only used in tests currently.

と記載されており、「Experimental」であることから、積極的には使いづらい状況に見える。(2019/05/10現在)

関連事項として、 alluxio.security.authentication.AuthenticationProvider を見ると、 alluxio.security.authentication.custom.provider.class プロパティで渡された クラス名を用いて CustomAuthenticationProvider をインスタンス生成するようにしているように見える。

alluxio/security/authentication/AuthenticationProvider.java:44

1
2
3
4
5
6
7
8
9
switch (authType) {
case SIMPLE:
return new SimpleAuthenticationProvider();
case CUSTOM:
String customProviderName =
conf.get(PropertyKey.SECURITY_AUTHENTICATION_CUSTOM_PROVIDER_CLASS);
return new CustomAuthenticationProvider(customProviderName);
default:
throw new AuthenticationException("Unsupported AuthType: " + authType.getAuthName());

なお、テストには以下のような実装が見られ、CUSTOMモードの使い方を想像できる。

  • alluxio.security.authentication.PlainSaslServerCallbackHandlerTest.NameMatchAuthenticationProvider
  • alluxio.security.authentication.GrpcSecurityTest.ExactlyMatchAuthenticationProvider
  • alluxio.server.auth.MasterClientAuthenticationIntegrationTest.NameMatchAuthenticationProvider
  • alluxio.security.authentication.CustomAuthenticationProviderTest.MockAuthenticationProvider

ExactlyMatchAuthenticationProviderを用いたテストは以下の通り。

alluxio/security/authentication/GrpcSecurityTest.java:78

1
2
3
4
5
6
7
8
9
10
11
12
13
public void testCustomAuthentication() throws Exception {

mConfiguration.set(PropertyKey.SECURITY_AUTHENTICATION_TYPE, AuthType.CUSTOM.getAuthName());
mConfiguration.set(PropertyKey.SECURITY_AUTHENTICATION_CUSTOM_PROVIDER_CLASS,
ExactlyMatchAuthenticationProvider.class.getName());
GrpcServer server = createServer(AuthType.CUSTOM);
server.start();
GrpcChannelBuilder channelBuilder =
GrpcChannelBuilder.newBuilder(getServerConnectAddress(server), mConfiguration);
channelBuilder.setCredentials(ExactlyMatchAuthenticationProvider.USERNAME,
ExactlyMatchAuthenticationProvider.PASSWORD, null).build();
server.shutdown();
}

alluxio/security/authentication/GrpcSecurityTest.java:93

1
2
3
4
5
6
7
8
9
10
11
12
13
public void testCustomAuthenticationFails() throws Exception {

mConfiguration.set(PropertyKey.SECURITY_AUTHENTICATION_TYPE, AuthType.CUSTOM.getAuthName());
mConfiguration.set(PropertyKey.SECURITY_AUTHENTICATION_CUSTOM_PROVIDER_CLASS,
ExactlyMatchAuthenticationProvider.class.getName());
GrpcServer server = createServer(AuthType.CUSTOM);
server.start();
GrpcChannelBuilder channelBuilder =
GrpcChannelBuilder.newBuilder(getServerConnectAddress(server), mConfiguration);
mThrown.expect(UnauthenticatedException.class);
channelBuilder.setCredentials("fail", "fail", null).build();
server.shutdown();
}

参考までに、SIMPLEモードで用いられる alluxio.security.authentication.plain.SimpleAuthenticationProvider では、 実際に以下のように何もしないauthenticateメソッドが定義されている。

1
2
3
public void authenticate(String user, String password) throws AuthenticationException {
// no-op authentication
}

監査ログ

Alluxio公式ドキュメントのセキュリティに関する説明 の「AUDITING」にログのエントリが記載されている。 2019/05/10時点では、以下の通り。

  • succeeded:
    • True if the command has succeeded. To succeed, it must also have been allowed.
  • allowed:
    • True if the command has been allowed. Note that a command can still fail even if it has been allowed.
  • ugi:
    • User group information, including username, primary group, and authentication type.
  • ip:
    • Client IP address.
  • cmd:
    • Command issued by the user.
  • src:
    • Path of the source file or directory.
  • dst:
    • Path of the destination file or directory. If not applicable, the value is null.
  • perm:
    • User:group:mask or null if not applicable.

HDFSにおける監査ログ相当の内容が出力されるようだ。 これが、下層にあるストレージによらずに出力されるとしたら、 Alluxioによる抽象化層でAuditログを取る、という方針も悪くないか?

ただ、そのときにはどのパス(URI?)に、どのストレージをマウントしたか、という情報もセットで保存しておく必要があるだろう。

共有

Storage Layer ? Storage Engine ?

参考

データベースエンジン あるいは ストレージエンジン

Apache Parquet

Delta Lake

Apache Hudi

Alluxio

AWS Aurora

Apache HBase

Apache Kudu

メモ

動機

単にPut、Getするだけではなく、例えばデータを内部的に構造化したり、トランザクションに対応したりする機能を持つ ストレージ機能のことをなんと呼べば良いのかを考えてみる。

どういうユースケースを想定するか?

  • データがストリームとして届く
    • できる限り鮮度の高い状態で扱う
  • 主に分析および分析結果を用いたビジネス
  • 大規模なデータを取り扱う
    • 大規模なデータを一度の分析で取り扱う
    • 大規模なデータの中から、一部を一度の分析で取り扱う

どういう特徴を持つストレージを探るか?

  • 必須
    • Put、Getなど(あるいは、それ相当)の基本的なAPIを有する
      • 補足:POSIXでなくてもよい?
    • 大規模データを扱える。スケーラビリティを有する。 大規模の種類には、サイズが大きいこと、件数が多いことの両方の特徴がありえる
      • データを高効率で圧縮できることは重要そう
    • クエリエンジンと連係する
  • あると望ましい
    • トランザクションに対応
      • ストリーム処理の出力先として用いる場合、Exactly Onceセマンティクスを 達成するためには出力先ストレージ側でトランザクション対応していることで ストリーム処理アプリケーションをシンプルにできる、はず
    • ストリームデータを効率的に永続化し、オンデマンドの分析向けにバックエンドで変換。 あるいは分析向けにストア可能
      • 高頻度での書き込み + 一括での読み出し
    • サービスレス
      • 複雑なサービスを運用せずに済むなら越したことはない。
    • 読み出しについて、プッシュダウンフィルタへの対応
      • 更にデータ構造によっては、基本的な集計関数に対応していたら便利ではある
    • 更新のあるデータについて、過去のデータも取り出せる

データベースエンジン あるいは ストレージエンジン

jp.wikipediaのデータベースエンジン によると、

データベース管理システム (DBMS)がデータベースに対しデータを 挿入、抽出、更新および削除(CRUD参照)するために使用する基礎となる ソフトウェア部品

とある。

MySQLの場合には、 MySQLの代替ストレージエンジン に載っているようなものが該当する。 なお、デフォルトはInnoDB。 InnoDBイントロダクション に「表 14.1 InnoDB ストレージエンジンの機能」という項目がある。 インデックス、キャッシュ、トランザクション、バックアップ・リカバリ、レプリケーション、 圧縮、地理空間情報の取扱、暗号化対応などが大まかに挙げられる。

Apache Parquet

Parquetの公式ウェブサイト によると以下のように定義されている。

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.

ストレージフォーマット単体でも、バッチ処理向けにはある程度有用であると考えられる。

Delta Lake

Delta Lake公式ウェブサイト には、以下のような定義が記載されている。

Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads.

またウェブサイトには、「Key Features」が挙げられている。2019/05/10時点では以下の通り。

  • ACID Transactions
  • Scalable Metadata Handling
  • Time Travel (data versioning)
  • Open Format
  • Unified Batch and Streaming Source and Sink
  • Schema Enforcement
  • Schema Evolution
  • 100% Compatible with Apache Spark API

データベースの「データベースエンジン」、「ストレージエンジン」とは異なる特徴を有することから、 定義上も「Storage Layer」と読んでいるのだろうか。

Apache Hudi

Apache Hudiの公式ウェブサイト によると、

Hudi (pronounced “Hoodie”) ingests & manages storage of large analytical datasets over DFS (HDFS or cloud stores) and provides three logical views for query access.

とのこと。

また特徴としては、2019/05/10現在

  • Read Optimized View
  • Incremental View
  • Near-Real time Table

が挙げられていた。 なお、機能をイメージするには、 Hudiのイメージを表す図 がちょうどよい。

Apache Hudiでは、「XXなYYである」のような定義は挙げられていない。

Alluxio

ストレージそのものというより、ストレージの抽象化や透過的アクセスの仕組みとして考えられる。

Alluxio公式ウェブサイト には以下のように定義されている。

Data orchestration for analytics and machine learning in the cloud

Alluxio公式ウェブサイト には、「KEY TECHNICAL FEATURES」というのが記載されている。 気になったものを列挙すると以下の通り。

  • Compute
    • Flexible APIs
    • Intelligent data caching and tiering
  • Storage
    • Built-in data policies
    • Plug and play under stores
    • Transparent unified namespace for file system and object stores
  • Enterprise
    • Security
    • Monitoring and management

AWS Auroraのバックエンドストレージ(もしくはストレージエンジン)

kumagiさんの解説記事 あたりが入り口としてとてもわかり易い。 また、元ネタは、 ACMライブラリの論文 あたりか。

語弊を恐れずにいえば、上記でも触れられていたとおり、「Redo-logリプレイ機能付き分散ストレージ」という 名前が妥当だろうか。

Apache HBase

HBaseの公式ウェブサイト によると、

Apache HBase is the Hadoop database, a distributed, scalable, big data store.

のように定義されている。 また、挙げられていた特徴は以下のとおり。(2019/05/12現在)

  • Linear and modular scalability.
  • Strictly consistent reads and writes.
  • Automatic and configurable sharding of tables
  • Automatic failover support between RegionServers.
  • Convenient base classes for backing Hadoop MapReduce jobs with Apache HBase tables.
  • Easy to use Java API for client access.
  • Block cache and Bloom Filters for real-time queries.
  • Query predicate push down via server side Filters
  • Thrift gateway and a REST-ful Web service that supports XML, Protobuf, and binary data encoding options
  • Extensible jruby-based (JIRB) shell
  • Support for exporting metrics via the Hadoop metrics subsystem to files or Ganglia; or via JMX

いわゆるスケールアウト可能なKVSだが、これを分析向けのストレージとして用いる選択肢もありえる。 ただし、いわゆるカラムナフォーマットでデータを扱うわけではない点と、 サービスを運用する必要がある点は懸念すべき点である。

Apache Kudu

Apache Kuduの公式ウェブサイト を見ると、以下のように定義されている。

A new addition to the open source Apache Hadoop ecosystem, Apache Kudu completes Hadoop's storage layer to enable fast analytics on fast data.

またKuduの利点は以下のように記載されている。

  • Fast processing of OLAP workloads.
  • Integration with MapReduce, Spark and other Hadoop ecosystem components.
  • Tight integration with Apache Impala, making it a good, mutable alternative to using HDFS with Apache Parquet.
  • Strong but flexible consistency model, allowing you to choose consistency requirements on a per-request basis, including the option for strict-serializable consistency.
  • Strong performance for running sequential and random workloads simultaneously.
  • Easy to administer and manage.
  • High availability. Tablet Servers and Masters use the Raft Consensus Algorithm, which ensures that as long as more than half the total number of replicas is available, the tablet is available for reads and writes. For instance, if 2 out of 3 replicas or 3 out of 5 replicas are available, the tablet is available.
  • Reads can be serviced by read-only follower tablets, even in the event of a leader tablet failure.
  • Structured data model.

クエリエンジンImpalaと連係することを想定されている。 また、「ストレージレイヤ」という呼び方が、Delta Lake同様に用いられている。

共有