Automating ML model training and deployments via metadata-driven data, infrastructure, feature engineering, and model management

参考

メモ

以下の内容が記載されている。

ワークフロー。 よくあるワークフローなので特筆なし。

無数のストリームイベント。 数億のレコードアップデート。 数千万の顧客アカウント。

メタデータ管理、特徴量ストア、モデルサービング、パイプラインオートメーション。

共有

Questioning the Lambda Architecture

参考

メモ

Questioning the Lambda Architecture にてJay Krepsが初めて言及したとされているようだ。 過去に読んだが忘れたので改めて、いかにメモを記載する。

まとめ

主張としては、ストリーム処理はKafkaの登場により、十分に使用に耐えうるものになったため、 バッチ処理と両方使うのではなく、ストリーム処理1本で勝負できるのでは?ということだった。

気になった文言を引用

ラムダアーキテクチャについて:

The Lambda Architecture is an approach to building stream processing applications on top of MapReduce and Storm or similar systems.

Lambdaアーキテクチャのイメージ

バッチ処理とストリーム処理で2回ロジックを書く。:

You implement your transformation logic twice, once in the batch system and once in the stream processing system.

レコメンデーションシステムを例にとる:

A good example would be a news recommendation system that needs to crawl various news sources, process and normalize all the input, and then index, rank, and store it for serving.

データを取り込み、イミュターブルなものとして扱うことはありだと思う:

I’ve written some of my thoughts about capturing and transforming immutable data streams

I have found that many people who attempt to build real-time data processing systems don’t put much thought into this problem and end-up with a system that simply cannot evolve quickly because it has no convenient way to handle reprocessing.

リアルタイム処理が本質的に近似であり、バッチ処理よりも弱く、損失しがち、という意見があるよね、と。:

One is that real-time processing is inherently approximate, less powerful, and more lossy than batch processing.

ラムダアーキテクチャの利点にCAP定理との比較が持ち出されることを引き合いに出し、ラムダアーキテクチャがCAP定例を克服するようなものではない旨を説明。:

Long story short, although there are definitely latency/availability trade-offs in stream processing, this is an architecture for asynchronous processing, so the results being computed are not kept immediately consistent with the incoming data. The CAP theorem, sadly, remains intact.

結局、StormとHadoopの両方で同じ結果を生み出すアプリケーションを 実装するのがしんどいという話。:

Programming in distributed frameworks like Storm and Hadoop is complex.

ひとつの解法は抽象化。:

Summingbird

とはいえ、2重運用はしんどい。デバッグなど。:

the operational burden of running and debugging two systems is going to be very high.

結局のところ、両方を同時に使わないでくれ、という結論:

These days, my advice is to use a batch processing framework like MapReduce if you aren’t latency sensitive, and use a stream processing framework if you are, but not to try to do both at the same time unless you absolutely must.

ストリーム処理はヒストリカルデータの高スループットでの処理に向かない、という話もあるが…:

When I’ve discussed this with people, they sometimes tell me that stream processing feels inappropriate for high-throughput processing of historical data.

バッチ処理もストリーム処理も抽象化の仕方は、DAGをベースにしたものであり、 その点では共通である、と。:

But there is no reason this should be true. The fundamental abstraction in stream processing is data flow DAGs, which are exactly the same underlying abstraction in a traditional data warehouse (a la Volcano) as well as being the fundamental abstraction in the MapReduce successor Tez.

ということでKafka。:

Use Kafka

提案アーキテクチャ

Kafkaに入れた後は、HDFS等に簡単に保存できる。:

Kafka has good integration with Hadoop, so mirroring any Kafka topic into HDFS is easy.

このあと少し、Kafkaの説明が続く。

この時点では、Event Sourcing、CQRSについての言及あり。

Indeed, a lot of people are familiar with similar patterns that go by the name Event Sourcing or CQRS.

LinkedInにて、JayはSamzaを利用。

I know this approach works well using Samza as the stream processing system because we do it at LinkedIn.

提案手法の難点として、一時的に2倍の出力ストレージサイズが必要になる。

However, my proposal requires temporarily having 2x the storage space in the output database and requires a database that supports high-volume writes for the re-load.

単純さを大事にする。:

So, in cases where simplicity is important, consider this approach as an alternative to the Lambda Architecture.

所感

当時よりも、最近のワークロードは複雑なものも含めて期待されるようになっており、 ますます「バッチ処理とストリーム処理で同じ処理を実装する」というのがしんどくなっている印象。

共有

ML Ops: Machine Learning as an Engineering Discipline

参考

メモ

読んでみた感想をまとめる。

感想

結論としては、まとめ表がよくまとまっているのでそれでよい気がする。 この表をベースに、アクティビティから必要なものを追加するか? -> まとめ表

演繹的ではなく、帰納的な手法であるため、もとになったコードとデータの両方が重要。

データサイエンティスト、MLエンジニア、DevOpsエンジニア、データエンジニア。 MLエンジニア。あえてエンジニアと称しているのは、ソフトウェア開発のスキルを有していることを期待するから。

フェアネスの考慮、というか機械学習の倫理考慮をどうやって機械的に実現するのか、というのはかねてより気になっていた。 フェアネスの考慮などを達成するためには、テストデータセットの作り方、メトリクスの作り方に工夫するとよい。つまり、男女でそれぞれ個別にもテストするなど。 まだ一面ではあるが、参考になった。

気になる文言の抜粋

以下の文言が印象に残った。 ほかのレポートでも言われていることに見える。

Deeplearning.ai reports that “only 22 percent of companies using machine learning have successfully deployed a model”.

このレポートがどれか気になった。 Deeplearning.aiのレポート か。

確かに以下のように記載されている。

Although AI budgets are on the rise, only 22 percent of companies using machine learning have successfully deployed a model, the study found.

データが大切論:

ML is not just code, it’s code plus data

training data, which will affect the behavior of the model in production

ML = Code + Dataの図

It never stops changing, and you can’t control how it will change.

確かに、データはコントロールできない。

Data Engineering

ML Opsの概念図

チーム構成について言及あり。:

But the most likely scenario right now is that a successful team would include a Data Scientist or ML Engineer, a DevOps Engineer and a Data Engineer.

データサイエンティストのほかに、明確にMLエンジニア、DevOpsエンジニア、データエンジニアを入れている。 基盤エンジニアはデータエンジニアに含まれるのだろうか。

Even if an organization includes all necessary skills, it won’t be successful if they don’t work closely together.

ノートブックに殴り書かれたコードは不十分の話:

getting a model to work great in a messy notebook is not enough.

ML Engineers

データパイプラインの話:

data pipeline

殴り書きのコードではなく、適切なパイプラインはメリットいくつかあるよね、と。:

Switching to proper data pipelines provides many advantages in code reuse, run time visibility, management and scalability.

トレーニングとサービングの両方でパイプラインがあるけど、 入力データや変換内容は、場合によっては微妙に異なる可能性がある、と。:

Most models will need 2 versions of the pipeline: one for training and one for serving.

ML Pipelineは特定のデータから独立しているためCICDと連携可能

For example, the training pipeline usually runs over batch files that contain all features, while the serving pipeline often runs online and receives only part of the features in the requests, retrieving the rest from a database.

いくつかTensorFlow関係のツールが紹介されている。確認したほうがよさそう。:

TensorFlow Pipeline

TensorFlow Transform

バージョン管理について:

In ML, we also need to track model versions, along with the data used to train it, and some meta-information like training hyperparameters.

Models and metadata can be tracked in a standard version control system like Git, but data is often too large and mutable for that to be efficient and practical.

コードのラインサイクルと、モデルのライフサイクルは異なる:

It’s also important to avoid tying the model lifecycle to the code lifecycle, since model training often happens on a different schedule.

It’s also necessary to version data and tie each trained model to the exact versions of code, data and hyperparameters that were used.

Having comprehensive automated tests can give great confidence to a team, accelerating the pace of production deployments dramatically.

モデルの検証は本質的に、統計に基づくものになる。というのも、そもそもモデルの出力が確率的だし、入力データも変動する。:

model validation tests need to be necessarily statistical in nature

Just as good unit tests must test several cases, model validation needs to be done individually for relevant segments of the data, known as slices.

Data validation is analogous to unit testing in the code domain.

ML pipelines should also validate higher level statistical properties of the input.

TensorFlow Data Validation

Therefore, in addition to monitoring standard metrics like latency, traffic, errors and saturation, we also need to monitor model prediction performance.

An obvious challenge with monitoring model performance is that we usually don’t have a verified label to compare our model’s predictions to, since the model works on new data.

まとめ表
共有

jvm profiler for Spark at Uber

参考

メモ

概要

Sparkのエグゼキュータ等のJVMプロファイリングを行うためのライブラリ。

JVM起動時に、agentとしてアタッチするようにする。

executorプロセスのUUIDを付与しながらメトリクスを取得できるようだ。 Kafkaに流すことも可能。

公式GitHub のREADMEによると、オフヒープの使用量なども計測できており、チューニングに役立ちそう。

20200107時点では、2か月前ほどに更新されており、まだ生きているプロジェクトのようだ。

動作確認

公式GitHub のREADMEに記載されていた手順で、パッケージをビルドし、ローカルモードのSparkで利用してみた。

ビルド方法:

1
$ mvn clean package

パッケージビルド結果は、 target/jvm-profiler-1.0.0.jar に保存される。

これをjavaagentとして用いる。 渡すオプションは、 --conf spark.executor.driverJavaOptions=-javaagent:${JVMPROFILER_HOME}/target/jvm-profiler-1.0.0.jar である。 環境変数 ${JVMPROFILER_HOME} は先ほどビルドしたレポジトリのPATHとする。

また、今回は com.uber.profiling.reporters.FileOutputReporter を用いて、ファイル出力を試みることとする。

結果的に、Sparkの起動コマンドは、以下のような感じになる。:

1
$ ${SPARK_HOME}/bin/spark-shell --conf spark.driver.extraJavaOptions=-javaagent:/home/ubuntu/Sources/jvm-profiler/target/jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=/tmp/jvm-profile

ここで

  • 環境変数 ${SPARK_HOME} はSparkを配備したPATHである
  • ディレクトリ /tmp/jvm-profile は予め作成しておく

とする。

生成されるレコードは、以下のようなJSONである。CpuAndMemory.jsonの例は以下の通り。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
{
"heapMemoryMax": 954728448,
"role": "driver",
"nonHeapMemoryTotalUsed": 156167536,
"bufferPools": [
{
"totalCapacity": 20572,
"name": "direct",
"count": 10,
"memoryUsed": 20575
},
{
"totalCapacity": 0,
"name": "mapped",
"count": 0,
"memoryUsed": 0
}
],
"heapMemoryTotalUsed": 400493400,
"vmRSS": 812081152,
"epochMillis": 1578408135107,
"nonHeapMemoryCommitted": 157548544,
"heapMemoryCommitted": 744488960,
"memoryPools": [
{
"peakUsageMax": 251658240,
"usageMax": 251658240,
"peakUsageUsed": 37649152,
"name": "Code Cache",
"peakUsageCommitted": 38010880,
"usageUsed": 37649152,
"type": "Non-heap memory",
"usageCommitted": 38010880
},
{
"peakUsageMax": -1,
"usageMax": -1,
"peakUsageUsed": 104054944,
"name": "Metaspace",
"peakUsageCommitted": 104857600,
"usageUsed": 104054944,
"type": "Non-heap memory",
"usageCommitted": 104857600
},
{
"peakUsageMax": 1073741824,
"usageMax": 1073741824,
"peakUsageUsed": 14463440,
"name": "Compressed Class Space",
"peakUsageCommitted": 14680064,
"usageUsed": 14463440,
"type": "Non-heap memory",
"usageCommitted": 14680064
},
{
"peakUsageMax": 336592896,
"usageMax": 243269632,
"peakUsageUsed": 247788352,
"name": "PS Eden Space",
"peakUsageCommitted": 250085376,
"usageUsed": 218352416,
"type": "Heap memory",
"usageCommitted": 239075328
},
{
"peakUsageMax": 58195968,
"usageMax": 55050240,
"peakUsageUsed": 43791112,
"name": "PS Survivor Space",
"peakUsageCommitted": 58195968,
"usageUsed": 43791112,
"type": "Heap memory",
"usageCommitted": 55050240
},
{
"peakUsageMax": 716177408,
"usageMax": 716177408,
"peakUsageUsed": 138349872,
"name": "PS Old Gen",
"peakUsageCommitted": 450363392,
"usageUsed": 138349872,
"type": "Heap memory",
"usageCommitted": 450363392
}
],
"processCpuLoad": 0.02584087025382403,
"systemCpuLoad": 0.026174300837744344,
"processCpuTime": 49500000000,
"vmHWM": 812081152,
"appId": "local-1578407721611",
"vmPeak": 4925947904,
"name": "24974@ubuec2",
"host": "ubuec2",
"processUuid": "38d5c63f-d70d-4e4d-9d54-a2381b9c37a7",
"nonHeapMemoryMax": -1,
"vmSize": 4925947904,
"gc": [
{
"collectionTime": 277,
"name": "PS Scavenge",
"collectionCount": 16
},
{
"collectionTime": 797,
"name": "PS MarkSweep",
"collectionCount": 4
}
]
}
  • nonヒープのメモリ使用量についても情報あり
  • ヒープについては、RSSに関する情報もある
  • ヒープ内の領域に関する情報もあり、GCに関する情報もある
共有

The evolution of metadata: LinkedIn’s story

参考

メモ

LinkedInが提唱する Generalized Metadata Architecture (GMA) を基盤としたメタデータ管理システム。

コンセプトは、 スライド に記載されているが、ざっとアーキテクチャのイメージをつかむには、 アーキテクチャ がよい。

GMA

メタデータは自動収集。

これは標準化されたメタデータモデルとアクセスレイヤによるものである。

また標準モデルが、モデルファーストのアプローチを促進する。

Metadata Serving

Metadata Serving に記載あり。

RESTサービスは、LinkedInが開発していると思われるREST.liが用いられており、 DAOもその中の「Pegasus」という仕組みを利用している。

Key-Value DAO、Search DAO、Query DAOが定義されている。

上記GMAの通り、この辺りのDAOによるアクセスレイヤの標準化が見て取れる。

Metadata Ingestion Architecture

そもそも、メタデータに対する変更は MAE(Metadata Audit Event) としてキャプチャされる。

それがKafka Streamsのジョブで刈り取られ処理される。なお、シーケンシャルに処理されるための工夫もあるようだ。

共有

Pinot: Enabling Real-time Analytics Applications

参考

メモ

オンライン分析のアーキテクチャ

  • Join on the fly
    • ベーステーブルからクエリ実行のタイミングでデータ生成
  • Pre Join + Pre Aggregate
    • 分析で必要となるテーブルをストリーム処理等で事前作成
    • 事前処理はマスタデータのテーブルとの結合、など
  • Pre Join + Pre Aggregate + Pre Cube
    • さらに分析で求められる結果データを予め作成、インデックス化
    • 例えばAggregationしておく、など

レイテンシと柔軟性のトレードオフの関係:

Latency vs. Flexibility
「Who View」のアーキテクチャ

ユースケース

  • LinkedInのメンバーが見る分析レポート
    • QPSが高い(数千/秒 級)
    • レイテンシは数十ms~sub秒
  • インタラクティブダッシュボード
    • 様々なアグリゲーション
  • 異常検知

LinkedIn以外の企業では、

  • Uber
  • slack
  • MS Teams
  • Weibo
  • factual

あたりが利用しているようだ。 Uberは、自身の技術ブログでも触れていた。

ワークフロー

ワークフロー概要

Pinotは、原則としてPre Aggregation、Pre Cubeを前提とする仕組みなので、 スキーマの定義が非常に重要。

分散処理とIngestion

またバッチとストリーム両方に対応しているので、 それぞれデータ入力(Ingestion)を定義する。

データはセグメントに分けられ、サーバに分配される。

Segment Assignment

分散処理とクエリルーティング

セグメントに基づき、Brokerによりルーティングされる。

リアルタイムサーバとオフラインサーバ

ストリームから取り込んだデータとバッチで取り込んだデータは、 共通のスキーマを用いることになる。

したがって、統一的にクエリできる?

アーキテクチャの特徴

インデックス(Cube)の特徴

Scan、Inverted Index、Sorted Index、Star-Tree Indexを併用可能。

データ処理上の工夫

比較対象として、Druidがよく挙げられているが、Sorted Index、Star-Tree Indexがポイント。

カラムナデータフォーマットを用いるのは前提。 それに加え、Dictionary Encodeing、Bit Compressionを使ってデータ構造に基づいた圧縮を採用。

Inverted Index

転置インデックスを作っておく、という定番手法。

Sorted Index

予め、Dimensionに基づいてソートしておくことで、 フィルタする際にスキャン量を減らしながら、かつデータアクセスを効率化。

Sroted Indexの特徴

Star-Tree Index

すべてをCube化するとデータ保持のスペースが大量に必要になる。 そこで部分的にCube化する。 ★要確認

Star-Tree Indexの特徴

性能特性

Druidとの簡単な比較

Druidと簡易的に比較した結果が載っている。

まずは、レイテンシの小ささが求められるインタラクティブな分析における性能特徴:

Druidとの簡易比較

ミリ秒単位での分析を取り扱うことに関してDruidと共通だが、 各種インデックスのおかげか、Druidよりもパーセンタイルベースの比較で レイテンシが小さいとされている。

つづいて、あらかじめ定義されたクエリを大量にさばくユースケース:

Druidとの簡易比較2

レイテンシを小さく保ったまま、高いQPSを実現していることを 示すグラフが載っている。 この辺りは、工夫として載っていた各種インデックスを予め定義していることが強く効きそうだ。

続いて異常検知ユースケースの例:

Druidとの簡易比較3

データがSkewしていることが強調されているが、その意図はもう少し読み解く必要がありそう。 ★要確認

Star-Tree Indexについて

Star-Tree Index Powering Fast Aggregations on Pinot に記載あり。

共有

Configure Python of PySpark in Zeppelin

参考

メモ

ウェブフロントエンドのSpark Interpreterの設定において、 以下の2項目を設定した。

  • spark.pyspark.python
  • zeppelin.pyspark.python

上記2項目が同じ値で設定されていないと、実行時エラーを生じた。

参考)Pythonバージョンの確かめ方

ドライバのPythonバージョンの確かめ方

1
2
3
4
5
%spark.pyspark

import sys

print('Zeppelin python: {}'.format(sys.version))

ExecutorのPythonバージョンの確かめ方

1
2
3
4
5
6
7
%spark.pyspark

def print_version(x):
import sys
return sys.version

spark.sparkContext.parallelize(range(1, 3)).map(print_version).collect()
共有

markdown preview of vim

参考

メモ

Vim + Markdown のページに色々と載っていた。 前半はMarkdown編集のための補助ツール、後半はプレビューのツール。

プレビューとしては、結論として iamcco/markdown-preview.nvim を選んだ。

試したツール

  • shime/vim-livedown
    • 外部ツールが必要だがnpmで導入可能なので簡易。
    • 悪くなかったが、動作が不安定だったので一旦保留。
  • iamcco/markdown-preview.nvim
    • 外部ツールが必要だが予めプラグインディレクトリ内にインストールされるような手順になっている
      • ただし手元のWSL環境ではnpmインストールに失敗していたため手動インストールした
    • 今のところ馴染んでいるので利用している
  • previm/previm
    • 外部依存がなく悪くなさそうだった
    • WindowsのWSL環境で動作が不安定だったので一旦保留。

その他試したいもの

  • iwataka/minidown.vim
    • Vim + Markdown のブログ内で紹介されていた、ブログ主の作ったツール
    • ミニマムな機能で必要十分そう。外部依存が殆どないのが大きい

iamcco/markdown-preview.nvim を試す際に気をつけたこと

プラグイン内でnodejsを利用するのだが、そのライブラリインストールにyarnを 使うようになっている。

公式READMEの文言を以下に引用。

1
2
call dein#add('iamcco/markdown-preview.nvim', {'on_ft': ['markdown', 'pandoc.markdown', 'rmd'],
\ 'build': 'cd app & yarn install' })

個人的にはnpmを利用しているのでyarnではなくnpmでインストールするよう修正し実行した。 しかいs上記の通り、一部の環境ではプラグイン内にインストールする予定だったnpmライブラリが インストールされない事象が生じた。

その際は、自身で npm install すればよい。

1
2
$ cd ~/vimfiles/bundles/repos/github.com/iamcco/markdown-preview.nvim/app
$ npm install

なお、 ~/vimfiles/bundles のところは、各自のdeinレポジトリの場所を指定してほしい。

共有

Data Platform for Machine Learning

参考

メモ

Data Platform for Machine Learning を読んでみた。個人的要約メモを以下に記載する。

Abstract

これまでは数学的な試行錯誤(ここでは機械学習も含んでそう表現していると思われる)に適したデータプラットフォーム(データを保持し、サーブする仕組み、と解釈)がなかった。

既存のデータプラットフォームの課題はいくつかある。 いずれにせよ、データ管理の負荷はユーザビリティを下げる。 また、昨今求められるコンプライアンス機能(例:terms of use、privacy measure、auditなど)を すべて利用できるわけではない。

本論文では、Machine Learning Data Platform(MLdp)に求められるものをまとめ、Appleにおける経験とソリューションを紹介し、将来のアクションを紹介する。

Introduction

MLdp = Machine Learningのためのデータ管理システム

Motivation

前提とするワークフロー:

  • data collection
  • annotation
  • exploration
  • feature engineering
  • experimentation
  • evaluation
  • finally deployment

(補足)Appleにおけるワークフローのアブストラクションの考え方が分かるので参考として

強い課題感:サイロ化されたデータストア、サイロ化された知見・アクティビティ ★重要

サイロ化されたストアとアクティビティのイメージ

3種類の動機

  • エンジニアリングチームのサポート
  • MLライフサイクルのサポート
  • MLフレームワーク、データのサポート

ロール3種類 + 1

  • MLエンジニア
  • ソフトウェアエンジニア
  • データサイエンティスト
  • Legal Team

(補足)よく聞くのは、データサイエンティスト、データエンジニアだが、ここでは、 MLエンジニア、ソフトウェアエンジニア、Legal Teamが挙げられているのが特徴的。

MLでは、データはトラックされること、バージョン管理されることが必要 ★重要

MLdpが満たすべき要件 ★重要

  • 以下の2種類のデータに対するコンセプチャルな表現方法
    • ほとんど変化しない大量の永続データ
      • 学習データのセットのこと
      • 再現性担保のため学習データ自体はイミュータブルに扱う
    • 揮発性のデータ
      • アノテーションやラベルのこと
      • これらの情報は、業務次第で値を変える可能性がある
      • 学習データセット本体と比べて総量が小さい
  • 上記2種類の要件を満たすハイブリッドデータストア
    • さらにバージョニング可能な物理レイアウト
    • インクリメンタルな更新、デルタトラッキング、ML学習ワークロードへの最適化
  • 継続的なデータ入力に対するシンプルな機構
  • データ、特徴量エンジニアリングのためのDSL
  • 著名なMLフレームワークへのインターフェース
    • 著名なMLフレームワークでは独自のデータ表現をもっているが、一方でMLdpでは フレームワークをまたいだ共通的なデータ表現が必要、など
    • データ型も多様である
  • データセンタ、エッジの両方への対応。
    • データへの距離を意識しないとならない
  • 説明可能性を担保するためのバージョニング
  • データリネージのトラック、データソースのトラック
  • データ探索と発見
    • MLdpでは省力化のための中央集権的なアプローチを採用
  • コンプライアンスやプライバシーの考慮
MLプラットフォームのエコシステム一覧

3Vの話

関連研究

(ここでは一旦省略)

SYSTEM ARCHITECTURE AND DESIGN

アーキテクチャは、コントロールプレーン、データプレーンによる構成。 ★重要

MLdpのアーキテクチャ概要

各コンポーネントの特徴 ★重要

  • 概念データモデル
    • ローデータ、ローデータから生成されたデータ(アノテーション、特徴量)の両方とも表現可能とする
  • バージョン管理の仕組み
    • イミュータブルなデータのスナップショットを用いて、実験の再現性を担保する
  • データアクセスインターフェース
    • MLフレームワークや他のデータ処理エンジンとシームレスに連係可能とする
  • ハイブリッドなデータストア
    • 高頻度でのデータ入力とあまり変化しない大量のデータの両方に対応可能とする
  • ストレージレイアウト
    • バージョン間のデルタトラッキング
    • 分散学習のためのデータ並列化
    • データ探索と発見を可能とするインデックス
    • デバイスとデータセンタの両方での学習のためのストリームI/O
    • 学習タスクのためのキャッシュ

4種類のデータセット:dataset, annotation, split, and package に分けて考える。 ★重要

  • dataset:
    • データ本体。学習データに用いる、など。量、サイズが大きい。
  • annotatoin:
    • データ本体に付与したラベル、属性、メタデータなど。量、サイズはdatasetと比べて小さい。
  • split:
    • データ本体を分割したもの。
    • (補足)実体はdatasetであり、split自体は仮想的な定義と考えられる。
  • package:
    • dataset / split、annotationを組み合わせた仮想的なまとまり。 再現性担保のため、学習セットをまとめて管理するために利用。

以下補足。

datasetに対し、annotationやsplitは「weak object」。つまり、それ単体では存在しない。 annotationやsplitをdatasetとは別に管理することメリットは「multifold」である。 annotationもsplitもdatasetを変更せずに変更可能である。 annotationは個別のコンプライアンスポリシーを持つことがある。

バージョンの表現 -> <schema>.<revision>.<patch> ★重要 これにより、スキーマ変更あり・なしを明示しながら、バージョン管理を可能とする。

バージョン管理はユーザが明示的に行う。 ★重要

MLプロジェクト間でのデータシェアにおいて以下を実現。 ★重要

  • 他のプロジェクトへの影響を気にせず、そのプロジェクトの都合でデータをシェアし、スキーマを変更できる。
  • 特定のバージョンをピンどめする。再現性確保。
  • データのモデルバージョンの依存関係をトラック可能とする

MLプロジェクトにおいてはデータはインターフェースである。 ★名言

MLdpではオブジェクトは以下の状態を遷移する。 ★重要

  • draft
  • published
  • archived
  • purged

publishされたデータはイミュータブルになる。 ★重要 これは再現性を担保するため。

(補足)このオブジェクトのライフサイクルは、S3等のパブリッククラウドのオブジェクトストレージの機能と相性が良さそう。

MLdpでは、表形式のデータ表現を採用。 ★重要

カラムナ表現により、圧縮効率やIO効率の改善だけではなく、 カラムの追加削除にも対応しやすくなる。

Spark RDD、DataFrame、Apache DataFrame、Pandas、R DataFrameなどと互換性あり。

MLdpではファイルはバイト列として扱う。

またファイルに対して、ストリーム形式でのアクセスも可能とする。 ★重要

データアクセスのインタフェースのうち、ハイレベルAPIはサーバサイドのバッチ処理向き。つまり前処理などに適している。 ローレベルAPIは各種フレームワークとの連係向き。

クライアントでのデータ利用のため、MLdpはキャッシュ機能を持つ。 ★重要 (補足)このあたり、Appleだからこそ必要であり、一般企業は果たして必要だろうか。

SQLライクなインタフェースを提供。(その他のDSLもあるようだ)

現在、Pythonと連係可能

入力となるデータの例
データセット定義のクエリ例
既存モデルを読み込んで推論するクエリ例

MLのデータセットは、複数のファイルをまとめ、MLモデルから扱うことがある。

MLdpをマウントして扱うことができる。 いまのところクライアントサイドのシンプルなキャッシュ機構を有している。

またエッジでの利用を想定し、REST APIも提供。

MLdpをマウントしPATHに対して画像処理するクエリ例

セカンダリインデックスも利用可能。

セカンダリインデックスを使い、求める画像のみを扱うクエリ例

分散学習のためのスプリットを生成することもできる。

MLdpをマウントし、TFの機能を使ってスライスを作成

ストレージレイヤのデザインのポイントは以下の通り。

  • 頻繁に更新されるデータにも対応しながら、学習時の高スループットでの処理にも対応
  • スケーラビリティを持ち、バージョン管理可能であり、バージョン間の差分をトラックできる
  • ダイナミックレンジクエリ、ポイントクエリに対応し、ストリームI/Oアクセスも可能

in-flightのデータ保持にはKVSを使用し、高頻度での並列での書き込みに対応する。 ★重要 スナップショットが取られ、クラウドストレージ上に置かれる。 ★重要

おかれたデータは読み取りに最適化されたものになる。

スナップショットに対する変更は、新しいスナップショットとなって書き込まれる(論文ではcurateされた、と表現されていた)。 つまり、スナップショットはイミュータブルな扱いとなる。

in-flightのデータとcurateされたデータをつなぐサブシステムを論文ではdata-pipeと呼ぶ。

MLdpでは複数のデータストアを取り扱うが、ユニファイドなアクセス手段を提供する。 ★重要

in-flightのデータストアは頻繁に更新されるため、それをそのまま機械学習の入力データに することはあまりない。というのも、再現性を担保するのが難しいからだ。

MLdpではデータをパーティション化して保存するが、パーティションキーはユーザが 直接指定するわけではない。 その代わりソートキーを指定する。 ただし、ソートキーが指定されなかった場合は、システムがハッシュをキーに割り当て パーティションを構成するようにする。

パーティションは複数のブロックで構成される。

データの変更が少ないケースでは、バージョン間で多くのブロックを共有できる。 copy-on-writeで変更のあったブロックが生成される。

パーティションとブロック

(感想)多くの分散ファイルシステムで見られる構成。

このブロックで構成することで、MLdpのクライアントは興味のあるブロックにだけアクセスすればよくなる。 またブロックに対するストリームI/Oも可能なので、MLフレームワークからストリームI/Oでアクセスできる。

インデックスもある。そのためブロック単位でのスキャンも可能だし、インデックスに基づくピンポイントでの検索も可能。

セカンダリインデックスの仕組みもある。 ★重要

セカンダリインデックスの仕組み

セカンダリインデックスを用いて、各ブロック内のレコードを取りに行くことになる。

MLの計算クラスタにはキャッシュの仕組みが組み込まれており、 透過的に利用できるようになっている。 データアクセスし、もしキャッシュヒットしたら、キャッシュからデータを返す。 キャッシュシステムが停止している場合は、通常のデータアクセスにフォールバックする。

キャッシュシステムの目的は、レイテンシの低減とReadのスケールアウト。

キャッシュはイミュータブルデータを対象とし、読み取り専用。この単純化のおかげで、結果一貫性などに伴う異常を排除。

FUTURE WORK

将来の課題:データ探索性、システム改善、エコシステムインテグレーション。

データ分析者は、適切なデータを探す。 human-in-the-loop。

カタログは知っている情報を探し当てるもの。データ探索は知らない情報を明らかにすること。両者は異なる。 データ探索の活動はそれ自体が機械学習タスクであることもある。

この手のデータ探索はドメインスペシフィックではあるが、共通的な?システム要件もある。 例えばコンピュータビジョンなど。

ヘテロジーニアスなデータフォーマットをユニファイドな表現に変換できるようにしたい。

システム的な改善に関する2種類の観点:

  • レイテンシを下げ、スループットを上げる
  • human-in-the-loopを減らす

インテントベースのキャッシュを開発したい。つまり、明示的にキャッシュ化するのではなく、頻繁に利用されるデータをキャッシュするようにする、など。

プリフェッチの改善、ローカルバッファリングも改善ネタ。 しかし学習の際にランダマイズが必要なのが難しいところ。

Spark等との連携も課題。

各フレームワークは独自のデータ保持方式を持っている。 MLdpを利用しやすいようにしたい。できるだけユーザコードを変更せずに利用できるようにしたい。

データ保持方式に大きく分けて2種類:

" MLdpの独自の方式を採用し、都度変換する。 " もうひとつはTFRecordなどのフレームワークネイティブの方式を採用すること

一長一短。

共有

Abstract of Analytics Zoo

参考

メモ

Analytics Zooの公式ドキュメント の内容から、ひとことで書いてある文言を拾うと...以下の通り。

0.2.0時代:

Analytics + AI Platform for Apache Spark and BigDL.

2019/12現在:

A unified analytics + AI platform for distributed TensorFlow, Keras, PyTorch and BigDL on Apache Spark

とのこと。対応範囲が拡大していることがわかる。

動作確認

Python環境構築

今回は簡易的にPythonから触る。

pipenvで3.6.9環境を作り、以下のライブラリをインストールした。

  • numpy
  • scipy
  • pandas
  • scikit-learn
  • matplotlib
  • seaborn
  • wordcloud
  • jupyter

pipenvファイルは以下の通り。 https://github.com/dobachi/analytics-zoo-example/blob/master/Pipfile

なお、 Analytics Zooの公式ドキュメント では、Hadoop YARNでローンチする方法などいくつかの方法が掲載されている。

Exampleを動かしてみようとするが、pipでインストールしたところ jupyter-with-zoo.sh は以下の箇所にあった。

1
$HOME/.local/share/virtualenvs/analytics-zoo-example-hS4gvn-H/lib/python3.6/site-packages/zoo/share/bin/jupyter-with-zoo.sh

そこで環境変数を以下のように設定した。

1
2
$ export ANALYTICS_ZOO_HOME=`pipenv --venv`/lib/python3.6/site-packages/zoo/share
$ export SPARK_HOME=`pipenv --venv`/lib/python3.6/site-packages/pyspark

この状態で、 ${ANALYTICS_ZOO_HOME}/bin/jupyter-with-zoo.sh --master local[*] を実行しようとしたのだが、 jupyter-with-zoo.sh 内で用いられている analytics-zoo-base.sh の中で、

1
2
3
4
if [[ ! -f ${ANALYTICS_ZOO_PY_ZIP} ]]; then
echo "Cannot find ${ANALYTICS_ZOO_PY_ZIP}"
exit 1
fi

の箇所に引っかかりエラーになってしまう。 そもそも環境変数 ANALYTICS_ZOO_PY_ZIP は任意のオプションの値を指定するための環境変数であるため、このハンドリングがおかしいと思われる。 どうやら、pipenvでインストールした0.6.0では、Jupyterとの連係周りがまだ付け焼き刃のようだ。 とりあえず、今回の動作確認では利用しない環境変数であるため、関係個所をコメントアウトして実行することとする。

上記対応の後、以下のようにpipenv経由でJupyter PySparkを起動。

1
$ pipenv run ${ANALYTICS_ZOO_HOME}/bin/jupyter-with-zoo.sh --master local[*]

試しに実行しようとしたが、以下のエラー

1
Py4JError: com.intel.analytics.zoo.pipeline.nnframes.python.PythonNNFrames does not exist in the JVM
共有