scala> val data = spark.range(0, 5) data: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val outputUrl = "alluxio://localhost:19998/users/dobachi/numbers"
scala> data.write.format("delta").save(outputUrl)
すると以下のようなエラーが生じた。
1 2 3 4 5 6 7 8 9 10
scala> data.write.format("delta").save(outputUrl) 21/01/05 22:47:50 ERROR HDFSLogStore: The error typically occurs when the default LogStore implementation, that is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system. In order to get the transactional ACID guarantees on table updates, you have to use the correct implementation of LogStore that is appropriate for your storage system. See https://docs.delta.io/latest/delta-storage.html " for details.
org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.alluxio.impl=null: No AbstractFileSystem configured for scheme: alluxio
21/01/05 02:54:35 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hadoop-pseudo, executor 2): alluxio.exception.status.UnauthenticatedException: Channel authentication failed with code:UNAUTHENTICATED. Channel: GrpcChannelKey{ClientType=FileSystemMasterClient, ClientHostname=hadoop-pseudo.mshome.net, ServerAddress=GrpcServerAddress{HostName=localhost, SocketAddress=localhost:19998}, ChannelId=81f7d97f-8e32-4289-bcab-ea6008d5ffac}, AuthType: SIMPLE, Error: alluxio.exception.status.UnauthenticatedException: Plain authentication failed: Failed to authenticate client user="yarn" connecting to Alluxio server and impersonating as impersonationUser="vagrant" to access Alluxio file system. User "yarn" is not configured to allow any impersonation. Please read the guide to configure impersonation at https://docs.alluxio.io/os/user/2.4/en/operation/Security.html at alluxio.exception.status.AlluxioStatusException.from(AlluxioStatusException.java:141)
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition); if (offset != null) { // The offset as it is stored contains our next state, so restore it as-is. taskGeneration = ((Long) offset.get(TASK_GENERATION)).intValue(); count = ((Long) offset.get(CURRENT_ITERATION)); random.setSeed((Long) offset.get(RANDOM_SEED)); }
final Object generatedObject = generator.generate(); if (!(generatedObject instanceof GenericRecord)) { thrownew RuntimeException(String.format( "Expected Avro Random Generator to return instance of GenericRecord, found %s instead", generatedObject.getClass().getName() )); } final GenericRecord randomAvroMessage = (GenericRecord) generatedObject;
final List<Object> genericRowValues = new ArrayList<>(); for (org.apache.avro.Schema.Field field : avroSchema.getFields()) { final Object value = randomAvroMessage.get(field.name()); if (value instanceof Record) { final Record record = (Record) value; final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value(); Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue); genericRowValues.add(optionValue); } else { genericRowValues.add(value); } }
// The source offsets will be the values that the next task lifetime will restore from // Essentially, the "next" state of the connector after this loop completes Map<String, Object> sourceOffset = new HashMap<>(); // The next lifetime will be a member of the next generation. sourceOffset.put(TASK_GENERATION, taskGeneration + 1); // We will have produced this record sourceOffset.put(CURRENT_ITERATION, count + 1); // This is the seed that we just re-seeded for our own next iteration. sourceOffset.put(RANDOM_SEED, seed);
Checking if the standard Scala applications are installed Installed ammonite Installed cs Installed coursier Installed scala Installed scalac Installed sbt Installed sbtn Installed scalafmt
$ ./cs setup --help Command: setup Usage: cs setup --jvm <string?> --jvm-dir <string?> --system-jvm <bool?> --local-only <bool> --update <bool> --jvm-index <string?> --graalvm-home <string?> --graalvm-option <string*> --graalvm-default-version <string?> --install-dir | --dir <string?> --install-platform <string?> Platform for prebuilt binaries (e.g. "x86_64-pc-linux", "x86_64-apple-darwin", "x86_64-pc-win32") --install-prefer-prebuilt <bool> --only-prebuilt <bool> Require prebuilt artifacts for native applications, don't try to build native executable ourselves --repository | -r <maven|sonatype:$repo|ivy2local|bintray:$org/$repo|bintray-ivy:$org/$repo|typesafe:ivy-$repo|typesafe:$repo|sbt-plugin:$repo|ivy:$pattern> Repository - for multiple repositories, separate with comma and/or add this option multiple times (e.g. -r central,ivy2local -r sonatype:snapshots, or equivalently -r central,ivy2local,sonatype:snapshots) --default-repositories <bool> --proguarded <bool?> --channel <org:name> Channel for apps --default-channels <bool> Add default channels --contrib <bool> Add contrib channel --file-channels <bool> Add channels read from the configuration directory --cache <string?> Cache directory (defaults to environment variable COURSIER_CACHE, or ~/.cache/coursier/v1 on Linux and ~/Library/Caches/Coursier/v1 on Mac) --mode | -m <offline|update-changing|update|missing|force> Download mode (default: missing, that is fetch things missing from cache) --ttl | -l <duration> TTL duration (e.g. "24 hours") --parallel | -n <int> Maximum number of parallel downloads (default: 6) --checksum <checksum1,checksum2,...> Checksum types to check - end with none to allow for no checksum validation if no checksum is available, example: SHA-256,SHA-1,none --retry-count <int> Retry limit for Checksum error when fetching a file --cache-file-artifacts | --cfa <bool> Flag that specifies if a local artifact should be cached. --follow-http-to-https-redirect <bool> Whether to follow http to https redirections --credentials <host(realm) user:pass|host user:pass> Credentials to be used when fetching metadata or artifacts. Specify multiple times to pass multiple credentials. Alternatively, use the COURSIER_CREDENTIALS environment variable --credential-file <string*> Path to credential files to read credentials from --use-env-credentials <bool> Whether to read credentials from COURSIER_CREDENTIALS (env) or coursier.credentials (Java property), along those passed with --credentials and --credential-file --quiet | -q <counter> Quiet output --verbose | -v <counter> Increase verbosity (specify several times to increase more) --progress | -P <bool> Force display of progress bars --env <bool> --user-home <string?> --banner <bool?> --yes | -y <bool?> --try-revert <bool> --apps <string*>
20/11/23 22:10:55 ERROR MicroBatchExecution: Query [id = 13cf0aa0-116c-4c95-aea0-3e6f779e02c8, runId = 6f71a4bb-c067-4f6d-aa17-6bf04eea3520] terminated with error java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273) at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117)
scala> import io.delta.tables._ scala> val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/users_for_delete")
1
scala> deltaTable.delete("name == 'Ben'")
上記の通り、テーブルを更新(削除)した結果、ストリーム処理が以下のエラーを出力して終了した。
1 2 3 4 5 6
20/11/23 22:26:21 ERROR MicroBatchExecution: Query [id = 660b82a9-ca40-4b91-8032-d75807b11c18, runId = 7e46e9a7-1ba2-48b2-b264-53c254cfa6fc] terminated with error java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273) at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) (snip)
/** A check on the source table that disallows deletes on the source data. */ privateval ignoreChanges = options.ignoreChanges || ignoreFileDeletion --> レコードの変更(ファイル変更)を無視するかどうか
/** A check on the source table that disallows commits that only include deletes to the data. */ privateval ignoreDeletes = options.ignoreDeletes || ignoreFileDeletion || ignoreChanges --> レコードの削除(ファイル削除)を無視するかどうか