KafkaConnectでTwitterデータを取り込む

参考

メモ

ここでは、 confluent-hub コマンドでインストールする。

1
$ confluent-hub install jcustenborder/kafka-connect-twitter

以下の設定ファイルを作る。

/etc/kafka/connect-twitter-source.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector

# Set these required values
twitter.oauth.accessTokenSecret=hoge
process.deletes=false
filter.keywords=kafka
kafka.status.topic=twitter-status
kafka.delete.topic=twitter-delete
twitter.oauth.consumerSecret=hoge
twitter.oauth.accessToken=hoge
twitter.oauth.consumerKey=hoge

キーのところは、適宜TwitterのDeveloper向けページで生成して記載すること。

スタンドアローンモードで実行する。

1
$ connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka/connect-twitter-source.properties

なお、もし分散モードだったら、以下のようにする。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
"name": "twitter",
"config": {
"tasks.max":1,
"connector.class":"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
"twitter.oauth.accessTokenSecret":"hoge",
"process.deletes":"false",
"filter.keywords":"kafka",
"kafka.status.topic":"twitter-status",
"kafka.delete.topic":"twitter-delete",
"twitter.oauth.consumerSecret":"hoge",
"twitter.oauth.accessToken":"hoge",
"twitter.oauth.consumerKey":"hoge",
}
}
'

最後に、入力されるメッセージを確認する。

1
$ kafka-console-consumer --bootstrap-server broker:9092 --topic twitter-status | jq .

結果は以下のような形式である。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"doc": "Return the created_at",
"field": "CreatedAt"
},

(snip)

"payload": {
"CreatedAt": XXXXXXXXXXXXX,
"Id": XXXXXXXXXXXXXXXXXXX,
"Text": "hoge",
"Source": "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>",
"Truncated": false,

(snip)

共有

Kafkaのログ周りの調査メモ

参考

メモ

How Kafka’s Storage Internals Work にログファイルの中身の読み方が載っているので試してみる。

予め、 kafka-connect-twitter のKafka Connectコネクタを用い、Twitterデータを投入しておいた。

トピック(パーティション)の確認

1
2
$ ls /var/lib/kafka/data/twitter-status-0
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint

インデックスの確認。

1
2
3
4
5
6
7
8
9
$ kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /var/lib/kafka/data/twitter-status-0/00000000000000000000.index | head

Dumping /var/lib/kafka/data/twitter-status-0/00000000000000000000.index
offset: 1 position: 16674
offset: 2 position: 33398
offset: 3 position: 50562
offset: 4 position: 67801

(snip)

上記の通り、オフセットと位置が記載されている。

続いて、ログ本体の確認。

1
2
3
4
5
6
7
8
9
10
11
$ kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /var/lib/kafka/data/twitter-status-0/00000000000000000000.log | head -n 4

Dumping /var/lib/kafka/data/twitter-status-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1552055018454 isvalid: true keysize: 239 valuesize: 16362 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: {"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"Id"}],"optional":false,"name":"com.github.jcustenborder.kafka.connect.twitter.StatusKey","doc":"Key for a twitter status."

(snip)

offset: 1 position: 16674 CreateTime: 1552055018465 isvalid: true keysize: 239 valuesize: 16412 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: {"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"Id"}],"optional":false,"name":"com.github.jcustenborder.kafka.connect.twitter.StatusKey","doc":"Key for a twitter status."},"payload":{"Id":1104024860143968257}

(snip)

1
2
3
4
5
$ kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /var/lib/kafka/data/twitter-status-0/00000000000000000000.timeindex | head -n 10

Dumping /var/lib/kafka/data/twitter-status-0/00000000000000000000.timeindextimestamp: 1552055018465 offset: 1timestamp: 1552055529166 offset: 2timestamp: 1552055536284 offset: 3timestamp: 1552055626862 offset: 4timestamp: 1552055652086 offset: 5timestamp: 1552055717443 offset: 6timestamp: 1552055788403 offset: 7timestamp: 1552055789505 offset: 8

(snip)
共有

Apache KafkaにおけるZooKeeper

参考

メモ

前提

  • kafka-docker を使って環境を立てた
  • docker-compose.yml内で環境変数で指定し、ZooKeeperじょうでは、 /kafka以下のパスを用いるようにした。
  • いったんtrunkで確認

ZooKeeperには何が置かれるのか?

実機で確認してみる。

1
2
bash-4.4# zookeeper-shell.sh zookeeper:2181
Connecting to zookeeper:2181
1
2
ls /kafka
[log_dir_event_notification, isr_change_notification, admin, consumers, cluster, config, latest_producer_id_block, controller, brokers, controller_epoch]

トピック準備

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bash-4.4#  kafka-topics.sh --create --topic topic --partitions 1 --zookeeper zookeeper:2181/kafka --replication-factor 1

bash-4.4# kafka-topics.sh --topic topic --zookeeper zookeeper:2181/kafka --describeTopic:topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001

bash-4.4# kafka-console-producer.sh --topic=topic --broker-list=kafka:9092
>hoge
>fuga
>hoge
>fuga

bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic topic
hoge
fuga
hoge
fuga

log_dir_event_notification

handleLogDirFailureメソッド内でオフラインとなったディレクトリを取り扱うために用いられる。

kafka/server/ReplicaManager.scala:203

1
2
3
4
5
6
7
8
9
10
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
Override def doWork() {
val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
if (haltBrokerOnDirFailure) {
fatal(s"Halting broker because dir $newOfflineLogDir is offline")
Exit.halt(1)
}
handleLogDirFailure(newOfflineLogDir)
}
}

isr_change_notification

ISRに変化があったことを確認する。

kafka/server/ReplicaManager.scala:269

1
2
3
4
5
6
7
8
9
10
11
12
def maybePropagateIsrChanges() {
val now = System.currentTimeMillis()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
}
}

brokers

以下のように、ブローカに関するいくつかの情報を保持する。

1
2
ls /kafka/brokers
[seqid, topics, ids]

例えば、ブローカ情報を記録するのは以下の通り。

kafka/zk/KafkaZkClient.scala:95

1
2
3
4
5
6
def registerBroker(brokerInfo: BrokerInfo): Long = {
val path = brokerInfo.path
val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}, czxid (broker epoch): ${stat.getCzxid}")
stat.getCzxid
}

例えば、トピック・パーティション情報は以下の通り。

1
2
get /kafka/brokers/topics/topic/partitions/0/state
{"controller_epoch":1,"leader":1001,"version":1,"leader_epoch":0,"isr":[1001]}

controller

例えば、コントローラ情報は以下の通り。

1
2
get /kafka/controller
{"version":1,"brokerid":1001,"timestamp":"1551794212551"}

KafkaZKClient#registerControllerAndIncrementControllerEpochメソッドあたり。

updateLeaderAndIsrメソッド

リーダとISRの情報を受けとり、ZooKeeper上の情報を更新する。

getLogConfigsメソッド

ローカルのデフォルトの設定値と、ZooKeeper上のトピックレベルの設定値をマージする。

setOrCreateEntityConfigsメソッド

トピックを作成する際に呼ばれるメソッドだが、これ自体は何かロックを取りながら、 トピックの情報を編集するわけではないようだ。★要確認 したがって、同じトピックを作成する処理が同時に呼ばれた場合、後勝ちになる。

ただしトピックが作成された後は、トピック作成時に当該トピックが存在するかどうかの確認が行われるので問題ない。

kafka/zk/AdminZkClient.scala:101

1
2
3
def validateTopicCreate(topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties): Unit = {

kafka/server/AdminManager.scala:109

1
2
3
createTopicPolicy match {
case Some(policy) =>
adminZkClient.validateTopicCreate(topic.name(), assignments, configs)

BrokerのEpochについて

以下の通り、BrokerのEpochとしては、ZooKeeperのznodeのcZxid(※)が用いられる。

※znodeの作成に関するZooKeeper Transaction ID

kafka/zk/KafkaZkClient.scala:417

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = {
val brokerIds = getSortedBrokerList
val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
val getDataResponses = retryRequestsUntilConnected(getDataRequests)
getDataResponses.flatMap { getDataResponse =>
val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
getDataResponse.resultCode match {
case Code.OK =>
Some((BrokerIdZNode.decode(brokerId, getDataResponse.data).broker, getDataResponse.stat.getCzxid))
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
}.toMap
}

getAllLogDirEventNotificationsメソッド

ログディレクトリの変化に関する情報を取得する。 コントローラのイベントハンドラ内で、呼び出されるLogDirEventNotification#processメソッドで用いられる。 何か変化のあったログ(ディレクトリ)を確認し、当該ログを保持するブローカのレプリカの情報を最新化する。★要確認

setOrCreatePartitionReassignmentメソッド

パーティションリアサインメントの情報をZooKeeperに書き込む。 このメソッドは、パーティションリアサインメントの必要があるときに呼び出される。 例えばコントローラフェールオーバ時などにも呼び出される。

共有

Clipper

参考

メモ

Strata NY 2018でのClipper が講演のようだが、スライドが公開されていない。

Clipper: A Low-Latency Online Prediction Serving System

Clipperの論文によると、「A Low-Latency Online Prediction Serving System」と定義されている。 上記論文の図1を見るかぎり、モデルサービングを担い、複数の機械学習モデルをまとめ上げる「中層化」層の 役割も担うようだ。 また、モデル選択の機能、キャッシングなどの機能も含まれているようである。

これまでの研究では、学習フェーズに焦点が当たることが多かったが、 この研究では推論フェーズに焦点を当てている。

アーキ概要

  • モデル抽象化層
  • モデル選択層

論文Figure 1参照。

モデル選択について

複数の競合するモデルから得られる結果を扱い、動的にモデルを選択していく。 これにより、モデルの精度とロバストネスを高める。 またバンディットアルゴリズムを採用しており、複数のモデルの結果を組み合わせる。

スループットとレイテンシを保つため、キャッシングやバッチ処理を採用。 所定の(?)レイテンシを保持しながら、スループットを高めるためにバッチ処理化するようだ。

また、このあたりは、モデル抽象化層の上になりたっているから、 異なるフレームワーク発のモデルが含まれている場合でも動作するようである。

TensorFlow Servingとの比較

機能面で充実し、スループットやレイテンシの面でも有利。

ターゲットとなるアプリ

  • 物体検知
  • 音声認識
  • 自動音声認識

チャレンジポイント

  • 機械学習フレームワーク、ライブラリの乱立への対応
    • モデル抽象化レイヤを設けて差異を吸収
  • 高アクセス数と低レイテンシへの対応
    • スループットのレイテンシの両面を考慮してバッチ化
  • A/Bテストの煩わしさと不確かさへの対応
    • 機械的なモデル選択。アンサンブル化。

アーキテクチャと大まかな処理フロー

リクエストが届くまで 結果が帰る流れ フィードバック

実際にはキューは、モデルの抽象化レイヤ側にあるようにも思う。 追って実装確認必要。

キャッシュ

頻度の高いクエリのキャッシュが有効なのはわかりやすいが、頻度の低いクエリの キャッシュも有効な面がある。 予測結果を使ったあとのフィードック(★要確認)はすぐに生じるからである。

キャッシュはrequestとfetchのAPIを持ち、ノンブロッキングである。

LRUポリシーを用いる。

バッチ化

レイテンシに関するSLOを指定すると、それを守る範囲内で、 バッチ化を試みる。これによりスループットの向上を狙う。

バッチ化で狙うのはRPCやメモリコピーの回数削減、 フレームワークのデータ並列の仕組みの活用。

なお、バックオフを設けながら、バッチサイズを変えることで、 最適なバッチサイズを探す。 AIMD=Additive-Increase-Multiplicative-decrease に基づいてサイズ変更。

また、キューに入っているバッチが少ないケースでは、 ある程度貯まるのを待ち、スループットを上げる工夫もする。

モデルコンテナ

C++、Java、Pythonの言語バインディングが提供されている。 各言語バインディングでラッパーを実装すれば良い。

モデルコンテナのレプリカを作り、スケールアウトさせられる。

モデル選択層

アプリケーションのライフサイクル全体に渡り、 フィードバックを考慮しながら、複数のモデルを取り扱う。 これにより、一部のモデルが失敗しても問題ない。 また複数のモデルから得られた結果を統合することでaccuracyを向上させる。

モデルへの選択の基本的なAPIは、select、combine、observe。 selectとcombineはそのままの意味だが、observeは、アプリケーションからの フィードバックを受取り、ステートを更新するために用いるAPIである。

モデル選択については予めgeneralなアルゴリズムが実装されているが、 ユーザが実装することも可能。 ただし、計算量と精度はトレードオフになりがちなので注意。

  • バンディットアルゴリズム(多腕バンディット問題)
    • ClipperではExp3アルゴリズムに対応
    • 単一の結果を利用
  • アンサンブル
    • Clipperでは線形アンサンブルに対応
    • 重み計算にはExp4(バンディットアルゴリズム)を利用

なお、バンディットアルゴリズムについては バンディットアルゴリズムについての説明 などを参照されたし。

信頼度とデフォルト動作

モデルから得られた推測値の信頼度が定められた閾値よりも低い場合、 所定のデフォルト動作をさせられる。 複数のモデルを取り扱う場合、信頼度の指標としてそれらのモデルが 最終的な解を採用するかどうかとする方法が挙げられる。 要は、アンサンブルを利用する、ということである。

落伍者モデルの影響の軽減

アンサンブルの歳、モデル抽象化層でスケールアウトさせられる。 しかし弱点として、モデルの中に落伍者がいると、それに足を引っ張られレイテンシが悪化することである。(計算結果の取得が遅くなる)

そこでClipperでは、モデル選択層で待ち時間(SLO)を設け、ベストエフォートで 結果をアグリゲートすることとした。

ステートの管理

ステート管理には現在の実装ではRedisを用いているようだ。 なお、DockerContainerMangerでは、外部のRedisサービスを利用することもできるし、 開発用に内部でRedisのDockerコンテナを起動させることもできる。

内部的には、コンストラクタ引数にredis_ipを渡すかどうかで管理されているようだ。

clipper_admin/clipper_admin/docker/docker_container_manager.py:77

1
2
if redis_ip is None:
self.external_redis = False

TensorFlow Servingとの比較

TF Servingは、TFと密結合。 Clipper同様にスループット向上のための工夫が施されている。 バッチ化も行う。 ただし、基本的に1個のモデルで推論するようになっており、 アプリケーションからのフィードバックを受取り、 モデル選択するような仕組みは無い。

TF Servingと比べると、多少スループットが劣るものの健闘。

制約

Clipperはモデルをブラックボックスとして扱うので、 モデル内に踏み込んだ最適化は行わない。

類似技術

  • TF Serving
  • LASER
  • Velox

公式ドキュメント

アーキテクチャ

http://clipper.ai/images/clipper-structure.png に公式ドキュメント上の構成イメージが記載されている。 論文と比較して、モデル選択層の部分が「Clipper Query Processor」と表現されているように見える。

またClipperは、コンテナ群をオーケストレーションし、環境を構成する。

またClipperは、実質的に「ファンクション・サーバ」であると考えられる。 実のところ、ファンクションは機械学習モデルでなくてもよい。 (公式ドキュメントのクイックスタートでも、簡単な四則演算をデプロイする例が載っている)

モデル管理

モデルは、リストを入力し、値かJSONを出力する。 入力がリストなのは、機械学習モデルによってはデータ並列で処理し、性能向上を狙うものがあるため。

またClipperはモデル管理のためのライブラリを提供する。 これにより、ある程度決まったモデルであれば、コンテナの作成やモデルの保存などを 自前で作り込む必要がない。 現在は以下の内容に対応する。

  • One to deploy arbitrary Python functions (within some constraints)
  • One to deploy PySpark models along with pre- and post-processing logic
  • One to deploy R models

アプリケーションとモデルの間は、必ずしも1対1でなくてもよい。

参考) * http://clipper.ai/images/link_model.png * http://clipper.ai/images/update_model.png

以上の通り、モデルをデプロイしながら、複数のモデルをアプリケーションにリンクして切り替えることができる。 例えばデプロイした新しいモデルが想定通り動作しなかったとき、もとのモデルに切り戻す、など。

なお、モデルを登録するときには、いくつかの引数を渡す。

  • 入力型
  • レイテンシのSLO(サービスレベルオブジェクト)
  • デフォルトの出力

さらに、 ClipperConnection.set_num_replicas() を用いて、モデルのレプリカ数を決められる。

コンテナマネージャ

Container Managers によると、Dockerコンテナを自前のライブラリか、k8sでオーケストレーションすることが可能。 自前のライブラリは開発向けのようである。

なお、ステートを保存するRedisもコンテナでローンチするようになっているが、 勝手にローンチしてくれるものはステートを永続化するようになっていない。 したがってコンテナが落ちるとステートが消える。 プロダクションのケースでは、きちんと可用性を考慮した構成で予めローンチしておくことが推奨されている。

注意点として、2019/2/27時点でのClipperでは、クエリフロントエンドのオートスケールには対応していない。 これは、クエリフロントエンドとモデル抽象化層のコンテナの間を長命なコネクション(TCPコネクション)で 結んでいるからである。クエリフロントエンドをスケールアウトする時に、これをリバランスする機能が まだ存在していない。

APIドキュメント

クライアントのサンプル

example_client.py

コネクションの生成とエンドポイント定義

1
2
3
4
clipper_conn = ClipperConnection(DockerContainerManager())
clipper_conn.start_clipper()
python_deployer.create_endpoint(clipper_conn, "simple-example", "doubles",
feature_sum)

ここで渡しているファンクションfeature_sumは以下の通り。

1
2
def feature_sum(xs):
return [str(sum(x)) for x in xs]

推論

1
2
3
4
5
6
7
8
9
while True:
if batch_size > 1:
predict(
clipper_conn.get_query_addr(),
[list(np.random.random(200)) for i in range(batch_size)],
batch=True)
else:
predict(clipper_conn.get_query_addr(), np.random.random(200))
time.sleep(0.2)

predict関数の定義は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def predict(addr, x, batch=False):
url = "http://%s/simple-example/predict" % addr

if batch:
req_json = json.dumps({'input_batch': x})
else:
req_json = json.dumps({'input': list(x)})

headers = {'Content-type': 'application/json'}
start = datetime.now()
r = requests.post(url, headers=headers, data=req_json)
end = datetime.now()
latency = (end - start).total_seconds() * 1000.0
print("'%s', %f ms" % (r.text, latency))

与えられた入力をJSONに変換し、REST APIで渡している。

XGBOOSTの例

1
2
(clipper) $ pip install xgboost
(clipper) $ ipython

Clipperを起動。

1
2
3
4
import logging, xgboost as xgb, numpy as np
from clipper_admin import ClipperConnection, DockerContainerManager
cl = ClipperConnection(DockerContainerManager())
cl.start_clipper()

結果

1
2
19-02-27:22:49:09 INFO     [docker_container_manager.py:119] Starting managed Redis instance in Docker
19-02-27:22:49:14 INFO [clipper_admin.py:126] Clipper is running

アプリを登録。

1
cl.register_application('xgboost-test', 'integers', 'default_pred', 100000)

結果

1
19-02-27:22:49:55 INFO     [clipper_admin.py:201] Application xgboost-test was successfully registered

ファンクション定義

1
2
3
4
5
6
7
8
9
10
11
12
def get_test_point():
return [np.random.randint(255) for _ in range(784)]

# Create a training matrix.
dtrain = xgb.DMatrix(get_test_point(), label=[0])

# We then create parameters, watchlist, and specify the number of rounds
# This is code that we use to build our XGBoost Model, and your code may differ.
param = {'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'binary:logistic'}
watchlist = [(dtrain, 'train')]
num_round = 2
bst = xgb.train(param, dtrain, num_round, watchlist)

結果

1
2
[0]     train-error:0
[1] train-error:0

推論用の関数定義

1
2
def predict(xs):
return bst.predict(xgb.DMatrix(xs))

推論用のコンテンをビルドし、ローンチ。

1
2
3
4
5
6
from clipper_admin.deployers import python as python_deployer
# We specify which packages to install in the pkgs_to_install arg.
# For example, if we wanted to install xgboost and psycopg2, we would use
# pkgs_to_install = ['xgboost', 'psycopg2']
python_deployer.deploy_python_closure(cl, name='xgboost-model', version=1,
input_type="integers", func=predict, pkgs_to_install=['xgboost'])

なお、ここではコンテナをビルドするときに、xgboostをインストールするように指定している。

結果

1
2
3
4
5
6
7
8
9
19-02-27:22:54:35 INFO     [deployer_utils.py:44] Saving function to /tmp/clipper/tmpincj4sg2
19-02-27:22:54:35 INFO [deployer_utils.py:54] Serialized and supplied predict function
19-02-27:22:54:35 INFO [python.py:192] Python closure saved

(snip)

19-02-27:22:54:53 INFO [docker_container_manager.py:257] Found 0 replicas for xgboost-model:1. Adding 1
19-02-27:22:55:00 INFO [clipper_admin.py:635] Successfully registered model xgboost-model:1
19-02-27:22:55:00 INFO [clipper_admin.py:553] Done deploying model xgboost-model:1.

モデルをアプリにリンク。

1
cl.link_model_to_app('xgboost-test', 'xgboost-model')

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests, json
# Get Address
addr = cl.get_query_addr()
# Post Query
response = requests.post(
"http://%s/%s/predict" % (addr, 'xgboost-test'),
headers={"Content-type": "application/json"},
data=json.dumps({
'input': get_test_point()
}))
result = response.json()
if response.status_code == requests.codes.ok and result["default"]:
print('A default prediction was returned.')
elif response.status_code != requests.codes.ok:
print(result)
raise BenchmarkException(response.text)
else:
print('Prediction Returned:', result)

結果

1
Prediction Returned: {'query_id': 2, 'output': 0.3266071, 'default': False}

実装確認

開発言語

1
C++ 56.0%	 Python 24.1%	 CMake 8.1%	 Scala 3.0%	 Shell 2.5%	 Java 2.2%	 Other 4.1%

コンテナの起動の流れを確認してみる

ClipperConnection#start_clipperメソッドを確認する。

ContainerManager#start_clipperメソッドが中で呼ばれる。

clipper_admin/clipper_admin/clipper_admin.py:123

1
2
3
self.cm.start_clipper(query_frontend_image, mgmt_frontend_image,
frontend_exporter_image, cache_size,
num_frontend_replicas)

clipper_admin/clipper_admin/container_manager.py:63

1
2
3
4
5
class ContainerManager(object):
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def start_clipper(self, query_frontend_image, mgmt_frontend_image,

ContainerManagerは親クラスであり、KubernetesContainerManagerやDockerContainerManagerのstart_clipperが実行される。 例えばDockerContainerManagerを見てみる。

Docker SDKを使い、docker networkを作る。

clipper_admin/clipper_admin/docker/docker_container_manager.py:128

1
2
self.docker_client.networks.create(
self.docker_network, check_duplicate=True)

Redisを起動する。

clipper_admin/clipper_admin/docker/docker_container_manager.py:156

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if not self.external_redis:
self.logger.info("Starting managed Redis instance in Docker")
self.redis_port = find_unbound_port(self.redis_port)
redis_labels = self.common_labels.copy()
redis_labels[CLIPPER_DOCKER_PORT_LABELS['redis']] = str(
self.redis_port)
redis_container = self.docker_client.containers.run(
'redis:alpine',
"redis-server --port %s" % CLIPPER_INTERNAL_REDIS_PORT,
name="redis-{}".format(random.randint(
0, 100000)), # generate a random name
ports={
'%s/tcp' % CLIPPER_INTERNAL_REDIS_PORT: self.redis_port
},
labels=redis_labels,
**self.extra_container_kwargs)
self.redis_ip = redis_container.name

マネジメントフロントエンドを起動。

clipper_admin/clipper_admin/docker/docker_container_manager.py:168

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mgmt_cmd = "--redis_ip={redis_ip} --redis_port={redis_port}".format(
redis_ip=self.redis_ip, redis_port=CLIPPER_INTERNAL_REDIS_PORT)
self.clipper_management_port = find_unbound_port(
self.clipper_management_port)
mgmt_labels = self.common_labels.copy()
mgmt_labels[CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL] = ""
mgmt_labels[CLIPPER_DOCKER_PORT_LABELS['management']] = str(
self.clipper_management_port)
self.docker_client.containers.run(
mgmt_frontend_image,
mgmt_cmd,
name="mgmt_frontend-{}".format(random.randint(
0, 100000)), # generate a random name
ports={
'%s/tcp' % CLIPPER_INTERNAL_MANAGEMENT_PORT:
self.clipper_management_port
},
labels=mgmt_labels,
**self.extra_container_kwargs)

なお、コンテナ起動のコマンドに、上記で起動された(もしくは外部のサービスとして与えられた)Redisの IPアドレス、ポート等の情報が渡されていることがわかる。

クエリフロントエンドの起動。

1
2
3
4
5
6
7
8
9
10
11
self.docker_client.containers.run(
query_frontend_image,
query_cmd,
name=query_name,
ports={
'%s/tcp' % CLIPPER_INTERNAL_QUERY_PORT:
self.clipper_query_port,
'%s/tcp' % CLIPPER_INTERNAL_RPC_PORT: self.clipper_rpc_port
},
labels=query_labels,
**self.extra_container_kwargs)

その他、メトリクスのコンテナを起動する。

1
2
3
run_metric_image(self.docker_client, metric_labels,
self.prometheus_port, self.prom_config_path,
self.extra_container_kwargs)

コンテナを起動したあとは、ポートの情報を更新する。

create_endpointメソッドを確認

公式の例でも、create_endpointメソッドがよく用いられるので確認する。

メソッド引数は以下の通り。

clipper_admin/clipper_admin/deployers/python.py:16

1
2
3
4
5
6
7
8
9
10
11
12
13
def create_endpoint(clipper_conn,
name,
input_type,
func,
default_output="None",
version=1,
slo_micros=3000000,
labels=None,
registry=None,
base_image="default",
num_replicas=1,
batch_size=-1,
pkgs_to_install=None):

基本的には、ClipperConnectionインスタンス、関数名、入力データのタイプ、関数を指定しながら利用する。

メソッド内部の処理は以下のとおり。

clipper_admin/clipper_admin/deployers/python.py:87

1
2
3
4
5
6
7
clipper_conn.register_application(name, input_type, default_output,
slo_micros)
deploy_python_closure(clipper_conn, name, version, input_type, func,
base_image, labels, registry, num_replicas,
batch_size, pkgs_to_install)

clipper_conn.link_model_to_app(name, name)

最初にregister_applicationメソッドで、アプリケーションを Clipperに登録する。 なお、register_applicatoin メソッドは、Clipperの マネジメントフロントエンドに対してリクエストを送る。

マネジメントフロントエンドのURLの指定は以下の通り。

clipper_admin/clipper_admin/clipper_admin.py:201

1
2
url = "http://{host}/admin/add_app".format(
host=self.cm.get_admin_addr())

マネジメントフロントエンド自体は、 \src\management\src\management_frontend.hpp が自体と思われる。 以下の通り、エンドポイント add_app が定義されている。

src/management/src/management_frontend.hpp:52

1
const std::string ADD_APPLICATION = ADMIN_PATH + "/add_app$";

src/management/src/management_frontend.hpp:181

1
2
3
4
5
6
7
server_.add_endpoint(
ADD_APPLICATION, "POST",
[this](std::shared_ptr<HttpServer::Response> response,
std::shared_ptr<HttpServer::Request> request) {
try {
clipper::log_info(LOGGING_TAG_MANAGEMENT_FRONTEND,
"Add application POST request");

create_endopintの流れ確認に戻る。

アプリケーションの登録が完了したあとは、deploy_python_closure メソッドを使って、

引数は以下の通り。

clipper_admin/clipper_admin/deployers/python.py:96

1
2
3
4
5
6
7
8
9
10
11
def deploy_python_closure(clipper_conn,
name,
version,
input_type,
func,
base_image="default",
labels=None,
registry=None,
num_replicas=1,
batch_size=-1,
pkgs_to_install=None):

このメソッドでは、最初に save_python_function を使って 関数をシリアライズする。

clipper_admin/clipper_admin/deployers/python.py:189

1
serialization_dir = save_python_function(name, func)

つづいて、Pyhonのバージョンに従いながらベースとなるイメージを選択する。 以下にPython3.6の場合を載せる。

clipper_admin/clipper_admin/deployers/python.py:205

1
2
3
4
elif py_minor_version == (3, 6):
logger.info("Using Python 3.6 base image")
base_image = "{}/python36-closure-container:{}".format(
__registry__, __version__)

最後にClipperConnection#build_and_deploy_modelメソッドを使って 関数を含むコンテナイメージを作成する。

clipper_admin/clipper_admin/deployers/python.py:220

1
2
3
clipper_conn.build_and_deploy_model(
name, version, input_type, serialization_dir, base_image, labels,
registry, num_replicas, batch_size, pkgs_to_install)

build_and_deploy_model メソッドは以下の通り。

clipper_admin/clipper_admin/clipper_admin.py:352

1
2
3
4
5
6
if not self.connected:
raise UnconnectedException()
image = self.build_model(name, version, model_data_path, base_image,
container_registry, pkgs_to_install)
self.deploy_model(name, version, input_type, image, labels,
num_replicas, batch_size)

build_model メソッドでDockerイメージをビルドし、 レポジトリに登録する。

deploy_model メソッドで登録されたDockerイメージからコンテナを起動する。 このとき指定された個数だけコンテナを起動する。 なお、起動時には、抽象クラス ContainerManagerdeploy_model メソッドが呼ばれる。 実際には具象クラスであるKubernetesContainerManagerやDockerContainerManagerクラスのメソッドが実行される。 ここに抽象化が施されていると考えられる。

動作確認

クイックスタート

公式ウェブサイト のクイックスタートの通り、実行してみる。 上記サイトでAnaconda上で環境構成するのを推奨する記述があったためそれに従う。 また、Pythonバージョンに指定があったので、指定された中で最も新しい3.6にした。

1
2
3
4
5
$ sudo yum install gcc
$ conda create -n clipper python=3.6 python
$ conda activate clipper
$ conda install ipython
$ pip install clipper_admin

ipythonを起動する。

1
(clipper) $ ipython

Docker環境を立てる。

1
2
3
from clipper_admin import ClipperConnection, DockerContainerManager
clipper_conn = ClipperConnection(DockerContainerManager())
clipper_conn.start_clipper()

結果の例

1
2
19-02-26:22:24:42 INFO     [docker_container_manager.py:119] Starting managed Redis instance in Docker
19-02-26:22:26:50 INFO [clipper_admin.py:126] Clipper is running

簡単な例を登録。これにより、エンドポイントが有効になるようだ。 実際の処理の登録は後ほど。

1
clipper_conn.register_application(name="hello-world", input_type="doubles", default_output="-1.0", slo_micros=100000)

登録処理は以下の通り。

1
2
def feature_sum(xs):
return [str(sum(x)) for x in xs]

デプロイ。

1
2
from clipper_admin.deployers import python as python_deployer
python_deployer.deploy_python_closure(clipper_conn, name="sum-model", version=1, input_type="doubles", func=feature_sum)

ここから、モデル抽象化層のDockerイメージが作られる。

1
2
3
4
5
6
7
8
9
10
11
19-02-26:22:30:59 INFO     [deployer_utils.py:44] Saving function to /tmp/clipper/tmp67eliqhx
19-02-26:22:30:59 INFO [deployer_utils.py:54] Serialized and supplied predict function
19-02-26:22:30:59 INFO [python.py:192] Python closure saved
19-02-26:22:30:59 INFO [python.py:206] Using Python 3.6 base image

(snip)

19-02-26:22:31:26 INFO [docker_container_manager.py:257] Found 0 replicas for sum-model:1. Adding 1
19-02-26:22:31:33 INFO [clipper_admin.py:635] Successfully registered model sum-model:1
19-02-26:22:31:33 INFO [clipper_admin.py:553] Done deploying model sum-model:1.
19-02-26:22:30:59 INFO [clipper_admin.py:452] Building model Docker image with model data from /tmp/clipper/tmp67eliqhx

モデルをアプリケーションにリンクさせる。

1
clipper_conn.link_model_to_app(app_name="hello-world", model_name="sum-model")

以上で、エンドポイントhttp://localhost:1337/hello-world/predictを用いて、 推論結果(計算結果)を受け取れるようになる。

curlで結果の取得

1
$ curl -X POST --header "Content-Type:application/json" -d '{"input": [1.1, 2.2, 3.3]}' 127.0.0.1:1337/hello-world/predict

結果の例

1
{"query_id":0,"output":6.6,"default":false}

Pythonから取得するパターン。

1
2
3
import requests, json, numpy as np
headers = {"Content-type": "application/json"}
requests.post("http://localhost:1337/hello-world/predict", headers=headers, data=json.dumps({"input": list(np.random.random(10))})).json()

結果の例

1
Out[12]: {'query_id': 1, 'output': 4.710181343957851, 'default': False}

参考までに、この時点で実行されているDockerコンテナは以下の通り。

1
2
3
4
5
6
7
8
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b041228b2a4d sum-model:1 "/container/containe…" 25 minutes ago Up 25 minutes (healthy) sum-model_1-37382
6f93447357bd prom/prometheus:v2.1.0 "/bin/prometheus --c…" 30 minutes ago Up 30 minutes 0.0.0.0:9090->9090/tcp metric_frontend-37801
b3034a7352b8 clipper/frontend-exporter:0.3.0 "python /usr/src/app…" 30 minutes ago Up 30 minutes query_frontend_exporter-33443
6b775a49e1ff clipper/query_frontend:0.3.0 "/clipper/release/sr…" 30 minutes ago Up 30 minutes 0.0.0.0:1337->1337/tcp, 0.0.0.0:7000->7000/tcp query_frontend-33443
2c75406fda84 clipper/management_frontend:0.3.0 "/clipper/release/sr…" 30 minutes ago Up 30 minutes 0.0.0.0:1338->1338/tcp mgmt_frontend-39690
ff14d91f313e redis:alpine "docker-entrypoint.s…" 32 minutes ago Up 32 minutes 0.0.0.0:6379->6379/tcp redis-28775

最後にコンテナを停止しておく。

1
clipper_conn.stop_all()

画像取扱のサンプル

image_query の通りに実行してみる。 また、 上記のノートブックが image_query/example.ipynb にある。

Clipperでは、REST APIでクエリが渡される。データはJSONにラップされて渡される。 そのため、ノートブック image_query/example.ipynb を見ると、 画像ファイルがバイト列で渡される場合と、BASE64でエンコードされて渡される場合の2種類の例が載っていた。

この例では、画像のサイズを返すような関数を定義して使っているが、 渡された画像に何らかの判定処理を加えて戻り値を返すような関数を定義すればよいだろう。

また、例では、python_deployer#create_endpointメソッドが用いられている。

共有

Tellus

参考

  • https://www.tellusxdp.com/ja/

メモ

1
2
Tellusとは
Tellus(テルース)」は、政府衛星データを利用した新たなビジネスマーケットプレイスを創出することを目的とした、日本初のオープン&フリーな衛星データプラットフォームです。複数のデータをかけ合わせ、新たなビジネス創出を促進するためのあらゆるファンクションを提供します。
共有

PostgreSQLにおけるfsyncに関するバグフィックス

参考

メモ

基本的に 澤田さんによる説明 がわかりやすいのでそれを参照されたし。 より原典を当たる意では、FOSDEM'19の講演 PostgreSQL vs. fsync How is it possible that PostgreSQL used fsync incorrectly for 20 years, and what we'll do about it. を参照されたし。

fsyncに期待することの認識誤りのため発生していたバグとのこと。

共有

デブサミでの澤田さん講演メモ

カンファレンスメモ

日頃の過ごし方

  • NTTOSSセンタにて、業務としてOSS活動
    • 問題解決を水から実施することで、プロダクトの改善と組織の技術力向上の両方に貢献
  • PostgreSQLについて説明
  • PostgreSQLへの貢献活動は、問題を見つけて解決する流れもあれば、研究開発の一環として、 大きめの機能を追加していくこともある。
    • 透過的な暗号化を入れる、など

やりがいについて

  • 一番は、自分の書いたコードが本体にマージされると嬉しい、というのが 単純だけど大事
  • 他にも、社外での登壇。製品の開発どうこうに詳しくなる。英語が少し上手になった。も。
  • 注意していること
    • 英語は辛い。評価してもらえる仕組みが必要。成果がでる前に時間がかかることがある。会社上司の理解が必要。
  • PostgreSQLは問題管理(?)、開発管理もメールベースで実施する。(実はGit等を使っていない)
  • 秘蔵のパッチ、眠っていませんか?
  • 人生最初のパッチは、pgbenchに関する困りごとの解消
    • ベンチマークで流すSQLが最大1024文字だったのを改善。
  • PostgreSQLの大きい機能を入れて、それのメンテナとしてコミッターになることが多い。
    • 早い人手も3年単位。10年くらい活動してからなった人もいる。
  • 貢献の方法を公開しているので気になる人はぜひ見てください。
共有

Ansibleでコマンド実行結果のJSONを辞書型に変換して用いる

参考

メモ

Ansible で task の実行結果の json を dict オブジェクトとして後続の処理で利用する に記載の内容で問題ない。 自分の場合は、AnsibleでWindows PowerShellの実行結果を受け取るときに、オブジェクトをJSONに変換し、 それを辞書型に変換した上で後々when構文で使いたかった。

例)

1
2
3
4
5
6
7
8
9
10
- name: check_state_of_wsl
win_shell: Get-WindowsOptionalFeature -Online | ? FeatureName -Match "Microsoft-Windows-Subsystem-Linux" | ConvertTo-Json
register: wsl_check_json

- set_fact:
wsl_check: "{{ wsl_check_json.stdout }}"

- name: enable_wsl
win_shell: Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Windows-Subsystem-Linux -NoRestart
when: wsl_check.State != 2
共有

Ansible for Windows

参考

メモ

WindowsをAnsibleで管理する方法を説明するブログで基本的に問題ない。

最初の環境のはじめ方

Ansibleのインストール、実行環境

Windows10 に ゼロから Ansible をインストールする(Ansible for Windows) ではDocker for Windowsで環境構築をしているが、 WSLを使う方法はどうだろうか?

パッケージ管理

chocolateyを利用すればよいだろう。

注意点

Ansible環境

自身の環境では、UbuntuのレポジトリからインストールしたAnsibleを使用していたが、 pywinrmを有効にする手間がかかりそうだったので、condaで環境を作った。

1
2
3
$ conda create -n ansible python
$ conda activate ansible
$ pip install ansible pywinrm

とした。

Windows環境での実行ユーザ

MSアカウントに対しての実行が手間取りそうだったので、 ここではAdministratorで実験した。

Ansibleのインベントリは以下のようなものを用意した。

1
2
3
4
5
6
7
8
9
[win]
<対象となるWin環境>

[win:vars]
ansible_ssh_user=Administrator
ansible_ssh_port=5986
ansible_connection=winrm
ansible_winrm_transport=ntlm
ansible_winrm_server_cert_validation=ignore

Pingコマンドは以下の通り。 (パスワードを聞かれるので予め設定したAdministratorのパスワードを入れる)

1
$ ansible -i hosts.win -m win_ping win -k

パッケージインストールの動作確認は以下の通り。

1
2
$ ansible -i hosts.win -m win_chocolatey -a "name=googlechrome state=present" win -k
$ ansible -i hosts.win -m win_chocolatey -a "name=vim state=present" win -k

シェルの実行

AnsibleでWindowsのシェル実行 によると、win_shellwin_commandscriptモジュールを使うと良いようだ。

WSL導入

WSL導入手順の説明のブログ の通り、PowerShellをAnsibleのwin_shellモジュール使ってインストールする。

共有

Machine Learning Model Management Tools

参考

Strata

studio.ml

ModelDB

MLflow

Clipper

Azure Machine Learning Studio

Hydrosphere Serving

Hydrosphere.io のページを参照。

studio.ml

Pythonベースの機械学習モデル管理のフレームワーク。 対応しているのは、Keras、TensorFlow、scikit-learnあたり。

注意事項

studio.mlの公式GitHubのREADMEを読むと「Authentication」の項目があり、 studio.mlに何らかの手段で認証しないといけないようである。 ゲストモードがあるようだが極力アカウント情報を渡さずに使おうとしたが うまく動作しなかった。

インストールと動作確認

1
2
3
$ /opt/Anaconda/default/bin/conda create -n studioml python
$ conda activate stduioml
$ pip install studioml

以下のようなエラーが生じた。

1
awscli 1.16.101 has requirement rsa<=3.5.0,>=3.1.2, but you'll have rsa 4.0 which is incompatible.
1
2
3
4
$ cd ~/Sources
$ git clone https://github.com/studioml/studio.git
$ cd studio
$ cd examples/keras/

ここでGitHub上のREADMEに記載のとおり、~/.studioml/config.yamlのdatabaseセクションに guest: trueを追加した。

いったんUIを起動してみる。

1
$ studio ui

以下のようなエラーが出た。

1
2
3
  File "/home/******/.conda/envs/studioml/lib/python3.7/site-packages/flask/app.py", line 1813, in full_dispatch_request    rv = self.dispatch_request()  File "/home/******/.conda/envs/studioml/lib/python3.7/site-packages/flask/app.py", line 1799, in dispatch_request    return self.view_functions[rule.endpoint](**req.view_args)  File "/home/******/.conda/envs/studioml/lib/python3.7/site-packages/studio/apiserver.py", line 37, in dashboard    return _render('dashboard.html')  File "/home/******/.conda/envs/studioml/lib/python3.7/site-packages/studio/apiserver.py", line 518, in _render    auth = get_auth(get_auth_config())  File "/home/******/.conda/envs/studioml/lib/python3.7/site-packages/studio/apiserver.py", line 511, in get_auth_config    return get_config()['server']['authentication']

KeyError: 'server'

参考: studio.ml Issue-330

設定を以下のように変えてみた。

1
2
3
4
5
6
7
8
9
10
11
12
--- /home/dobachi/.studioml/config.yaml.2019021001      2019-02-10 01:04:10.751282800 +0900
+++ /home/dobachi/.studioml/config.yaml 2019-02-10 01:17:20.178761700 +0900
@@ -2,6 +2,10 @@
type: http serverUrl: https://zoo.studio.ml
authentication: github
+ guest: true
+
+server:
+ authentication: None

storage:
type: gcloud

ブラウザでhttp://localhost:5000にアクセスした所、開けたような気がしたが、 GitHubの404ページが表示された。

コンソール上のエラーは以下のとおり。

1
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='zoo.studio.ml', port=443): Max retries exceeded with url: /api/get_user_experiments (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7ff5a64a3da0>: Failed to establisha new connection: [Errno -2] Name or service not known'))

ModelDB

注意点

ModelDBの公式GitHubを見ると、20190211時点で最近更新されていないようだ。(最新が6ヶ月前の更新)

概要

モデルの情報を構造的に管理するための仕組み。 「ModelDB's Light API」を通じて、モデルのメトリクスとメタデータを管理できるようになる。 spark.mlとscikit-learnはネイティブクライアントを利用する。 ★要確認

ModelDBの公式GitHubのREADMEにわかりやすいフロントエンドの紹介がある。

Light APIの例

ModelDB BasicWorkflow.pyがワークフローの基本が分かる例。 基本は、 Syncerを使ってモデルの情報を登録する。 この例では、

  • データセット
  • モデル
  • モデルのコンフィグ
  • モデルのメトリクス

をModelDBに登録する。

ModelDB BasicSyncAll.pyがYAMLで登録する例。

Native API利用の例

ModelDB Clientの例modeldb.sklearn_native.SyncableMetricsの利用例が記載されている。

ウェブUIの例

ModelDB ウェブUIの例に例が載っている。

MLflow

概要

MLflowのO'Reilly記事に動機と機能概要が書かれている。 また動画でウェブUIの使い方が示されている。

動機

  • 無数のツールを組み合わせるのが難しい
  • 実験をトレースするのが難しい
  • 結果の再現性担保が難しい
  • デプロイが面倒

機能

  • トラッキング
    • パラメータ、コードバージョン、メトリクス、アウトプットファイルなどを管理する
  • プロジェクト
    • コードをパッケージングし、結果の再現性を担保する。またユーザ間でプロジェクトを流通できる
    • conda形式で依存関係を記述可能であり、実行コマンドも定義可能
    • もしMLflowの機能でトラッキングしていれば、バージョンやパラメータをトラックする
  • モデル
    • モデルをパッケージングし、デプロイできるようにする

Clipper

Strata NY 2018でのClipper が講演のようだが、スライドが公開されていない。

Clipper: A Low-Latency Online Prediction Serving System

Clipperの論文によると、「A Low-Latency Online Prediction Serving System」と定義されている。 上記論文の図1を見るかぎり、モデルサービングを担い、複数の機械学習モデルをまとめ上げる「中層化」層の 役割も担うようだ。 また、モデル選択の機能、キャッシングなどの機能も含まれているようである。

これまでの研究では、学習フェーズに焦点が当たることが多かったが、 この研究では推論フェーズに焦点を当てている。

アーキ概要

  • モデル抽象化層
  • モデル選択層

論文Figure 1参照。

Azure Machine Learning Studio

Azure Machine Learning Studioには、「完全に管理されたクラウド サービスで、ユーザーは簡単に予測分析ソリューションを構築、デプロイ、共有できます。」と 記載されている。

Hydrosphere Serving

Hydrosphere.io のページを参照。

共有