Init git log of an existing repository

参考

メモ

ある文書群をクラスタリングする際、TF-IDFの代わりにWord2Vecを使っていることにした。

共有

Singularization with Python

参考

メモ

condaコマンドではインストールできなかったので、pipコマンドでインストールした。 使い方は、公式ウェブサイトの通り。

なお、関係ない単語にもそのまま適用してみたが動くは動くようだ。

共有

Stopword with Python

参考

メモ

今回はConda環境だったので、 conda コマンドでインストールした。

1
$ conda install nltk

ストップワードをダウンロードする。

1
2
import nltk
nltk.download("stopwords")

ここでは、仮に対象となる単語のリストall_wordsがあったとする。 そのとき、以下のようにリストからストップワードを取り除くと良い。

1
2
symbol = ["'", '"', ':', ';', '.', ',', '-', '!', '?', "'s"]
words_wo_stopwords = [w.lower() for w in all_words if not w in stop_words + symbol]

ストップワードの中には記号が含まれていないので、ここでは、symbolを定義して一緒に取り除いた。 次に頻度の高い単語を30件抽出してみる。

1
clean_frequency = nltk.FreqDist(words_wo_stopwords)

これを可視化する。

1
2
plt.figure(figsize=(10, 7))
clean_frequency.plot(30,cumulative=True)
共有

Hydrosphere.io

Hydrosphere.ioについて

参考

メモ

Hydroshpereブログの最も古い記事 が投稿されたのは、2016/6/14である。 上記ブログで強調されていたのは、Big DataプロジェクトのDevOps対応。 DockerやAnsibleといった道具を使いながら。

Hydrosphere Serving

参考情報

開発状況

2019/03/21現在でも、わりと活発に開発されているようだ。 https://github.com/Hydrospheredata/hydro-serving/graphs/commit-activity

開発元は、hydrosphere.io。パロアルトにある企業らしい。

hydrosphere.ioのプロダクト

ここで取り上げているServingを含む、以下のプロダクトがある。

  • Serving: モデルのサーブ、アドホック分析への対応
  • Sonar: モデルやパイプラインの品質管理
  • Mist: Sparkの計算リソースをREST API経由で提供する仕組み(マルチテナンシーの実現)

概要

GitHubのREADMEに特徴が書かれていが、その中でも個人的にポイントと思ったのは以下の通り。

  • Envoyプロキシを用いてサービスメッシュ型のサービングを実現
  • 複数の機械学習モデルに対応し、パイプライン化も可能
  • UIがある

Hydro Servingの公式ウェブサイト に掲載されていた動画を見る限り、 コマンドライン経由でモデルを登録することもでき、モデルを登録したあとは、 処理フロー(ただし、シリアルなフローに見える)を定義可能。 例えば、機械学習モデルの推論器に渡す前処理・後処理も組み込めるようだ。

セットアップ方法

Docker Composeを使う方法とk8sを使う方法があるようだ。

利用方法

モデルを学習させ、出力する。(例では、h5形式で出力していた) モデルや必要なライブラリを示したテキストなどを、ペイロードとして登楼する。

必要なライブラリの指定は以下のようにする。

requirements.txt

1
2
3
Keras==2.2.0
tensorflow==1.8.0
numpy==1.13.3

また、それらの規約事項は、contract(定義ファイル)として保存する。 フォルダ内の構成は以下のようになる。

公式ドキュメントから引用

1
2
3
4
5
6
7
linear_regression
├── model.h5
├── model.py
├── requirements.txt
├── serving.yaml
└── src
└── func_main.py

上記のようなファイル群を作り、 hs uploadコマンドを使ってアップロードする。 アップロードしたあとは、ウェブフロントエンドから処理フローを定義する。

クエリはREST APIで以下のように投げる。

公式ウェブサイトから引用

1
2
$ curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
"x": [[1, 1],[1, 1]]}' 'http://localhost/gateway/applications/linear_regression/infer'

また上記のREST APIの他にも、gRPCを用いて推論結果を取得することも可能。 また、データ構造としてはTensorProtoを使える。

なお、TensorFlowモデルのサーブについては、 公式ウェブサイトの Hydro ServingでTensorFlowモデルをサーブ がわかりやすい。 また、単純にPython関数を渡すこともできる。(必ずしもTensorFlowにロックインではない)

コンセプト

  • モデル
    • Hydro Servingに渡されたモデルはバージョン管理される。
    • フレーワークは多数対応
      • ただしフレームワークによって、出力されるメタデータの情報量に差がある。
  • アプリケーション
    • 単一で動かす方法とパイプラインを構成する方法がある
  • ランタイム
    • 予め実行環境を整えたDockerイメージが提供されている
    • Python、TensorFlow、Spark

動作確認

Hydro Servingの公式ドキュメント に従って動作確認する。

1
2
3
4
5
$ mkdir HydroServing
$ cd HydroServing/
$ git clone https://github.com/Hydrospheredata/hydro-serving
$ hydro-serving
$ sudo docker-compose up -d

起動したコンテナを確認する。

1
2
3
4
5
6
7
8
$ sudo docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------
gateway /hydro-serving/app/start.sh Up 0.0.0.0:29090->9090/tcp, 0.0.0.0:29091->9091/tcp
manager /hydro-serving/app/start.sh Up 0.0.0.0:19091->9091/tcp
managerui /bin/sh -c envsubst '${MAN ... Up 80/tcp, 9091/tcp
postgres docker-entrypoint.sh postgres Up
sidecar /hydro-serving/start.sh Up 0.0.0.0:80->8080/tcp, 0.0.0.0:8082->8082/tcp

その他CLIを導入する。

1
2
3
4
$ conda create -n hydro-serving python=3.6 python
$ source activate hydro-serving
$ conda install keras scikit-learn
$ pip install hs

エラーが生じた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
$ hs cluster add --name local --server http://localhost
--- Logging error ---
Traceback (most recent call last):
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 992, in emit
msg = self.format(record)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 838, in format
return fmt.format(record)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 575, in format
record.message = record.getMessage()
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 338, in getMessage
msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
File "/home/centos/.conda/envs/hydro-serving/bin/hs", line 11, in <module>
sys.exit(hs_cli())
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 697, in main
rv = self.invoke(ctx)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 1063, in invoke
Command.invoke(self, ctx)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
return f(get_current_context(), *args, **kwargs)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/cli/hs.py", line 18, in hs_cli
ctx.obj.services = ContextServices.with_config_path(HOME_PATH_EXPANDED)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/models/context_object.py", line 44, in with_config_path
config_service = ConfigService(path)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/services/config.py", line 17, in __init__
logging.error("{} is not an existing directory", home_path)
Message: '{} is not an existing directory'
Arguments: ('/home/centos/.hs-home',)
WARNING:root:Using local as current cluster
Cluster 'local' @ http://localhost added successfully

エラーが出ているのに登録されたように見える。 念の為、クラスタ情報を確認する。

1
2
$ hs cluster
Current cluster: {'cluster': {'server': 'http://localhost'}, 'name': 'local'}

いったんこのまま進める。 まずはアプリを作成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ mkdir -p ~/Sources/linear_regression
$ cd ~/Sources/linear_regression
$ cat << EOF > model.py
from keras.models import Sequential
from keras.layers import Dense
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# initialize data
n_samples = 1000
X, y = make_regression(n_samples=n_samples, n_features=2, noise=0.5, random_state=112)

scallar_x, scallar_y = MinMaxScaler(), MinMaxScaler()
scallar_x.fit(X)
scallar_y.fit(y.reshape(n_samples, 1))
X = scallar_x.transform(X)
y = scallar_y.transform(y.reshape(n_samples, 1))

# create a model
model = Sequential()
model.add(Dense(4, input_dim=2, activation='relu'))
model.add(Dense(4, activation='relu'))
model.add(Dense(1, activation='linear'))

model.compile(loss='mse', optimizer='adam')
model.fit(X, y, epochs=100)

# save model
model.save('model.h5')
EOF

学習し、モデルを出力。

1
$ python model.py

サーブ対象となる関数のアプリを作成する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ mkdir src
$ cd src
$ cat << EOF > func_main.py
import numpy as np
import hydro_serving_grpc as hs
from keras.models import load_model

# 0. Load model once
model = load_model('/model/files/model.h5')

def infer(x):
# 1. Retrieve tensor's content and put it to numpy array
data = np.array(x.double_val)
data = data.reshape([dim.size for dim in x.tensor_shape.dim])

# 2. Make a prediction
result = model.predict(data)

# 3. Pack the answer
y_shape = hs.TensorShapeProto(dim=[hs.TensorShapeProto.Dim(size=-1)])
y_tensor = hs.TensorProto(
dtype=hs.DT_DOUBLE,
double_val=result.flatten(),
tensor_shape=y_shape)

# 4. Return the result
return hs.PredictResponse(outputs={"y": y_tensor})
EOF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ cd ..
$ cat << EOF > serving.yaml
kind: Model
name: linear_regression
model-type: python:3.6
payload:
- "src/"
- "requirements.txt"
- "model.h5"

contract:
infer: # Signature function
inputs:
x: # Input field
shape: [-1, 2]
type: double
profile: numerical
outputs:
y: # Output field
shape: [-1]
type: double
profile: numerical
EOF

つづいて必要ライブラリを定義する。

1
2
3
4
5
$ cat << EOF > requirements.txt
Keras==2.2.0
tensorflow==1.8.0
numpy==1.13.3
EOF

最終的に以下のような構成になった。

1
2
3
4
5
6
7
8
9
10
$ tree
.
├── model.h5
├── model.py
├── requirements.txt
├── serving.yaml
└── src
└── func_main.py

1 directory, 5 files

つづいてモデルなどをアップロード。

1
$ hs upload

ここで以下のようなエラーが生じた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Using 'local' cluster
['/home/centos/Sources/linear_regression/src', '/home/centos/Sources/linear_regression/requirements.txt', '/home/centos/Sources/linear_regression/model.h5']
Packing the model [####################################] 100%
Assembling the model [####################################] 100%
Uploading to http://localhost
Uploading model assembly [####################################] 100%
Traceback (most recent call last):
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/httpclient/remote_connection.py", line 82, in postprocess_response
response.raise_for_status()
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/requests/models.py", line 940, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http://localhost/api/v1/model/upload

During handling of the above exception, another exception occurred:

(snip)

軽く調べると ISSUE255 が 関係していそうである。

ここでは動作確認のため、pip install hs==2.0.0rc2としてインストールして試した。

1
2
$ pip uninstall hs
$ pip install hs==2.0.0rc2

また、 serving.yaml に以下のエントリを追加した。

1
runtime: "hydrosphere/serving-runtime-python:3.6-latest"

再び hs upload したところ完了のように見えた。

ウェブUIを確認したところ以下の通り。

ウェブUIに登録されたモデルが表示される

それではテストを実行してみる。

1
$ curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{   "x": [     [       1,       1     ]   ] }' 'http://10.0.0.209/gateway/application/linear_regression'

以下のようなエラーが生じた。

1
{"error":"InternalUncaught","information":"UNKNOWN: Exception calling application: No module named 'numpy'"}

ランタイム上にnumpyがインストールされなかったのだろうか・・・

WIP

Hydro sonar

参考

メモ

公式のSonarのGUI画面 を見ると、モデルの性能を観測するための仕組みに見える。 モデル選択、メンテナンス、リアイアメントのために用いられる。 例えば、入力データ変化を起因したモデル精度の劣化など。

Hydro Mist

参考

メモ

公式のMistのアーキテクチャイメージ を見ると、 Sparkのマルチテナンシーを実現するものに見える。

特徴として挙げられていたものの中で興味深いのは以下。

  • Spark Function as a Service
  • ユーザのAPIをSparkのコンフィグレーションから分離する
  • HTTP、Kafka、MQTTによるやりとり

Hydro Mistの公式クイックスタートを見ると、Scala、Java、Pythonのサンプルアプリが掲載されている。 Mistはライブラリとしてインポートし、フレームワークに則ってアプリを作成すると、 アプリ内で定義された関数を実行するジョブをローンチできるようになる。

また実際にローンチするときには、REST API等で起動することになるが、 そのときに引数を渡すこともできる。

動作確認

Hydro Mistの公式クイックスタート に従い動作を確認する。 Dockerで実行するか、バイナリで実行するかすれば良い。

Dockerでの起動

Dockerの場合は以下の通り。

1
2
3
$ sudo docker run -p 2004:2004 \
-v /var/run/docker.sock:/var/run/docker.sock \
hydrosphere/mist:1.1.1-2.3.0

バイナリを配備しての起動

バイナリをダウンロードして実行する場合は以下の通り。 まずSparkをダウンロードする。(なお、JDKは予めインストールされていることを前提とする)

1
2
3
4
5
$ mkdir ~/Spark
$ cd ~/Spark
$ wget http://ftp.riken.jp/net/apache/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
$ tar xvzf spark-2.3.3-bin-hadoop2.7.tgz
$ ln -s spark-2.3.3-bin-hadoop2.7 default

これで、~/Spark/default以下にSparkが配備された。 つづいて、Hydro Mistのバイナリをダウンロードし、配備する。

1
2
3
4
5
6
$ mkdir ~/HydroMist
$ cd ~/HydroMist
$ wget http://repo.hydrosphere.io/hydrosphere/static/mist-1.1.1.tar.gz
$ tar xvfz mist-1.1.1.tar.gz
$ ln -s mist-1.1.1 default
$ cd default

以上でHydro Mistが配備された。 それではHydro Mistのマスタを起動する。

1
$ SPARK_HOME=${HOME}/Spark/default ./bin/mist-master start --debug true

以上で、バイナリを配備したHydro Mistが起動する。

Mistの動作確認

以降、サンプルアプリを実行している。

まずmistのCLIを導入する。

1
2
3
$ conda create -n mist python=3.6 python
$ source activate mist
$ pip install mist-cli

続いて、サンプルプロジェクトをcloneする。

1
2
3
$ mkdir -p ~/Sources
$ cd ~/Sources
$ git clone https://github.com/Hydrospheredata/hello_mist.git

試しにScala版を動かす。 (なお、SBTがインストールされていることを前提とする)

1
2
3
$ cd hello_mist/scala
$ sbt package
$ mist-cli apply -f conf -u ''

上記コマンドの結果、以下のような出力が得られる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Process 5 file entries
updating Artifact hello-mist-scala_0.0.1.jar
Success: Artifact hello-mist-scala_0.0.1.jar
updating Context emr_ctx
Success: Context emr_ctx
updating Context emr_autoscale_ctx
Success: Context emr_autoscale_ctx
updating Context standalone
Success: Context standalone
updating Function hello-mist-scala
Success: Function hello-mist-scala


Get context info
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/contexts/emr_ctx


Get context info
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/contexts/emr_autoscale_ctx


Get context info
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/contexts/standalone


Get info of function resource
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X GET http://localhost:2004/v2/api/functions/hello-mist-scala

Start job via mist-cli
--------------------------------------------------------------------------------
mist-cli --host localhost --port 2004 start job hello-mist-scala '{"samples": 7}'

Start job via curl
--------------------------------------------------------------------------------
curl --data '{"samples": 7}' -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/functions/hello-mist-scala/jobs?force=true

試しにstandaloneのコンテキストを確認してみる。 (なお、上記メッセージではPOSTを使うよう書かれているが、GETでないとエラーになった)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ curl -H 'Content-Type: application/json' -X GET http://localhost:2004/v2/api/contexts/standalone | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 252 100 252 0 0 26537 0 --:--:-- --:--:-- --:--:-- 28000
{
"name": "standalone",
"maxJobs": 1,
"maxConnFailures": 1,
"workerMode": "exclusive",
"precreated": false,
"sparkConf": {
"spark.submit.deployMode": "cluster",
"spark.master": "spark://holy-excalibur:7077"
},
"runOptions": "",
"downtime": "1200s",
"streamingDuration": "1s"
}

なお、サンプルアプリは以下の通り。 Piの値を簡易的に計算するものである。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import mist.api._
import mist.api.dsl._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext

object HelloMist extends MistFn with Logging {

override def handle = {
withArgs(
arg[Int]("samples", 10000)
)
.withMistExtras
.onSparkContext((n: Int, extras: MistExtras, sc: SparkContext) => {
import extras._

logger.info(s"Hello Mist started with samples: $n")

val count = sc.parallelize(1 to n).filter(_ => {
val x = math.random
val y = math.random
x * x + y * y < 1
}).count()

val pi = (4.0 * count) / n
pi
}).asHandle
}
}

結果として、以下のようにジョブが登録される。

ウェブUIに登録されたアプリ

登録されたジョブを実行する。

ジョブを走らせる

また上記で表示されていたcurl ...をコマンドで実行することでも、ジョブを走らせることができる。 ジョブの一覧は、ウェブUIから以下のように確認できる。

走らせたジョブ一覧

ジョブ一覧からジョブを選ぶと、そのメタデータ、渡されたパラメータ、結果などを確認できる。 さらにジョブ実行時のログもウェブUIから確認できる。

ジョブ実行時のログ

Sparkのドライバログと思われるものも含まれている。

アーキテクチャと動作

Hydro Mistがどうやってジョブを起動するか によると以下の通り。

Mistで実行する関数は、mist-workerにラップされており、当該ワーカがSparkContextを保持しているようだ。 またワーカを通じてジョブを実行する。 (参考:Hydro Mistのジョブのステート

ユーザがジョブを実行させようとしたとき、Mistのマスタはそのリクエストをキューに入れ、ワーカが空くのを待つ。 ワーカのモードには2種類がある。

  • exclusive
    • ジョブごとにワーカを立ち上げる
  • shared
    • ひとつのジョブが完了してもワーカを立ち上げたままにし再利用する。

これにより、ジョブの並列度、ジョブの耐障害性の面で有利になる。

また、 Hydro Mistのジョブローンチの流れ からジョブ登録から実行までの大まかな流れがわかる。

コンテキストの管理

Sparkのコンテキスト管理やワーカのモード設定は、 mist-cliを通じてできるようだ。 Hydro Mistのコンテキスト管理 参照。

mist-cli

Hydro MIstのmist-cliのGitHub 参照。 mist-cli は、コンフィグファイル(やコンフィグファイルが配備されたディレクトリ)を指定しながら実行する。 ディレクトリにコンフィグを置く場合は、ファイル名の先頭に2桁の数字を入れることで、 プライオリティを指定することができる。

コンフィグには、Artifact、Context、Functionの設定を記載する。 参考:Hydro Mistのコンフィグの例

Hydro Mistの公式クイックスタート を実行したあとの状態で確認してみる。

1
2
3
4
5
6
7
8
9
10
11
12
$ mist-cli list contexts
ID WORKER MODE
default exclusiveemr_ctx exclusiveemr_autoscale_ctx exclusivestandalone exclusive

$ mist-cli list functions
FUNCTION DEFAULT CONTEXT PATH CLASS NAME
hello-mist-scala default hello-mist-scala_0.0.1.jar HelloMist$

$ mist-cli status
Mist version: 1.1.1
Spark version: 2.4.0
Java version: 1.8.0_201-b09

いくつかのコンテキストと、関数が登録されていることがわかる。 なお、GitHub上のバイナリを用いたので使用するSparkのバージョンが2.4.0になっていた。

Scala API

Hydro MistのScala API とサンプルアプリ(mist\examples\examples\src\main\scala\PiExample.scala)を確認してみる。

エントリポイント

MistFnがエントリポイントのようだ。

PiExample.scala:6

1
2
3
4
5
6
object PiExample extends MistFn {

override def handle: Handle = {
val samples = arg[Int]("samples").validated(_ > 0, "Samples should be positive")

(snip)

引数定義

また関数の引数設定はmist.api.ArgsInstances#arg[A]などで行う。 下記にサンプルに記載の例を示す。

PiExample.scala:9

1
val samples = arg[Int]("samples").validated(_ > 0, "Samples should be positive")

[Int]により引数として渡される値の型を指定する。argメソッドの戻り値はNamedUserArgクラスだが、 当該クラスはUserArg[A]トレートを拡張している。 UserArg#validatedメソッドを用いることで、値の検証を行う。

複数の引数を渡すには、withArgsを使うか、combine&を使う。

例:

1
2
3
val three = withArgs(arg[Int]("n"), arg[String]("str"), arg[Boolean]("flag"))

val three = arg[Int]("n") & arg[String]("str") & arg[Boolean]("flag")

またドキュメントには、case classと各種Extractorを用いることで、JSON形式の入力データを 扱う方法が説明されていた。

Sparkのコンテキスト

引数を定義したのちは、Mistのコンテキスト管理のAPIを利用し、 SparkSessionなどを取得する。 onSparkContextonSparkSessionなどを利用可能。

mist/api/MistFnSyntax.scala:91

1
2
3
4
5
def onSparkSession[F, Cmb, Out](f: F)(
implicit
cmb: ArgCombiner.Aux[A, SparkSession, Cmb],
fnT: FnForTuple.Aux[Cmb, F, Out]
): RawHandle[Out] = args.combine(sparkSessionArg).apply(f)

サンプルでは以下のような使い方を示している。

PiExample.scala:10

1
2
3
4
    withArgs(samples).onSparkContext((n: Int, sc: SparkContext) => {
val count = sc.parallelize(1 to n).filter(_ => {

(snip)

もしSparkSessionを使うならば以下の通りか。

1
2
3
4
5
6
7
8
import org.apache.spark.sql.SparkSession

(snip)

withArgs(samples).onSparkSession((n: Int, spark: SparkSession) => {
val count = spark.sparkContext.parallelize(1 to n).filter(_ => {

(snip)

なお、onStreamingContextというのもあり、Spark Streamingも実行可能のようだ。

結果のハンドリング

上記onSparkContextメソッドなどの戻り値の型はRawHandle[Out]である。

mist/api/MistFnSyntax.scala:58

1
2
3
4
5
def onSparkContext[F, Cmb, Out](f: F)(
implicit
cmb: ArgCombiner.Aux[A, SparkContext, Cmb],
fnT: FnForTuple.Aux[Cmb, F, Out]
): RawHandle[Out] = args.combine(sparkContextArg).apply(f)

最終的に結果をJSON形式で返すために、asHandleメソッドを利用する。

PiExample.scala:10

1
2
3
4
5
withArgs(samples).onSparkContext((n: Int, sc: SparkContext) => {

(snip)

}).asHandle

asHandleメソッドは以下の通り。

mist/api/MistFnSyntax.scala:48

1
2
3
implicit class AsHandleOps[A](raw: RawHandle[A]) {
def asHandle(implicit enc: JsEncoder[A]): Handle = raw.toHandle(enc)
}

余談:implicitクラスの使用

mist.api以下で複数のクラスがimplicit定義されて利用されており、一見して解析しづらい印象を覚えた。 例えばonSparkContextonSparkSessionがimplicitクラスContextsOpsクラスに定義されている。

考察:フレームワークの良し悪し

Sparkの単純なプロキシではなく、フレームワーク(ライブラリ)化されていることで、 ユーザはSparkのコンテキストの管理から開放される、という利点がある。

一方で、Hydro Mistのフレームワークに従って実装する必要があり、多少なり

  • Sparkに加えて、Hydro Mistの学習コストがある
  • Mistのフレームワークでは実装しづらいケースが存在するかもしれない
  • トラブルシュートの際に、Mistの実装まで含めて確認が必要になるかもしれない

という懸念点・欠点が挙げられる。

HTTP API

Hydro MistのHTTP API に一覧が載っている。

例えば関数一覧を取得する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ curl -X GET 'http://10.0.0.209:2004/v2/api/functions' | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 208 100 208 0 0 13256 0 --:--:-- --:--:-- --:--:-- 13866
[
{
"name": "hello-mist-scala",
"execute": {
"samples": {
"type": "MOption",
"args": [
{
"type": "MInt"
}
]
}
},
"path": "hello-mist-scala_0.0.1.jar",
"tags": [],
"className": "HelloMist$",
"defaultContext": "default",
"lang": "scala"
}
]

続いて、当該関数についてジョブ一覧を取得する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ curl -X GET 'http://10.0.0.209:2004/v2/api/functions/hello-mist-scala/jobs' | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1682 100 1682 0 0 29484 0 --:--:-- --:--:-- --:--:-- 30035
[
{
"source": "Http",
"startTime": 1553358512167,
"createTime": 1553358507406,
"context": "default",
"params": {
"filePath": "hello-mist-scala_0.0.1.jar",
"className": "HelloMist$",
"arguments": {
"samples": 9
},
"action": "execute"
},
"endTime": 1553358513162,
"jobResult": 2.2222222222222223,
"status": "finished",
"function": "hello-mist-scala",
"jobId": "b044d08f-9554-4be9-8e22-1d687e58c52e",
"workerId": "default_1cb3b66d-99c3-400b-ac3a-f11d72ab8124_4"
},

(snip)

上記のように、ジョブのメタデータと結果が取得される。 ジョブの情報であれば、直接jobs APIを用いても取得可能。

1
$ curl -X GET 'http://10.0.0.209:2004/v2/api/jobs' | jq

またジョブのログを出力できる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ curl -X GET 'http://10.0.0.209:2004/v2/api/jobs/a87197c8-1692-48bc-b151-978ea89b058a/logs' | head -n 20
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0INFO 2019-03-23T16:22:44.178 [a87197c8-1692-48bc-b151-978ea89b058a] Waiting worker connectionINFO 2019-03-23T16:22:44.183 [a87197c8-1692-48bc-b151-978ea89b058a] InitializedEvent(externalId=None)
INFO 2019-03-23T16:22:44.183 [a87197c8-1692-48bc-b151-978ea89b058a] QueuedEventINFO 2019-03-23T16:22:47.991 [a87197c8-1692-48bc-b151-978ea89b058a] WorkerAssigned(workerId=default_1cb3b66d-99c3-400b-ac3a-f11d72ab8124_2)
INFO 2019-03-23T16:22:48.027 [a87197c8-1692-48bc-b151-978ea89b058a] JobFileDownloadingEvent
INFO 2019-03-23T16:22:48.885 [a87197c8-1692-48bc-b151-978ea89b058a] StartedEvent
INFO 2019-03-23T16:22:48.882 [a87197c8-1692-48bc-b151-978ea89b058a] Added JAR /home/centos/HydroMist/default/worker-default_1cb3b66d-99c3-400b-ac3a-f11d72ab8124_2/hello-mist-scala_0.0.1.jar at spark://dev:46260/jars/hello-mist-scala_0.0.1.jar with timestamp 1553358168882
INFO 2019-03-23T16:22:48.965 [a87197c8-1692-48bc-b151-978ea89b058a] Hello Mist started with samples: 8
INFO 2019-03-23T16:22:49.204 [a87197c8-1692-48bc-b151-978ea89b058a] Starting job: count at HelloMist.scala:22
INFO 2019-03-23T16:22:49.218 [a87197c8-1692-48bc-b151-978ea89b058a] Got job 0 (count at HelloMist.scala:22) with 16 output partitions
INFO 2019-03-23T16:22:49.219 [a87197c8-1692-48bc-b151-978ea89b058a] Final stage: ResultStage 0 (count at HelloMist.scala:22)
INFO 2019-03-23T16:22:49.22 [a87197c8-1692-48bc-b151-978ea89b058a] Parents of final stage: List()
INFO 2019-03-23T16:22:49.221 [a87197c8-1692-48bc-b151-978ea89b058a] Missing parents: List()
INFO 2019-03-23T16:22:49.229 [a87197c8-1692-48bc-b151-978ea89b058a] Submitting ResultStage 0 (MapPartitionsRDD[1] at filter at HelloMist.scala:18), which has
no missing parents
INFO 2019-03-23T16:22:49.438 [a87197c8-1692-48bc-b151-978ea89b058a] Block broadcast_0 stored as values in memory (estimated size 1808.0 B, free 366.3 MB)
INFO 2019-03-23T16:22:49.471 [a87197c8-1692-48bc-b151-978ea89b058a] Block broadcast_0_piece0 stored as bytes in memory (estimated size 1232.0 B, free 366.3 MB)
INFO 2019-03-23T16:22:49.473 [a87197c8-1692-48bc-b151-978ea89b058a] Added broadcast_0_piece0 in memory on dev:39976 (size: 1232.0 B, free: 366.3 MB)
INFO 2019-03-23T16:22:49.476 [a87197c8-1692-48bc-b151-978ea89b058a] Created broadcast 0 from broadcast at DAGScheduler.scala:1039
10INFO 2019-03-23T16:22:49.494 [a87197c8-1692-48bc-b151-978ea89b058a] Submitting 16 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at filter at HelloMist.scala:18) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
0INFO 2019-03-23T16:22:49.495 [a87197c8-1692-48bc-b151-978ea89b058a] Adding task set 0.0 with 16 tasks

(snip)

Reactie API

Hydro MistのReactive API を見ると、MQTTやKafkaと連携して動くAPIがあるようだが、 まだドキュメントが成熟していない。 デフォルトでは無効になっている。

EMRとの連係

Hydro MistのEMR連係 を眺めるとhello_mistプロジェクト等でEMRとの連係の仕方を示してくれているようだが、まだ情報が足りない。

共有

Corne Chocolate

参考

メモ

ビルドについて

基本的には、 Corne Chocolateのビルドガイド の通りで問題なかった。

キーマップ

もともと使用していたLet's SplitやHelixとは、デフォルトのキー配置が異なる。 イメージとしては、

  • Raiseレイヤに記号系が集まっている
  • Lowerレイヤに数字やファンクションキーが集まっている
  • 矢印キーがない?
  • Ctrl、ESC、Tabあたりは好みでカスタマイズ必要そう。
  • 親指エンターも好みが分かれそう

という感じ。 個人的な感想として、記号をRaiseレイヤに集めるのは良いと思った。 一方、矢印キー、Ctrlなど、エンターあたりを中心にいじった。

dobachiのキーマップ を参照されたし。

OLED表示

押されたキー1個までならまだしも、入力のログがしばらく表示されるのは気になったので、 keymap.cを以下のように修正して表示しないようにした。

1
2
3
4
5
6
7
8
9
10
11
12
13
diff --git a/keyboards/crkbd/keymaps/dobachi/keymap.c b/keyboards/crkbd/keymaps/dobachi/keymap.c
index f80eff9..44a9137 100644
--- a/keyboards/crkbd/keymaps/dobachi/keymap.c
+++ b/keyboards/crkbd/keymaps/dobachi/keymap.c
@@ -158,7 +158,7 @@ void matrix_render_user(struct CharacterMatrix *matrix) {
// If you want to change the display of OLED, you need to change here
matrix_write_ln(matrix, read_layer_state());
matrix_write_ln(matrix, read_keylog());
- matrix_write_ln(matrix, read_keylogs());
+ //matrix_write_ln(matrix, read_keylogs());
//matrix_write_ln(matrix, read_mode_icon(keymap_config.swap_lalt_lgui));
//matrix_write_ln(matrix, read_host_led_state());
//matrix_write_ln(matrix, read_timelog());
共有

Kafkaのグレースフルシャットダウン

参考

メモ

公式ドキュメント

Graceful Shutdownに関する公式ドキュメントの記述によると、グレースフルシャットダウンの強みは以下の通り。

  • ログリカバリ処理をスキップする
  • シャットダウンするブローカから予めリーダを移動する

実装確認

以下のあたりから、グレースフルシャットダウンの実装。

kafka/server/KafkaServer.scala:422

1
2
3
4
5
  private def controlledShutdown() {

def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)

(snip)

コントローラを取得して接続し、コントローラに対してグレースフルシャットダウンの依頼を投げる。 なお、本メソッドはKafkaServer#shutdownメソッド内で呼び出される。

なお、当該メソッドは(KafkaServerStartableクラスでラップされているが)シャットダウンフックで適用される。

kafka/Kafka.scala:70

1
2
3
4
// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})

共有

KafkaConnectでTwitterデータを取り込む

参考

メモ

ここでは、 confluent-hub コマンドでインストールする。

1
$ confluent-hub install jcustenborder/kafka-connect-twitter

以下の設定ファイルを作る。

/etc/kafka/connect-twitter-source.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector

# Set these required values
twitter.oauth.accessTokenSecret=hoge
process.deletes=false
filter.keywords=kafka
kafka.status.topic=twitter-status
kafka.delete.topic=twitter-delete
twitter.oauth.consumerSecret=hoge
twitter.oauth.accessToken=hoge
twitter.oauth.consumerKey=hoge

キーのところは、適宜TwitterのDeveloper向けページで生成して記載すること。

スタンドアローンモードで実行する。

1
$ connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka/connect-twitter-source.properties

なお、もし分散モードだったら、以下のようにする。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
"name": "twitter",
"config": {
"tasks.max":1,
"connector.class":"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
"twitter.oauth.accessTokenSecret":"hoge",
"process.deletes":"false",
"filter.keywords":"kafka",
"kafka.status.topic":"twitter-status",
"kafka.delete.topic":"twitter-delete",
"twitter.oauth.consumerSecret":"hoge",
"twitter.oauth.accessToken":"hoge",
"twitter.oauth.consumerKey":"hoge",
}
}
'

最後に、入力されるメッセージを確認する。

1
$ kafka-console-consumer --bootstrap-server broker:9092 --topic twitter-status | jq .

結果は以下のような形式である。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"doc": "Return the created_at",
"field": "CreatedAt"
},

(snip)

"payload": {
"CreatedAt": XXXXXXXXXXXXX,
"Id": XXXXXXXXXXXXXXXXXXX,
"Text": "hoge",
"Source": "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>",
"Truncated": false,

(snip)

共有

Kafkaのログ周りの調査メモ

参考

メモ

How Kafka’s Storage Internals Work にログファイルの中身の読み方が載っているので試してみる。

予め、 kafka-connect-twitter のKafka Connectコネクタを用い、Twitterデータを投入しておいた。

トピック(パーティション)の確認

1
2
$ ls /var/lib/kafka/data/twitter-status-0
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint

インデックスの確認。

1
2
3
4
5
6
7
8
9
$ kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /var/lib/kafka/data/twitter-status-0/00000000000000000000.index | head

Dumping /var/lib/kafka/data/twitter-status-0/00000000000000000000.index
offset: 1 position: 16674
offset: 2 position: 33398
offset: 3 position: 50562
offset: 4 position: 67801

(snip)

上記の通り、オフセットと位置が記載されている。

続いて、ログ本体の確認。

1
2
3
4
5
6
7
8
9
10
11
$ kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /var/lib/kafka/data/twitter-status-0/00000000000000000000.log | head -n 4

Dumping /var/lib/kafka/data/twitter-status-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1552055018454 isvalid: true keysize: 239 valuesize: 16362 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: {"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"Id"}],"optional":false,"name":"com.github.jcustenborder.kafka.connect.twitter.StatusKey","doc":"Key for a twitter status."

(snip)

offset: 1 position: 16674 CreateTime: 1552055018465 isvalid: true keysize: 239 valuesize: 16412 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: {"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"Id"}],"optional":false,"name":"com.github.jcustenborder.kafka.connect.twitter.StatusKey","doc":"Key for a twitter status."},"payload":{"Id":1104024860143968257}

(snip)

1
2
3
4
5
$ kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /var/lib/kafka/data/twitter-status-0/00000000000000000000.timeindex | head -n 10

Dumping /var/lib/kafka/data/twitter-status-0/00000000000000000000.timeindextimestamp: 1552055018465 offset: 1timestamp: 1552055529166 offset: 2timestamp: 1552055536284 offset: 3timestamp: 1552055626862 offset: 4timestamp: 1552055652086 offset: 5timestamp: 1552055717443 offset: 6timestamp: 1552055788403 offset: 7timestamp: 1552055789505 offset: 8

(snip)
共有

Apache KafkaにおけるZooKeeper

参考

メモ

前提

  • kafka-docker を使って環境を立てた
  • docker-compose.yml内で環境変数で指定し、ZooKeeperじょうでは、 /kafka以下のパスを用いるようにした。
  • いったんtrunkで確認

ZooKeeperには何が置かれるのか?

実機で確認してみる。

1
2
bash-4.4# zookeeper-shell.sh zookeeper:2181
Connecting to zookeeper:2181
1
2
ls /kafka
[log_dir_event_notification, isr_change_notification, admin, consumers, cluster, config, latest_producer_id_block, controller, brokers, controller_epoch]

トピック準備

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash-4.4#  kafka-topics.sh --create --topic topic --partitions 1 --zookeeper zookeeper:2181/kafka --replication-factor 1

bash-4.4# kafka-topics.sh --topic topic --zookeeper zookeeper:2181/kafka --describeTopic:topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001

bash-4.4# kafka-console-producer.sh --topic=topic --broker-list=kafka:9092
>hoge
>fuga
>hoge
>fuga

bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic topic
hoge
fuga
hoge
fuga

log_dir_event_notification

handleLogDirFailureメソッド内でオフラインとなったディレクトリを取り扱うために用いられる。

kafka/server/ReplicaManager.scala:203

1
2
3
4
5
6
7
8
9
10
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
Override def doWork() {
val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
if (haltBrokerOnDirFailure) {
fatal(s"Halting broker because dir $newOfflineLogDir is offline")
Exit.halt(1)
}
handleLogDirFailure(newOfflineLogDir)
}
}

isr_change_notification

ISRに変化があったことを確認する。

kafka/server/ReplicaManager.scala:269

1
2
3
4
5
6
7
8
9
10
11
12
def maybePropagateIsrChanges() {
val now = System.currentTimeMillis()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
}
}

brokers

以下のように、ブローカに関するいくつかの情報を保持する。

1
2
ls /kafka/brokers
[seqid, topics, ids]

例えば、ブローカ情報を記録するのは以下の通り。

kafka/zk/KafkaZkClient.scala:95

1
2
3
4
5
6
def registerBroker(brokerInfo: BrokerInfo): Long = {
val path = brokerInfo.path
val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}, czxid (broker epoch): ${stat.getCzxid}")
stat.getCzxid
}

例えば、トピック・パーティション情報は以下の通り。

1
2
get /kafka/brokers/topics/topic/partitions/0/state
{"controller_epoch":1,"leader":1001,"version":1,"leader_epoch":0,"isr":[1001]}

controller

例えば、コントローラ情報は以下の通り。

1
2
get /kafka/controller
{"version":1,"brokerid":1001,"timestamp":"1551794212551"}

KafkaZKClient#registerControllerAndIncrementControllerEpochメソッドあたり。

updateLeaderAndIsrメソッド

リーダとISRの情報を受けとり、ZooKeeper上の情報を更新する。

getLogConfigsメソッド

ローカルのデフォルトの設定値と、ZooKeeper上のトピックレベルの設定値をマージする。

setOrCreateEntityConfigsメソッド

トピックを作成する際に呼ばれるメソッドだが、これ自体は何かロックを取りながら、 トピックの情報を編集するわけではないようだ。★要確認 したがって、同じトピックを作成する処理が同時に呼ばれた場合、後勝ちになる。

ただしトピックが作成された後は、トピック作成時に当該トピックが存在するかどうかの確認が行われるので問題ない。

kafka/zk/AdminZkClient.scala:101

1
2
3
def validateTopicCreate(topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties): Unit = {

kafka/server/AdminManager.scala:109

1
2
3
createTopicPolicy match {
case Some(policy) =>
adminZkClient.validateTopicCreate(topic.name(), assignments, configs)

BrokerのEpochについて

以下の通り、BrokerのEpochとしては、ZooKeeperのznodeのcZxid(※)が用いられる。

※znodeの作成に関するZooKeeper Transaction ID

kafka/zk/KafkaZkClient.scala:417

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = {
val brokerIds = getSortedBrokerList
val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests)
getDataResponses.flatMap { getDataResponse =>
val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
getDataResponse.resultCode match {
case Code.OK =>
Some((BrokerIdZNode.decode(brokerId, getDataResponse.data).broker, getDataResponse.stat.getCzxid))
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
}.toMap
}

getAllLogDirEventNotificationsメソッド

ログディレクトリの変化に関する情報を取得する。 コントローラのイベントハンドラ内で、呼び出されるLogDirEventNotification#processメソッドで用いられる。 何か変化のあったログ(ディレクトリ)を確認し、当該ログを保持するブローカのレプリカの情報を最新化する。★要確認

setOrCreatePartitionReassignmentメソッド

パーティションリアサインメントの情報をZooKeeperに書き込む。 このメソッドは、パーティションリアサインメントの必要があるときに呼び出される。 例えばコントローラフェールオーバ時などにも呼び出される。

共有

Clipper

参考

メモ

Strata NY 2018でのClipper が講演のようだが、スライドが公開されていない。

Clipper: A Low-Latency Online Prediction Serving System

Clipperの論文によると、「A Low-Latency Online Prediction Serving System」と定義されている。 上記論文の図1を見るかぎり、モデルサービングを担い、複数の機械学習モデルをまとめ上げる「中層化」層の 役割も担うようだ。 また、モデル選択の機能、キャッシングなどの機能も含まれているようである。

これまでの研究では、学習フェーズに焦点が当たることが多かったが、 この研究では推論フェーズに焦点を当てている。

アーキ概要

  • モデル抽象化層
  • モデル選択層

論文Figure 1参照。

モデル選択について

複数の競合するモデルから得られる結果を扱い、動的にモデルを選択していく。 これにより、モデルの精度とロバストネスを高める。 またバンディットアルゴリズムを採用しており、複数のモデルの結果を組み合わせる。

スループットとレイテンシを保つため、キャッシングやバッチ処理を採用。 所定の(?)レイテンシを保持しながら、スループットを高めるためにバッチ処理化するようだ。

また、このあたりは、モデル抽象化層の上になりたっているから、 異なるフレームワーク発のモデルが含まれている場合でも動作するようである。

TensorFlow Servingとの比較

機能面で充実し、スループットやレイテンシの面でも有利。

ターゲットとなるアプリ

  • 物体検知
  • 音声認識
  • 自動音声認識

チャレンジポイント

  • 機械学習フレームワーク、ライブラリの乱立への対応
    • モデル抽象化レイヤを設けて差異を吸収
  • 高アクセス数と低レイテンシへの対応
    • スループットのレイテンシの両面を考慮してバッチ化
  • A/Bテストの煩わしさと不確かさへの対応
    • 機械的なモデル選択。アンサンブル化。

アーキテクチャと大まかな処理フロー

リクエストが届くまで 結果が帰る流れ フィードバック

実際にはキューは、モデルの抽象化レイヤ側にあるようにも思う。 追って実装確認必要。

キャッシュ

頻度の高いクエリのキャッシュが有効なのはわかりやすいが、頻度の低いクエリの キャッシュも有効な面がある。 予測結果を使ったあとのフィードック(★要確認)はすぐに生じるからである。

キャッシュはrequestとfetchのAPIを持ち、ノンブロッキングである。

LRUポリシーを用いる。

バッチ化

レイテンシに関するSLOを指定すると、それを守る範囲内で、 バッチ化を試みる。これによりスループットの向上を狙う。

バッチ化で狙うのはRPCやメモリコピーの回数削減、 フレームワークのデータ並列の仕組みの活用。

なお、バックオフを設けながら、バッチサイズを変えることで、 最適なバッチサイズを探す。 AIMD=Additive-Increase-Multiplicative-decrease に基づいてサイズ変更。

また、キューに入っているバッチが少ないケースでは、 ある程度貯まるのを待ち、スループットを上げる工夫もする。

モデルコンテナ

C++、Java、Pythonの言語バインディングが提供されている。 各言語バインディングでラッパーを実装すれば良い。

モデルコンテナのレプリカを作り、スケールアウトさせられる。

モデル選択層

アプリケーションのライフサイクル全体に渡り、 フィードバックを考慮しながら、複数のモデルを取り扱う。 これにより、一部のモデルが失敗しても問題ない。 また複数のモデルから得られた結果を統合することでaccuracyを向上させる。

モデルへの選択の基本的なAPIは、select、combine、observe。 selectとcombineはそのままの意味だが、observeは、アプリケーションからの フィードバックを受取り、ステートを更新するために用いるAPIである。

モデル選択については予めgeneralなアルゴリズムが実装されているが、 ユーザが実装することも可能。 ただし、計算量と精度はトレードオフになりがちなので注意。

  • バンディットアルゴリズム(多腕バンディット問題)
    • ClipperではExp3アルゴリズムに対応
    • 単一の結果を利用
  • アンサンブル
    • Clipperでは線形アンサンブルに対応
    • 重み計算にはExp4(バンディットアルゴリズム)を利用

なお、バンディットアルゴリズムについては バンディットアルゴリズムについての説明 などを参照されたし。

信頼度とデフォルト動作

モデルから得られた推測値の信頼度が定められた閾値よりも低い場合、 所定のデフォルト動作をさせられる。 複数のモデルを取り扱う場合、信頼度の指標としてそれらのモデルが 最終的な解を採用するかどうかとする方法が挙げられる。 要は、アンサンブルを利用する、ということである。

落伍者モデルの影響の軽減

アンサンブルの歳、モデル抽象化層でスケールアウトさせられる。 しかし弱点として、モデルの中に落伍者がいると、それに足を引っ張られレイテンシが悪化することである。(計算結果の取得が遅くなる)

そこでClipperでは、モデル選択層で待ち時間(SLO)を設け、ベストエフォートで 結果をアグリゲートすることとした。

ステートの管理

ステート管理には現在の実装ではRedisを用いているようだ。 なお、DockerContainerMangerでは、外部のRedisサービスを利用することもできるし、 開発用に内部でRedisのDockerコンテナを起動させることもできる。

内部的には、コンストラクタ引数にredis_ipを渡すかどうかで管理されているようだ。

clipper_admin/clipper_admin/docker/docker_container_manager.py:77

1
2
if redis_ip is None:
self.external_redis = False

TensorFlow Servingとの比較

TF Servingは、TFと密結合。 Clipper同様にスループット向上のための工夫が施されている。 バッチ化も行う。 ただし、基本的に1個のモデルで推論するようになっており、 アプリケーションからのフィードバックを受取り、 モデル選択するような仕組みは無い。

TF Servingと比べると、多少スループットが劣るものの健闘。

制約

Clipperはモデルをブラックボックスとして扱うので、 モデル内に踏み込んだ最適化は行わない。

類似技術

  • TF Serving
  • LASER
  • Velox

公式ドキュメント

アーキテクチャ

http://clipper.ai/images/clipper-structure.png に公式ドキュメント上の構成イメージが記載されている。 論文と比較して、モデル選択層の部分が「Clipper Query Processor」と表現されているように見える。

またClipperは、コンテナ群をオーケストレーションし、環境を構成する。

またClipperは、実質的に「ファンクション・サーバ」であると考えられる。 実のところ、ファンクションは機械学習モデルでなくてもよい。 (公式ドキュメントのクイックスタートでも、簡単な四則演算をデプロイする例が載っている)

モデル管理

モデルは、リストを入力し、値かJSONを出力する。 入力がリストなのは、機械学習モデルによってはデータ並列で処理し、性能向上を狙うものがあるため。

またClipperはモデル管理のためのライブラリを提供する。 これにより、ある程度決まったモデルであれば、コンテナの作成やモデルの保存などを 自前で作り込む必要がない。 現在は以下の内容に対応する。

  • One to deploy arbitrary Python functions (within some constraints)
  • One to deploy PySpark models along with pre- and post-processing logic
  • One to deploy R models

アプリケーションとモデルの間は、必ずしも1対1でなくてもよい。

参考) * http://clipper.ai/images/link_model.png * http://clipper.ai/images/update_model.png

以上の通り、モデルをデプロイしながら、複数のモデルをアプリケーションにリンクして切り替えることができる。 例えばデプロイした新しいモデルが想定通り動作しなかったとき、もとのモデルに切り戻す、など。

なお、モデルを登録するときには、いくつかの引数を渡す。

  • 入力型
  • レイテンシのSLO(サービスレベルオブジェクト)
  • デフォルトの出力

さらに、 ClipperConnection.set_num_replicas() を用いて、モデルのレプリカ数を決められる。

コンテナマネージャ

Container Managers によると、Dockerコンテナを自前のライブラリか、k8sでオーケストレーションすることが可能。 自前のライブラリは開発向けのようである。

なお、ステートを保存するRedisもコンテナでローンチするようになっているが、 勝手にローンチしてくれるものはステートを永続化するようになっていない。 したがってコンテナが落ちるとステートが消える。 プロダクションのケースでは、きちんと可用性を考慮した構成で予めローンチしておくことが推奨されている。

注意点として、2019/2/27時点でのClipperでは、クエリフロントエンドのオートスケールには対応していない。 これは、クエリフロントエンドとモデル抽象化層のコンテナの間を長命なコネクション(TCPコネクション)で 結んでいるからである。クエリフロントエンドをスケールアウトする時に、これをリバランスする機能が まだ存在していない。

APIドキュメント

クライアントのサンプル

example_client.py

コネクションの生成とエンドポイント定義

1
2
3
4
clipper_conn = ClipperConnection(DockerContainerManager())
clipper_conn.start_clipper()
python_deployer.create_endpoint(clipper_conn, "simple-example", "doubles",
feature_sum)

ここで渡しているファンクションfeature_sumは以下の通り。

1
2
def feature_sum(xs):
return [str(sum(x)) for x in xs]

推論

1
2
3
4
5
6
7
8
9
while True:
if batch_size > 1:
predict(
clipper_conn.get_query_addr(),
[list(np.random.random(200)) for i in range(batch_size)],
batch=True)
else:
predict(clipper_conn.get_query_addr(), np.random.random(200))
time.sleep(0.2)

predict関数の定義は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def predict(addr, x, batch=False):
url = "http://%s/simple-example/predict" % addr

if batch:
req_json = json.dumps({'input_batch': x})
else:
req_json = json.dumps({'input': list(x)})

headers = {'Content-type': 'application/json'}
start = datetime.now()
r = requests.post(url, headers=headers, data=req_json)
end = datetime.now()
latency = (end - start).total_seconds() * 1000.0
print("'%s', %f ms" % (r.text, latency))

与えられた入力をJSONに変換し、REST APIで渡している。

XGBOOSTの例

1
2
(clipper) $ pip install xgboost
(clipper) $ ipython

Clipperを起動。

1
2
3
4
import logging, xgboost as xgb, numpy as np
from clipper_admin import ClipperConnection, DockerContainerManager
cl = ClipperConnection(DockerContainerManager())
cl.start_clipper()

結果

1
2
19-02-27:22:49:09 INFO     [docker_container_manager.py:119] Starting managed Redis instance in Docker
19-02-27:22:49:14 INFO [clipper_admin.py:126] Clipper is running

アプリを登録。

1
cl.register_application('xgboost-test', 'integers', 'default_pred', 100000)

結果

1
19-02-27:22:49:55 INFO     [clipper_admin.py:201] Application xgboost-test was successfully registered

ファンクション定義

1
2
3
4
5
6
7
8
9
10
11
12
def get_test_point():
return [np.random.randint(255) for _ in range(784)]

# Create a training matrix.
dtrain = xgb.DMatrix(get_test_point(), label=[0])

# We then create parameters, watchlist, and specify the number of rounds
# This is code that we use to build our XGBoost Model, and your code may differ.
param = {'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'binary:logistic'}
watchlist = [(dtrain, 'train')]
num_round = 2
bst = xgb.train(param, dtrain, num_round, watchlist)

結果

1
2
[0]     train-error:0
[1] train-error:0

推論用の関数定義

1
2
def predict(xs):
return bst.predict(xgb.DMatrix(xs))

推論用のコンテンをビルドし、ローンチ。

1
2
3
4
5
6
from clipper_admin.deployers import python as python_deployer
# We specify which packages to install in the pkgs_to_install arg.
# For example, if we wanted to install xgboost and psycopg2, we would use
# pkgs_to_install = ['xgboost', 'psycopg2']
python_deployer.deploy_python_closure(cl, name='xgboost-model', version=1,
input_type="integers", func=predict, pkgs_to_install=['xgboost'])

なお、ここではコンテナをビルドするときに、xgboostをインストールするように指定している。

結果

1
2
3
4
5
6
7
8
9
19-02-27:22:54:35 INFO     [deployer_utils.py:44] Saving function to /tmp/clipper/tmpincj4sg2
19-02-27:22:54:35 INFO [deployer_utils.py:54] Serialized and supplied predict function
19-02-27:22:54:35 INFO [python.py:192] Python closure saved

(snip)

19-02-27:22:54:53 INFO [docker_container_manager.py:257] Found 0 replicas for xgboost-model:1. Adding 1
19-02-27:22:55:00 INFO [clipper_admin.py:635] Successfully registered model xgboost-model:1
19-02-27:22:55:00 INFO [clipper_admin.py:553] Done deploying model xgboost-model:1.

モデルをアプリにリンク。

1
cl.link_model_to_app('xgboost-test', 'xgboost-model')

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests, json
# Get Address
addr = cl.get_query_addr()
# Post Query
response = requests.post(
"http://%s/%s/predict" % (addr, 'xgboost-test'),
headers={"Content-type": "application/json"},
data=json.dumps({
'input': get_test_point()
}))
result = response.json()
if response.status_code == requests.codes.ok and result["default"]:
print('A default prediction was returned.')
elif response.status_code != requests.codes.ok:
print(result)
raise BenchmarkException(response.text)
else:
print('Prediction Returned:', result)

結果

1
Prediction Returned: {'query_id': 2, 'output': 0.3266071, 'default': False}

実装確認

開発言語

1
C++ 56.0%	 Python 24.1%	 CMake 8.1%	 Scala 3.0%	 Shell 2.5%	 Java 2.2%	 Other 4.1%

コンテナの起動の流れを確認してみる

ClipperConnection#start_clipperメソッドを確認する。

ContainerManager#start_clipperメソッドが中で呼ばれる。

clipper_admin/clipper_admin/clipper_admin.py:123

1
2
3
self.cm.start_clipper(query_frontend_image, mgmt_frontend_image,
frontend_exporter_image, cache_size,
num_frontend_replicas)

clipper_admin/clipper_admin/container_manager.py:63

1
2
3
4
5
class ContainerManager(object):
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def start_clipper(self, query_frontend_image, mgmt_frontend_image,

ContainerManagerは親クラスであり、KubernetesContainerManagerやDockerContainerManagerのstart_clipperが実行される。 例えばDockerContainerManagerを見てみる。

Docker SDKを使い、docker networkを作る。

clipper_admin/clipper_admin/docker/docker_container_manager.py:128

1
2
self.docker_client.networks.create(
self.docker_network, check_duplicate=True)

Redisを起動する。

clipper_admin/clipper_admin/docker/docker_container_manager.py:156

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if not self.external_redis:
self.logger.info("Starting managed Redis instance in Docker")
self.redis_port = find_unbound_port(self.redis_port)
redis_labels = self.common_labels.copy()
redis_labels[CLIPPER_DOCKER_PORT_LABELS['redis']] = str(
self.redis_port)
redis_container = self.docker_client.containers.run(
'redis:alpine',
"redis-server --port %s" % CLIPPER_INTERNAL_REDIS_PORT,
name="redis-{}".format(random.randint(
0, 100000)), # generate a random name
ports={
'%s/tcp' % CLIPPER_INTERNAL_REDIS_PORT: self.redis_port
},
labels=redis_labels,
**self.extra_container_kwargs)
self.redis_ip = redis_container.name

マネジメントフロントエンドを起動。

clipper_admin/clipper_admin/docker/docker_container_manager.py:168

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mgmt_cmd = "--redis_ip={redis_ip} --redis_port={redis_port}".format(
redis_ip=self.redis_ip, redis_port=CLIPPER_INTERNAL_REDIS_PORT)
self.clipper_management_port = find_unbound_port(
self.clipper_management_port)
mgmt_labels = self.common_labels.copy()
mgmt_labels[CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL] = ""
mgmt_labels[CLIPPER_DOCKER_PORT_LABELS['management']] = str(
self.clipper_management_port)
self.docker_client.containers.run(
mgmt_frontend_image,
mgmt_cmd,
name="mgmt_frontend-{}".format(random.randint(
0, 100000)), # generate a random name
ports={
'%s/tcp' % CLIPPER_INTERNAL_MANAGEMENT_PORT:
self.clipper_management_port
},
labels=mgmt_labels,
**self.extra_container_kwargs)

なお、コンテナ起動のコマンドに、上記で起動された(もしくは外部のサービスとして与えられた)Redisの IPアドレス、ポート等の情報が渡されていることがわかる。

クエリフロントエンドの起動。

1
2
3
4
5
6
7
8
9
10
11
self.docker_client.containers.run(
query_frontend_image,
query_cmd,
name=query_name,
ports={
'%s/tcp' % CLIPPER_INTERNAL_QUERY_PORT:
self.clipper_query_port,
'%s/tcp' % CLIPPER_INTERNAL_RPC_PORT: self.clipper_rpc_port
},
labels=query_labels,
**self.extra_container_kwargs)

その他、メトリクスのコンテナを起動する。

1
2
3
run_metric_image(self.docker_client, metric_labels,
self.prometheus_port, self.prom_config_path,
self.extra_container_kwargs)

コンテナを起動したあとは、ポートの情報を更新する。

create_endpointメソッドを確認

公式の例でも、create_endpointメソッドがよく用いられるので確認する。

メソッド引数は以下の通り。

clipper_admin/clipper_admin/deployers/python.py:16

1
2
3
4
5
6
7
8
9
10
11
12
13
def create_endpoint(clipper_conn,
name,
input_type,
func,
default_output="None",
version=1,
slo_micros=3000000,
labels=None,
registry=None,
base_image="default",
num_replicas=1,
batch_size=-1,
pkgs_to_install=None):

基本的には、ClipperConnectionインスタンス、関数名、入力データのタイプ、関数を指定しながら利用する。

メソッド内部の処理は以下のとおり。

clipper_admin/clipper_admin/deployers/python.py:87

1
2
3
4
5
6
7
clipper_conn.register_application(name, input_type, default_output,
slo_micros)
deploy_python_closure(clipper_conn, name, version, input_type, func,
base_image, labels, registry, num_replicas,
batch_size, pkgs_to_install)

clipper_conn.link_model_to_app(name, name)

最初にregister_applicationメソッドで、アプリケーションを Clipperに登録する。 なお、register_applicatoin メソッドは、Clipperの マネジメントフロントエンドに対してリクエストを送る。

マネジメントフロントエンドのURLの指定は以下の通り。

clipper_admin/clipper_admin/clipper_admin.py:201

1
2
url = "http://{host}/admin/add_app".format(
host=self.cm.get_admin_addr())

マネジメントフロントエンド自体は、 \src\management\src\management_frontend.hpp が自体と思われる。 以下の通り、エンドポイント add_app が定義されている。

src/management/src/management_frontend.hpp:52

1
const std::string ADD_APPLICATION = ADMIN_PATH + "/add_app$";

src/management/src/management_frontend.hpp:181

1
2
3
4
5
6
7
server_.add_endpoint(
ADD_APPLICATION, "POST",
[this](std::shared_ptr<HttpServer::Response> response,
std::shared_ptr<HttpServer::Request> request) {
try {
clipper::log_info(LOGGING_TAG_MANAGEMENT_FRONTEND,
"Add application POST request");

create_endopintの流れ確認に戻る。

アプリケーションの登録が完了したあとは、deploy_python_closure メソッドを使って、

引数は以下の通り。

clipper_admin/clipper_admin/deployers/python.py:96

1
2
3
4
5
6
7
8
9
10
11
def deploy_python_closure(clipper_conn,
name,
version,
input_type,
func,
base_image="default",
labels=None,
registry=None,
num_replicas=1,
batch_size=-1,
pkgs_to_install=None):

このメソッドでは、最初に save_python_function を使って 関数をシリアライズする。

clipper_admin/clipper_admin/deployers/python.py:189

1
serialization_dir = save_python_function(name, func)

つづいて、Pyhonのバージョンに従いながらベースとなるイメージを選択する。 以下にPython3.6の場合を載せる。

clipper_admin/clipper_admin/deployers/python.py:205

1
2
3
4
elif py_minor_version == (3, 6):
logger.info("Using Python 3.6 base image")
base_image = "{}/python36-closure-container:{}".format(
__registry__, __version__)

最後にClipperConnection#build_and_deploy_modelメソッドを使って 関数を含むコンテナイメージを作成する。

clipper_admin/clipper_admin/deployers/python.py:220

1
2
3
clipper_conn.build_and_deploy_model(
name, version, input_type, serialization_dir, base_image, labels,
registry, num_replicas, batch_size, pkgs_to_install)

build_and_deploy_model メソッドは以下の通り。

clipper_admin/clipper_admin/clipper_admin.py:352

1
2
3
4
5
6
if not self.connected:
raise UnconnectedException()
image = self.build_model(name, version, model_data_path, base_image,
container_registry, pkgs_to_install)
self.deploy_model(name, version, input_type, image, labels,
num_replicas, batch_size)

build_model メソッドでDockerイメージをビルドし、 レポジトリに登録する。

deploy_model メソッドで登録されたDockerイメージからコンテナを起動する。 このとき指定された個数だけコンテナを起動する。 なお、起動時には、抽象クラス ContainerManagerdeploy_model メソッドが呼ばれる。 実際には具象クラスであるKubernetesContainerManagerやDockerContainerManagerクラスのメソッドが実行される。 ここに抽象化が施されていると考えられる。

動作確認

クイックスタート

公式ウェブサイト のクイックスタートの通り、実行してみる。 上記サイトでAnaconda上で環境構成するのを推奨する記述があったためそれに従う。 また、Pythonバージョンに指定があったので、指定された中で最も新しい3.6にした。

1
2
3
4
5
$ sudo yum install gcc
$ conda create -n clipper python=3.6 python
$ conda activate clipper
$ conda install ipython
$ pip install clipper_admin

ipythonを起動する。

1
(clipper) $ ipython

Docker環境を立てる。

1
2
3
from clipper_admin import ClipperConnection, DockerContainerManager
clipper_conn = ClipperConnection(DockerContainerManager())
clipper_conn.start_clipper()

結果の例

1
2
19-02-26:22:24:42 INFO     [docker_container_manager.py:119] Starting managed Redis instance in Docker
19-02-26:22:26:50 INFO [clipper_admin.py:126] Clipper is running

簡単な例を登録。これにより、エンドポイントが有効になるようだ。 実際の処理の登録は後ほど。

1
clipper_conn.register_application(name="hello-world", input_type="doubles", default_output="-1.0", slo_micros=100000)

登録処理は以下の通り。

1
2
def feature_sum(xs):
return [str(sum(x)) for x in xs]

デプロイ。

1
2
from clipper_admin.deployers import python as python_deployer
python_deployer.deploy_python_closure(clipper_conn, name="sum-model", version=1, input_type="doubles", func=feature_sum)

ここから、モデル抽象化層のDockerイメージが作られる。

1
2
3
4
5
6
7
8
9
10
11
19-02-26:22:30:59 INFO     [deployer_utils.py:44] Saving function to /tmp/clipper/tmp67eliqhx
19-02-26:22:30:59 INFO [deployer_utils.py:54] Serialized and supplied predict function
19-02-26:22:30:59 INFO [python.py:192] Python closure saved
19-02-26:22:30:59 INFO [python.py:206] Using Python 3.6 base image

(snip)

19-02-26:22:31:26 INFO [docker_container_manager.py:257] Found 0 replicas for sum-model:1. Adding 1
19-02-26:22:31:33 INFO [clipper_admin.py:635] Successfully registered model sum-model:1
19-02-26:22:31:33 INFO [clipper_admin.py:553] Done deploying model sum-model:1.
19-02-26:22:30:59 INFO [clipper_admin.py:452] Building model Docker image with model data from /tmp/clipper/tmp67eliqhx

モデルをアプリケーションにリンクさせる。

1
clipper_conn.link_model_to_app(app_name="hello-world", model_name="sum-model")

以上で、エンドポイントhttp://localhost:1337/hello-world/predictを用いて、 推論結果(計算結果)を受け取れるようになる。

curlで結果の取得

1
$ curl -X POST --header "Content-Type:application/json" -d '{"input": [1.1, 2.2, 3.3]}' 127.0.0.1:1337/hello-world/predict

結果の例

1
{"query_id":0,"output":6.6,"default":false}

Pythonから取得するパターン。

1
2
3
import requests, json, numpy as np
headers = {"Content-type": "application/json"}
requests.post("http://localhost:1337/hello-world/predict", headers=headers, data=json.dumps({"input": list(np.random.random(10))})).json()

結果の例

1
Out[12]: {'query_id': 1, 'output': 4.710181343957851, 'default': False}

参考までに、この時点で実行されているDockerコンテナは以下の通り。

1
2
3
4
5
6
7
8
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b041228b2a4d sum-model:1 "/container/containe…" 25 minutes ago Up 25 minutes (healthy) sum-model_1-37382
6f93447357bd prom/prometheus:v2.1.0 "/bin/prometheus --c…" 30 minutes ago Up 30 minutes 0.0.0.0:9090->9090/tcp metric_frontend-37801
b3034a7352b8 clipper/frontend-exporter:0.3.0 "python /usr/src/app…" 30 minutes ago Up 30 minutes query_frontend_exporter-33443
6b775a49e1ff clipper/query_frontend:0.3.0 "/clipper/release/sr…" 30 minutes ago Up 30 minutes 0.0.0.0:1337->1337/tcp, 0.0.0.0:7000->7000/tcp query_frontend-33443
2c75406fda84 clipper/management_frontend:0.3.0 "/clipper/release/sr…" 30 minutes ago Up 30 minutes 0.0.0.0:1338->1338/tcp mgmt_frontend-39690
ff14d91f313e redis:alpine "docker-entrypoint.s…" 32 minutes ago Up 32 minutes 0.0.0.0:6379->6379/tcp redis-28775

最後にコンテナを停止しておく。

1
clipper_conn.stop_all()

画像取扱のサンプル

image_query の通りに実行してみる。 また、 上記のノートブックが image_query/example.ipynb にある。

Clipperでは、REST APIでクエリが渡される。データはJSONにラップされて渡される。 そのため、ノートブック image_query/example.ipynb を見ると、 画像ファイルがバイト列で渡される場合と、BASE64でエンコードされて渡される場合の2種類の例が載っていた。

この例では、画像のサイズを返すような関数を定義して使っているが、 渡された画像に何らかの判定処理を加えて戻り値を返すような関数を定義すればよいだろう。

また、例では、python_deployer#create_endpointメソッドが用いられている。

共有