Kafka Streams with ML

参考

kaiwaehner kafka-streams-machine-learning-examples

メモ

kaiwaehner/kafka-streams-machine-learning-examples

kaiwaehner kafka-streams-machine-learning-examples には、TensorFlow、Keras、H2O、Python、DL4JをKafka Streamsと併用するサンプルが含まれている。

上記レポジトリのREADMEには、いくつか参考文献が記載されている。

How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka には、

  • デザインの概略みたいなものが載っていた
  • 学習と推論のモデルやりとりをどうつなぐのか?の考察

が記載されていた。

Using Apache Kafka to Drive Cutting-Edge Machine Learning にはモデルの組み込み方の種類、 AutoMLとの組み合わせについて考察(Not 具体例)が掲載されている。

Machine Learning with Python, Jupyter, KSQL and TensorFlow には、以下のような記述がある。 いつもの論文とセット。

Impedance mismatch between data scientists, data engineers and production engineers

これを解決する手段としていくつか例示。

  • ONNX、PMMLなどを利用
  • DL4Jなど開発者視点の含まれるプロダクトの利用
  • AutoML
  • モデルを出力してアプリに埋め込んで利用(TensorFlowだったらJava APIでモデルを利用する、など)
  • パブリッククラウドのマネージドサービス利用

ConfluentのKafka Python KSQL APIを使い、Jupyter上でKafkaからデータロードし分析する例も記載されていた。

kaiwaehner ksql-udf-deep-learning-mqtt-iot には、UDF内でTensorFlowを使う例が記載されている。

kaiwaehner ksql-fork-with-deep-learning-function には、エンドツーエンドで動作を確認してみるためのサンプル実装が載っている。

kaiwaehner tensorflow-serving-java-grpc-kafka-streams には、gRPC経由でKafka StreamsとTensorFlow Servingを連係する例が記載されている。

Convolutional Neural Network (CNN) with TensorFlow for Image Recognition

com.github.megachucky.kafka.streams.machinelearning.Kafka_Streams_TensorFlow_Image_Recognition_Example クラスを確認する。

概要

  • 本クラスは、予め学習されたTensorFlowのモデルを読み出して利用する
  • 上記モデルを利用するときには、TensorFlowのJava APIを利用する
  • 画像はどこかのストレージに保存されている前提となっており、そのPATHがメッセージとしてKafkaに入ってくるシナリオである
  • 画像に対するラベルのProbabilityを計算し、最大のProbabilityを持つラベルを戻り値として返す

詳細メモ

main内では特別なことはしていない。トポロジを組み立てるための getStreamTopology が呼ばれる。

getStreamTopology メソッドを確認する。

当該メソッドでは、最初にモデル本体やモデルの定義が読み込まれる。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:83

1
2
3
4
5
6
7
String modelDir = "src/main/resources/generatedModels/CNN_inception5h";

Path pathGraph = Paths.get(modelDir, "tensorflow_inception_graph.pb");
byte[] graphDef = Files.readAllBytes(pathGraph);

Path pathModel = Paths.get(modelDir, "imagenet_comp_graph_label_strings.txt");
List<String> labels = Files.readAllLines(pathModel, Charset.forName("UTF-8"));

続いてストリームのインスタンスが生成される。 その後、ストリーム処理の内容が定義される。

最初はメッセージ内に含まれる画像のPATHを用いて、実際の画像をバイト列で読み出す。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:104

1
2
3
4
5
6
7
8
9
10
11
12
KStream<String, Object> transformedMessage =
imageInputLines.mapValues(value -> {

String imageClassification = "unknown";
String imageProbability = "unknown";

String imageFile = value;

Path pathImage = Paths.get(imageFile);
byte[] imageBytes;
try {
imageBytes = Files.readAllBytes(pathImage);

つづいていくつかのヘルパメソッドを使って、画像に対するラベル(推論結果)を算出する。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:117

1
2
3
4
5
6
7
8
9
10
11
try (Tensor image = constructAndExecuteGraphToNormalizeImage(imageBytes)) {
float[] labelProbabilities = executeInceptionGraph(graphDef, image);
int bestLabelIdx = maxIndex(labelProbabilities);

imageClassification = labels.get(bestLabelIdx);

imageProbability = Float.toString(labelProbabilities[bestLabelIdx] * 100f);

System.out.println(String.format("BEST MATCH: %s (%.2f%% likely)", imageClassification,
labelProbabilities[bestLabelIdx] * 100f));
}

constructAndExecuteGraphToNormalizeImage メソッドは、グラフを構成し、前処理を実行する。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:171

1
2
3
4
5
6
7
8
9
final Output input = b.constant("input", imageBytes);
final Output output = b
.div(b.sub(
b.resizeBilinear(b.expandDims(b.cast(b.decodeJpeg(input, 3), DataType.FLOAT),
b.constant("make_batch", 0)), b.constant("size", new int[] { H, W })),
b.constant("mean", mean)), b.constant("scale", scale));
try (Session s = new Session(g)) {
return s.runner().fetch(output.op().name()).run().get(0);
}

executeInceptionGraph メソッドは、予め学習済みのモデルと画像を引数にとり、 ラベルごとのProbabilityを算出する。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:184

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
try (Graph g = new Graph()) {

// Model loading: Using Graph.importGraphDef() to load a pre-trained Inception
// model.
g.importGraphDef(graphDef);

// Graph execution: Using a Session to execute the graphs and find the best
// label for an image.
try (Session s = new Session(g);
Tensor result = s.runner().feed("input", image).fetch("output").run().get(0)) {
final long[] rshape = result.shape();
if (result.numDimensions() != 2 || rshape[0] != 1) {
throw new RuntimeException(String.format(
"Expected model to produce a [1 N] shaped tensor where N is the number of labels, instead it produced one with shape %s",
Arrays.toString(rshape)));
}
int nlabels = (int) rshape[1];
return result.copyTo(new float[1][nlabels])[0];
}
}

executeInceptionGraph メソッドにより、ある画像に対するラベルごとのProbabilityが得られた後、 最大のProbabilityを持つラベルを算出。 それを戻り値とする。

Iris Prediction using a Neural Network with DeepLearning4J (DL4J)

Iris Prediction using a Neural Network with DeepLearning4J (DL4J) を確認する。

com.github.megachucky.kafka.streams.machinelearning.models.DeepLearning4J_CSV_Model_Inference クラスを確認したが、これはKafka Streamsアプリには見えなかった。 中途半端な状態で止まっている? これをベースに Kafka Streams のアプリを作ってみろ、ということか。

もしくはunitテスト側を見ろ、ということか? -> そのようだ。 Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java

クラスの内容を確認する。

まずテストのセットアップ。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:46

1
2
3
4
5
6
7
8
9
10
11
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new TestEmbeddedKafkaCluster(1);

private static final String inputTopic = "IrisInputTopic";
private static final String outputTopic = "IrisOutputTopic";

// Generated DL4J model
private File locationDL4JModel = new File("src/main/resources/generatedModels/DL4J/DL4J_Iris_Model.zip");

// Prediction Value
private static String irisPrediction = "unknown";

TestEmbeddedKafkaClusterはテスト用のKafkaクラスタを起動するヘルパークラス。 内部的には、Kafka Streamsのテストに用いられている補助機能である org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster クラスを継承して作られている。

機械学習モデルは、予め学習済みのものが含まれているのでそれを読み込んで用いる

テストコードの実態は、 com.github.megachucky.kafka.streams.machinelearning.test.Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest#shouldPredictIrisFlowerType である。 以降、当該メソッド内を確認する。

メソッドの冒頭で、Kafka Streamsの設定を定義している。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:67

1
2
3
4
5
6
7
8
		// Iris input data (the model returns probabilities for input being each of Iris
// Type 1, 2 and 3)
List<String> inputValues = Arrays.asList("5.4,3.9,1.7,0.4", "7.0,3.2,4.7,1.4", "4.6,3.4,1.4,0.3");

(snip)

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

このサンプルは動作確認用のため簡易な設定になっている。 実際のアプリケーション開発時にはきちんと設定必要。

なお、途中でDL4Jのモデルを読み込んでいる。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:86

1
2
// Create DL4J object (see DeepLearning4J_CSV_Model.java)
MultiLayerNetwork model = ModelSerializer.restoreMultiLayerNetwork(locationDL4JModel);

その後、ビルダのインスタンスを生成し、Irisデータを入力とするストリームを定義する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:97, 104

1
2
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> irisInputLines = builder.stream(inputTopic);

その後はストリームのメッセージに対し、DL4Jのモデルによる推論を実行する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:108

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
irisInputLines.foreach((key, value) -> {

if (value != null && !value.equals("")) {
System.out.println("#####################");
System.out.println("Iris Input:" + value);

// TODO Easier way to map from String[] to double[] !!!
String[] stringArray = value.split(",");
Double[] doubleArray = Arrays.stream(stringArray).map(Double::valueOf).toArray(Double[]::new);
double[] irisInput = Stream.of(doubleArray).mapToDouble(Double::doubleValue).toArray();

// Inference
INDArray input = Nd4j.create(irisInput);
INDArray result = model.output(input);

System.out.println("Probabilities: " + result.toString());

irisPrediction = result.toString();

}

});

ここでは入力されたテキストデータをDouble型の配列に変換し、さらにND4JのINDArrayに変換する。 それをモデルに入力し、推論を得る。

その後、テキストを整形し、出力用トピックに書き出し。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:132

1
2
3
4
5
6
// Transform message: Add prediction information
KStream<String, Object> transformedMessage = irisInputLines
.mapValues(value -> "Prediction: Iris Probability => " + irisPrediction);

// Send prediction information to Output Topic
transformedMessage.to(outputTopic);

ビルダを渡し、ストリーム処理をスタート。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:140

1
2
3
4
5
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
System.out.println("Iris Prediction Microservice is running...");
System.out.println("Input to Kafka Topic 'IrisInputTopic'; Output to Kafka Topic 'IrisOutputTopic'");

その後、これはテストコードなので、入力となるデータをアプリケーション内で生成し、入力トピックに書き込む。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:149

1
2
3
4
5
6
7
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig, new MockTime());

ここでは結合テスト用のヘルパーメソッドを利用。

その後、出力トピックから結果を取り出し、確認する。(実装解説は省略)

Python + Keras + TensorFlow + DeepLearning4j

例のごとく、テスト側が実装本体。 Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest クラスを確認する。

実態は shouldPredictValues メソッド。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:64

1
public void shouldPredictValues() throws Exception {

上記メソッド内では、最初にモデルを読み込む。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:69

1
2
3
4
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
System.out.println(simpleMlp.toString());

MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);

上記では、HDF形式で予め保存されたモデルを読み込む。 読み込みの際にはDeeplearning4Jの KerasModelImport#importKerasSequentialModelAndWeights メソッドが用いられる。

続いて、Kafka Streamsのコンフィグを定める。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:81

1
2
3
4
5
6
7
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kafka-streams-tensorflow-keras-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

次にKafka Streamsのビルダを定義し、入力トピックを渡して入力ストリームを定義する。

1
2
3
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> inputEvents = builder.stream(inputTopic);

以降、メッセージを入力とし、推論を行う処理の定義が続く。

先程定義したストリームの中は、カンマ区切りのテキストになっている。 これをカンマで区切り、配列に変換する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:104

1
2
3
4
5
6
inputEvents.foreach((key, value) -> {

// Transform input values (list of Strings) to expected DL4J parameters (two
// Integer values):
String[] valuesAsArray = value.split(",");
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));

配列への変換には、Nd4Jを用いる。

配列の定義が完了したら、それを入力としてモデルを用いた推論を行う処理を定義する。

1
2
3
4
5

output = model.output(input);
prediction = output.toString();

});

最後は出力メッセージに変換し、出力トピックへの書き出しを定義する。 その後、独自のユーティリティクラスを使って、ビルダーに基づいてストリームをビルド。 ストリーム処理を開始する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:118

1
2
3
4
5
6
7
8
9
10
11
12
13
// Transform message: Add prediction result
KStream<String, Object> transformedMessage = inputEvents.mapValues(value -> "Prediction => " + prediction);

// Send prediction result to Output Topic
transformedMessage.to(outputTopic);

// Start Kafka Streams Application to process new incoming messages from
// Input Topic
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
System.out.println("Prediction Microservice is running...");
System.out.println("Input to Kafka Topic " + inputTopic + "; Output to Kafka Topic " + outputTopic);
共有

Studying Software Engineering Patterns for Designing Machine Learning Systems

参考

メモ

スライド に大まかな内容が記載されている。

機械学習に関するソフトウェアエンジニアリングやアーキテクチャデザインパターンに関する調査。 良い/悪いソフトウェアエンジニアリングデザインパターンを調査。 「リサーチ質問」(RQ)という形でいくつか確認事項を用意し、 ヒアリングや検索エンジンによる調査を実施。 SLR(Systematic literature review)手法にのっとり、調査結果を検証。

ヒアリングは760超の開発者を対処に実施し、1%の回答者を得た。

調査は、23個の論文、追加の6個の論文、48個のグレードキュメントを調査。 アカデミックではシステムエンジニアリングデザインパターンの文献は少ない。 グレードキュメントでは多数。データマネジメント観点が多い。

パターン整理では以下の軸を用いた。

  • Microsoftのパイプライン(9ステージ)
  • ISO/IEC/IEEE 12207:2008 standard

ドキュメント群から69個のパターンを抽出。MLのアーキテクチャやデザインパターン関係は33個。

興味深かった図

業務ロジックとMLロジックを分離。 メンテナンス性が上がるだろうが、果たしてロジックを現実的に分離可能かどうかはやや疑問。

Fig. 2. Structure of Distinguish Business Logic from ML Model pattern

モデルに関するドキュメントが多数。

TABLE III CLASSIFICATION OF THE IDENTIFIED PATTERNS
共有

Git SDK for Windows

参考

メモ

pacmanをgit bash上で使いたいと思ったのだが、git-sdkはいまいち。

共有

Docker on VMWare

参考

メモ

Hyper-VでなくVMwareでDocker for Windows を使う に習った。

Docker環境

手元の環境は Windows + VMWare だったので、Vagrant + VMWareで環境を構築し、その中にDockerをインストールすることにした。

Get Docker Engine - Community for Ubuntu を参考に、dockerをインストールするようにした。 参考までに、Vagrantfileは以下のような感じである。 ubuntu18系をベースとしつつ、プロビジョニングの際にDockerをインストールする。

ただし、以下のVagrantfileのプロビジョニングでは、DockerサービスにTCPで接続することを許可するようにしている。 これはセキュリティ上問題があるため、注意して取り扱うこと。(あくまで動作確認程度に留めるのがよさそう)

Vagrantfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Vagrant.configure("2") do |config|
config.vm.define "docker-01" do |config|
config.vm.box = "bento/ubuntu-18.04"
config.vm.network :private_network, ip: "172.16.19.220"
config.vm.hostname = "docker-01"
end

config.vm.provision "shell", inline: <<-SHELL
apt-get update
apt-get install -y \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
apt-key fingerprint 0EBFCD88
add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
apt-get update
apt-get install -y docker-ce docker-ce-cli containerd.io
sed -i "s;fd://;tcp://0.0.0.0:2375;g" /lib/systemd/system/docker.service
SHELL

end

WSLなどから以下のようにすることで、VM内のDockerにアクセスできる。

1
$ sudo docker --host tcp://172.16.19.220:2375 run hello-world

環境変数として、DOCKER_HOSTを設定しておいても良いだろう。

共有

Kafka cluster information

参考

メモ

kafka.clusterパッケージ以下には、Kafkaクラスタの情報を格納するためのクラス群が存在している。 具体的には以下の通り。

  • Broker
  • BrokerEndPoint
  • Cluster
  • EndPoint
  • Partition.scala
  • Replica

例としてBroker

例えば Broker クラスについて。

case classである。

usageを確認すると、例えば以下のように KafkaServer#createBrokerInfo メソッド内で用いられている。

kafka/server/KafkaServer.scala:430

1
BrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort)

その他

わかりやすい例だと、PartitionやReplicaなどが挙げられる。

共有

Kafka Admin Commands

参考

メモ

意外とまとまった説明は Kafka公式ドキュメントConfluentドキュメント にはない。 Getting startedや運用面のドキュメントに一部含まれている。

丁寧なのは、 Clouderaのドキュメント(Kafka Administration Using Command Line Tools) である。

Confluentドキュメント

ConfluentドキュメントのAdminister章 には、ツールとしてのまとまりではなく、 Admin作業単位で説明があり、その中にいくつかツールの説明が含まれている。

実装

kafka.admin パッケージ以下にAminコマンドの実装が含まれている。 またそれらのクラスは、 bin以下に含まれている。

例えば、 kafka-log-dirs コマンドでは、kafka.admin.LogDirsCommand クラスが使われている、など。

共有

KafkaのJavaバージョン

参考

メモ

6.4 Java Version の通り、JDK1.8の最新リリースバージョンを使うように、とされている。 2019/11/03時点の公式ドキュメントでは、LinkedInでは1.8u5を使っているとされているが…。

このあたりの記述が最も最近でいつ編集されたか、というと、 以下の通り2018/5/21あたり。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
commit e70a191d3038e00790aa95fbd1e16e78c32b79a4
Author: Ismael Juma <ismael@juma.me.uk>
Date: Mon May 21 23:17:42 2018 -0700

KAFKA-4423: Drop support for Java 7 (KIP-118) and update deps (#5046)

(snip)

diff --git a/docs/ops.html b/docs/ops.html
index 450a268a2..95b9a9601 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -639,9 +639,7 @@

From a security perspective, we recommend you use the latest released version of JDK 1.8 as older freely available versions have disclosed security vulnerabilities.

- LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. If you decide to use the G1 collector (the current default) and you are still on JDK 1.7, make sure you are on u51 or newer. LinkedIn tried out u21 in testing, but they had a number of problems with the GC implementation in that version.
-
- LinkedIn's tuning looks like this:
+ LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. LinkedIn's tuning looks like this:
<pre class="brush: text;">
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M

関連しそうなIssue、メーリングリストのエントリ

あまり活発な議論はない。公式ドキュメントを参照せよ、というコメントのみ。 Jay Kepsによると2013年くらいはLinkedInでは1.6を使っていた、ということらしい。 順当に更新されている。

  • https://issues.apache.org/jira/browse/KAFKA-7328
  • https://sematext.com/opensee/m/Kafka/uyzND19j3Ec5wqz42?subj=Re+kafka+0+9+0+java+version
  • https://sematext.com/opensee/m/Kafka/uyzND138NFX1w26SP1?subj=Re+java+version+for+kafka+clients
共有

OpenMLを軽く確認してみる

参考

メモ

概要

OpenML公式ドキュメント によると、以下の定義。

An open, collaborative, frictionless, automated machine learning environment.

OpenML公式ウェブサイト によると、

  • Dataset
  • Task
    • データセットと機械学習としての達成すべきこと
  • Flow
    • 各種フレームワークに則った処理パイプライン
  • Run
    • あるFlowについてハイパーパラメータを指定し、あるTaskに対して実行したもの

のレポジトリが提供されている。 これらはウェブサイトから探索可能。

Dataset

Datasetが登録されると機械的にアノテーションされたり、分析されたりする。 パット見でデータの品質がわかるようになっている。

Task

データセットに対する目標(と言ってよいのか。つまり識別、回帰など)、学習用、テスト用のデータのスプリット、 どのカラムを目的変数とするか、などが セットになったコンテナである。

機械的に読めるようになっている。

Flow

処理パイプラインはFlowとして登録され、再利用可能になる。 特定のTaskに紐づく処理として定義される。

フレームワークにひもづく。

Run

OpenML APIを用いて実行ごとに自動登録される。

scikit-learnで試す

Pythonガイド を参考に試してみる。 一通り、Pandas、scikit-learn、openml、pylzmaをインストールする。 なお、あらかじめliblzma-devをインストールしておくこと。 詳しくは以下の「lzmaがインポートできない」節を参照。

実行に用いたpipenvの環境ファイル に用いたライブラリが載っている。

Pythonガイド を眺めながら進めようと思ったが、 https://openml.github.io/openml-python/develop/examples/introduction_tutorial.html のリンクが切れていた。 何となく見るとgithub.ioだったので探索してみたら、 github.ioのPythonガイド が見つかった。

こっちを参考にしてみる。

FlowとRunのチュートリアル を元に、チュートリアルを実施。 また特に、 公式Examples あたりを参考にした。 詳しくは、 dobachi openml_sklearn_example 内のnotebookを参照。

トラブルシュート

lzmaがインポートできない

以下の内容のエラーが生じた。

1
2
/home/dobachi/.local/share/virtualenvs/openml_sklearn_example-YW762zpK/lib/python3.7/site-packages/pandas/compat/__init__.py:85: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
warnings.warn(msg)

lzma関連のlibをインストールしておいてから、Python環境を構築すること。 つまりpyenvとpipenvで環境構築する前にliblzma-devをインストールしておく。

1
$ sudo apt install liblzma-dev

参考: lzmaライブラリをインポートできません。

所感

歴史があり、情報が集まっている。 ドキュメントが中途半端なせいで、初見ではとっつきづらいかもしれないが、ライブラリひとつでデータ発掘・共有に始まり、処理パイプラインの機械化まで対応しているのは有益。

共有

pyenv を使う

参考

メモ

インストール手順

WSL / WSL2にインストールするには、 pyenvのGitHub の通り、以下のようにインストールする。

1
2
3
4
$ git clone https://github.com/pyenv/pyenv.git ~/.pyenv
$ echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc
$ echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc
$ echo -e 'if command -v pyenv 1>/dev/null 2>&1; then\n eval "$(pyenv init -)"\nfi' >> ~/.bashrc

トラブルシューティング

Common-build-problems に載っているが、前提となるパッケージのインストールが必要なことに注意。

古い内容のバックアップ

環境構築 - pipenv で Python 環境の構築方法 (2018年11月版) ではpipenvを使う前に、pyenvを使ってPython3最新環境を導入している。

自動インストール に自動インストール手順が記載されている。

共有

pipenvを試す

参考

メモ

上記の参考情報を見て試した。

WSL上でのpipenv

まずOS標準のPython環境を汚さないため、pyenvを導入した。 導入手段は、 pyenvのインストール手順 を参照。

導入した後、pyenvを使ってPython3.7.5を導入する。

1
2
3
$ pyenv install 3.7.5
$ pyenv global 3.7.5
$ pip install pipenv

さて、ここでpipenvを使おうとしたらエラーが生じた。 以下の参考に、 libffi-devを導入してPythonを再インストールしたらうまくいった。

https://stackoverflow.com/questions/27022373/python3-importerror-no-module-named-ctypes-when-using-value-from-module-mul

Pipenvことはじめ に大まかな使い方が載っている。

共有