参考
メモ
公式ドキュメント
載っている特徴は、以下の通り。
Upsert support with fast, pluggable indexing.
Atomically publish data with rollback support.
Snapshot isolation between writer & queries.
Savepoints for data recovery.
Manages file sizes, layout using statistics.
Async compaction of row & columnar data.
Timeline metadata to track lineage.
クイックスタートから確認(version
0.5.2前提)
Quick
Start Guide を参考に進める。
公式ドキュメントではSpark2.4.4を利用しているが、ここでは2.4.5を利用する。
1 2 3 4 $ export SPARK_HOME=/opt/spark/default $ ${SPARK_HOME} /bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
必要なライブラリをインポート
1 2 3 4 5 6 7 8 9 10 scala> import org.apache.hudi.QuickstartUtils ._ scala> import scala.collection.JavaConversions ._ scala> import org.apache.spark.sql.SaveMode ._ scala> import org.apache.hudi.DataSourceReadOptions ._ scala> import org.apache.hudi.DataSourceWriteOptions ._ scala> import org.apache.hudi.config.HoodieWriteConfig ._ scala> scala> val tableName = "hudi_trips_cow" scala> val basePath = "file:///tmp/hudi_trips_cow" scala> val dataGen = new DataGenerator
ダミーデータには
org.apache.hudi.QuickstartUtils.DataGenerator
クラスを利用する。 以下の例では、
org.apache.hudi.QuickstartUtils.DataGenerator#generateInserts
メソッドを利用しデータを生成するが、 どういうレコードが生成されるかは、
org.apache.hudi.QuickstartUtils.DataGenerator#generateRandomValue
メソッドあたりを見るとわかる。
ダミーデータを生成し、Spark DataFrameに変換。
1 2 scala> val inserts = convertToStringList(dataGen.generateInserts(10 )) scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2 ))
中身は以下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 scala> df.show +-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+ | begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid| +-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+ | 0.4726905879569653 |0.46157858450465483 |driver-213 | 0.754803407008858 | 0.9671159942018241 |34.158284716382845 |americas/brazil/s...|rider-213 |0.0 |28432 dec-53 eb-402. ..| | 0.6100070562136587 | 0.8779402295427752 |driver-213 | 0.3407870505929602 | 0.5030798142293655 | 43.4923811219014 |americas/brazil/s...|rider-213 |0.0 |1 bd3905e-a6c4-404. ..| | 0.5731835407930634 | 0.4923479652912024 |driver-213 |0.08988581780930216 |0.42520899698713666 | 64.27696295884016 |americas/united_s...|rider-213 |0.0 |c9cc8f4b-acee-413. ..| |0.21624150367601136 |0.14285051259466197 |driver-213 | 0.5890949624813784 | 0.0966823831927115 | 93.56018115236618 |americas/united_s...|rider-213 |0.0 |4 be1c199-86 dc-489. ..| | 0.40613510977307 | 0.5644092139040959 |driver-213 | 0.798706304941517 |0.02698359227182834 |17.851135255091155 | asia/india/chennai|rider-213 |0.0 |83 f4d3df-46 c1-48 a...| | 0.8742041526408587 | 0.7528268153249502 |driver-213 | 0.9197827128888302 | 0.362464770874404 |19.179139106643607 |americas/united_s...|rider-213 |0.0 |cb8b392d-c9d0-445. ..| | 0.1856488085068272 | 0.9694586417848392 |driver-213 |0.38186367037201974 |0.25252652214479043 | 33.92216483948643 |americas/united_s...|rider-213 |0.0 |66 aaf87d-4786 -4 d0...| | 0.0750588760043035 |0.03844104444445928 |driver-213 |0.04376353354538354 | 0.6346040067610669 | 66.62084366450246 |americas/brazil/s...|rider-213 |0.0 |c5a335f5-c57f-4 f5...| | 0.651058505660742 | 0.8192868687714224 |driver-213 |0.20714896002914462 |0.06224031095826987 | 41.06290929046368 | asia/india/chennai|rider-213 |0.0 |53026 eda-28 c4-4 d8...| |0.11488393157088261 | 0.6273212202489661 |driver-213 | 0.7454678537511295 | 0.3954939864908973 | 27.79478688582596 |americas/united_s...|rider-213 |0.0 |cd42df54-5215 -402. ..| +-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
1 2 3 4 5 6 7 8 scala> df.write.format("hudi" ). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY , "ts" ). option(RECORDKEY_FIELD_OPT_KEY , "uuid" ). option(PARTITIONPATH_FIELD_OPT_KEY , "partitionpath" ). option(TABLE_NAME , tableName). mode(Overwrite ). save(basePath)
なお、生成されたファイルは以下の通り。
PARTITIONPATH_FIELD_OPT_KEY
で指定したカラムをパーティションキーとして用いていることがわかる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 $ ls -R /tmp/hudi_trips_cow/ /tmp/hudi_trips_cow/: americas asia /tmp/hudi_trips_cow/americas: brazil united_states /tmp/hudi_trips_cow/americas/brazil: sao_paulo /tmp/hudi_trips_cow/americas/brazil/sao_paulo: ae28c85a-38f0-487f-a42d-3a0babc9d321-0_0-21-25_20200329002247.parquet /tmp/hudi_trips_cow/americas/united_states: san_francisco /tmp/hudi_trips_cow/americas/united_states/san_francisco: 849db286-1cbe-4a1f-b544-9939893e99f8-0_1-21-26_20200329002247.parquet /tmp/hudi_trips_cow/asia: india /tmp/hudi_trips_cow/asia/india: chennai /tmp/hudi_trips_cow/asia/india/chennai: 2ebfbab0-4f8f-42db-b79e-1c0cbcc3cf39-0_2-21-27_20200329002247.parquet
保存したデータを読み出してみる。
1 2 3 4 5 scala> val tripsSnapshotDF = spark. read. format("hudi" ). load(basePath + "/*/*/*/*" ) scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot" )
中身は以下の通り。
元データに対し、Hudiのカラムが追加されていることがわかる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 scala> tripsSnapshotDF.show +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+ |_hoodie_commit_time|_hoodie_commit_seqno| _hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid| +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+ | 20200329002247 | 20200329002247 _1_1|7695 c291-8530 -473. ..| americas/united_s...|849 db286-1 cbe-4 a1...|0.21624150367601136 |0.14285051259466197 |driver-213 | 0.5890949624813784 | 0.0966823831927115 | 93.56018115236618 |americas/united_s...|rider-213 |0.0 |7695 c291-8530 -473. ..| | 20200329002247 | 20200329002247 _1_3|2 f06fcd2-8296 -423. ..| americas/united_s...|849 db286-1 cbe-4 a1...| 0.5731835407930634 | 0.4923479652912024 |driver-213 |0.08988581780930216 |0.42520899698713666 | 64.27696295884016 |americas/united_s...|rider-213 |0.0 |2 f06fcd2-8296 -423. ..| | 20200329002247 | 20200329002247 _1_5|6 ebc4028-9 aae-420. ..| americas/united_s...|849 db286-1 cbe-4 a1...| 0.8742041526408587 | 0.7528268153249502 |driver-213 | 0.9197827128888302 | 0.362464770874404 |19.179139106643607 |americas/united_s...|rider-213 |0.0 |6 ebc4028-9 aae-420. ..| | 20200329002247 | 20200329002247 _1_6|8 bf60390-ad41-4 b0...| americas/united_s...|849 db286-1 cbe-4 a1...|0.11488393157088261 | 0.6273212202489661 |driver-213 | 0.7454678537511295 | 0.3954939864908973 | 27.79478688582596 |americas/united_s...|rider-213 |0.0 |8 bf60390-ad41-4 b0...| | 20200329002247 | 20200329002247 _1_7|762e8 cb2-8806 -47 d...| americas/united_s...|849 db286-1 cbe-4 a1...| 0.1856488085068272 | 0.9694586417848392 |driver-213 |0.38186367037201974 |0.25252652214479043 | 33.92216483948643 |americas/united_s...|rider-213 |0.0 |762e8 cb2-8806 -47 d...| | 20200329002247 | 20200329002247 _0_8|28622337 -d76b-442. ..| americas/brazil/s...|ae28c85a-38 f0-487. ..| 0.6100070562136587 | 0.8779402295427752 |driver-213 | 0.3407870505929602 | 0.5030798142293655 | 43.4923811219014 |americas/brazil/s...|rider-213 |0.0 |28622337 -d76b-442. ..| | 20200329002247 | 20200329002247 _0_9|33 aec15d-356 f-475. ..| americas/brazil/s...|ae28c85a-38 f0-487. ..| 0.0750588760043035 |0.03844104444445928 |driver-213 |0.04376353354538354 | 0.6346040067610669 | 66.62084366450246 |americas/brazil/s...|rider-213 |0.0 |33 aec15d-356 f-475. ..| | 20200329002247 | 20200329002247 _0_10|2 d71c9a3-26 a3-40 b...| americas/brazil/s...|ae28c85a-38 f0-487. ..| 0.4726905879569653 |0.46157858450465483 |driver-213 | 0.754803407008858 | 0.9671159942018241 |34.158284716382845 |americas/brazil/s...|rider-213 |0.0 |2 d71c9a3-26 a3-40 b...| | 20200329002247 | 20200329002247 _2_2|a997a8f0-4 ab6-4 d5...| asia/india/chennai|2 ebfbab0-4 f8f-42 d...| 0.40613510977307 | 0.5644092139040959 |driver-213 | 0.798706304941517 |0.02698359227182834 |17.851135255091155 | asia/india/chennai|rider-213 |0.0 |a997a8f0-4 ab6-4 d5...| | 20200329002247 | 20200329002247 _2_4|271 de424-a0f8-427. ..| asia/india/chennai|2 ebfbab0-4 f8f-42 d...| 0.651058505660742 | 0.8192868687714224 |driver-213 |0.20714896002914462 |0.06224031095826987 | 41.06290929046368 | asia/india/chennai|rider-213 |0.0 |271 de424-a0f8-427. ..| +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
上記の通り、SparkのData Source機能を利用している。
中では、org.apache.hudi.DefaultSource#createRelation
メソッドが用いられる。
つづいて、更新を試す。
1 2 3 4 5 6 7 8 9 10 scala> val updates = convertToStringList(dataGen.generateUpdates(10 )) scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2 )) scala> df.write.format("hudi" ). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY , "ts" ). option(RECORDKEY_FIELD_OPT_KEY , "uuid" ). option(PARTITIONPATH_FIELD_OPT_KEY , "partitionpath" ). option(TABLE_NAME , tableName). mode(Append ). save(basePath)
もう一度、DataFrameとして読み出すと、レコードが追加されていることを確かめられる。(ここでは省略)
この後の、 incremental
クエリタイプの実験のため、上記の更新を幾度か実行しておく。
つづいて、 incremental
クエリタイプで読み出す。
一度読み出し、最初のコミット時刻を取り出す。
1 2 3 4 5 6 7 8 scala> spark. read. format("hudi" ). load(basePath + "/*/*/*/*" ). createOrReplaceTempView("hudi_trips_snapshot" ) scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime" ).map(k => k.getString(0 )).take(50 ) scala> val beginTime = commits(commits.length - 2 )
今回は、初回書き込みに加えて2回更新したので、 commits
は以下の通り。
1 2 scala> commits res12: Array [String ] = Array (20200330002239 , 20200330002354 , 20200330003142 )
また、今回「読み込みの最初」とするコミットは、以下の通り。
つまり、2回目の更新時。
1 2 scala> beginTime res13: String = 20200330002354
では、 incremental
クエリタイプで読み出す。
1 2 3 4 5 6 7 scala> val tripsIncrementalDF = spark.read.format("hudi" ). option(QUERY_TYPE_OPT_KEY , QUERY_TYPE_INCREMENTAL_OPT_VAL ). option(BEGIN_INSTANTTIME_OPT_KEY , beginTime). load(basePath) scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental" ) scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0" ).show()
結果は以下のようなイメージ。
1 2 3 4 5 6 7 8 9 10 11 scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0" ).show() +-------------------+------------------+--------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+--------------------+-------------------+---+ | 20200330003142 | 87.68271062363665 | 0.9273857651526887 | 0.1620033132033215 |0.0 | | 20200330003142 | 40.44073446276323 |9.842943407509797E-4 |0.47631824594751015 |0.0 | | 20200330003142 | 45.39370966816483 | 0.65888271115305 | 0.8535610661589833 |0.0 | | 20200330003142 |47.332186591003044 | 0.8006023508896579 | 0.9025851737325563 |0.0 | | 20200330003142 | 93.34457064050349 | 0.6331319396951335 | 0.5375953886834237 |0.0 | | 20200330003142 |31.065524210209226 | 0.7608842984578864 | 0.9514417909802292 |0.0 | +-------------------+------------------+--------------------+-------------------+---+
なお、ここでbeginTimeを1遡ることにすると…。
1 scala> val beginTime = commits(commits.length - 3 )
以下のように、2回目のコミットも含まれるようになる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0" ).show() +-------------------+------------------+--------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+--------------------+-------------------+---+ | 20200330003142 | 87.68271062363665 | 0.9273857651526887 | 0.1620033132033215 |0.0 | | 20200330003142 | 40.44073446276323 |9.842943407509797E-4 |0.47631824594751015 |0.0 | | 20200330003142 | 45.39370966816483 | 0.65888271115305 | 0.8535610661589833 |0.0 | | 20200330003142 |47.332186591003044 | 0.8006023508896579 | 0.9025851737325563 |0.0 | | 20200330002354 | 39.09858962414072 | 0.08151154133724581 |0.21729959707372848 |0.0 | | 20200330003142 | 93.34457064050349 | 0.6331319396951335 | 0.5375953886834237 |0.0 | | 20200330002354 | 80.87869643345753 | 0.0748253615757305 | 0.9787639413761751 |0.0 | | 20200330003142 |31.065524210209226 | 0.7608842984578864 | 0.9514417909802292 |0.0 | | 20200330002354 |21.602186045036387 | 0.772134626462835 | 0.3291184473506418 |0.0 | | 20200330002354 | 43.41497201940956 | 0.6226833057042072 | 0.5501675314928346 |0.0 | | 20200330002354 | 35.71294622426758 | 0.6696123015022845 | 0.7318572150654761 |0.0 | | 20200330002354 | 67.30906296028802 | 0.16768228612130764 |0.29666655980198253 |0.0 | +-------------------+------------------+--------------------+-------------------+---+
org.apache.hudi.DefaultSource#createRelation(書き込み)
クイックスタートで、例えば更新などする際の動作を確認する。
1 2 3 4 5 6 7 8 scala> df.write.format("hudi" ). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY , "ts" ). | option(RECORDKEY_FIELD_OPT_KEY , "uuid" ). | option(PARTITIONPATH_FIELD_OPT_KEY , "partitionpath" ). | option(TABLE_NAME , tableName). | mode(Append ). | save(basePath)
のような例を実行する際、内部的には
org.apache.hudi.DefaultSource#createRelation
が呼ばれる。
org/apache/hudi/DefaultSource.scala:85
1 2 3 4 5 6 7 8 9 override def createRelation (sqlContext: SQLContext , mode: SaveMode , optParams: Map [String , String ], df: DataFrame ): BaseRelation = { val parameters = HoodieSparkSqlWriter .parametersWithWriteDefaults(optParams) HoodieSparkSqlWriter .write(sqlContext, mode, parameters, df) createRelation(sqlContext, parameters, df.schema) }
上記メソッド内では、
org.apache.hudi.HoodieSparkSqlWriter$#write
メソッドが呼ばれており、 これが書き込みの実態である。 なお、その下の
org.apache.hudi.DefaultSource#createRelation
は、読み込み時に呼ばれるものと同一。
ここでは org.apache.hudi.HoodieSparkSqlWriter#write
メソッドを確認する。
当該メソッドの冒頭では、オペレーションの判定などいくつか前処理が行われた後、
以下の箇所から実際に書き出す処理が定義されている。
org/apache/hudi/HoodieSparkSqlWriter.scala:85
1 2 3 4 5 6 7 8 9 10 11 12 val (writeStatuses, writeClient: HoodieWriteClient [HoodieRecordPayload [Nothing ]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL )) { val structName = s"${tblName.get} _record" val nameSpace = s"hoodie.${tblName.get} " sparkContext.getConf.registerKryoClasses( Array (classOf[org.apache.avro.generic.GenericData ], classOf[org.apache.avro.Schema ])) val schema = AvroConversionUtils .convertStructTypeToAvroSchema(df.schema, structName, nameSpace) sparkContext.getConf.registerAvroSchemas(schema) (snip)
まず delete
オペレーションかどうかで処理が別れるが、上記の例では upsert
オペレーションなので一旦そのまま読み進める。
ネームスペース(データベースやテーブル?)を取得した後、SparkのStructTypeで保持されたスキーマ情報を、AvroのSchemaに変換する。
変換されたスキーマをSparkで登録する。
つづいて、DataFrameをRDDに変換する。
org/apache/hudi/HoodieSparkSqlWriter.scala:97
1 2 3 4 5 6 7 8 9 val keyGenerator = DataSourceUtils .createKeyGenerator(toProperties(parameters))val genericRecords: RDD [GenericRecord ] = AvroConversionUtils .createRdd(df, structName, nameSpace)val hoodieAllIncomingRecords = genericRecords.map(gr => { val orderingVal = DataSourceUtils .getNestedFieldValAsString( gr, parameters(PRECOMBINE_FIELD_OPT_KEY ), false ).asInstanceOf[Comparable [_]] DataSourceUtils .createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY )) }).toJavaRDD()
RDDに一度変換した後、mapメソッドで加工する。
まず、 genericRecords
の内容は以下のようなものが含まれる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 result = {GenericRecord[1]@27822} 0 = {GenericData$Record@27827} "{"begin_lat": 0.09632451474505643, "begin_lon": 0.8989273848550128, "driver": "driver-164", "end_lat": 0.6431885917325862, "end_lon": 0.6664889106258252, "fare": 86.865568091804, "partitionpath": "americas/brazil/sao_paulo", "rider": "rider-164", "ts": 0.0, "uuid": "5d49cfb5-0db4-4172-bff4-e581eb1f9783"}" schema = {Schema$RecordSchema@27835} "{"type":"record","name":"hudi_trips_cow_record","namespace":"hoodie.hudi_trips_cow","fields":[{"name":"begin_lat","type":["double","null"]},{"name":"begin_lon","type":["double","null"]},{"name":"driver","type":["string","null"]},{"name":"end_lat","type":["double","null"]},{"name":"end_lon","type":["double","null"]},{"name":"fare","type":["double","null"]},{"name":"partitionpath","type":["string","null"]},{"name":"rider","type":["string","null"]},{"name":"ts","type":["double","null"]},{"name":"uuid","type":["string","null"]}]}" values = {Object[10]@27836} 0 = {Double@27838} 0.09632451474505643 1 = {Double@27839} 0.8989273848550128 2 = {Utf8@27840} "driver-164" 3 = {Double@27841} 0.6431885917325862 4 = {Double@27842} 0.6664889106258252 5 = {Double@27843} 86.865568091804 6 = {Utf8@27844} "americas/brazil/sao_paulo" 7 = {Utf8@27845} "rider-164" 8 = {Double@27846} 0.0 9 = {Utf8@27847} "5d49cfb5-0db4-4172-bff4-e581eb1f9783"
上記の通り、これは入ロクレコードそのものである。
その後、mapメソッドを使ってHudiで利用するキーを含む、Hudiのレコード形式に変換する。
変換された hoodieAllIncomingRecords
は以下のような内容になる。
1 2 3 4 5 6 7 8 9 10 11 result = {Wrappers$SeqWrapper@27881} size = 1 0 = {HoodieRecord@27883} "HoodieRecord{key=HoodieKey { recordKey=5d49cfb5-0db4-4172-bff4-e581eb1f9783 partitionPath=americas/brazil/sao_paulo}, currentLocation='null', newLocation='null'}" key = {HoodieKey@27892} "HoodieKey { recordKey=5d49cfb5-0db4-4172-bff4-e581eb1f9783 partitionPath=americas/brazil/sao_paulo}" recordKey = "5d49cfb5-0db4-4172-bff4-e581eb1f9783" partitionPath = "americas/brazil/sao_paulo" data = {OverwriteWithLatestAvroPayload@27893} recordBytes = {byte[142]@27895} orderingVal = "0.0" currentLocation = null newLocation = null sealed = false
上記の例の通り、ペイロードは
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
で保持される。
その後、いくつかモードの確認が行われた後、もしテーブルがなければ
org.apache.hudi.common.table.HoodieTableMetaClient#initTableType
を用いて テーブルを初期化する。
その後、重複レコードを必要に応じて落とす。
org/apache/hudi/HoodieSparkSqlWriter.scala:132
1 2 3 4 5 6 7 8 9 val hoodieRecords = if (parameters(INSERT_DROP_DUPS_OPT_KEY ).toBoolean) { DataSourceUtils .dropDuplicates( jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters), client.getTimelineServer) } else { hoodieAllIncomingRecords }
レコードが空かどうかを改めて確認しつつ、 最後に書き込み実施。
org.apache.hudi.DataSourceUtils#doWriteOperation
が実態である。
org/apache/hudi/HoodieSparkSqlWriter.scala:147
1 val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
org.apache.hudi.DataSourceUtils#doWriteOperation
メソッドは以下の通り。
org/apache/hudi/DataSourceUtils.java:162
1 2 3 4 5 6 7 8 9 10 11 public static JavaRDD<WriteStatus> doWriteOperation (HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords, String commitTime, String operation) { if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) { return client.bulkInsert(hoodieRecords, commitTime); } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) { return client.insert(hoodieRecords, commitTime); } else { return client.upsert(hoodieRecords, commitTime); } }
今回の例だと、 upseart
オペレーションなので
org.apache.hudi.client.HoodieWriteClient#upsert
メソッドが呼ばれる。 このメソッドは以下のとおりだが、ポイントは、
org.apache.hudi.client.HoodieWriteClient#upsertRecordsInternal
メソッドである。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public JavaRDD<WriteStatus> upsert (JavaRDD<HoodieRecord<T>> records, final String commitTime) { HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT); try { JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism()); Timer.Context indexTimer = metrics.getIndexCtx(); JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); return upsertRecordsInternal(taggedRecords, commitTime, table, true ); } catch (Throwable e) { if (e instanceof HoodieUpsertException) { throw (HoodieUpsertException) e; } throw new HoodieUpsertException("Failed to upsert for commit time " + commitTime, e); } }
org.apache.hudi.client.HoodieWriteClient#upsertRecordsInternal
メソッド内のポイントは、 以下の箇所。
org.apache.spark.api.java.AbstractJavaRDDLike#mapPartitionsWithIndex
メソッドで、upsertやinsertの処理を定義している。
org/apache/hudi/client/HoodieWriteClient.java:470
1 2 3 4 5 6 7 8 9 JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> { if (isUpsert) { return hoodieTable.handleUpsertPartition(commitTime, partition, recordItr, partitioner); } else { return hoodieTable.handleInsertPartition(commitTime, partition, recordItr, partitioner); } }, true ).flatMap(List::iterator); return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime);
ここでは、
org.apache.hudi.table.HoodieCopyOnWriteTable#handleUpsertPartition
メソッドを確認してみる。
org/apache/hudi/table/HoodieCopyOnWriteTable.java:253
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Iterator<List<WriteStatus>> handleUpsertPartition(String commitTime, Integer partition, Iterator recordItr, Partitioner partitioner) { UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); BucketType btype = binfo.bucketType; try { if (btype.equals(BucketType.INSERT)) { return handleInsert(commitTime, binfo.fileIdPrefix, recordItr); } else if (btype.equals(BucketType.UPDATE)) { return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr); } else { throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); } } catch (Throwable t) { String msg = "Error upserting bucketType " + btype + " for partition :" + partition; LOG.error(msg, t); throw new HoodieUpsertException(msg, t); } }
真ん中あたりに、INSERTかUPDATEかで条件分岐しているが、ここでは例としてINSERT側を確認する。
org.apache.hudi.table.HoodieCopyOnWriteTable#handleInsert
メソッドがポイントとなる。
なお、当該メッソッドには同期的な実装と、非同期的な実装があるよう。
ここでは上記呼び出しに基づき、非同期的な実装の方を確認する。
org/apache/hudi/table/HoodieCopyOnWriteTable.java:233
1 2 3 4 5 6 7 8 9 public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception { if (!recordItr.hasNext()) { LOG.info("Empty partition" ); return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator(); } return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this , idPfx); }
戻り値が、
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable
クラスのインスタンスになっていることがわかる。 このイテレータは、
org.apache.hudi.client.utils.LazyIterableIterator
アブストラクトクラスを継承している。
org.apache.hudi.client.utils.LazyIterableIterator
では、nextメソッドが
org/apache/hudi/client/utils/LazyIterableIterator.java:116
1 2 3 4 5 6 7 8 @Override public O next () { try { return computeNext(); } catch (Exception ex) { throw new RuntimeException(ex); } }
のように定義されており、実態が
org.apache.hudi.client.utils.LazyIterableIterator#computeNext
であることがわかる。 当該メソッドは、
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable#CopyOnWriteLazyInsertIterable
クラスではオーバーライドされており、 以下のように定義されている。
org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java:93
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override protected List<WriteStatus> computeNext () { BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor = null ; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema)); final List<WriteStatus> result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; } catch (Exception e) { throw new HoodieException(e); } finally { if (null != bufferedIteratorExecutor) { bufferedIteratorExecutor.shutdownNow(); } } }
どうやら内部でFutureパターンを利用し、非同期化して書き込みを行っているようだ。(これが筋よしなのかどうかは要議論。update、つまりマージも同様になっている。)
処理内容を知る上でポイントとなるのは、
1 2 bufferedIteratorExecutor = new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema));
の箇所。
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable#getInsertHandler
あたり。 中で用いられている、
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.CopyOnWriteInsertHandler
クラスがポイントとなる。
このクラスは、書き込みデータのキュー(要確認)からレコードを受け取って、処理していると考えられる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected void consumeOneRecord (HoodieInsertValueGenResult<HoodieRecord> payload) { final HoodieRecord insertPayload = payload.record; if (handle == null ) { handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), getNextFileId(idPrefix)); } if (handle.canWrite(payload.record)) { handle.write(insertPayload, payload.insertValue, payload.exception); } else { statuses.add(handle.close()); handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), getNextFileId(idPrefix)); handle.write(insertPayload, payload.insertValue, payload.exception); } }
下の方にある org.apache.hudi.io.HoodieCreateHandle
クラスを用いているあたりがポイント。 そのwriteメソッドは以下の通り。
org.apache.hudi.io.storage.HoodieStorageWriter#writeAvroWithMetadata
を用いて書き出しているように見える。 (実際には
org.apache.hudi.io.storage.HoodieParquetWriter
)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void write (HoodieRecord record, Option<IndexedRecord> avroRecord) { Option recordMetadata = record.getData().getMetadata(); try { if (avroRecord.isPresent()) { IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get()); storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId())); record.seal(); recordsWritten++; insertRecordsWritten++; } else { recordsDeleted++; } writeStatus.markSuccess(record, recordMetadata); record.deflate(); } catch (Throwable t) { writeStatus.markFailure(record, t, recordMetadata); LOG.error("Error writing record " + record, t); } }
org.apache.hudi.io.storage.HoodieParquetWriter#writeAvroWithMetadata
メソッドは以下の通りである。 つまり、
org.apache.parquet.hadoop.ParquetWriter#write
を用いてParquet内に、Avroレコードを書き出していることがわかる。
1 2 3 4 5 6 7 8 9 10 @Override public void writeAvroWithMetadata (R avroRecord, HoodieRecord record) throws IOException { String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement()); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), file.getName()); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, commitTime, seqId); super .write(avroRecord); writeSupport.add(record.getRecordKey()); }
今回のクイックスタートの例では、 avroRecord
には以下のような内容が含まれていた。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 result = {GenericData$Record@18566} "{"_hoodie_commit_time": "20200331002133", "_hoodie_commit_seqno": "20200331002133_0_44", "_hoodie_record_key": "7b887fb5-2837-4cac-b075-a8a8450f453d", "_hoodie_partition_path": "asia/india/chennai", "_hoodie_file_name": "317a54b0-70b8-4bdc-bfde-12ba4fde982b-0_0-207-301_20200331002133.parquet", "begin_lat": 0.4789745387904072, "begin_lon": 0.14781856144057215, "driver": "driver-022", "end_lat": 0.10509642405359532, "end_lon": 0.07682825311613706, "fare": 30.429177017810616, "partitionpath": "asia/india/chennai", "rider": "rider-022", "ts": 0.0, "uuid": "7b887fb5-2837-4cac-b075-a8a8450f453d"}" schema = {Schema$RecordSchema@18582} "{"type":"record","name":"hudi_trips_cow_record","namespace":"hoodie.hudi_trips_cow","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"begin_lat","type":["double","null"]},{"name":"begin_lon","type":["double","null"]},{"name":"driver","type":["string","null"]},{"name":"end_lat","type":["double","null"]},{"name":"end_lon","type":["double","null"]},{"name":"fare","type":["double","null"]},{"name":"partitionpath","type":["string","null"]},{"name":"rider","type":["string","null"]},{"name":"ts","type":["double","null"]},{"name":"uuid","type":["string","null"]}]}" values = {Object[15]@18583} 0 = "20200331002133" 1 = "20200331002133_0_44" 2 = "7b887fb5-2837-4cac-b075-a8a8450f453d" 3 = "asia/india/chennai" 4 = "317a54b0-70b8-4bdc-bfde-12ba4fde982b-0_0-207-301_20200331002133.parquet" 5 = {Double@18596} 0.4789745387904072 6 = {Double@18597} 0.14781856144057215 7 = {Utf8@18598} "driver-022" 8 = {Double@18599} 0.10509642405359532 9 = {Double@18600} 0.07682825311613706 10 = {Double@18601} 30.429177017810616 11 = {Utf8@18602} "asia/india/chennai" 12 = {Utf8@18603} "rider-022" 13 = {Double@18604} 0.0 14 = {Utf8@18605} "7b887fb5-2837-4cac-b075-a8a8450f453d"
org.apache.hudi.DefaultSource#createRelation(読み込み)
当該メソッドのポイントを確認する。
hoodie.datasource.query.type
の種類によって返すRelationが変わる。
org/apache/hudi/DefaultSource.scala:60
1 2 3 4 5 6 7 8 9 10 11 if (parameters(QUERY_TYPE_OPT_KEY ).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL )) {(snip) } else if (parameters(QUERY_TYPE_OPT_KEY ).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL )) { (snip) } else { throw new HoodieException ("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY )) }
上記の通り、 snapshot
、もしくは
incremental
クエリタイプである。 なお、以下の通り、
MERGE_ON_READ
テーブルに対する snapshot
クエリタイプは利用できない。 もし使いたければ、SparkのData
Source機能ではなく、Hiveテーブルとして読み込むこと。
org/apache/hudi/DefaultSource.scala:69
1 2 log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " + "Please query the Hive table registered using Spark SQL." )
まずクエリタイプが snapshot
である場合は、
以下の通り、Parquetとして読み込みが定義され、Relationが返される。
org/apache/hudi/DefaultSource.scala:72
1 2 3 4 5 6 DataSource .apply( sparkSession = sqlContext.sparkSession, userSpecifiedSchema = Option (schema), className = "parquet" , options = parameters) .resolveRelation()
例えば、クイックスタートの例
1 2 3 4 5 scala> val tripsSnapshotDF = spark. read. format("hudi" ). load(basePath + "/*/*/*/*" ) scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot" )
では、こちらのタイプ。
ParquetベースのRelation(実際には、HadoopFsRelation)が返される。
上記の例では、当該RelationのrootPathsに、以下のような値が含まれる。
1 2 3 4 5 6 7 rootPaths = {$colon$colon@14885} "::" size = 6 0 = {Path@15421} "file:/tmp/hudi_trips_cow/americas/brazil/sao_paulo/ae28c85a-38f0-487f-a42d-3a0babc9d321-0_0-21-25_20200329002247.parquet" 1 = {Path@15422} "file:/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.hoodie_partition_metadata" 2 = {Path@15423} "file:/tmp/hudi_trips_cow/americas/united_states/san_francisco/849db286-1cbe-4a1f-b544-9939893e99f8-0_1-21-26_20200329002247.parquet" 3 = {Path@15424} "file:/tmp/hudi_trips_cow/americas/united_states/san_francisco/.hoodie_partition_metadata" 4 = {Path@15425} "file:/tmp/hudi_trips_cow/asia/india/chennai/2ebfbab0-4f8f-42db-b79e-1c0cbcc3cf39-0_2-21-27_20200329002247.parquet" 5 = {Path@15426} "file:/tmp/hudi_trips_cow/asia/india/chennai/.hoodie_partition_metadata"
次にクエリタイプが incremental
である場合は、
以下の通り、
org.apache.hudi.IncrementalRelation#IncrementalRelation
が返される。
org/apache/hudi/DefaultSource.scala:79
1 new IncrementalRelation (sqlContext, path.get, optParams, schema)
クイックスタートの例
1 2 3 4 5 6 7 scala> val tripsIncrementalDF = spark.read.format("hudi" ). option(QUERY_TYPE_OPT_KEY , QUERY_TYPE_INCREMENTAL_OPT_VAL ). option(BEGIN_INSTANTTIME_OPT_KEY , beginTime). load(basePath) scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental" ) scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0" ).show()
では、
org.apache.hudi.IncrementalRelation#IncrementalRelation
が戻り値として返される。
IncrementalRelation
コンストラクタ
Parquetをファイルを単純に読めば良いのと比べて、格納された最新データを返すようにしないとならないので
それなりに複雑なRelationとなっている。
以下、簡単にコンストラクタのポイントを確認する。
最初にメタデータを取得するクライアント。
コミット、セーブポイント、コンパクションなどの情報を得られるようになる。
org/apache/hudi/IncrementalRelation.scala:51
1 val metaClient = new HoodieTableMetaClient (sqlContext.sparkContext.hadoopConfiguration, basePath, true )
クイックスタートの例では、 metaPath
は、
file:/tmp/hudi_trips_cow/.hoodie
だった。
続いてテーブル情報のインスタンスを取得する。
テーブル情報からタイムラインを取り出す。
org/apache/hudi/IncrementalRelation.scala:57
1 2 3 4 5 6 7 8 9 10 private val hoodieTable = HoodieTable .getHoodieTable(metaClient, HoodieWriteConfig .newBuilder().withPath(basePath).build(), sqlContext.sparkContext) val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()if (commitTimeline.empty()) { throw new HoodieException ("No instants to incrementally pull" ) } if (!optParams.contains(DataSourceReadOptions .BEGIN_INSTANTTIME_OPT_KEY )) { throw new HoodieException (s"Specify the begin instant time to pull from using " + s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY} " ) }
クイックスタートの例で実際に生成されたタイムラインは以下の通り。
1 2 3 4 instants = {ArrayList@25586} size = 3 0 = {HoodieInstant@25589} "[20200330002239__commit__COMPLETED]" 1 = {HoodieInstant@25590} "[20200330002354__commit__COMPLETED]" 2 = {HoodieInstant@25591} "[20200330003142__commit__COMPLETED]"
オプションとして与えられた「はじめ」と「おわり」から、
対象となるタイムラインを構成する。
タイムライン上、最も新しいインスタンスを取得し、
Parquetファイルからスキーマを読んでいる。
org/apache/hudi/IncrementalRelation.scala:68
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val lastInstant = commitTimeline.lastInstant().get()val commitsToReturn = commitTimeline.findInstantsInRange( optParams(DataSourceReadOptions .BEGIN_INSTANTTIME_OPT_KEY ), optParams.getOrElse(DataSourceReadOptions .END_INSTANTTIME_OPT_KEY , lastInstant.getTimestamp)) .getInstants.iterator().toList val latestSchema = { val instant = commitsToReturn.lastOption.getOrElse(lastInstant) val latestMeta = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata ]) val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next() AvroConversionUtils .convertAvroSchemaToStructType(ParquetUtils .readAvroSchema( sqlContext.sparkContext.hadoopConfiguration, new Path (metaFilePath))) }
クイックスタートの例では、 commitsToReturn
は以下の通り。
1 2 3 4 5 result = {$colon$colon@25626} "::" size = 1 0 = {HoodieInstant@25591} "[20200330003142__commit__COMPLETED]" state = {HoodieInstant$State@25602} "COMPLETED" action = "commit" timestamp = "20200330003142"
また、少々気になるのは、
1 2 AvroConversionUtils .convertAvroSchemaToStructType(ParquetUtils .readAvroSchema( sqlContext.sparkContext.hadoopConfiguration, new Path (metaFilePath)))
という箇所で、もともとParquet形式のデータからAvro形式のスキーマを取り出し、それをさらにSparkのStructTypeに変換しているところ。
実際にParquetのfooterから取り出したスキーマ情報を、AvroのSchemaに変換しているのは以下の箇所。
org/apache/hudi/common/util/ParquetUtils.java:140
1 2 3 public static Schema readAvroSchema(Configuration configuration, Path parquetFilePath) { return new AvroSchemaConverter ().convert(readSchema(configuration, parquetFilePath)); }
Parquet自身にAvroへの変換器
org.apache.parquet.avro.AvroSchemaConverter
が備わっているので便利?
SparkのData
Source機能でDataFrame化してからスキーマを取り出すと、一度読み込みが生じていしまうから非効率?
という理由が想像されるが、やや回りくどいような印象を持った。
★要確認
本編に戻る。続いてフィルタを定義。
org/apache/hudi/IncrementalRelation.scala:86
1 2 3 4 5 6 7 8 9 10 val filters = { if (optParams.contains(DataSourceReadOptions .PUSH_DOWN_INCR_FILTERS_OPT_KEY )) { val filterStr = optParams.getOrElse( DataSourceReadOptions .PUSH_DOWN_INCR_FILTERS_OPT_KEY , DataSourceReadOptions .DEFAULT_PUSH_DOWN_FILTERS_OPT_VAL ) filterStr.split("," ).filter(!_.isEmpty) } else { Array [String ]() } }
ここまでがコンストラクタ。
buildScan
実際にSparkのData
Sourceで読み込むときに用いられる読み込みの手段が定義されている。
以下にポイントを述べる。
org/apache/hudi/IncrementalRelation.scala:99
1 2 3 override def buildScan (): RDD [Row ] = {(snip)
ファイルIDとフルPATHのマップを作る。
1 2 3 4 5 6 val fileIdToFullPath = mutable.HashMap [String , String ]()for (commit <- commitsToReturn) { val metadata: HoodieCommitMetadata = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(commit) .get, classOf[HoodieCommitMetadata ]) fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap }
上記マップに対し、必要に応じてフィルタを適用する。
org/apache/hudi/IncrementalRelation.scala:106
1 2 3 4 5 6 7 8 9 val pathGlobPattern = optParams.getOrElse( DataSourceReadOptions .INCR_PATH_GLOB_OPT_KEY , DataSourceReadOptions .DEFAULT_INCR_PATH_GLOB_OPT_VAL ) val filteredFullPath = if (!pathGlobPattern.equals(DataSourceReadOptions .DEFAULT_INCR_PATH_GLOB_OPT_VAL )) { val globMatcher = new GlobPattern ("*" + pathGlobPattern) fileIdToFullPath.filter(p => globMatcher.matches(p._2)) } else { fileIdToFullPath }
コンストラクタで定義されたフィルタを適用しながら、
対象となるParquetファイルを読み込み、RDDを生成する。
org/apache/hudi/IncrementalRelation.scala:117
1 2 3 4 5 6 7 8 9 10 11 12 13 14 sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class" ) val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path" ))if (filteredFullPath.isEmpty) { sqlContext.sparkContext.emptyRDD[Row ] } else { log.info("Additional Filters to be applied to incremental source are :" + filters) filters.foldLeft(sqlContext.read.options(sOpts) .schema(latestSchema) .parquet(filteredFullPath.values.toList: _*) .filter(String .format("%s >= '%s'" , HoodieRecord .COMMIT_TIME_METADATA_FIELD , commitsToReturn.head.getTimestamp)) .filter(String .format("%s <= '%s'" , HoodieRecord .COMMIT_TIME_METADATA_FIELD , commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f)) .toDF().rdd }
Hudiへの書き込み
Writing Hudi
Tables をベースに調べる。
オペレーション種類
書き込みのオペレーション種類は、upsert、insert、bulk_insert。
クイックスタートにはbulk_insertはなかった。
DeltaStreamer
ユーティリティとして付属するDeltaStreamerを用いると、
Kafka等からデータを取り込める。
Avro等のスキーマのデータを読み取れる。
動作確認
パッケージ化
公式ドキュメントのData
Streamer の手順に基づくと、
ビルドされたユーティリティを使うことになるので、
予めパッケージ化しておく。
1 2 3 4 5 6 $ mkdir -p ~/Sources $ cd ~/Sources $ git clone https://github.com/apache/incubator-hudi.git incubator-hudi-052 $ cd incubator-hudi-052 $ git checkout -b release-0.5.2-incubating refs/tags/release-0.5.2-incubating $ mvn clean package -DskipTests -DskipITs
実行
公式ドキュメントのData
Streamer に基づくと、Confluentメンバが作成した ( apurvam
streams-prototyping )サンプルデータ作成用のAvroスキーマと Confluent
PlatformのKSQLのユーティリティを 使ってサンプルデータを作成する。
ついては。予めConfluent Platformをインストールしておくこと。
まずはスキーマをダウンロードする。
1 $ curl https://raw.githubusercontent.com/apurvam/streams-prototyping/master/src/main/resources/impressions.avro > /tmp/impressions.avro
テストデータを生成する。
1 $ ksql-datagen schema=/tmp/impressions.avro format=avro topic=impressions key=impressionid
別の端末を開き、ユーティリティを起動する。
1 2 3 4 5 6 7 8 9 10 11 $ export SPARK_HOME=/opt/spark/default $ cd ~/Sources/incubator-hudi-052 $ ${SPARK_HOME} /bin/spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \ --props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --source-ordering-field impresssiontime \ --target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \ --target-table uber.impressions \ --table-type COPY_ON_WRITE \ --op BULK_INSERT
なお、 公式ドキュメントのData
Streamer から2箇所修正した。(JarファイルPATH、
--table-type
オプション追加。
Kafkaから読み込んで書き出したデータ(
/tmp/hudi-deltastreamer-op
)を確認してみる。
1 2 3 $ ${SPARK_HOME} /bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
シェルが起動したら、以下の通り読み込んで見る。 なお、ここでは
userid
がパーティションキーとなっているので、ロード時にそれを指定した。
1 2 3 4 5 scala> val basePath = "file:///tmp/hudi-deltastreamer-op" scala> val impressionDF = spark. read. format("hudi" ). load(basePath + "/*/*" )
内容は以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 scala> impressionDF.show +-------------------+--------------------+------------------+----------------------+--------------------+---------------+--------------+-------+-----+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|impresssiontime| impressionid| userid| adid| +-------------------+--------------------+------------------+----------------------+--------------------+---------------+--------------+-------+-----+ | 20200406002420 |20200406002420 _1_...| impression_106| user_83|fb381e12-f9ec-4 fb...| 1586096500438 |impression_106|user_83|ad_57| | 20200406002420 |20200406002420 _1_...| impression_107| user_83|fb381e12-f9ec-4 fb...| 1586096464324 |impression_107|user_83|ad_11| | 20200406002420 |20200406002420 _1_...| impression_111| user_83|fb381e12-f9ec-4 fb...| 1586096366450 |impression_111|user_83|ad_14| | 20200406002420 |20200406002420 _1_...| impression_111| user_83|fb381e12-f9ec-4 fb...| 1586099019181 |impression_111|user_83|ad_38| | 20200406002420 |20200406002420 _1_...| impression_116| user_83|fb381e12-f9ec-4 fb...| 1586099146437 |impression_116|user_83|ad_48| | 20200406002420 |20200406002420 _1_...| impression_121| user_83|fb381e12-f9ec-4 fb...| 1586098316334 |impression_121|user_83|ad_26| (snip)
実装確認
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
クラスの実装を確認する。
まずmainは以下の通り。
org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:298
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) throws Exception { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null , args); if (cfg.help || args.length == 0 ) { cmd.usage(); System.exit(1 ); } Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); try { new HoodieDeltaStreamer(cfg, jssc).sync(); } finally { jssc.stop(); } }
上記の通り、
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer#sync
メソッドがエントリポイント。
org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:116
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void sync () throws Exception { if (cfg.continuousMode) { deltaSyncService.start(this ::onDeltaSyncShutdown); deltaSyncService.waitForShutdown(); LOG.info("Delta Sync shutting down" ); } else { LOG.info("Delta Streamer running only single round" ); try { deltaSyncService.getDeltaSync().syncOnce(); } catch (Exception ex) { LOG.error("Got error running delta sync once. Shutting down" , ex); throw ex; } finally { deltaSyncService.close(); LOG.info("Shut down delta streamer" ); } } }
上記の通り、 continous
モードかどうかで動作が変わる。
ここでは一旦、ワンショットの場合を確認する。
上記の通り、
org.apache.hudi.utilities.deltastreamer.DeltaSync#syncOnce
メソッドがエントリポイント。
当該メソッドは以下のようにシンプルな内容。
org/apache/hudi/utilities/deltastreamer/DeltaSync.java:218
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public Option<String> syncOnce () throws Exception { Option<String> scheduledCompaction = Option.empty(); HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider)); Timer.Context overallTimerContext = metrics.getOverallTimerContext(); refreshTimeline(); Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = readFromSource(commitTimelineOpt); if (null != srcRecordsWithCkpt) { if (null == schemaProvider) { this .schemaProvider = srcRecordsWithCkpt.getKey(); setupWriteClient(); } scheduledCompaction = writeToSink(srcRecordsWithCkpt.getRight().getRight(), srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext); } jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); return scheduledCompaction; }
最初にメトリクスの準備、データソースから読み出してRDD化する定義(
org.apache.hudi.utilities.deltastreamer.DeltaSync#readFromSource
メソッド) その後、
org.apache.hudi.utilities.deltastreamer.DeltaSync#writeToSink
メソッドにより、定義されたRDDの内容を実際に書き込む。
ここでは上記メソッドを確認する。
まず与えられたRDDから重複排除する。
org/apache/hudi/utilities/deltastreamer/DeltaSync.java:352
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private Option<String> writeToSink (JavaRDD<HoodieRecord> records, String checkpointStr, HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception { Option<String> scheduledCompactionInstant = Option.empty(); if (cfg.filterDupes) { cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig()); } boolean isEmpty = records.isEmpty(); (snip)
その後実際の書き込みになるが、そのとき採用したオペレーション種類によって動作が異なる。
org/apache/hudi/utilities/deltastreamer/DeltaSync.java:369
1 2 3 4 5 6 7 8 9 if (cfg.operation == Operation.INSERT) { writeStatusRDD = writeClient.insert(records, instantTime); } else if (cfg.operation == Operation.UPSERT) { writeStatusRDD = writeClient.upsert(records, instantTime); } else if (cfg.operation == Operation.BULK_INSERT) { writeStatusRDD = writeClient.bulkInsert(records, instantTime); } else { throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); }
bulkInsert
ここではためしに
org.apache.hudi.client.HoodieWriteClient#bulkInsert
メソッドを確認してみる。
当該メソッドでは、最初にCOPY_ON_WRITEかMERGE_ON_READかに応じて、それぞれの種類のテーブル情報を取得する。
その後、
org.apache.hudi.client.HoodieWriteClient#bulkInsertInternal
メソッドを使ってデータを書き込む。
なお、その間で重複排除されているが、上記の通り、もともと重複排除しているはずなので、要確認。(重複排除のロジックが異なるのかどうか、など)
パット見た感じ、
org.apache.hudi.DataSourceUtils#dropDuplicates
メソッドはロケーション情報(インデックス?)がない場合をドロップする。
org.apache.hudi.client.HoodieWriteClient#combineOnCondition
メソッドはキーに基づきreduce処理する。 という違いがあるようだ。
org/apache/hudi/client/HoodieWriteClient.java:300
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public JavaRDD<WriteStatus> bulkInsert (JavaRDD<HoodieRecord<T>> records, final String instantTime, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT); setOperationType(WriteOperationType.BULK_INSERT); try { JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism()); return bulkInsertInternal(dedupedRecords, instantTime, table, bulkInsertPartitioner); } catch (Throwable e) { if (e instanceof HoodieInsertException) { throw e; } throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e); } }
上記の通り、
org.apache.hudi.client.HoodieWriteClient#bulkInsertInternal
メソッドが中で用いられている。
当該メソッドでは、再パーティションないしソートが行われた後、書き込みが実行される。
org/apache/hudi/client/HoodieWriteClient.java:412
1 2 3 JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true ) .flatMap(List::iterator);
ポイントは、org.apache.hudi.execution.BulkInsertMapFunction
クラスである。 このクラスが関数として渡されている。
org.apache.hudi.execution.BulkInsertMapFunction#call
メソッドは以下の通り。
org/apache/hudi/execution/BulkInsertMapFunction.java:52
1 2 3 4 public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) { return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable, fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier()); }
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable
クラスについては、別の節で書いたとおり。
insert
org.apache.hudi.client.HoodieWriteClient#insert
メソッド。
大まかな構造は、 bulkInsert
と同様。 ポイントは、
org.apache.hudi.client.HoodieWriteClient#upsertRecordsInternal
メソッド。
org/apache/hudi/client/HoodieWriteClient.java:229
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public JavaRDD<WriteStatus> insert (JavaRDD<HoodieRecord<T>> records, final String instantTime) { HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT); setOperationType(WriteOperationType.INSERT); try { JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism()); return upsertRecordsInternal(dedupedRecords, instantTime, table, false ); } catch (Throwable e) { if (e instanceof HoodieInsertException) { throw e; } throw new HoodieInsertException("Failed to insert for commit time " + instantTime, e); } }
上記の通り、挿入対象のデータを表すRDDを引数に取り、データを書き込む。
これは、upsertのときと同じメソッドである。第4引数でinsertかupsertかを分ける。
当該メソッドは以下の通り。 bulkInsert
と同様にリパーティションなどを経て、
org.apache.hudi.table.HoodieTable#handleUpsertPartition
が呼び出される。
org/apache/hudi/client/HoodieWriteClient.java:457
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private JavaRDD<WriteStatus> upsertRecordsInternal (JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, HoodieTable<T> hoodieTable, final boolean isUpsert) {(snip) JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> { if (isUpsert) { return hoodieTable.handleUpsertPartition(instantTime, partition, recordItr, partitioner); } else { return hoodieTable.handleInsertPartition(instantTime, partition, recordItr, partitioner); } }, true ).flatMap(List::iterator); (snip)
org.apache.hudi.table.HoodieTable#handleUpsertPartition
と org.apache.hudi.table.HoodieTable#handleInsertPartition
が用いられている。 今回は、insertなので後者。
なお、
org.apache.hudi.table.HoodieCopyOnWriteTable#handleInsertPartition
は以下の通り、実態としては
org.apache.hudi.table.HoodieCopyOnWriteTable#handleUpsertPartition
である。
1 2 3 4 public Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) { return handleUpsertPartition(instantTime, partition, recordItr, partitioner); }
当該メソッドは以下の通り。
insertやupsertでは、RDDひとつを1バケットと表現している。
バケットの情報から、insertやupdateの情報を取得して用いる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) { UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); BucketType btype = binfo.bucketType; try { if (btype.equals(BucketType.INSERT)) { return handleInsert(instantTime, binfo.fileIdPrefix, recordItr); } else if (btype.equals(BucketType.UPDATE)) { return handleUpdate(instantTime, binfo.partitionPath, binfo.fileIdPrefix, recordItr); } else { throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); } } catch (Throwable t) { String msg = "Error upserting bucketType " + btype + " for partition :" + partition; LOG.error(msg, t); throw new HoodieUpsertException(msg, t); } }
例えば、insertの場合は、
org.apache.hudi.table.HoodieCopyOnWriteTable#handleInsert
が呼び出される。 当該メソッドでは、戻り値として
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable#CopyOnWriteLazyInsertIterable
が返される。
org/apache/hudi/table/HoodieCopyOnWriteTable.java:186
1 2 3 4 5 6 7 8 9 public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception { if (!recordItr.hasNext()) { LOG.info("Empty partition" ); return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator(); } return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this , idPfx, sparkTaskContextSupplier); }
このメソッドについては上記ですでに説明したとおり。
HoodieCopyOnWriteTable
と HoodieMergeOnReadTable
テーブルの種類によって、書き込みの実装上どういう違いがあるかを確認する。
例えば、handleInsert
メソッドを確認する。なお、当該メソッドには同期的、非同期的な処理方式がそれぞれ実装されている。
HoodieCopyOnWriteTableの場合は以下の通り。
org/apache/hudi/table/HoodieCopyOnWriteTable.java:186
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception { if (!recordItr.hasNext()) { LOG.info("Empty partition" ); return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator(); } return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this , idPfx, sparkTaskContextSupplier); } public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) { HoodieCreateHandle createHandle = new HoodieCreateHandle(config, instantTime, this , partitionPath, fileId, recordItr, sparkTaskContextSupplier); createHandle.write(); return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator(); }
上が非同期的な方式、下が同期的な方式と見られる。
なお、実装上は同期的な処理方式は今は使われていないようにもみえるが、要確認。
HoodieMergeOnReadTableの場合は、非同期的な処理だけoverrideされている。
org/apache/hudi/table/HoodieMergeOnReadTable.java:120
1 2 3 4 5 6 7 8 9 public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception { if (index.canIndexLogFiles()) { return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this , idPfx, sparkTaskContextSupplier); } else { return super .handleInsert(instantTime, idPfx, recordItr); } }