Uber's 2019 highlights

参考

メモ

Ludwig

モデル開発と比較を単純にするために作られたソフトウェア。

詳しくは、 Introducing Ludwig, a Code-Free Deep Learning Toolbox を参照。

AresDB

GPUを活用した処理エンジン。

QUIC導入

TCPとHTTP/2の置き換え。

Kotlinの性能調査

Javaとの比較など。かなり細かく調査したようだ(まだ読んでいない)

グラフ処理と機械学習を用いたUber Eatsの改善

グラフ構造に基づいた機械学習により、 レコメンデーションの効果を向上させる取り組みについて。

Uber’s Data Platform in 2019: Transforming Information to Intelligence

Uber’s Data Platform in 2019 Transforming Information to Intelligence

UberでもData Platformという言い方をするようだ。

データの用途は、スクーターの位置情報や店舗の最新メニューをトラックするだけではない。 トレンドを把握することなどにも用いられる。

データの鮮度の品質は大事。

リアルタイムの処理のニーズ・仕組み、ヒストリカルなデータの処理のニーズ・仕組み。両方ある。

補足)それぞれがあることを前提とした最適化を施すことが前提となっているようだ。

データがどこから来たのかをトラックする。 Uberの内部プロダクト uLineageにて実現。

Apache HBaseをグローバルインデックスとして利用。 これにより、高いスケーラビリティ、強い一貫性、水平方向へのスケーラビリティを獲得する。

DBEventsというプロダクトで、CDCを実現する。

トラブルシュートとプロアクティブな防止。

もしリアルタイムにデータ分析できれば、例えば利用者にドライバ替わり当たらず一定時間を過ぎてしまったケースを発見し、 オペレーションチームが助けられるかもしれない。

AresDB、Apache Pinotなど。★要確認

共有

Configure Python of PySpark in Zeppelin

参考

メモ

ウェブフロントエンドのSpark Interpreterの設定において、 以下の2項目を設定した。

  • spark.pyspark.python
  • zeppelin.pyspark.python

上記2項目が同じ値で設定されていないと、実行時エラーを生じた。

参考)Pythonバージョンの確かめ方

ドライバのPythonバージョンの確かめ方

1
2
3
4
5
%spark.pyspark

import sys

print('Zeppelin python: {}'.format(sys.version))

ExecutorのPythonバージョンの確かめ方

1
2
3
4
5
6
7
%spark.pyspark

def print_version(x):
import sys
return sys.version

spark.sparkContext.parallelize(range(1, 3)).map(print_version).collect()
共有

markdown preview of vim

参考

メモ

Vim + Markdown のページに色々と載っていた。 前半はMarkdown編集のための補助ツール、後半はプレビューのツール。

プレビューとしては、結論として iamcco/markdown-preview.nvim を選んだ。

試したツール

  • shime/vim-livedown
    • 外部ツールが必要だがnpmで導入可能なので簡易。
    • 悪くなかったが、動作が不安定だったので一旦保留。
  • iamcco/markdown-preview.nvim
    • 外部ツールが必要だが予めプラグインディレクトリ内にインストールされるような手順になっている
      • ただし手元のWSL環境ではnpmインストールに失敗していたため手動インストールした
    • 今のところ馴染んでいるので利用している
  • previm/previm
    • 外部依存がなく悪くなさそうだった
    • WindowsのWSL環境で動作が不安定だったので一旦保留。

その他試したいもの

  • iwataka/minidown.vim
    • Vim + Markdown のブログ内で紹介されていた、ブログ主の作ったツール
    • ミニマムな機能で必要十分そう。外部依存が殆どないのが大きい

iamcco/markdown-preview.nvim を試す際に気をつけたこと

プラグイン内でnodejsを利用するのだが、そのライブラリインストールにyarnを 使うようになっている。

公式READMEの文言を以下に引用。

1
2
call dein#add('iamcco/markdown-preview.nvim', {'on_ft': ['markdown', 'pandoc.markdown', 'rmd'],
\ 'build': 'cd app & yarn install' })

個人的にはnpmを利用しているのでyarnではなくnpmでインストールするよう修正し実行した。 しかいs上記の通り、一部の環境ではプラグイン内にインストールする予定だったnpmライブラリが インストールされない事象が生じた。

その際は、自身で npm install すればよい。

1
2
$ cd ~/vimfiles/bundles/repos/github.com/iamcco/markdown-preview.nvim/app
$ npm install

なお、 ~/vimfiles/bundles のところは、各自のdeinレポジトリの場所を指定してほしい。

共有

Data Platform for Machine Learning

参考

メモ

Data Platform for Machine Learning を読んでみた。個人的要約メモを以下に記載する。

Abstract

これまでは数学的な試行錯誤(ここでは機械学習も含んでそう表現していると思われる)に適したデータプラットフォーム(データを保持し、サーブする仕組み、と解釈)がなかった。

既存のデータプラットフォームの課題はいくつかある。 いずれにせよ、データ管理の負荷はユーザビリティを下げる。 また、昨今求められるコンプライアンス機能(例:terms of use、privacy measure、auditなど)を すべて利用できるわけではない。

本論文では、Machine Learning Data Platform(MLdp)に求められるものをまとめ、Appleにおける経験とソリューションを紹介し、将来のアクションを紹介する。

Introduction

MLdp = Machine Learningのためのデータ管理システム

Motivation

前提とするワークフロー:

  • data collection
  • annotation
  • exploration
  • feature engineering
  • experimentation
  • evaluation
  • finally deployment

(補足)Appleにおけるワークフローのアブストラクションの考え方が分かるので参考として

強い課題感:サイロ化されたデータストア、サイロ化された知見・アクティビティ ★重要

サイロ化されたストアとアクティビティのイメージ

3種類の動機

  • エンジニアリングチームのサポート
  • MLライフサイクルのサポート
  • MLフレームワーク、データのサポート

ロール3種類 + 1

  • MLエンジニア
  • ソフトウェアエンジニア
  • データサイエンティスト
  • Legal Team

(補足)よく聞くのは、データサイエンティスト、データエンジニアだが、ここでは、 MLエンジニア、ソフトウェアエンジニア、Legal Teamが挙げられているのが特徴的。

MLでは、データはトラックされること、バージョン管理されることが必要 ★重要

MLdpが満たすべき要件 ★重要

  • 以下の2種類のデータに対するコンセプチャルな表現方法
    • ほとんど変化しない大量の永続データ
      • 学習データのセットのこと
      • 再現性担保のため学習データ自体はイミュータブルに扱う
    • 揮発性のデータ
      • アノテーションやラベルのこと
      • これらの情報は、業務次第で値を変える可能性がある
      • 学習データセット本体と比べて総量が小さい
  • 上記2種類の要件を満たすハイブリッドデータストア
    • さらにバージョニング可能な物理レイアウト
    • インクリメンタルな更新、デルタトラッキング、ML学習ワークロードへの最適化
  • 継続的なデータ入力に対するシンプルな機構
  • データ、特徴量エンジニアリングのためのDSL
  • 著名なMLフレームワークへのインターフェース
    • 著名なMLフレームワークでは独自のデータ表現をもっているが、一方でMLdpでは フレームワークをまたいだ共通的なデータ表現が必要、など
    • データ型も多様である
  • データセンタ、エッジの両方への対応。
    • データへの距離を意識しないとならない
  • 説明可能性を担保するためのバージョニング
  • データリネージのトラック、データソースのトラック
  • データ探索と発見
    • MLdpでは省力化のための中央集権的なアプローチを採用
  • コンプライアンスやプライバシーの考慮
MLプラットフォームのエコシステム一覧

3Vの話

関連研究

(ここでは一旦省略)

SYSTEM ARCHITECTURE AND DESIGN

アーキテクチャは、コントロールプレーン、データプレーンによる構成。 ★重要

MLdpのアーキテクチャ概要

各コンポーネントの特徴 ★重要

  • 概念データモデル
    • ローデータ、ローデータから生成されたデータ(アノテーション、特徴量)の両方とも表現可能とする
  • バージョン管理の仕組み
    • イミュータブルなデータのスナップショットを用いて、実験の再現性を担保する
  • データアクセスインターフェース
    • MLフレームワークや他のデータ処理エンジンとシームレスに連係可能とする
  • ハイブリッドなデータストア
    • 高頻度でのデータ入力とあまり変化しない大量のデータの両方に対応可能とする
  • ストレージレイアウト
    • バージョン間のデルタトラッキング
    • 分散学習のためのデータ並列化
    • データ探索と発見を可能とするインデックス
    • デバイスとデータセンタの両方での学習のためのストリームI/O
    • 学習タスクのためのキャッシュ

4種類のデータセット:dataset, annotation, split, and package に分けて考える。 ★重要

  • dataset:
    • データ本体。学習データに用いる、など。量、サイズが大きい。
  • annotatoin:
    • データ本体に付与したラベル、属性、メタデータなど。量、サイズはdatasetと比べて小さい。
  • split:
    • データ本体を分割したもの。
    • (補足)実体はdatasetであり、split自体は仮想的な定義と考えられる。
  • package:
    • dataset / split、annotationを組み合わせた仮想的なまとまり。 再現性担保のため、学習セットをまとめて管理するために利用。

以下補足。

datasetに対し、annotationやsplitは「weak object」。つまり、それ単体では存在しない。 annotationやsplitをdatasetとは別に管理することメリットは「multifold」である。 annotationもsplitもdatasetを変更せずに変更可能である。 annotationは個別のコンプライアンスポリシーを持つことがある。

バージョンの表現 -> <schema>.<revision>.<patch> ★重要 これにより、スキーマ変更あり・なしを明示しながら、バージョン管理を可能とする。

バージョン管理はユーザが明示的に行う。 ★重要

MLプロジェクト間でのデータシェアにおいて以下を実現。 ★重要

  • 他のプロジェクトへの影響を気にせず、そのプロジェクトの都合でデータをシェアし、スキーマを変更できる。
  • 特定のバージョンをピンどめする。再現性確保。
  • データのモデルバージョンの依存関係をトラック可能とする

MLプロジェクトにおいてはデータはインターフェースである。 ★名言

MLdpではオブジェクトは以下の状態を遷移する。 ★重要

  • draft
  • published
  • archived
  • purged

publishされたデータはイミュータブルになる。 ★重要 これは再現性を担保するため。

(補足)このオブジェクトのライフサイクルは、S3等のパブリッククラウドのオブジェクトストレージの機能と相性が良さそう。

MLdpでは、表形式のデータ表現を採用。 ★重要

カラムナ表現により、圧縮効率やIO効率の改善だけではなく、 カラムの追加削除にも対応しやすくなる。

Spark RDD、DataFrame、Apache DataFrame、Pandas、R DataFrameなどと互換性あり。

MLdpではファイルはバイト列として扱う。

またファイルに対して、ストリーム形式でのアクセスも可能とする。 ★重要

データアクセスのインタフェースのうち、ハイレベルAPIはサーバサイドのバッチ処理向き。つまり前処理などに適している。 ローレベルAPIは各種フレームワークとの連係向き。

クライアントでのデータ利用のため、MLdpはキャッシュ機能を持つ。 ★重要 (補足)このあたり、Appleだからこそ必要であり、一般企業は果たして必要だろうか。

SQLライクなインタフェースを提供。(その他のDSLもあるようだ)

現在、Pythonと連係可能

入力となるデータの例
データセット定義のクエリ例
既存モデルを読み込んで推論するクエリ例

MLのデータセットは、複数のファイルをまとめ、MLモデルから扱うことがある。

MLdpをマウントして扱うことができる。 いまのところクライアントサイドのシンプルなキャッシュ機構を有している。

またエッジでの利用を想定し、REST APIも提供。

MLdpをマウントしPATHに対して画像処理するクエリ例

セカンダリインデックスも利用可能。

セカンダリインデックスを使い、求める画像のみを扱うクエリ例

分散学習のためのスプリットを生成することもできる。

MLdpをマウントし、TFの機能を使ってスライスを作成

ストレージレイヤのデザインのポイントは以下の通り。

  • 頻繁に更新されるデータにも対応しながら、学習時の高スループットでの処理にも対応
  • スケーラビリティを持ち、バージョン管理可能であり、バージョン間の差分をトラックできる
  • ダイナミックレンジクエリ、ポイントクエリに対応し、ストリームI/Oアクセスも可能

in-flightのデータ保持にはKVSを使用し、高頻度での並列での書き込みに対応する。 ★重要 スナップショットが取られ、クラウドストレージ上に置かれる。 ★重要

おかれたデータは読み取りに最適化されたものになる。

スナップショットに対する変更は、新しいスナップショットとなって書き込まれる(論文ではcurateされた、と表現されていた)。 つまり、スナップショットはイミュータブルな扱いとなる。

in-flightのデータとcurateされたデータをつなぐサブシステムを論文ではdata-pipeと呼ぶ。

MLdpでは複数のデータストアを取り扱うが、ユニファイドなアクセス手段を提供する。 ★重要

in-flightのデータストアは頻繁に更新されるため、それをそのまま機械学習の入力データに することはあまりない。というのも、再現性を担保するのが難しいからだ。

MLdpではデータをパーティション化して保存するが、パーティションキーはユーザが 直接指定するわけではない。 その代わりソートキーを指定する。 ただし、ソートキーが指定されなかった場合は、システムがハッシュをキーに割り当て パーティションを構成するようにする。

パーティションは複数のブロックで構成される。

データの変更が少ないケースでは、バージョン間で多くのブロックを共有できる。 copy-on-writeで変更のあったブロックが生成される。

パーティションとブロック

(感想)多くの分散ファイルシステムで見られる構成。

このブロックで構成することで、MLdpのクライアントは興味のあるブロックにだけアクセスすればよくなる。 またブロックに対するストリームI/Oも可能なので、MLフレームワークからストリームI/Oでアクセスできる。

インデックスもある。そのためブロック単位でのスキャンも可能だし、インデックスに基づくピンポイントでの検索も可能。

セカンダリインデックスの仕組みもある。 ★重要

セカンダリインデックスの仕組み

セカンダリインデックスを用いて、各ブロック内のレコードを取りに行くことになる。

MLの計算クラスタにはキャッシュの仕組みが組み込まれており、 透過的に利用できるようになっている。 データアクセスし、もしキャッシュヒットしたら、キャッシュからデータを返す。 キャッシュシステムが停止している場合は、通常のデータアクセスにフォールバックする。

キャッシュシステムの目的は、レイテンシの低減とReadのスケールアウト。

キャッシュはイミュータブルデータを対象とし、読み取り専用。この単純化のおかげで、結果一貫性などに伴う異常を排除。

FUTURE WORK

将来の課題:データ探索性、システム改善、エコシステムインテグレーション。

データ分析者は、適切なデータを探す。 human-in-the-loop。

カタログは知っている情報を探し当てるもの。データ探索は知らない情報を明らかにすること。両者は異なる。 データ探索の活動はそれ自体が機械学習タスクであることもある。

この手のデータ探索はドメインスペシフィックではあるが、共通的な?システム要件もある。 例えばコンピュータビジョンなど。

ヘテロジーニアスなデータフォーマットをユニファイドな表現に変換できるようにしたい。

システム的な改善に関する2種類の観点:

  • レイテンシを下げ、スループットを上げる
  • human-in-the-loopを減らす

インテントベースのキャッシュを開発したい。つまり、明示的にキャッシュ化するのではなく、頻繁に利用されるデータをキャッシュするようにする、など。

プリフェッチの改善、ローカルバッファリングも改善ネタ。 しかし学習の際にランダマイズが必要なのが難しいところ。

Spark等との連携も課題。

各フレームワークは独自のデータ保持方式を持っている。 MLdpを利用しやすいようにしたい。できるだけユーザコードを変更せずに利用できるようにしたい。

データ保持方式に大きく分けて2種類:

" MLdpの独自の方式を採用し、都度変換する。 " もうひとつはTFRecordなどのフレームワークネイティブの方式を採用すること

一長一短。

共有

Abstract of Analytics Zoo

参考

メモ

Analytics Zooの公式ドキュメント の内容から、ひとことで書いてある文言を拾うと...以下の通り。

0.2.0時代:

Analytics + AI Platform for Apache Spark and BigDL.

2019/12現在:

A unified analytics + AI platform for distributed TensorFlow, Keras, PyTorch and BigDL on Apache Spark

とのこと。対応範囲が拡大していることがわかる。

動作確認

Python環境構築

今回は簡易的にPythonから触る。

pipenvで3.6.9環境を作り、以下のライブラリをインストールした。

  • numpy
  • scipy
  • pandas
  • scikit-learn
  • matplotlib
  • seaborn
  • wordcloud
  • jupyter

pipenvファイルは以下の通り。 https://github.com/dobachi/analytics-zoo-example/blob/master/Pipfile

なお、 Analytics Zooの公式ドキュメント では、Hadoop YARNでローンチする方法などいくつかの方法が掲載されている。

Exampleを動かしてみようとするが、pipでインストールしたところ jupyter-with-zoo.sh は以下の箇所にあった。

1
$HOME/.local/share/virtualenvs/analytics-zoo-example-hS4gvn-H/lib/python3.6/site-packages/zoo/share/bin/jupyter-with-zoo.sh

そこで環境変数を以下のように設定した。

1
2
$ export ANALYTICS_ZOO_HOME=`pipenv --venv`/lib/python3.6/site-packages/zoo/share
$ export SPARK_HOME=`pipenv --venv`/lib/python3.6/site-packages/pyspark

この状態で、 ${ANALYTICS_ZOO_HOME}/bin/jupyter-with-zoo.sh --master local[*] を実行しようとしたのだが、 jupyter-with-zoo.sh 内で用いられている analytics-zoo-base.sh の中で、

1
2
3
4
if [[ ! -f ${ANALYTICS_ZOO_PY_ZIP} ]]; then
echo "Cannot find ${ANALYTICS_ZOO_PY_ZIP}"
exit 1
fi

の箇所に引っかかりエラーになってしまう。 そもそも環境変数 ANALYTICS_ZOO_PY_ZIP は任意のオプションの値を指定するための環境変数であるため、このハンドリングがおかしいと思われる。 どうやら、pipenvでインストールした0.6.0では、Jupyterとの連係周りがまだ付け焼き刃のようだ。 とりあえず、今回の動作確認では利用しない環境変数であるため、関係個所をコメントアウトして実行することとする。

上記対応の後、以下のようにpipenv経由でJupyter PySparkを起動。

1
$ pipenv run ${ANALYTICS_ZOO_HOME}/bin/jupyter-with-zoo.sh --master local[*]

試しに実行しようとしたが、以下のエラー

1
Py4JError: com.intel.analytics.zoo.pipeline.nnframes.python.PythonNNFrames does not exist in the JVM
共有

Kafka Streams with ML

参考

kaiwaehner kafka-streams-machine-learning-examples

メモ

kaiwaehner/kafka-streams-machine-learning-examples

kaiwaehner kafka-streams-machine-learning-examples には、TensorFlow、Keras、H2O、Python、DL4JをKafka Streamsと併用するサンプルが含まれている。

上記レポジトリのREADMEには、いくつか参考文献が記載されている。

How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka には、

  • デザインの概略みたいなものが載っていた
  • 学習と推論のモデルやりとりをどうつなぐのか?の考察

が記載されていた。

Using Apache Kafka to Drive Cutting-Edge Machine Learning にはモデルの組み込み方の種類、 AutoMLとの組み合わせについて考察(Not 具体例)が掲載されている。

Machine Learning with Python, Jupyter, KSQL and TensorFlow には、以下のような記述がある。 いつもの論文とセット。

Impedance mismatch between data scientists, data engineers and production engineers

これを解決する手段としていくつか例示。

  • ONNX、PMMLなどを利用
  • DL4Jなど開発者視点の含まれるプロダクトの利用
  • AutoML
  • モデルを出力してアプリに埋め込んで利用(TensorFlowだったらJava APIでモデルを利用する、など)
  • パブリッククラウドのマネージドサービス利用

ConfluentのKafka Python KSQL APIを使い、Jupyter上でKafkaからデータロードし分析する例も記載されていた。

kaiwaehner ksql-udf-deep-learning-mqtt-iot には、UDF内でTensorFlowを使う例が記載されている。

kaiwaehner ksql-fork-with-deep-learning-function には、エンドツーエンドで動作を確認してみるためのサンプル実装が載っている。

kaiwaehner tensorflow-serving-java-grpc-kafka-streams には、gRPC経由でKafka StreamsとTensorFlow Servingを連係する例が記載されている。

Convolutional Neural Network (CNN) with TensorFlow for Image Recognition

com.github.megachucky.kafka.streams.machinelearning.Kafka_Streams_TensorFlow_Image_Recognition_Example クラスを確認する。

概要

  • 本クラスは、予め学習されたTensorFlowのモデルを読み出して利用する
  • 上記モデルを利用するときには、TensorFlowのJava APIを利用する
  • 画像はどこかのストレージに保存されている前提となっており、そのPATHがメッセージとしてKafkaに入ってくるシナリオである
  • 画像に対するラベルのProbabilityを計算し、最大のProbabilityを持つラベルを戻り値として返す

詳細メモ

main内では特別なことはしていない。トポロジを組み立てるための getStreamTopology が呼ばれる。

getStreamTopology メソッドを確認する。

当該メソッドでは、最初にモデル本体やモデルの定義が読み込まれる。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:83

1
2
3
4
5
6
7
String modelDir = "src/main/resources/generatedModels/CNN_inception5h";

Path pathGraph = Paths.get(modelDir, "tensorflow_inception_graph.pb");
byte[] graphDef = Files.readAllBytes(pathGraph);

Path pathModel = Paths.get(modelDir, "imagenet_comp_graph_label_strings.txt");
List<String> labels = Files.readAllLines(pathModel, Charset.forName("UTF-8"));

続いてストリームのインスタンスが生成される。 その後、ストリーム処理の内容が定義される。

最初はメッセージ内に含まれる画像のPATHを用いて、実際の画像をバイト列で読み出す。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:104

1
2
3
4
5
6
7
8
9
10
11
12
KStream<String, Object> transformedMessage =
imageInputLines.mapValues(value -> {

String imageClassification = "unknown";
String imageProbability = "unknown";

String imageFile = value;

Path pathImage = Paths.get(imageFile);
byte[] imageBytes;
try {
imageBytes = Files.readAllBytes(pathImage);

つづいていくつかのヘルパメソッドを使って、画像に対するラベル(推論結果)を算出する。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:117

1
2
3
4
5
6
7
8
9
10
11
try (Tensor image = constructAndExecuteGraphToNormalizeImage(imageBytes)) {
float[] labelProbabilities = executeInceptionGraph(graphDef, image);
int bestLabelIdx = maxIndex(labelProbabilities);

imageClassification = labels.get(bestLabelIdx);

imageProbability = Float.toString(labelProbabilities[bestLabelIdx] * 100f);

System.out.println(String.format("BEST MATCH: %s (%.2f%% likely)", imageClassification,
labelProbabilities[bestLabelIdx] * 100f));
}

constructAndExecuteGraphToNormalizeImage メソッドは、グラフを構成し、前処理を実行する。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:171

1
2
3
4
5
6
7
8
9
final Output input = b.constant("input", imageBytes);
final Output output = b
.div(b.sub(
b.resizeBilinear(b.expandDims(b.cast(b.decodeJpeg(input, 3), DataType.FLOAT),
b.constant("make_batch", 0)), b.constant("size", new int[] { H, W })),
b.constant("mean", mean)), b.constant("scale", scale));
try (Session s = new Session(g)) {
return s.runner().fetch(output.op().name()).run().get(0);
}

executeInceptionGraph メソッドは、予め学習済みのモデルと画像を引数にとり、 ラベルごとのProbabilityを算出する。

com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Image_Recognition_Example.java:184

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
try (Graph g = new Graph()) {

// Model loading: Using Graph.importGraphDef() to load a pre-trained Inception
// model.
g.importGraphDef(graphDef);

// Graph execution: Using a Session to execute the graphs and find the best
// label for an image.
try (Session s = new Session(g);
Tensor result = s.runner().feed("input", image).fetch("output").run().get(0)) {
final long[] rshape = result.shape();
if (result.numDimensions() != 2 || rshape[0] != 1) {
throw new RuntimeException(String.format(
"Expected model to produce a [1 N] shaped tensor where N is the number of labels, instead it produced one with shape %s",
Arrays.toString(rshape)));
}
int nlabels = (int) rshape[1];
return result.copyTo(new float[1][nlabels])[0];
}
}

executeInceptionGraph メソッドにより、ある画像に対するラベルごとのProbabilityが得られた後、 最大のProbabilityを持つラベルを算出。 それを戻り値とする。

Iris Prediction using a Neural Network with DeepLearning4J (DL4J)

Iris Prediction using a Neural Network with DeepLearning4J (DL4J) を確認する。

com.github.megachucky.kafka.streams.machinelearning.models.DeepLearning4J_CSV_Model_Inference クラスを確認したが、これはKafka Streamsアプリには見えなかった。 中途半端な状態で止まっている? これをベースに Kafka Streams のアプリを作ってみろ、ということか。

もしくはunitテスト側を見ろ、ということか? -> そのようだ。 Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java

クラスの内容を確認する。

まずテストのセットアップ。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:46

1
2
3
4
5
6
7
8
9
10
11
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new TestEmbeddedKafkaCluster(1);

private static final String inputTopic = "IrisInputTopic";
private static final String outputTopic = "IrisOutputTopic";

// Generated DL4J model
private File locationDL4JModel = new File("src/main/resources/generatedModels/DL4J/DL4J_Iris_Model.zip");

// Prediction Value
private static String irisPrediction = "unknown";

TestEmbeddedKafkaClusterはテスト用のKafkaクラスタを起動するヘルパークラス。 内部的には、Kafka Streamsのテストに用いられている補助機能である org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster クラスを継承して作られている。

機械学習モデルは、予め学習済みのものが含まれているのでそれを読み込んで用いる

テストコードの実態は、 com.github.megachucky.kafka.streams.machinelearning.test.Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest#shouldPredictIrisFlowerType である。 以降、当該メソッド内を確認する。

メソッドの冒頭で、Kafka Streamsの設定を定義している。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:67

1
2
3
4
5
6
7
8
		// Iris input data (the model returns probabilities for input being each of Iris
// Type 1, 2 and 3)
List<String> inputValues = Arrays.asList("5.4,3.9,1.7,0.4", "7.0,3.2,4.7,1.4", "4.6,3.4,1.4,0.3");

(snip)

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

このサンプルは動作確認用のため簡易な設定になっている。 実際のアプリケーション開発時にはきちんと設定必要。

なお、途中でDL4Jのモデルを読み込んでいる。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:86

1
2
// Create DL4J object (see DeepLearning4J_CSV_Model.java)
MultiLayerNetwork model = ModelSerializer.restoreMultiLayerNetwork(locationDL4JModel);

その後、ビルダのインスタンスを生成し、Irisデータを入力とするストリームを定義する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:97, 104

1
2
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> irisInputLines = builder.stream(inputTopic);

その後はストリームのメッセージに対し、DL4Jのモデルによる推論を実行する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:108

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
irisInputLines.foreach((key, value) -> {

if (value != null && !value.equals("")) {
System.out.println("#####################");
System.out.println("Iris Input:" + value);

// TODO Easier way to map from String[] to double[] !!!
String[] stringArray = value.split(",");
Double[] doubleArray = Arrays.stream(stringArray).map(Double::valueOf).toArray(Double[]::new);
double[] irisInput = Stream.of(doubleArray).mapToDouble(Double::doubleValue).toArray();

// Inference
INDArray input = Nd4j.create(irisInput);
INDArray result = model.output(input);

System.out.println("Probabilities: " + result.toString());

irisPrediction = result.toString();

}

});

ここでは入力されたテキストデータをDouble型の配列に変換し、さらにND4JのINDArrayに変換する。 それをモデルに入力し、推論を得る。

その後、テキストを整形し、出力用トピックに書き出し。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:132

1
2
3
4
5
6
// Transform message: Add prediction information
KStream<String, Object> transformedMessage = irisInputLines
.mapValues(value -> "Prediction: Iris Probability => " + irisPrediction);

// Send prediction information to Output Topic
transformedMessage.to(outputTopic);

ビルダを渡し、ストリーム処理をスタート。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:140

1
2
3
4
5
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
System.out.println("Iris Prediction Microservice is running...");
System.out.println("Input to Kafka Topic 'IrisInputTopic'; Output to Kafka Topic 'IrisOutputTopic'");

その後、これはテストコードなので、入力となるデータをアプリケーション内で生成し、入力トピックに書き込む。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_MachineLearning_DL4J_DeepLearning_Iris_IntegrationTest.java:149

1
2
3
4
5
6
7
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig, new MockTime());

ここでは結合テスト用のヘルパーメソッドを利用。

その後、出力トピックから結果を取り出し、確認する。(実装解説は省略)

Python + Keras + TensorFlow + DeepLearning4j

例のごとく、テスト側が実装本体。 Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest クラスを確認する。

実態は shouldPredictValues メソッド。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:64

1
public void shouldPredictValues() throws Exception {

上記メソッド内では、最初にモデルを読み込む。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:69

1
2
3
4
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
System.out.println(simpleMlp.toString());

MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);

上記では、HDF形式で予め保存されたモデルを読み込む。 読み込みの際にはDeeplearning4Jの KerasModelImport#importKerasSequentialModelAndWeights メソッドが用いられる。

続いて、Kafka Streamsのコンフィグを定める。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:81

1
2
3
4
5
6
7
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kafka-streams-tensorflow-keras-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

次にKafka Streamsのビルダを定義し、入力トピックを渡して入力ストリームを定義する。

1
2
3
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> inputEvents = builder.stream(inputTopic);

以降、メッセージを入力とし、推論を行う処理の定義が続く。

先程定義したストリームの中は、カンマ区切りのテキストになっている。 これをカンマで区切り、配列に変換する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:104

1
2
3
4
5
6
inputEvents.foreach((key, value) -> {

// Transform input values (list of Strings) to expected DL4J parameters (two
// Integer values):
String[] valuesAsArray = value.split(",");
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));

配列への変換には、Nd4Jを用いる。

配列の定義が完了したら、それを入力としてモデルを用いた推論を行う処理を定義する。

1
2
3
4
5

output = model.output(input);
prediction = output.toString();

});

最後は出力メッセージに変換し、出力トピックへの書き出しを定義する。 その後、独自のユーティリティクラスを使って、ビルダーに基づいてストリームをビルド。 ストリーム処理を開始する。

com/github/megachucky/kafka/streams/machinelearning/test/Kafka_Streams_TensorFlow_Keras_Example_IntegrationTest.java:118

1
2
3
4
5
6
7
8
9
10
11
12
13
// Transform message: Add prediction result
KStream<String, Object> transformedMessage = inputEvents.mapValues(value -> "Prediction => " + prediction);

// Send prediction result to Output Topic
transformedMessage.to(outputTopic);

// Start Kafka Streams Application to process new incoming messages from
// Input Topic
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
System.out.println("Prediction Microservice is running...");
System.out.println("Input to Kafka Topic " + inputTopic + "; Output to Kafka Topic " + outputTopic);
共有

Behavioral economics

参考

メモ

それは「人は、出された質問が難しいと、それを簡単な質問に置き換えてしまう」ように脳ができているからだ。 これは、心理学と経済学の融合を果たした「行動経済学」という学問分野の知見による。

という記述を皮切りに、行動経済学の初歩を紹介するブログ。

共有

Studying Software Engineering Patterns for Designing Machine Learning Systems

参考

メモ

スライド に大まかな内容が記載されている。

機械学習に関するソフトウェアエンジニアリングやアーキテクチャデザインパターンに関する調査。 良い/悪いソフトウェアエンジニアリングデザインパターンを調査。 「リサーチ質問」(RQ)という形でいくつか確認事項を用意し、 ヒアリングや検索エンジンによる調査を実施。 SLR(Systematic literature review)手法にのっとり、調査結果を検証。

ヒアリングは760超の開発者を対処に実施し、1%の回答者を得た。

調査は、23個の論文、追加の6個の論文、48個のグレードキュメントを調査。 アカデミックではシステムエンジニアリングデザインパターンの文献は少ない。 グレードキュメントでは多数。データマネジメント観点が多い。

パターン整理では以下の軸を用いた。

  • Microsoftのパイプライン(9ステージ)
  • ISO/IEC/IEEE 12207:2008 standard

ドキュメント群から69個のパターンを抽出。MLのアーキテクチャやデザインパターン関係は33個。

興味深かった図

業務ロジックとMLロジックを分離。 メンテナンス性が上がるだろうが、果たしてロジックを現実的に分離可能かどうかはやや疑問。

Fig. 2. Structure of Distinguish Business Logic from ML Model pattern

モデルに関するドキュメントが多数。

TABLE III CLASSIFICATION OF THE IDENTIFIED PATTERNS
共有

Git SDK for Windows

参考

メモ

pacmanをgit bash上で使いたいと思ったのだが、git-sdkはいまいち。

共有

Docker on VMWare

参考

メモ

Hyper-VでなくVMwareでDocker for Windows を使う に習った。

Docker環境

手元の環境は Windows + VMWare だったので、Vagrant + VMWareで環境を構築し、その中にDockerをインストールすることにした。

Get Docker Engine - Community for Ubuntu を参考に、dockerをインストールするようにした。 参考までに、Vagrantfileは以下のような感じである。 ubuntu18系をベースとしつつ、プロビジョニングの際にDockerをインストールする。

ただし、以下のVagrantfileのプロビジョニングでは、DockerサービスにTCPで接続することを許可するようにしている。 これはセキュリティ上問題があるため、注意して取り扱うこと。(あくまで動作確認程度に留めるのがよさそう)

Vagrantfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Vagrant.configure("2") do |config|
config.vm.define "docker-01" do |config|
config.vm.box = "bento/ubuntu-18.04"
config.vm.network :private_network, ip: "172.16.19.220"
config.vm.hostname = "docker-01"
end

config.vm.provision "shell", inline: <<-SHELL
apt-get update
apt-get install -y \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
apt-key fingerprint 0EBFCD88
add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
apt-get update
apt-get install -y docker-ce docker-ce-cli containerd.io
sed -i "s;fd://;tcp://0.0.0.0:2375;g" /lib/systemd/system/docker.service
SHELL

end

WSLなどから以下のようにすることで、VM内のDockerにアクセスできる。

1
$ sudo docker --host tcp://172.16.19.220:2375 run hello-world

環境変数として、DOCKER_HOSTを設定しておいても良いだろう。

共有