dansbecker/handling-missing-values

参考

メモ

Kaggleのhandling-missing-values の内容から気になった箇所をメモ。

欠落のあるカラムの排除

最もシンプルな方法として、欠落のあるカラムを排除する方法が挙げられていた。

1
2
3
4
cols_with_missing = [col for col in X_train.columns 
if X_train[col].isnull().any()]
reduced_X_train = X_train.drop(cols_with_missing, axis=1)
reduced_X_test = X_test.drop(cols_with_missing, axis=1)

X_train[col].isnull().any()のところで、カラム内の値のいずれかがNULL値かどうかを確認している。

欠落を埋める

欠落を埋める。

1
2
3
4
5
from sklearn.impute import SimpleImputer

my_imputer = SimpleImputer()
imputed_X_train = my_imputer.fit_transform(X_train)
imputed_X_test = my_imputer.transform(X_test)

なお、SimpleImputer#fit_transformメソッドの戻り値は、numpy.ndarrayである。

また、scikit-learnのimputeの説明を見る限り、カテゴリ値にも対応しているようだ。

欠落を埋めつつ、欠落していたことを真理値として保持

1
2
3
4
5
6
7
8
imputed_X_train_plus = X_train.copy()
imputed_X_test_plus = X_test.copy()

cols_with_missing = (col for col in X_train.columns
if X_train[col].isnull().any())
for col in cols_with_missing:
imputed_X_train_plus[col + '_was_missing'] = imputed_X_train_plus[col].isnull()
imputed_X_test_plus[col + '_was_missing'] = imputed_X_test_plus[col].isnull()
共有

Kaggle入門

参考

メモ

エントリポイント

Kaggleの環境やお作法?になれる目的で、Faster Data Science Educationを見始めた。

共有

Word2Vec with Python

共有

Init git log of an existing repository

参考

メモ

ある文書群をクラスタリングする際、TF-IDFの代わりにWord2Vecを使っていることにした。

共有

Singularization with Python

参考

メモ

condaコマンドではインストールできなかったので、pipコマンドでインストールした。 使い方は、公式ウェブサイトの通り。

なお、関係ない単語にもそのまま適用してみたが動くは動くようだ。

共有

Stopword with Python

参考

メモ

今回はConda環境だったので、 conda コマンドでインストールした。

1
$ conda install nltk

ストップワードをダウンロードする。

1
2
import nltk
nltk.download("stopwords")

ここでは、仮に対象となる単語のリストall_wordsがあったとする。 そのとき、以下のようにリストからストップワードを取り除くと良い。

1
2
symbol = ["'", '"', ':', ';', '.', ',', '-', '!', '?', "'s"]
words_wo_stopwords = [w.lower() for w in all_words if not w in stop_words + symbol]

ストップワードの中には記号が含まれていないので、ここでは、symbolを定義して一緒に取り除いた。 次に頻度の高い単語を30件抽出してみる。

1
clean_frequency = nltk.FreqDist(words_wo_stopwords)

これを可視化する。

1
2
plt.figure(figsize=(10, 7))
clean_frequency.plot(30,cumulative=True)
共有

Hydrosphere.io

Hydrosphere.ioについて

参考

メモ

Hydroshpereブログの最も古い記事 が投稿されたのは、2016/6/14である。 上記ブログで強調されていたのは、Big DataプロジェクトのDevOps対応。 DockerやAnsibleといった道具を使いながら。

Hydrosphere Serving

参考情報

開発状況

2019/03/21現在でも、わりと活発に開発されているようだ。 https://github.com/Hydrospheredata/hydro-serving/graphs/commit-activity

開発元は、hydrosphere.io。パロアルトにある企業らしい。

hydrosphere.ioのプロダクト

ここで取り上げているServingを含む、以下のプロダクトがある。

  • Serving: モデルのサーブ、アドホック分析への対応
  • Sonar: モデルやパイプラインの品質管理
  • Mist: Sparkの計算リソースをREST API経由で提供する仕組み(マルチテナンシーの実現)

概要

GitHubのREADMEに特徴が書かれていが、その中でも個人的にポイントと思ったのは以下の通り。

  • Envoyプロキシを用いてサービスメッシュ型のサービングを実現
  • 複数の機械学習モデルに対応し、パイプライン化も可能
  • UIがある

Hydro Servingの公式ウェブサイト に掲載されていた動画を見る限り、 コマンドライン経由でモデルを登録することもでき、モデルを登録したあとは、 処理フロー(ただし、シリアルなフローに見える)を定義可能。 例えば、機械学習モデルの推論器に渡す前処理・後処理も組み込めるようだ。

セットアップ方法

Docker Composeを使う方法とk8sを使う方法があるようだ。

利用方法

モデルを学習させ、出力する。(例では、h5形式で出力していた) モデルや必要なライブラリを示したテキストなどを、ペイロードとして登楼する。

必要なライブラリの指定は以下のようにする。

requirements.txt

1
2
3
Keras==2.2.0
tensorflow==1.8.0
numpy==1.13.3

また、それらの規約事項は、contract(定義ファイル)として保存する。 フォルダ内の構成は以下のようになる。

公式ドキュメントから引用

1
2
3
4
5
6
7
linear_regression
├── model.h5
├── model.py
├── requirements.txt
├── serving.yaml
└── src
└── func_main.py

上記のようなファイル群を作り、 hs uploadコマンドを使ってアップロードする。 アップロードしたあとは、ウェブフロントエンドから処理フローを定義する。

クエリはREST APIで以下のように投げる。

公式ウェブサイトから引用

1
2
$ curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
"x": [[1, 1],[1, 1]]}' 'http://localhost/gateway/applications/linear_regression/infer'

また上記のREST APIの他にも、gRPCを用いて推論結果を取得することも可能。 また、データ構造としてはTensorProtoを使える。

なお、TensorFlowモデルのサーブについては、 公式ウェブサイトの Hydro ServingでTensorFlowモデルをサーブ がわかりやすい。 また、単純にPython関数を渡すこともできる。(必ずしもTensorFlowにロックインではない)

コンセプト

  • モデル
    • Hydro Servingに渡されたモデルはバージョン管理される。
    • フレーワークは多数対応
      • ただしフレームワークによって、出力されるメタデータの情報量に差がある。
  • アプリケーション
    • 単一で動かす方法とパイプラインを構成する方法がある
  • ランタイム
    • 予め実行環境を整えたDockerイメージが提供されている
    • Python、TensorFlow、Spark

動作確認

Hydro Servingの公式ドキュメント に従って動作確認する。

1
2
3
4
5
$ mkdir HydroServing
$ cd HydroServing/
$ git clone https://github.com/Hydrospheredata/hydro-serving
$ hydro-serving
$ sudo docker-compose up -d

起動したコンテナを確認する。

1
2
3
4
5
6
7
8
$ sudo docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------
gateway /hydro-serving/app/start.sh Up 0.0.0.0:29090->9090/tcp, 0.0.0.0:29091->9091/tcp
manager /hydro-serving/app/start.sh Up 0.0.0.0:19091->9091/tcp
managerui /bin/sh -c envsubst '${MAN ... Up 80/tcp, 9091/tcp
postgres docker-entrypoint.sh postgres Up
sidecar /hydro-serving/start.sh Up 0.0.0.0:80->8080/tcp, 0.0.0.0:8082->8082/tcp

その他CLIを導入する。

1
2
3
4
$ conda create -n hydro-serving python=3.6 python
$ source activate hydro-serving
$ conda install keras scikit-learn
$ pip install hs

エラーが生じた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
$ hs cluster add --name local --server http://localhost
--- Logging error ---
Traceback (most recent call last):
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 992, in emit
msg = self.format(record)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 838, in format
return fmt.format(record)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 575, in format
record.message = record.getMessage()
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/logging/__init__.py", line 338, in getMessage
msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
File "/home/centos/.conda/envs/hydro-serving/bin/hs", line 11, in <module>
sys.exit(hs_cli())
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 697, in main
rv = self.invoke(ctx)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 1063, in invoke
Command.invoke(self, ctx)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
return f(get_current_context(), *args, **kwargs)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/cli/hs.py", line 18, in hs_cli
ctx.obj.services = ContextServices.with_config_path(HOME_PATH_EXPANDED)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/models/context_object.py", line 44, in with_config_path
config_service = ConfigService(path)
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/services/config.py", line 17, in __init__
logging.error("{} is not an existing directory", home_path)
Message: '{} is not an existing directory'
Arguments: ('/home/centos/.hs-home',)
WARNING:root:Using local as current cluster
Cluster 'local' @ http://localhost added successfully

エラーが出ているのに登録されたように見える。 念の為、クラスタ情報を確認する。

1
2
$ hs cluster
Current cluster: {'cluster': {'server': 'http://localhost'}, 'name': 'local'}

いったんこのまま進める。 まずはアプリを作成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ mkdir -p ~/Sources/linear_regression
$ cd ~/Sources/linear_regression
$ cat << EOF > model.py
from keras.models import Sequential
from keras.layers import Dense
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# initialize data
n_samples = 1000
X, y = make_regression(n_samples=n_samples, n_features=2, noise=0.5, random_state=112)

scallar_x, scallar_y = MinMaxScaler(), MinMaxScaler()
scallar_x.fit(X)
scallar_y.fit(y.reshape(n_samples, 1))
X = scallar_x.transform(X)
y = scallar_y.transform(y.reshape(n_samples, 1))

# create a model
model = Sequential()
model.add(Dense(4, input_dim=2, activation='relu'))
model.add(Dense(4, activation='relu'))
model.add(Dense(1, activation='linear'))

model.compile(loss='mse', optimizer='adam')
model.fit(X, y, epochs=100)

# save model
model.save('model.h5')
EOF

学習し、モデルを出力。

1
$ python model.py

サーブ対象となる関数のアプリを作成する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ mkdir src
$ cd src
$ cat << EOF > func_main.py
import numpy as np
import hydro_serving_grpc as hs
from keras.models import load_model

# 0. Load model once
model = load_model('/model/files/model.h5')

def infer(x):
# 1. Retrieve tensor's content and put it to numpy array
data = np.array(x.double_val)
data = data.reshape([dim.size for dim in x.tensor_shape.dim])

# 2. Make a prediction
result = model.predict(data)

# 3. Pack the answer
y_shape = hs.TensorShapeProto(dim=[hs.TensorShapeProto.Dim(size=-1)])
y_tensor = hs.TensorProto(
dtype=hs.DT_DOUBLE,
double_val=result.flatten(),
tensor_shape=y_shape)

# 4. Return the result
return hs.PredictResponse(outputs={"y": y_tensor})
EOF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ cd ..
$ cat << EOF > serving.yaml
kind: Model
name: linear_regression
model-type: python:3.6
payload:
- "src/"
- "requirements.txt"
- "model.h5"

contract:
infer: # Signature function
inputs:
x: # Input field
shape: [-1, 2]
type: double
profile: numerical
outputs:
y: # Output field
shape: [-1]
type: double
profile: numerical
EOF

つづいて必要ライブラリを定義する。

1
2
3
4
5
$ cat << EOF > requirements.txt
Keras==2.2.0
tensorflow==1.8.0
numpy==1.13.3
EOF

最終的に以下のような構成になった。

1
2
3
4
5
6
7
8
9
10
$ tree
.
├── model.h5
├── model.py
├── requirements.txt
├── serving.yaml
└── src
└── func_main.py

1 directory, 5 files

つづいてモデルなどをアップロード。

1
$ hs upload

ここで以下のようなエラーが生じた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Using 'local' cluster
['/home/centos/Sources/linear_regression/src', '/home/centos/Sources/linear_regression/requirements.txt', '/home/centos/Sources/linear_regression/model.h5']
Packing the model [####################################] 100%
Assembling the model [####################################] 100%
Uploading to http://localhost
Uploading model assembly [####################################] 100%
Traceback (most recent call last):
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/hydroserving/httpclient/remote_connection.py", line 82, in postprocess_response
response.raise_for_status()
File "/home/centos/.conda/envs/hydro-serving/lib/python3.6/site-packages/requests/models.py", line 940, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http://localhost/api/v1/model/upload

During handling of the above exception, another exception occurred:

(snip)

軽く調べると ISSUE255 が 関係していそうである。

ここでは動作確認のため、pip install hs==2.0.0rc2としてインストールして試した。

1
2
$ pip uninstall hs
$ pip install hs==2.0.0rc2

また、 serving.yaml に以下のエントリを追加した。

1
runtime: "hydrosphere/serving-runtime-python:3.6-latest"

再び hs upload したところ完了のように見えた。

ウェブUIを確認したところ以下の通り。

ウェブUIに登録されたモデルが表示される

それではテストを実行してみる。

1
$ curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{   "x": [     [       1,       1     ]   ] }' 'http://10.0.0.209/gateway/application/linear_regression'

以下のようなエラーが生じた。

1
{"error":"InternalUncaught","information":"UNKNOWN: Exception calling application: No module named 'numpy'"}

ランタイム上にnumpyがインストールされなかったのだろうか・・・

WIP

Hydro sonar

参考

メモ

公式のSonarのGUI画面 を見ると、モデルの性能を観測するための仕組みに見える。 モデル選択、メンテナンス、リアイアメントのために用いられる。 例えば、入力データ変化を起因したモデル精度の劣化など。

Hydro Mist

参考

メモ

公式のMistのアーキテクチャイメージ を見ると、 Sparkのマルチテナンシーを実現するものに見える。

特徴として挙げられていたものの中で興味深いのは以下。

  • Spark Function as a Service
  • ユーザのAPIをSparkのコンフィグレーションから分離する
  • HTTP、Kafka、MQTTによるやりとり

Hydro Mistの公式クイックスタートを見ると、Scala、Java、Pythonのサンプルアプリが掲載されている。 Mistはライブラリとしてインポートし、フレームワークに則ってアプリを作成すると、 アプリ内で定義された関数を実行するジョブをローンチできるようになる。

また実際にローンチするときには、REST API等で起動することになるが、 そのときに引数を渡すこともできる。

動作確認

Hydro Mistの公式クイックスタート に従い動作を確認する。 Dockerで実行するか、バイナリで実行するかすれば良い。

Dockerでの起動

Dockerの場合は以下の通り。

1
2
3
$ sudo docker run -p 2004:2004 \
-v /var/run/docker.sock:/var/run/docker.sock \
hydrosphere/mist:1.1.1-2.3.0

バイナリを配備しての起動

バイナリをダウンロードして実行する場合は以下の通り。 まずSparkをダウンロードする。(なお、JDKは予めインストールされていることを前提とする)

1
2
3
4
5
$ mkdir ~/Spark
$ cd ~/Spark
$ wget http://ftp.riken.jp/net/apache/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
$ tar xvzf spark-2.3.3-bin-hadoop2.7.tgz
$ ln -s spark-2.3.3-bin-hadoop2.7 default

これで、~/Spark/default以下にSparkが配備された。 つづいて、Hydro Mistのバイナリをダウンロードし、配備する。

1
2
3
4
5
6
$ mkdir ~/HydroMist
$ cd ~/HydroMist
$ wget http://repo.hydrosphere.io/hydrosphere/static/mist-1.1.1.tar.gz
$ tar xvfz mist-1.1.1.tar.gz
$ ln -s mist-1.1.1 default
$ cd default

以上でHydro Mistが配備された。 それではHydro Mistのマスタを起動する。

1
$ SPARK_HOME=${HOME}/Spark/default ./bin/mist-master start --debug true

以上で、バイナリを配備したHydro Mistが起動する。

Mistの動作確認

以降、サンプルアプリを実行している。

まずmistのCLIを導入する。

1
2
3
$ conda create -n mist python=3.6 python
$ source activate mist
$ pip install mist-cli

続いて、サンプルプロジェクトをcloneする。

1
2
3
$ mkdir -p ~/Sources
$ cd ~/Sources
$ git clone https://github.com/Hydrospheredata/hello_mist.git

試しにScala版を動かす。 (なお、SBTがインストールされていることを前提とする)

1
2
3
$ cd hello_mist/scala
$ sbt package
$ mist-cli apply -f conf -u ''

上記コマンドの結果、以下のような出力が得られる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Process 5 file entries
updating Artifact hello-mist-scala_0.0.1.jar
Success: Artifact hello-mist-scala_0.0.1.jar
updating Context emr_ctx
Success: Context emr_ctx
updating Context emr_autoscale_ctx
Success: Context emr_autoscale_ctx
updating Context standalone
Success: Context standalone
updating Function hello-mist-scala
Success: Function hello-mist-scala


Get context info
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/contexts/emr_ctx


Get context info
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/contexts/emr_autoscale_ctx


Get context info
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/contexts/standalone


Get info of function resource
--------------------------------------------------------------------------------
curl -H 'Content-Type: application/json' -X GET http://localhost:2004/v2/api/functions/hello-mist-scala

Start job via mist-cli
--------------------------------------------------------------------------------
mist-cli --host localhost --port 2004 start job hello-mist-scala '{"samples": 7}'

Start job via curl
--------------------------------------------------------------------------------
curl --data '{"samples": 7}' -H 'Content-Type: application/json' -X POST http://localhost:2004/v2/api/functions/hello-mist-scala/jobs?force=true

試しにstandaloneのコンテキストを確認してみる。 (なお、上記メッセージではPOSTを使うよう書かれているが、GETでないとエラーになった)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ curl -H 'Content-Type: application/json' -X GET http://localhost:2004/v2/api/contexts/standalone | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 252 100 252 0 0 26537 0 --:--:-- --:--:-- --:--:-- 28000
{
"name": "standalone",
"maxJobs": 1,
"maxConnFailures": 1,
"workerMode": "exclusive",
"precreated": false,
"sparkConf": {
"spark.submit.deployMode": "cluster",
"spark.master": "spark://holy-excalibur:7077"
},
"runOptions": "",
"downtime": "1200s",
"streamingDuration": "1s"
}

なお、サンプルアプリは以下の通り。 Piの値を簡易的に計算するものである。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import mist.api._
import mist.api.dsl._
import mist.api.encoding.defaults._
import org.apache.spark.SparkContext

object HelloMist extends MistFn with Logging {

override def handle = {
withArgs(
arg[Int]("samples", 10000)
)
.withMistExtras
.onSparkContext((n: Int, extras: MistExtras, sc: SparkContext) => {
import extras._

logger.info(s"Hello Mist started with samples: $n")

val count = sc.parallelize(1 to n).filter(_ => {
val x = math.random
val y = math.random
x * x + y * y < 1
}).count()

val pi = (4.0 * count) / n
pi
}).asHandle
}
}

結果として、以下のようにジョブが登録される。

ウェブUIに登録されたアプリ

登録されたジョブを実行する。

ジョブを走らせる

また上記で表示されていたcurl ...をコマンドで実行することでも、ジョブを走らせることができる。 ジョブの一覧は、ウェブUIから以下のように確認できる。

走らせたジョブ一覧

ジョブ一覧からジョブを選ぶと、そのメタデータ、渡されたパラメータ、結果などを確認できる。 さらにジョブ実行時のログもウェブUIから確認できる。

ジョブ実行時のログ

Sparkのドライバログと思われるものも含まれている。

アーキテクチャと動作

Hydro Mistがどうやってジョブを起動するか によると以下の通り。

Mistで実行する関数は、mist-workerにラップされており、当該ワーカがSparkContextを保持しているようだ。 またワーカを通じてジョブを実行する。 (参考:Hydro Mistのジョブのステート

ユーザがジョブを実行させようとしたとき、Mistのマスタはそのリクエストをキューに入れ、ワーカが空くのを待つ。 ワーカのモードには2種類がある。

  • exclusive
    • ジョブごとにワーカを立ち上げる
  • shared
    • ひとつのジョブが完了してもワーカを立ち上げたままにし再利用する。

これにより、ジョブの並列度、ジョブの耐障害性の面で有利になる。

また、 Hydro Mistのジョブローンチの流れ からジョブ登録から実行までの大まかな流れがわかる。

コンテキストの管理

Sparkのコンテキスト管理やワーカのモード設定は、 mist-cliを通じてできるようだ。 Hydro Mistのコンテキスト管理 参照。

mist-cli

Hydro MIstのmist-cliのGitHub 参照。 mist-cli は、コンフィグファイル(やコンフィグファイルが配備されたディレクトリ)を指定しながら実行する。 ディレクトリにコンフィグを置く場合は、ファイル名の先頭に2桁の数字を入れることで、 プライオリティを指定することができる。

コンフィグには、Artifact、Context、Functionの設定を記載する。 参考:Hydro Mistのコンフィグの例

Hydro Mistの公式クイックスタート を実行したあとの状態で確認してみる。

1
2
3
4
5
6
7
8
9
10
11
12
$ mist-cli list contexts
ID WORKER MODE
default exclusiveemr_ctx exclusiveemr_autoscale_ctx exclusivestandalone exclusive

$ mist-cli list functions
FUNCTION DEFAULT CONTEXT PATH CLASS NAME
hello-mist-scala default hello-mist-scala_0.0.1.jar HelloMist$

$ mist-cli status
Mist version: 1.1.1
Spark version: 2.4.0
Java version: 1.8.0_201-b09

いくつかのコンテキストと、関数が登録されていることがわかる。 なお、GitHub上のバイナリを用いたので使用するSparkのバージョンが2.4.0になっていた。

Scala API

Hydro MistのScala API とサンプルアプリ(mist\examples\examples\src\main\scala\PiExample.scala)を確認してみる。

エントリポイント

MistFnがエントリポイントのようだ。

PiExample.scala:6

1
2
3
4
5
6
object PiExample extends MistFn {

override def handle: Handle = {
val samples = arg[Int]("samples").validated(_ > 0, "Samples should be positive")

(snip)

引数定義

また関数の引数設定はmist.api.ArgsInstances#arg[A]などで行う。 下記にサンプルに記載の例を示す。

PiExample.scala:9

1
val samples = arg[Int]("samples").validated(_ > 0, "Samples should be positive")

[Int]により引数として渡される値の型を指定する。argメソッドの戻り値はNamedUserArgクラスだが、 当該クラスはUserArg[A]トレートを拡張している。 UserArg#validatedメソッドを用いることで、値の検証を行う。

複数の引数を渡すには、withArgsを使うか、combine&を使う。

例:

1
2
3
val three = withArgs(arg[Int]("n"), arg[String]("str"), arg[Boolean]("flag"))

val three = arg[Int]("n") & arg[String]("str") & arg[Boolean]("flag")

またドキュメントには、case classと各種Extractorを用いることで、JSON形式の入力データを 扱う方法が説明されていた。

Sparkのコンテキスト

引数を定義したのちは、Mistのコンテキスト管理のAPIを利用し、 SparkSessionなどを取得する。 onSparkContextonSparkSessionなどを利用可能。

mist/api/MistFnSyntax.scala:91

1
2
3
4
5
def onSparkSession[F, Cmb, Out](f: F)(
implicit
cmb: ArgCombiner.Aux[A, SparkSession, Cmb],
fnT: FnForTuple.Aux[Cmb, F, Out]
): RawHandle[Out] = args.combine(sparkSessionArg).apply(f)

サンプルでは以下のような使い方を示している。

PiExample.scala:10

1
2
3
4
    withArgs(samples).onSparkContext((n: Int, sc: SparkContext) => {
val count = sc.parallelize(1 to n).filter(_ => {

(snip)

もしSparkSessionを使うならば以下の通りか。

1
2
3
4
5
6
7
8
import org.apache.spark.sql.SparkSession

(snip)

withArgs(samples).onSparkSession((n: Int, spark: SparkSession) => {
val count = spark.sparkContext.parallelize(1 to n).filter(_ => {

(snip)

なお、onStreamingContextというのもあり、Spark Streamingも実行可能のようだ。

結果のハンドリング

上記onSparkContextメソッドなどの戻り値の型はRawHandle[Out]である。

mist/api/MistFnSyntax.scala:58

1
2
3
4
5
def onSparkContext[F, Cmb, Out](f: F)(
implicit
cmb: ArgCombiner.Aux[A, SparkContext, Cmb],
fnT: FnForTuple.Aux[Cmb, F, Out]
): RawHandle[Out] = args.combine(sparkContextArg).apply(f)

最終的に結果をJSON形式で返すために、asHandleメソッドを利用する。

PiExample.scala:10

1
2
3
4
5
withArgs(samples).onSparkContext((n: Int, sc: SparkContext) => {

(snip)

}).asHandle

asHandleメソッドは以下の通り。

mist/api/MistFnSyntax.scala:48

1
2
3
implicit class AsHandleOps[A](raw: RawHandle[A]) {
def asHandle(implicit enc: JsEncoder[A]): Handle = raw.toHandle(enc)
}

余談:implicitクラスの使用

mist.api以下で複数のクラスがimplicit定義されて利用されており、一見して解析しづらい印象を覚えた。 例えばonSparkContextonSparkSessionがimplicitクラスContextsOpsクラスに定義されている。

考察:フレームワークの良し悪し

Sparkの単純なプロキシではなく、フレームワーク(ライブラリ)化されていることで、 ユーザはSparkのコンテキストの管理から開放される、という利点がある。

一方で、Hydro Mistのフレームワークに従って実装する必要があり、多少なり

  • Sparkに加えて、Hydro Mistの学習コストがある
  • Mistのフレームワークでは実装しづらいケースが存在するかもしれない
  • トラブルシュートの際に、Mistの実装まで含めて確認が必要になるかもしれない

という懸念点・欠点が挙げられる。

HTTP API

Hydro MistのHTTP API に一覧が載っている。

例えば関数一覧を取得する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ curl -X GET 'http://10.0.0.209:2004/v2/api/functions' | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 208 100 208 0 0 13256 0 --:--:-- --:--:-- --:--:-- 13866
[
{
"name": "hello-mist-scala",
"execute": {
"samples": {
"type": "MOption",
"args": [
{
"type": "MInt"
}
]
}
},
"path": "hello-mist-scala_0.0.1.jar",
"tags": [],
"className": "HelloMist$",
"defaultContext": "default",
"lang": "scala"
}
]

続いて、当該関数についてジョブ一覧を取得する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ curl -X GET 'http://10.0.0.209:2004/v2/api/functions/hello-mist-scala/jobs' | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1682 100 1682 0 0 29484 0 --:--:-- --:--:-- --:--:-- 30035
[
{
"source": "Http",
"startTime": 1553358512167,
"createTime": 1553358507406,
"context": "default",
"params": {
"filePath": "hello-mist-scala_0.0.1.jar",
"className": "HelloMist$",
"arguments": {
"samples": 9
},
"action": "execute"
},
"endTime": 1553358513162,
"jobResult": 2.2222222222222223,
"status": "finished",
"function": "hello-mist-scala",
"jobId": "b044d08f-9554-4be9-8e22-1d687e58c52e",
"workerId": "default_1cb3b66d-99c3-400b-ac3a-f11d72ab8124_4"
},

(snip)

上記のように、ジョブのメタデータと結果が取得される。 ジョブの情報であれば、直接jobs APIを用いても取得可能。

1
$ curl -X GET 'http://10.0.0.209:2004/v2/api/jobs' | jq

またジョブのログを出力できる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ curl -X GET 'http://10.0.0.209:2004/v2/api/jobs/a87197c8-1692-48bc-b151-978ea89b058a/logs' | head -n 20
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0INFO 2019-03-23T16:22:44.178 [a87197c8-1692-48bc-b151-978ea89b058a] Waiting worker connectionINFO 2019-03-23T16:22:44.183 [a87197c8-1692-48bc-b151-978ea89b058a] InitializedEvent(externalId=None)
INFO 2019-03-23T16:22:44.183 [a87197c8-1692-48bc-b151-978ea89b058a] QueuedEventINFO 2019-03-23T16:22:47.991 [a87197c8-1692-48bc-b151-978ea89b058a] WorkerAssigned(workerId=default_1cb3b66d-99c3-400b-ac3a-f11d72ab8124_2)
INFO 2019-03-23T16:22:48.027 [a87197c8-1692-48bc-b151-978ea89b058a] JobFileDownloadingEvent
INFO 2019-03-23T16:22:48.885 [a87197c8-1692-48bc-b151-978ea89b058a] StartedEvent
INFO 2019-03-23T16:22:48.882 [a87197c8-1692-48bc-b151-978ea89b058a] Added JAR /home/centos/HydroMist/default/worker-default_1cb3b66d-99c3-400b-ac3a-f11d72ab8124_2/hello-mist-scala_0.0.1.jar at spark://dev:46260/jars/hello-mist-scala_0.0.1.jar with timestamp 1553358168882
INFO 2019-03-23T16:22:48.965 [a87197c8-1692-48bc-b151-978ea89b058a] Hello Mist started with samples: 8
INFO 2019-03-23T16:22:49.204 [a87197c8-1692-48bc-b151-978ea89b058a] Starting job: count at HelloMist.scala:22
INFO 2019-03-23T16:22:49.218 [a87197c8-1692-48bc-b151-978ea89b058a] Got job 0 (count at HelloMist.scala:22) with 16 output partitions
INFO 2019-03-23T16:22:49.219 [a87197c8-1692-48bc-b151-978ea89b058a] Final stage: ResultStage 0 (count at HelloMist.scala:22)
INFO 2019-03-23T16:22:49.22 [a87197c8-1692-48bc-b151-978ea89b058a] Parents of final stage: List()
INFO 2019-03-23T16:22:49.221 [a87197c8-1692-48bc-b151-978ea89b058a] Missing parents: List()
INFO 2019-03-23T16:22:49.229 [a87197c8-1692-48bc-b151-978ea89b058a] Submitting ResultStage 0 (MapPartitionsRDD[1] at filter at HelloMist.scala:18), which has
no missing parents
INFO 2019-03-23T16:22:49.438 [a87197c8-1692-48bc-b151-978ea89b058a] Block broadcast_0 stored as values in memory (estimated size 1808.0 B, free 366.3 MB)
INFO 2019-03-23T16:22:49.471 [a87197c8-1692-48bc-b151-978ea89b058a] Block broadcast_0_piece0 stored as bytes in memory (estimated size 1232.0 B, free 366.3 MB)
INFO 2019-03-23T16:22:49.473 [a87197c8-1692-48bc-b151-978ea89b058a] Added broadcast_0_piece0 in memory on dev:39976 (size: 1232.0 B, free: 366.3 MB)
INFO 2019-03-23T16:22:49.476 [a87197c8-1692-48bc-b151-978ea89b058a] Created broadcast 0 from broadcast at DAGScheduler.scala:1039
10INFO 2019-03-23T16:22:49.494 [a87197c8-1692-48bc-b151-978ea89b058a] Submitting 16 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at filter at HelloMist.scala:18) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
0INFO 2019-03-23T16:22:49.495 [a87197c8-1692-48bc-b151-978ea89b058a] Adding task set 0.0 with 16 tasks

(snip)

Reactie API

Hydro MistのReactive API を見ると、MQTTやKafkaと連携して動くAPIがあるようだが、 まだドキュメントが成熟していない。 デフォルトでは無効になっている。

EMRとの連係

Hydro MistのEMR連係 を眺めるとhello_mistプロジェクト等でEMRとの連係の仕方を示してくれているようだが、まだ情報が足りない。

共有

Corne Chocolate

参考

メモ

ビルドについて

基本的には、 Corne Chocolateのビルドガイド の通りで問題なかった。

キーマップ

もともと使用していたLet's SplitやHelixとは、デフォルトのキー配置が異なる。 イメージとしては、

  • Raiseレイヤに記号系が集まっている
  • Lowerレイヤに数字やファンクションキーが集まっている
  • 矢印キーがない?
  • Ctrl、ESC、Tabあたりは好みでカスタマイズ必要そう。
  • 親指エンターも好みが分かれそう

という感じ。 個人的な感想として、記号をRaiseレイヤに集めるのは良いと思った。 一方、矢印キー、Ctrlなど、エンターあたりを中心にいじった。

dobachiのキーマップ を参照されたし。

OLED表示

押されたキー1個までならまだしも、入力のログがしばらく表示されるのは気になったので、 keymap.cを以下のように修正して表示しないようにした。

1
2
3
4
5
6
7
8
9
10
11
12
13
diff --git a/keyboards/crkbd/keymaps/dobachi/keymap.c b/keyboards/crkbd/keymaps/dobachi/keymap.c
index f80eff9..44a9137 100644
--- a/keyboards/crkbd/keymaps/dobachi/keymap.c
+++ b/keyboards/crkbd/keymaps/dobachi/keymap.c
@@ -158,7 +158,7 @@ void matrix_render_user(struct CharacterMatrix *matrix) {
// If you want to change the display of OLED, you need to change here
matrix_write_ln(matrix, read_layer_state());
matrix_write_ln(matrix, read_keylog());
- matrix_write_ln(matrix, read_keylogs());
+ //matrix_write_ln(matrix, read_keylogs());
//matrix_write_ln(matrix, read_mode_icon(keymap_config.swap_lalt_lgui));
//matrix_write_ln(matrix, read_host_led_state());
//matrix_write_ln(matrix, read_timelog());
共有

Kafka Summit SF 2019

参考

メモ

1
2
3
Call for Papers opens:  February 19, 2019

Call for Papers closes: April 15, 2019
共有

Kafkaのグレースフルシャットダウン

参考

メモ

公式ドキュメント

Graceful Shutdownに関する公式ドキュメントの記述によると、グレースフルシャットダウンの強みは以下の通り。

  • ログリカバリ処理をスキップする
  • シャットダウンするブローカから予めリーダを移動する

実装確認

以下のあたりから、グレースフルシャットダウンの実装。

kafka/server/KafkaServer.scala:422

1
2
3
4
5
  private def controlledShutdown() {

def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)

(snip)

コントローラを取得して接続し、コントローラに対してグレースフルシャットダウンの依頼を投げる。 なお、本メソッドはKafkaServer#shutdownメソッド内で呼び出される。

なお、当該メソッドは(KafkaServerStartableクラスでラップされているが)シャットダウンフックで適用される。

kafka/Kafka.scala:70

1
2
3
4
// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})

共有