Kafkaコンソールプロデューサを起点とした確認

参考

メモ

2019/6/23時点でのtrunkで再確認した内容。 ほぼ昔のメモのままコピペ・・・。

ConsoleProducer

エントリポイントとして、Kafkaのコンソールプロデューサを選ぶ。

bin/kafka-console-producer.sh:20

1
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

コンソールプロデューサの実態は、KafkaProducerである。

kafka/tools/ConsoleProducer.scala:45

1
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

なお、 ConsoleProducerにはsendメソッドが定義されており、 以下のように標準入力からデータを読み出してはメッセージを送信する、を繰り返す。

kafka/tools/ConsoleProducer.scala:54

1
2
3
4
5
do {
record = reader.readMessage()
if (record != null)
send(producer, record, config.sync)
} while (record != null)

kafka/tools/ConsoleProducer.scala:70

1
2
3
4
5
6
7
private def send(producer: KafkaProducer[Array[Byte], Array[Byte]],
record: ProducerRecord[Array[Byte], Array[Byte]], sync: Boolean): Unit = {
if (sync)
producer.send(record).get()
else
producer.send(record, new ErrorLoggingCallback(record.topic, record.key, record.value, false))
}

KafkaProducer

それでは実態であるKafkaProducerを確認する。

コンストラクタを確認する

まずクライアントのIDが定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:332

1
2
3
4
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;

続いてトランザクションIDが定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:337

1
2
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;

Kafkaのトランザクションについては、 Confluentのトランザクションに関する説明 を参照。

その後、ログ設定、メトリクス設定、キー・バリューのシリアライザ設定。 その後、各種設定値を定義する。

特徴的なところとしては、メッセージを送信するときにメッセージをいったんまとめこむ accumulator も このときに定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:396

1
2
3
4
5
6
7
8
9
10
11
12
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

他には Sendor も挙げられる。 例えば、上記で示したaccumulatorも、Sendorのコンストラクタに渡され、内部で利用される。

org/apache/kafka/clients/producer/KafkaProducer.java:422

1
this.sender = newSender(logContext, kafkaClient, this.metadata);

org/apache/kafka/clients/producer/KafkaProducer.java:463

1
2
3
4
5
6
7
8
9
10
11
12
13
14
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);

その他にはトランザクション・マネージャなども。

共有

GradleプロジェクトをIntellijでインポート

参考

メモ

昔は gradlew idea とやってIdea用のプロジェクトファイルを生成してから インポートしていたけれども、最近のIntellijは「ディレクトリを指定」してインポートすると、 Gradleプロジェクトを検知して、ウィザードを開いてくれる。

例えばApache KafkaなどのGradleプロジェクトをインポートするときには、 メニューからインポートを選択し、ディレクトリをインポートすると良い。

共有

Kafkaコンソールコンシューマを起点とした確認

参考

メモ

2019/6/23時点でのtrunkで再確認した内容。 ほぼ昔のメモのままコピペ・・・。

ConsoleConsumer

コンシューマの実装を確認するにあたっては、コンソール・コンシューマが良いのではないか。

bin/kafka-console-consumer.sh:21

1
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

から、 kafka.tools.ConsoleConsumer クラスあたりを確認すると良いだろう。

mainを確認すると、ConsumerConfigのインスタンスを生成し、runメソッドに渡すことがわかる。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:54

1
2
3
val conf = new ConsumerConfig(args)
try {
run(conf)

ここで、ConsumerConfigはコンソールからパラメータを受け取るためのクラス。 実態はrunメソッドである。

まず大事な箇所として、 KafkaConsumer クラスのインスタンスを生成している箇所がある。 ★

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:67

1
val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer)

この KafkaConsumer クラスがコンシューマの実態である。 なお、コンソール・コンシューマでは、これをラップした便利クラス ConsumerWrapper が定義されており、 そちらを通じて主に使う。

またシャットダウンフックを定義している。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:75

1
addShutdownHook(consumerWrapper, conf)

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:87

1
2
3
4
5
6
7
8
9
10
11
12
13
def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig) {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
consumer.wakeup()

shutdownLatch.await()

if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}

これはグレースフルにシャットダウンするには大切。 ★

またデータ処理の実態は、以下の process メソッドである。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:77

1
try process(conf.maxMessages, conf.formatter, consumerWrapper, System.out, conf.skipMessageOnError)

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:101

1
2
3
4
def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream,
skipMessageOnError: Boolean) {

(snip)

処理するメッセージ最大数、出力ストリームに渡す形を整形するメッセージフォーマッタ、コンシューマのラッパ、出力ストリーム、 エラーのときに呼び飛ばすかどうかのフラグが渡される。

少し処理の中身を確認する。 まず以下のようにメッセージを取得する。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:104

1
2
3
4
    val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try {
consumer.receive()

(snip)

上記の receive は、先程の通り、ラッパーのreceiveメソッドである。

receiveメソッドは以下のようにコンシューマのポール機能を使い、 メッセージを取得する。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:437

1
2
3
4
5
6
7
8
9
def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (!recordIter.hasNext) {
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
if (!recordIter.hasNext)
throw new TimeoutException()
}

recordIter.next
}

メッセージを取得するためのイテレータが得られるので、nextメソッドで取得する。 →イテレータのnextを使って1件ずつ取り出していることがわかる。 ★

続いて、取り出されたメッセージを ConsumerRecord 化してフォーマッタに渡す。   core/src/main/scala/kafka/tools/ConsoleConsumer.scala:118

1
2
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)

これにより出力が確定する。

KafkaConsumer

つづいて、KafkaConsumerクラスを確認する。

トピックアサイン

ここではトピックへの登録を確認する。 ★

コンシューマグループを使って自動で行うアサインメントを使用する場合。リバランスも行われる。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:936

1
2
3
  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {

(snip)

上記はトピックのコレクションを渡すメソッドだが、パターンを指定する方法もある。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:1008

1
2
3
  public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {

(snip)

なお、何らかの理由(詳しくはJavadoc参照)により、リバランスが必要になったら、 第2引数に渡されているリスナがまず呼び出される。

コンシューマ・グループを利用せず、手動で行うアサインメントの場合は以下の通り。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:1083

1
public void assign(Collection<TopicPartition> partitions) {

いずれの場合も内部的には SubscriptionState が管理のために用いられる。 タイプは以下の通り。 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:72

1
2
3
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}

またコンシューマの管理を行う ConsumerCoordinator には、 updatePatternSubscription メソッドがある。

org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:192

1
2
3
4
5
6
7
public void updatePatternSubscription(Cluster cluster) {
final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe))
metadata.requestUpdateForNewTopics();
}

matchesSubscribedPattern を用いて、現在クラスタが抱えているトピックの中から、 サブスクライブ対象のトピックをフィルタして取得し、SubscriptionState#subscribeFromPattern メソッドを呼ぶ。 これにより、当該コンシューマの購読するトピックが更新される。 この更新はいくつかのタイミングで発動するが、例えば KafkaConsumer#poll(java.time.Duration) の 中で呼ばれる updateAssignmentMetadataIfNeeded メソッドを通じて呼び出される。

コンシューマ故障の検知

基本的にはハートビート( heartbeat.interval.ms で設定)が session.timeout.ms を 超えて届かなくなると、故障したとみなされる。 その結果、当該クライアント(この場合はコンシューマ)がグループから外され、 リバランスが起きる。

なお、ハートビートはコーディネータに対して送られる。 コーディネータは以下のように定義されている。 org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:128

1
private Node coordinator = null;

org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:692

1
2
3
4
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());

AbstractCoordinator#sendFindCoordinatorRequest メソッドが呼ばれる契機は複数あるが、 例えば、コンシューマがポールするときなどにコーディネータが更新される。

なお、コーディネータにクライアントを登録する際、 セッションタイムアウトの値も渡され、対応される。 予め定められた group.min.session.timeout.ms や group.max.session.timeout.ms を満たす セッションタイムアウトが用いられる。

セッションタイムアウトの値は、例えば以下のように設定される。

kafka/coordinator/group/GroupCoordinator.scala:146

1
doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)

この値は、最終的に MemberMetadata に渡されて用いられる。 例えばハートビートのデッドラインの計算などに用いられることになる。

kafka/coordinator/group/MemberMetadata.scala:56

1
2
3
4
5
6
7
8
9
private[group] class MemberMetadata(var memberId: String,
val groupId: String,
val groupInstanceId: Option[String],
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
val sessionTimeoutMs: Int,
val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {

共有

The 8 Requirements of Real-Time Stream Processing

参照

メモ

MillWheel でも触れられているストリーム処理について書かれた論文。 2005年。 時代は古いが当時の考察は現在でも有用と考える。

1. Introduction

ウォール街のデータ処理量は大きい。 2005年時点で122,000msg/sec。年率2倍で増大。 しかも処理レイテンシが秒では遅い。 金融に限らず異常検知、センサーデータ活用などのユースケースでは同様の課題感だろう。

「メインメモリDBMS」、「ルールベースエンジン」などがこの手のユースケースのために再注目されたが、 その後いわゆる「ストリーム処理エンジン」も登場。

2. Eight rules for stream processing

Rule 1: Keep the Data Moving

ストレージに書き込むなどしてレイテンシを悪化させてはならない。

ポーリングもレイテンシを悪化させる。 そこでイベント駆動のアプローチを用いることがよく考えられる。

Rule 2: Query using SQL on Streams (StreamSQL)

ハイレベルAPIを用いることで、開発サイクルを加速し、保守性を上げる。

StreamSQL:ストリーム処理固有の機能を有するSQL リッチなウィンドウ関数、マージなど。

Rule 3: Handle Stream Imperfections (Delayed, Missing and Out-of-Order Data)

例えば複数のストリームからなるデータを処理する際、 あるストリームからのデータが到着しないとき、タイムアウトして結果を出す必要がある。

同じようにout-of-orderデータについてもウィンドウに応じてタイムアウトしないといけない。

Rule 4: Generate Predictable Outcomes

結果がdeterministicであり、再現性があること。

計算そのものだけではなく、対故障性の観点でも重要。

Rule 5: Integrate Stored and Streaming Data

過去データに基づき、何らかのパターンなどを見つける処理はよくある。 そのためステート管理の仕組み(データストア)は重要。

例えば金融のトレーディングで、あるパターンを見つける、など。 他にも異常検知も。

過去データを使いながら、実時間のデータにキャッチアップするケースもあるだろう。

このようにヒストリカルデータとライブデータをシームレスにつなぐ機能は重要。

Rule 6: Guarantee Data Safety and Availability

HAは悩ましい課題。 故障が発生してフェールオーバするとして、バックアップハードウェアを 立ち上げて処理可能な状態にするような待ち時間は許容できない。

そこでタンデム構成を取ることは現実的にありえる。

Rule 7: Partition and Scale Applications Automatically

ローレベルの実装を経ずに、分散処理できること。 またマルチスレッド処理可能なこと。

Rule 8: Process and Respond Instantaneously

数万メッセージ/secの量を処理する。 さらにレイテンシはマイクロ秒オーダから、ミリ秒オーダ。

そのためには極力コンポーネント間をまたぐ処理をしないこと。

3. SOFTWARE TECHNOLOGIES for STREAM PROCESSING

基本的なアーキテクチャは、DBMS、ルールベースエンジン、ストリーム処理エンジン。

DBMSはデータをストアしてから処理する。データを動かしつづけるわけではない。

またSPEはSQLスタイルでのストリームデータの処理を可能とする。

SPEとルールベースエンジンは、ブロックおよびタイムアウトを実現しやすい。 DBMSはそのあたりはアプリケーションに委ねられる。 仮にトリガを用いたとしても…。

いずれのアーキテクチャにおいても、予測可能な結果を得るためには、 deterministicな処理を実現しないといけない(例えばタイムスタンプの並びを利用、など) SPEとルールベースエンジンはそれを実現しやすいが、DBMSのACID特性は あくまでトラディショナルなデータベースのためのもであり、ストリームデータ処理のための ものではない。

状態を保存するデータストアとしてDBMSを見たとき、サーバ・クライアントモデルのDBMSは 主にレイテンシの面で不十分である。 唯一受け入れられるとしたら、アプリケーション内に埋め込めるDBMSの場合である。 一方ルールベースエンジンはストリームデータを扱うのは得意だが、 大規模な状態管理と状態に対する柔軟なクエリが苦手である。

3.3 Tabular results

このあたりにまとめ表が載っている。

共有

MillWheel Paper

参考

メモ

昔のメモをここに転記。

1. Introduction

故障耐性、ステート永続化、スケーラビリティが重要。 Googleでは多数の分散処理システムを運用しているので、何かしらおかしなことが置き続けている。

MillWheelはMapReduceと同様にフレームワーク。ただし、ストリーム処理やローレイテンシでの処理のため。

Spark StreamingとSonoraは、チェックポイント機能持つが、実装のためのオペレータが限られている。 S4は故障耐性が不十分。 Stormはexactly onceで動作するが、Tridentは厳密な順序保証が必要。

MapReduce的な考えをストリーム処理に持ち込んでも、妥協した柔軟性しか得られていない。 ストリーミングSQLは簡潔な手段をもたらしているが、ステートの抽象化、複雑なアプリケーションの実装しやすさという意味では、 オペレーショナルなフローに基づくアプローチのほうが有利。

  • 分散処理の専門家でなくても複雑なストリーム処理を実装できること
  • 実行可能性

2. Motivation and requirements

GoogleのZeitgeistを例にした動機の説明。

  • 永続化のためのストレージ
  • Low Watermark(遅れデータへの対応)
  • 重複への対応(Exactly Onceの実現)

3. System overview

Zeitgeistの例で言えば、検索クエリが入力データであり、データ流量がスパイクしたり、凹んだりしたことが出力データ。

データ形式:key、value、timestamp

4. Core concept

キーごとに計算され、並列処理化される。 また、キーがアグリゲーションや比較の軸になる。 Zeitgeistの例では、例えば検索クエリそのものをキーとすることが考えられる。 また、キー抽出のための関数を定義して用いる。

ストリーム:データの流れ。 コンピューテーションは複数のストリームを出力することもある。

ウィンドウ修正するときなどに用いられるステートを保持する。 そのための抽象化の仕組みがある。 キーごとのステート。

Low Watermarkの仕組みがあり、Wall timeが進むごとにWatermarkが進む。 時間経過とともにWatermarkを越したデータに対し、計算が行われる。

タイマー機能あり。 Low Watermarkもタイマーでキックされる、と考えて良い。

5. API

計算APIは、キーごとのステートをフェッチ、加工し、必要に応じてレコード生成し、タイマーを セットする。

Injector:MillWheelに外部データを入力する。 injector low watermarkも生成可能。

なお、low watermarkを逸脱するデータが生じた場合は、 ユーザアプリでは捨てるか、それとも現在の集計に組み込むか決められる。

6. Fault tolerance

到達保証の考え方としては、ユーザアプリで冪等性を考慮しなくて良いようにする、という点が挙げられる。

基本的にAckがなければデータが再送される設計に基づいているが、 受信者がAckを返す直前に何らかの理由で故障した場合、データが重複して処理される可能性がある。 そこでユニークIDを付与し、重複デーかどうかを確かめられるようにする。 判定にはブルームフィルタも利用する。 ID管理の仕組みにはガベージコレクションの仕組みもある。

チェックポイントの仕組みもある。 バックエンドストレージにはBigtableなどを想定。

なお、性能を重視しチェックポイントを無効化することもできるが、 そうすると何らかの故障が生じて、データ送信元へのAckが遅れることがある。 パイプラインが並列化すると、それだけシステム内のどこかで故障が生じる可能性は上がる。 そこで、滞留するデータについては部分的にチェックポイントすることで、 計算コストとエンドツーエンドレイテンシのバランスを保つ。 ★

永続化されたステートの一貫性を保つため、アトミックなオペレーションでラップする。 ただし、マシンのフェールオーバ時などにゾンビ化したWriterが存在する可能性を考慮し、 シークエンサートークンを用いるようにする。 つまり、新たに現れたWriterは古いシークエンサートークンを無効化してから、 動作を開始するようにしている。

7. System Implementation

MillWheelはステート管理のためにデータストア(BigTableなど)を使う。 故障発生時にはデータストアからメモリ情報を再現する。

low watermarkのジャーナルはデータストアに保存される。

感想:このあたりデータストアの性能は、最終的なパフォーマンスに大きく影響しそうだ。 ★

8. Evaluation

単純なシャッフル処理では、レイテンシの中央値は数ミリ秒。95パーセンタイルで30ミリ秒。 Exactly onceなどを有効化すると中央値は33.7ミリ秒。95パーセンタイルで93.8ミリ秒。

CPU数をスケールアウトしても、レイテンシに著しい劣化は見られない。(逆に言うと、99パーセンタイルではある程度の劣化が見られる)

low watermarkは処理のステージをまたぐと、実時間に対してラグが生じる。 このあたりは活発に改善を進められているところ。

ステート管理などにストレージ(BitTableなど)を使う。 これによりリード・ライトがたくさん発生する。 ワーカにおけるキャッシュは有効。

実際のユースケースは、広告関係。 そのほか、Google Street Viewでのパノラマ画像の生成など。 ★

9. Related work

ストリーム処理システムが必要とするものは、以下の論文に記載されている。 The 8 Requirements of Real-Time Stream Processing

Spark Streamingに対しては、MillWheelの方がgeneralであると主張。 RDDへの依存性がユーザに制約をもたらす、とも。 またチェックポイントの間隔はMillWheelの方が粒度が細かい。

共有

System monitor on Ubuntu18

参考

メモ

Ubuntu 18.04 LTSをインストールした直後に行う設定 & インストールするソフト に記載されているとおり、 GNOME Shell ExtensionsからSystem-monitorをインストールすると、タスクバー上にシステムモニターを表示できる。

共有

Alt Tab on Ubuntu18

参考

メモ

Ubuntu 18.04 LTSをインストールした直後に行う設定 & インストールするソフト に諸々記載されている。 設定から変えられる。

共有

Docker DesktopをWSLから利用している場合のマウントについて

参考

メモ

Docker for WindowsをWSLから使う時のVolumeの扱い方 に起債されている通りだが、 うっかり -v /tmp/hoge:/hoge とかやると、Docker Desktopで使用しているDocker起動用の仮想マシン中の /tmp/hoge をマウントすることになる。

共有

MLflow

参考

メモ

クイックスタートを試す

公式クイックスタート の通り、簡単な例を試す。 手元のAnaconda環境で仮想環境を構築し、実行する。

1
2
3
$ /opt/Anaconda/default/bin/conda create -n mlflow ipython jupyter
$ source activate mlflow
$ pip install mlflow

サンプルコードを実装。実験のために、もともと載っていた内容を修正。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ mkdir -p Sources/mlflow_quickstart
$ cd Sources/mlflow_quickstart
$ cat << EOF > quickstart.py
> import os
> from mlflow import log_metric, log_param, log_artifact
>
> if __name__ == "__main__":
> # Log a parameter (key-value pair)
> log_param("param1", 5)
>
> # Log a metric; metrics can be updated throughout the run
> log_metric("foo", 1)
> log_metric("foo", 2)
> log_metric("foo", 3)
>
> # Log an artifact (output file)
> with open("output.txt", "w") as f:
> f.write("Hello world! 1\n")
> with open("output.txt", "w") as f:
> f.write("Hello world! 2\n")
> log_artifact("output.txt")
> with open("output.txt", "w") as f:
> f.write("Hello world! 3\n")
> log_artifact("output.txt")
> EOF

以下のようなファイルが生成される。

1
2
$ ls
mlruns output.txt quickstart.py

つづいてUIを試す。

1
$ mlflow ui

ブラウザで、 http://localhost:5000/ にアクセスするとウェブUIを見られる。 メトリクスは複数回記録すると履歴となって時系列データとして見えるようだ。 一方、アーティファクトは複数回出力しても1回分(最後の1回?)分しか記録されない?

なお、複数回実行すると、時系列データとして登録される。 試行錯誤の履歴が残るようになる。

クイックスタートのプロジェクト

公式クイックスタート には、MLflowプロジェクトとして取り回す方法の例として、 GitHubに登録されたサンプルプロジェクトをロードして実行する例が載っている。

1
$ mlflow run https://github.com/mlflow/mlflow-example.git -P alpha=5

GitHubからロードされたプロジェクトが実行される。 なお、ローカルファイルシステムのmlrunsには今回実行したときの履歴が保存される。

チュートリアルの確認

公式チュートリアル を見ながら考える。 examples/sklearn_elasticnet_wine/train.py が取り上げられているのでまずは確認する。

27行目辺りからメイン関数。

最初にCSVファイルを読み込んでPandas DFを作る。 examples/sklearn_elasticnet_wine/train.py:32

1
2
wine_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wine-quality.csv")
data = pd.read_csv(wine_path)

入力ファイルを適当に分割したあと、MLflowのセッション(と呼べばよいのだろうか)を起動する。 examples/sklearn_elasticnet_wine/train.py:47

1
with mlflow.start_run():

これにより、MLflowが動作に必要とするスタックなどが初期化される。 詳しくは mlflow/tracking/fluent.py:71 あたりの run メソッドの実装を参照。

セッション開始後、モデルを定義し学習を実行する。 その後推論結果を取得し、メトリクスを計算する。 このあたりは通常のアプリと同じ実装。

1
2
3
4
5
6
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)

predicted_qualities = lr.predict(test_x)

(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

その後パラメータ、メトリクス、モデルを記録する。

1
2
3
4
5
6
7
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)

mlflow.sklearn.log_model(lr, "model")

このあたりが、MLflowのトラッキング機能を利用している箇所。

試しにパラメータを色々渡して実行してみる。 以下のようなシェルスクリプトを用意した。

1
2
3
4
5
6
7
for alpha in 0 1 0.5
do
for l1_ratio in 1 0.5 0.2 0
do
python ~/Sources/mlflow/examples/sklearn_elasticnet_wine/train.py ${alpha} ${l1_ratio}
done
done

その後、mlflow ui コマンドでUIを表示すると、先程試行した実験の結果がわかる。 メトリクスでソートもできるので、 モデルを試行錯誤しながら最低なモデル、パラメータを探すこともできる。

パッケージング

チュートリアルでは、以下のようなMLprojectファイル、conda.yamlが紹介されている。

MLproject

1
2
3
4
5
6
7
8
9
10
name: tutorial

conda_env: conda.yaml

entry_points:
main:
parameters:
alpha: float
l1_ratio: {type: float, default: 0.1}
command: "python train.py {alpha} {l1_ratio}"

このファイルはプロジェクトの定義を表すものであり、 依存関係と実装へのエントリーポイントを表す。 見てのとおり、パラメータ定義やコマンド定義が記載されている。

conda.yaml

1
2
3
4
5
6
7
8
name: tutorial
channels:
- defaults
dependencies:
- python=3.6
- scikit-learn=0.19.1
- pip:
- mlflow>=1.0

このファイルは環境の依存関係を表す。

このプロジェクトを実行するには、以下のようにする。

1
$ mlflow run ~/Sources/mlflow/examples/sklearn_elasticnet_wine -P alpha=0.5

これを実行すると、conda環境を構築し、その上でアプリを実行する。 もちろん、run情報が残る。

Dockerでパッケージング

Dockerでパッケージング を見ると、Dockerをビルドしてその中でアプリを動かす例も載っている。

最初にDockerイメージをビルドしておく。

1
2
$ cd ~/Sources/mlflow/examples/docker
$ docker build -t mlflow-docker-example -f Dockerfile .

つづいて、プロジェクトを実行する。

1
$ sudo mlflow run ~/Sources/mlflow/examples/docker -P alpha=0.5

これでconda環境で実行するのと同じように、Docker環境で実行される。

ちなみにMLprojectファイルは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
name: docker-example

docker_env:
image: mlflow-docker-example

entry_points:
main:
parameters:
alpha: float
l1_ratio: {type: float, default: 0.1}
command: "python train.py --alpha {alpha} --l1-ratio {l1_ratio}"

モデルサーブ

チュートリアルで学習したモデルをサーブできる。 mlflow ui でモデルの情報を確認すると、アーティファクト内にモデルが格納されていることがわかるので、 それを対象としてサーブする。

1
$ mlflow models serve -m /home/dobachi/Sources/mlflow_tutorials/mlruns/0/a87ee8c6c6f04f5c822a32e3ecae830e/artifacts/model -p 1234

サーブされたモデルを使った推論は、REST APIで可能。

1
$ curl -X POST -H "Content-Type:application/json; format=pandas-split" --data '{"columns":["fixed acidity","volatile acidity","citric acid","residual sugar","chlorides","free sulfur dioxide","total sulfur dioxide","density","pH","sulphates","alcohol"],"data":[[7,0.27,0.36,20.7,0.045,45,170,1.001,3,0.45,8.8]]}' http://127.0.0.1:1234/invocations

Runファイルが保存される場所

where-runs-are-recorded によると、以下の通り。

  • Local file path (specified as file:/my/local/dir), where data is just directly stored locally.
    • →ローカルファイルシステム
  • Database encoded as +://:@:/. Mlflow supports the dialects mysql, mssql, sqlite, and postgresql. For more details, see SQLAlchemy database uri.
    • →データベース(SQLAlchemyが対応しているもの)
  • HTTP server (specified as https://my-server:5000), which is a server hosting an MLFlow tracking server.
    • →MLflowトラッキングサーバ
  • Databricks workspace (specified as databricks or as databricks://, a Databricks CLI profile.

トラッキングサーバを試す

まずはトラッキングサーバを起動しておく。

1
$ mlflow server --backend-store-uri /tmp/hoge --default-artifact-root /tmp/fuga --host 0.0.0.0

つづいて、学習を実行する。

1
$ MLFLOW_TRACKING_URI=http://localhost:5000 mlflow run ~/Sources/mlflow/examples/sklearn_elasticnet_wine -P alpha=0.5

前の例と違い、環境変数MLFLOW_TRACKING_URIが利用され、上記で起動したトラッキングサーバが指定されていることがわかる。 改めてブラウザで、http://localhost:5000にアクセスすると、先程実行した学習の履歴を確認できる。

mlflowコマンドを確認する

mlflowコマンドの実態は以下のようなPythonスクリプトである。

1
2
3
4
5
6
7
8
import re
import sys

from mlflow.cli import cli

if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
sys.exit(cli())

mlflow.cliは、clickを使って実装されている。 コマンドとしては、

  • run
  • ui
  • server

あたりに加え、以下のようなものが定義されていた。

mlflow/cli.py:260

1
2
3
4
5
6
7
cli.add_command(mlflow.models.cli.commands)
cli.add_command(mlflow.sagemaker.cli.commands)
cli.add_command(mlflow.experiments.commands)
cli.add_command(mlflow.store.cli.commands)
cli.add_command(mlflow.azureml.cli.commands)
cli.add_command(mlflow.runs.commands)
cli.add_command(mlflow.db.commands)

start_runについて

mlflowを使うときには、以下のようにstart_runメソッドを呼び出す。

1
2
3
with mlflow.start_run():
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)

start_runメソッドの実態は以下のように定義されている。

mlflow/init.py:54

1
start_run = mlflow.tracking.fluent.start_run

なお、start_runの戻り値は mlflow.ActiveRun とのこと。

mlflow/tracking/fluent.py:155

1
2
_active_run_stack.append(ActiveRun(active_run_obj))
return _active_run_stack[-1]

mlflow/tracking/fluent.py:61

1
2
3
4
5
6
7
8
9
10
11
12
13
class ActiveRun(Run):  # pylint: disable=W0223
"""Wrapper around :py:class:`mlflow.entities.Run` to enable using Python ``with`` syntax."""

def __init__(self, run):
Run.__init__(self, run.info, run.data)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
status = RunStatus.FINISHED if exc_type is None else RunStatus.FAILED
end_run(RunStatus.to_string(status))
return exc_type is None

ActiveRunクラスは上記のような実装なので、withステートメントで用いる際には、終了処理をするようになっている。 ステータスを変更し、アクティブな実行(ActiveRun)を終了させる。

上記の構造から見るに、アクティブな実行(ActiveRun)はネスト構造?が可能なように見える。

uiの実装

cli.pyのuiコマンドを確認すると、どうやら手元で気軽に確認するたのコマンドのようだった。

mlflow/cli.py:158

1
2
3
4
5
6
def ui(backend_store_uri, default_artifact_root, port):
"""
Launch the MLflow tracking UI for local viewing of run results. To launch a production
server, use the "mlflow server" command instead.

The UI will be visible at http://localhost:5000 by default.

実際には簡易設定で、serverを起動しているようだ。

mlflow/cli.py:184

1
_run_server(backend_store_uri, default_artifact_root, "127.0.0.1", port, 1, None, [])

mlflow/server/init.py:51

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _run_server(file_store_path, default_artifact_root, host, port, workers, static_prefix,
gunicorn_opts):
"""
Run the MLflow server, wrapping it in gunicorn
:param static_prefix: If set, the index.html asset will be served from the path static_prefix.
If left None, the index.html asset will be served from the root path.
:return: None
"""
env_map = {}
if file_store_path:
env_map[BACKEND_STORE_URI_ENV_VAR] = file_store_path
if default_artifact_root:
env_map[ARTIFACT_ROOT_ENV_VAR] = default_artifact_root
if static_prefix:
env_map[STATIC_PREFIX_ENV_VAR] = static_prefix
bind_address = "%s:%s" % (host, port)
opts = shlex.split(gunicorn_opts) if gunicorn_opts else []
exec_cmd(["gunicorn"] + opts + ["-b", bind_address, "-w", "%s" % workers, "mlflow.server:app"],
env=env_map, stream_output=True)

なお、serverのヘルプを見ると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Usage: mlflow server [OPTIONS]

Run the MLflow tracking server.

The server which listen on http://localhost:5000 by default, and only
accept connections from the local machine. To let the server accept
connections from other machines, you will need to pass --host 0.0.0.0 to
listen on all network interfaces (or a specific interface address).

Options:
--backend-store-uri PATH URI to which to persist experiment and run
data. Acceptable URIs are SQLAlchemy-compatible
database connection strings (e.g.
'sqlite:///path/to/file.db') or local
filesystem URIs (e.g.
'file:///absolute/path/to/directory'). By
default, data will be logged to the ./mlruns
directory.
--default-artifact-root URI Local or S3 URI to store artifacts, for new
experiments. Note that this flag does not
impact already-created experiments. Default:
Within file store, if a file:/ URI is provided.
If a sql backend is used, then this option is
required.
-h, --host HOST The network address to listen on (default:
127.0.0.1). Use 0.0.0.0 to bind to all
addresses if you want to access the tracking
server from other machines.
-p, --port INTEGER The port to listen on (default: 5000).
-w, --workers INTEGER Number of gunicorn worker processes to handle
requests (default: 4).
--static-prefix TEXT A prefix which will be prepended to the path of
all static paths.
--gunicorn-opts TEXT Additional command line options forwarded to
gunicorn processes.
--help Show this message and exit.

なお、上記ヘルプを見ると、runデータを保存するのはデフォルトではローカルファイルシステムだが、 SQLAlchemyでアクセス可能なRDBMSでも良いようだ。

サーバとして、gunicornを使っているようだ。 多数のリクエストをさばくため、複数のワーカを使うこともできるようだ。

mlflow/server/init.py:68

1
2
exec_cmd(["gunicorn"] + opts + ["-b", bind_address, "-w", "%s" % workers, "mlflow.server:app"],
env=env_map, stream_output=True)

上記の通り、 mlflow.server:app が実態なので確認する。 このアプリケーションはFlaskが用いられている。 いったん、 / の定義から。

mlflow/server/init.py:45

1
2
3
4
# Serve the index.html for the React App for all other routes.
@app.route(_add_static_prefix('/'))
def serve():
return send_from_directory(STATIC_DIR, 'index.html')

mlflow/server/init.py:16

1
2
3
4
REL_STATIC_DIR = "js/build"

app = Flask(__name__, static_folder=REL_STATIC_DIR)
STATIC_DIR = os.path.join(app.root_path, REL_STATIC_DIR)

以上のように、 mlflow/server/js 以下にアプリが存在するようだが、 そのREADME.mdを見ると、当該アプリは https://github.com/facebook/create-react-app を 使って開発されたように見える。

mlflow-exampleプロジェクトを眺めてみる

mlflow-example には、mlflow公式のサンプルプロジェクトが存在する。 この中身を軽く眺めてみる。

MLproject

ファイルの内容は以下の通り。

1
2
3
4
5
6
7
8
9
10
name: tutorial

conda_env: conda.yaml

entry_points:
main:
parameters:
alpha: float
l1_ratio: {type: float, default: 0.1}
command: "python train.py {alpha} {l1_ratio}"

プロジェクト名は tutorial であり、condaによる環境構成情報は別途 conda.yaml に定義されていることがわかる。

エントリポイントには複数を定義可能だが、ここでは1個のみ(mainのみ)定義されている。 パラメータは2個(alphal1_ratio)与えられている。 それらのパラメータは、実行コマンド定義内でコマンドライン引数として渡されることになっている。

なお、実行されるPythonスクリプト内では以下のように、コマンドライン引数を処理している。

train.py:44

1
2
alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5

conda.yaml

本ファイルには、condaを使ってインストールするライブラリが指定されている。

1
2
3
4
5
6
7
8
9
name: tutorial
channels:
- defaults
dependencies:
- numpy=1.14.3
- pandas=0.22.0
- scikit-learn=0.19.1
- pip:
- mlflow

numpy、pandas、scikit-learnあたりの基本的なライブラリをcondaで導入し、 最後にpipでmlflowを導入していることがわかる。

またチャンネルの設定もできるようであるが、ここではデフォルトのみ使用することになっている。

train.py

ここでは、ハイライトを確認する。

最初に入力データの読み出しと分割等。

train.py:31

1
2
3
4
5
6
7
8
9
10
11
12
# Read the wine-quality csv file (make sure you're running this from the root of MLflow!)
wine_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wine-quality.csv")
data = pd.read_csv(wine_path)

# Split the data into training and test sets. (0.75, 0.25) split.
train, test = train_test_split(data)

# The predicted column is "quality" which is a scalar from [3, 9]
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]

MLflowのセッションを開始

train.py:47

1
with mlflow.start_run():

モデルを定義し、学習。その後テストデータを用いて予測値を算出し、メトリクスを計算する。

train.py:48

1
2
3
4
5
6
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)

predicted_qualities = lr.predict(test_x)

(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

上記のメトリクスは標準出力にも出されるようにもなっている。 手元で試した例では、以下のような感じ。

1
2
3
4
Elasticnet model (alpha=5.000000, l1_ratio=0.100000):
RMSE: 0.8594260117338262
MAE: 0.6480675144220314
R2: 0.046025292604596424

最後に、メトリクスを記録し、モデルを保存する。

train.py:60

1
2
3
4
5
6
7
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)

mlflow.sklearn.log_model(lr, "model")

【ボツ】Dockeでクイックスタートを実行

※以下の手順は、UIを表示させようとしたところでエラーになっている。まだ原因分析していない。 -> おおかたバインドするアドレスが127.0.0.1になっており、外部から参照できなくなっているのでは、と。-> mlflow ui の実装をぱっと見る限り、そうっぽい。

公式GitHub からCloneし、Dockerイメージをビルドする。 これを実行環境とする。

1
2
3
$ cd Sources
$ git clone https://github.com/mlflow/mlflow.git
$ cd mlflow/examples/docker

なお、実行時に便利なようにDockerfileを以下のように修正し、ipython等をインストールしておく。

1
2
3
4
diff --git a/examples/docker/Dockerfile b/examples/docker/Dockerfileindex e436f49..686e0e2 100644--- a/examples/docker/Dockerfile+++ b/examples/docker/Dockerfile@@ -1,5 +1,7 @@ FROM continuumio/miniconda:4.5.4+RUN conda install ipython jupyter
+ RUN pip install mlflow>=1.0 \
&& pip install azure-storage==0.36.0 \
&& pip install numpy==1.14.3

ではビルドする。

1
$ sudo -i docker build -t "dobachi/mlflow:latest" `pwd`

チュートリアルのサンプルアプリを作成する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ cat << EOF > tutorial.py
> import os
> from mlflow import log_metric, log_param, log_artifact
>
> if __name__ == "__main__":
> # Log a parameter (key-value pair)
> log_param("param1", 5)
>
> # Log a metric; metrics can be updated throughout the run
> log_metric("foo", 1)
> log_metric("foo", 2)
> log_metric("foo", 3)
>
> # Log an artifact (output file)
> with open("output.txt", "w") as f:
> f.write("Hello world!")
> log_artifact("output.txt")
> EOF

起動する。 このとき、サンプルアプリも /apps 以下にマウントするようにする。

1
$ sudo -i docker run -v `pwd`:/apps --rm -it dobachi/mlflow:latest  /bin/bash

先程作成したアプリを実行する。

1
2
# cd apps
# python tutorial.py

結果は以下の通り。

1
2
# cat output.txt  
Hello world!

ここで mlflow ui を起動して見ようと持ったが、ウェブフロントエンドに繋がらなかった。 デバッグを試みる。

共有

HBase on Docker for test

参考

メモ

最初は、 dajobe/hbase-docker を試そうとしたが、後から HariSekhon/Dockerfiles を見つけた。 この中の hbase ディレクトリ以下にある手順に従ったところ、無事に起動した。

HariSekhon/Dockerfiles は、ほかにもビッグデータ系のプロダクトのDockerイメージを配布しているようだ。

ただ、Dockerfileを見ると割と作りこまれているようなので、動作確認にはよいがまじめな環境としてはきちんと見直してから使う方がよさそう。

共有