华为云用户手册

  • 单表并发控制配置 表6 单表并发控制参数配置 参数 描述 默认值 hoodie.write.lock.provider 指定lock provider,不建议使用默认值,使用org.apache.hudi.hive.HiveMetastoreBasedLockProvider org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider hoodie.write.lock.hivemetastore.database Hive的database 无 hoodie.write.lock.hivemetastore.table Hive的table name 无 hoodie.write.lock.client.num_retries 重试次数 10 hoodie.write.lock.client.wait_time_ms_between_retry 重试间隔 10000 hoodie.write.lock.conflict.resolution.strategy lock provider类,必须是ConflictResolutionStrategy的子类 org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy hoodie.write.lock.zookeeper.base_path 存放ZNodes的路径,同一张表的并发写入需配置一致 无 hoodie.write.lock.zookeeper.lock_key ZNode的名称,建议与Hudi表名相同 无 hoodie.write.lock.zookeeper.connection_timeout_ms zk连接超时时间 15000 hoodie.write.lock.zookeeper.port zk端口号 无 hoodie.write.lock.zookeeper.url zk的url 无 hoodie.write.lock.zookeeper.session_timeout_ms zk的session过期时间 60000
  • Clustering配置 本章节内容仅使用于MRS 3.2.0及之后版本。 Clustering中有两个策略分别是hoodie.clustering.plan.strategy.class和hoodie.clustering.execution.strategy.class。一般情况下指定plan.strategy为SparkRecentDaysClusteringPlanStrategy或者SparkSizeBasedClusteringPlanStrategy时,execution.strategy不需要指定。但当plan.strategy为SparkSingleFileSortPlanStrategy时,需要指定execution.strategy为SparkSingleFileSortExecutionStrategy。 表7 Clustering参数配置 参数 描述 默认值 hoodie.clustering.inline 是否同步执行clustering false hoodie.clustering.inline.max.commits 触发clustering的commit数 4 hoodie.clustering.async.enabled 是否启用异步执行clustering 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 false hoodie.clustering.async.max.commits 异步执行时触发clustering的commit数 说明: 此参数仅适用于MRS 3.3.0-LTS及之后版本。 4 hoodie.clustering.plan.strategy.target.file.max.bytes 指定clustering后每个文件大小最大值 1024 * 1024 * 1024 byte hoodie.clustering.plan.strategy.small.file.limit 小于该大小的文件会被clustering 300 * 1024 * 1024 byte hoodie.clustering.plan.strategy.sort.columns clustering用以排序的列 无 hoodie.layout.optimize.strategy Clustering执行策略,可选linear、z-order、hilbert 三种排序方式 linear hoodie.layout.optimize.enable 使用z-order、hilbert时需要开启 false hoodie.clustering.plan.strategy.class 筛选FileGroup进行clustering的策略类,默认筛选小于hoodie.clustering.plan.strategy.small.file.limit阈值的文件 org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy hoodie.clustering.execution.strategy.class 执行clustering的策略类(RunClusteringStrategy的子类),用以定义群集计划的执行方式。 默认类们按指定的列对计划中的文件组进行排序,同时满足配置的目标文件大小 org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy hoodie.clustering.plan.strategy.max.num.groups 设置执行clustering时最多选择多少个FileGroup,该值越大并发度越大 30 hoodie.clustering.plan.strategy.max.bytes.per.group 设置执行clustering时每个FileGroup最多有多少数据参与clustering 2 * 1024 * 1024 * 1024 byte
  • 存储配置 表4 存储参数配置 参数 描述 默认值 hoodie.parquet.max.file.size Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。 120 * 1024 * 1024 byte hoodie.parquet.block.size parquet页面大小,页面是parquet文件中的读取单位,在一个块内,页面被分别压缩。 120 * 1024 * 1024 byte hoodie.parquet.compression.ratio 当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。 如果bulk_insert生成的文件小于预期大小,请增加此值。 0.1 hoodie.parquet.compression.codec parquet压缩编解码方式名称,默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo] snappy hoodie.logfile.max.size LogFile的最大值。这是在将日志文件移到下一个版本之前允许的最大值。 1GB hoodie.logfile.data.block.max.size LogFile数据块的最大值。这是允许将单个数据块附加到日志文件的最大值。 这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。 256MB hoodie.logfile.to.parquet.compression.ratio 随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。 用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。 0.35
  • compaction&cleaning配置 表5 compaction&cleaning参数配置 参数 描述 默认值 hoodie.clean.automatic 是否执行自动clean。 true hoodie.cleaner.policy 要使用的清理政策。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。建议确保数据保留的时间超过最大查询执行时间。 KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained 保留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的),这也直接转化为逐步提取此数据集的数量。 10 hoodie.keep.max.commits 触发归档操作的commit数阈值。 30 hoodie.keep.min.commits 归档操作保留的commit数。 20 hoodie.commits.archival.batch 这控制着批量读取并一起归档的提交即时的数量。 10 hoodie.parquet.small.file.limit 该值应小于maxFileSize,如果将其设置为0,会关闭此功能。由于批处理中分区中插入记录的数量众多,总会出现小文件。Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。此处的大小是被视为“小文件大小”的最小文件大小。 104857600 byte hoodie.copyonwrite.insert.split.size 插入写入并行度。为单个分区的总共插入次数。写出100MB的文件,至少1KB大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)。 500000 hoodie.copyonwrite.insert.auto.split Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize,默认关闭。 true hoodie.copyonwrite.record.size.estimate 平均记录大小。如果指定,Hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。 1024 hoodie.compact.inline 当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩。 true hoodie.compact.inline.max.delta.commits 触发内联压缩之前要保留的最大增量提交数。 5 hoodie.compaction.lazy.block.read 当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)。 true hoodie.compaction.reverse.log.read HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件。 false hoodie.cleaner.parallelism 如果清理变慢,请增加此值。 200 hoodie.compaction.strategy 用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。默认情况下,Hudi选择具有累积最多未合并数据的日志文件。 org.apache.hudi.table.action.compact.strategy. LogFileSizeBasedCompactionStrategy hoodie.compaction.target.io LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。 500 * 1024 MB hoodie.compaction.daybased.target.partitions 由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。 10 hoodie.compaction.payload.class 这需要与插入/插入更新过程中使用的类相同。就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。 org.apache.hudi.common.model.Defaulthoodierecordpayload hoodie.schedule.compact.only.inline 在写入操作时,是否只生成压缩计划。在hoodie.compact.inline=true时有效。 false hoodie.run.compact.only.inline 通过Sql执行run compaction命令时,是否只执行压缩操作,压缩计划不存在时直接退出。 false
  • index相关配置 表3 index相关参数配置 参数 描述 默认值 hoodie.index.class 用户自定义索引的全路径名,索引类必须为HoodieIndex的子类,当指定该配置时,其会优先于hoodie.index.type配置。 "" hoodie.index.type 使用的索引类型,默认为布隆过滤器。可能的选项是[BLOOM | HBASE | GLOBAL_BLOOM | SIMPLE | GLOBAL_SIMPLE] 。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中。 BLOOM hoodie.index.bloom.num_entries 存储在布隆过滤器中的条目数。 假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。 注意: 将此值设置得太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置得非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。 60000 hoodie.index.bloom.fpp 根据条目数允许的错误率。 用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),在磁盘空间上进行权衡以降低误报率。 0.000000001 hoodie.bloom.index.parallelism 索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,根据输入的工作负载特征自动计算的。 0 hoodie.bloom.index.prune.by.ranges 为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。 true hoodie.bloom.index.use.caching 为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找。 true hoodie.bloom.index.use.treebased.filter 为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度。 true hoodie.bloom.index.bucketized.checking 为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差。 true hoodie.bloom.index.keys.per.bucket 仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。 此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。 10000000 hoodie.bloom.index.update.partition.path 仅在索引类型为GLOBAL_BLOOM时适用。 为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。 true hoodie.index.hbase.zkquorum 仅在索引类型为HBASE时适用,必填选项。要连接的HBase ZK Quorum URL。 无 hoodie.index.hbase.zkport 仅在索引类型为HBASE时适用,必填选项。要连接的HBase ZK Quorum端口。 无 hoodie.index.hbase.zknode.path 仅在索引类型为HBASE时适用,必填选项。这是根znode,它将包含HBase创建及使用的所有znode。 无 hoodie.index.hbase.table 仅在索引类型为HBASE时适用,必填选项。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。 无
  • 写入操作配置 表1 写入操作重要配置项 参数 描述 默认值 hoodie.datasource.write.table.name 指定写入的hudi表名。 无 hoodie.datasource.write.operation 写hudi表指定的操作类型,当前支持upsert、delete、insert、bulk_insert等方式。 upsert:更新插入混合操作 delete:删除操作 insert:插入操作 bulk_insert: 用于初始建表导入数据, 注意初始建表禁止使用upsert、insert方式 insert_overwrite:对静态分区执行insert overwrite insert_overwrite_table:动态分区执行insert overwrite,该操作并不会立刻删除全表做overwrite,会逻辑上重写hudi表的元数据,无用数据后续由hudi的clean机制清理。效率比bulk_insert + overwrite 高 upsert hoodie.datasource.write.table.type 指定hudi表类型,一旦这个表类型被指定,后续禁止修改该参数,可选值MERGE_ON_READ。 COPY_ON_WRITE hoodie.datasource.write.precombine.field 该值用于在写之前对具有相同的key的行进行合并去重。 指定为具体的表字段 hoodie.datasource.write.payload.class 在更新过程中,该类用于提供方法将要更新的记录和更新的记录做合并,该实现可插拔,如要实现自己的合并逻辑,可自行编写。 org.apache.hudi.common.model.DefaultHoodieRecordPayload hoodie.datasource.write.recordkey.field 用于指定hudi的主键,hudi表要求有唯一主键。 指定为具体的表字段 hoodie.datasource.write.partitionpath.field 用于指定分区键,该值配合hoodie.datasource.write.keygenerator.class使用可以满足不同的分区场景。 无 hoodie.datasource.write.hive_style_partitioning 用于指定分区方式是否和hive保持一致,建议该值设置为true。 true hoodie.datasource.write.keygenerator.class 配合hoodie.datasource.write.partitionpath.field,hoodie.datasource.write.recordkey.field产生主键和分区方式。 说明: 写入设置KeyGenerator与表保存的参数值不一致时将提示需要保持一致。 org.apache.hudi.keygen.ComplexKeyGenerator
  • 同步Hive表配置 表2 同步Hive表参数配置 参数 描述 默认值 hoodie.datasource.hive_sync.enable 是否同步hudi表信息到hive metastore。 注意: 建议该值设置为true,统一使用hive管理hudi表。 false hoodie.datasource.hive_sync.database 要同步给hive的数据库名。 default hoodie.datasource.hive_sync.table 要同步给hive的表名,建议这个值和hoodie.datasource.write.table.name保证一致。 unknown hoodie.datasource.hive_sync.username 同步hive时,指定的用户名。 hive hoodie.datasource.hive_sync.password 同步hive时,指定的密码。 hive hoodie.datasource.hive_sync.jdbcurl 连接hive jdbc指定的连接。 "" hoodie.datasource.hive_sync.use_jdbc 是否使用hive jdbc方式连接hive同步hudi表信息。建议该值设置为false,设置为false后 jdbc连接相关配置无效。 true hoodie.datasource.hive_sync.partition_fields 用于决定hive分区列。 "" hoodie.datasource.hive_sync.partition_extractor_class 用于提取hudi分区列值,将其转换成hive分区列。 org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor hoodie.datasource.hive_sync.support_timestamp 当hudi表存在timestamp类型字段时,需指定此参数为true,以实现同步timestamp类型到hive元数据中。该值默认为false,默认将timestamp类型同步为bigInt,默认情况可能导致使用sql查询包含timestamp类型字段的hudi表出现错误。 true
  • CarbonData表操作并发语法说明 DDL和DML中的操作,执行前,需要获取对应的锁,各操作需要获取锁的情况见表1 操作获取锁一览表,√表示需要获取该锁,一个操作仅在获取到所有需要获取的锁后,才能继续执行。 任意两个操作是否可以并发执行,可以通过如下方法确定:表1两行代表两个操作,这两行没有任意一列都标记√,即不存在某一列两行全为√。 表1 操作获取锁一览表 操作 METADATA_LOCK COMPACTION_LOCK DROP_TABLE_LOCK DELETE_SEGMENT_LOCK CLEAN_FILES_LOCK ALTER_PARTITION_LOCK UPDATE_LOCK STREAMING_LOCK CONCURRENT_LOAD_LOCK SEGMENT_LOCK CREATE TABLE - - - - - - - - - - CREATE TABLE As SELECT - - - - - - - - - - DROP TABLE √ - √ - - - - √ - - ALTER TABLE COMPACTION - √ - - - - √ - - - TABLE RENAME - - - - - - - - - - ADD COLUMNS √ √ - - - - - - - - DROP COLUMNS √ √ - - - - - - - - CHANGE DATA TYPE √ √ - - - - - - - - REFRESH TABLE - - - - - - - - - - REGISTER INDEX TABLE √ - - - - - - - - - REFRESH INDEX - √ - - - - - - - - LOAD DATA/INSERT INTO - - - - - - - - √ √ UPDATE CARBON TABLE √ √ - - - - √ - - - DELETE RECORDS from CARBON TABLE √ √ - - - - √ - - - DELETE SEGMENT by ID - - - √ √ - - - - - DELETE SEGMENT by DATE - - - √ √ - - - - - SHOW SEGMENTS - - - - - - - - - - CREATE SECONDARY INDEX √ √ - √ - - - - - - SHOW SECONDARY INDEXES - - - - - - - - - - DROP SECONDARY INDEX √ - √ - - - - - - - CLEAN FILES - - - - - - - - - - SET/RESET - - - - - - - - - - Add Hive Partition - - - - - - - - - - Drop Hive Partition √ √ √ √ √ √ - - - - Drop Partition √ √ √ √ √ √ - - - - Alter table set √ √ - - - - - - - - 父主题: CarbonData语法参考
  • 回答 当执行器中此次数据查询和加载所需要的堆外内存不足时,便会发生此异常。 在这种情况下,请增大“carbon.unsafe.working.memory.in.mb”和“spark.yarn.executor.memoryOverhead”的值。 详细信息请参考如何在CarbonData中配置非安全内存? 该内存被数据查询和加载共享。所以如果加载和查询需要同时进行,建议将“carbon.unsafe.working.memory.in.mb”和“spark.yarn.executor.memoryOverhead”的值配置为2048 MB以上。 可以使用以下公式进行估算: 数据加载所需内存: (“carbon.number.of.cores.while.loading”的值[默认值 = 6]) x 并行加载数据的表格 x (“offheap.sort.chunk.size.inmb”的值[默认值 = 64 MB] + “carbon.blockletgroup.size.in.mb”的值[默认值 = 64 MB] + 当前的压缩率[64 MB/3.5]) = ~900 MB 每表格 数据查询所需内存: (SPARK_EXECUTOR_INSTANCES. [默认值 = 2]) x ( carbon.blockletgroup.size.in.mb [默认值 = 64 MB] +“carbon.blockletgroup.size.in.mb”解压内容[默认值 = 64 MB * 3.5]) x (每个执行器核数[默认值 = 1]) = ~ 600 MB
  • Hudi Cleaning操作说明 Cleaning用于清理不再需要的版本数据。 Hudi使用Cleaner后台作业,不断清除不需要的旧得版本的数据。通过配置hoodie.cleaner.policy和hoodie.cleaner.commits.retained可以使用不同的清理策略和保存的commit数量。 执行cleaning有两种方式: 同步clean由参数hoodie.clean.automatic控制,默认自动开启。 关闭同步clean: datasource写入时可以通过.option("hoodie.clean.automatic", "false")来关闭自动clean。 spark-sql写入时可以通过set hoodie.clean.automatic=false;来关闭自动clean。 异步clean可以使用spark-sql来执行,详情可以参考章节CLEAN。 更多clean相关参数请参考compaction&cleaning配置章节。 父主题: 数据管理维护
  • 问题 使用Spark SQL删除MOR表后重新建表写入数据不能实时同步ro、rt表,报错如下: WARN HiveSyncTool: Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set java.lang.IllegalArgumentException: Failed to get schema for table hudi_table2_ro does not exist at org.apache.hudi.hive.HoodieHiveClient.getTableSchema(HoodieHiveClient.java:183) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:286) at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:213)
  • 配置描述 在Spark Driver端的“spark-defaults.conf”配置文件中添加如下表格中的参数。 表1 参数说明 参数 描述 默认值 spark.sql.adaptive.enabled 自适应执行特性的总开关。 注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。集群中DPP特性是默认开启的,因此开启AQE特性的同时,需要将DPP特性关闭。 false spark.sql.optimizer.dynamicPartitionPruning.enabled 动态分区裁剪功能的开关。 true spark.sql.adaptive.skewJoin.enabled 当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能。 true spark.sql.adaptive.skewJoin.skewedPartitionFactor 此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区 5 spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes 分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes. 256MB spark.sql.adaptive.shuffle.targetPostShuffleInputSize 每个task处理的shuffle数据的最小数据量。单位:Byte。 67108864
  • 配置场景 在Spark SQL多表Join的场景下,会存在关联键严重倾斜的情况,导致Hash分桶后,部分桶中的数据远高于其它分桶。最终导致部分Task过重,跑得很慢;其它Task过轻,跑得很快。一方面,数据量大Task运行慢,使得计算性能低;另一方面,数据量少的Task在运行完成后,导致很多CPU空闲,造成CPU资源浪费。 通过如下配置项可开启自动进行数据倾斜处理功能,通过将Hash分桶后数据量很大的、且超过数据倾斜阈值的分桶拆散,变成多个task处理一个桶的数据机制,提高CPU资源利用率,提高系统性能。 未产生倾斜的数据,将采用原有方式进行分桶并运行。 使用约束: 只支持两表Join的场景。 不支持FULL OUTER JOIN的数据倾斜处理。 示例:执行下面SQL语句,a表倾斜或b表倾斜都无法触发该优化。 select aid FROM a FULL OUTER JOIN b ON aid=bid; 不支持LEFT OUTER JOIN的右表倾斜处理。 示例:执行下面SQL语句,b表倾斜无法触发该优化。 select aid FROM a LEFT OUTER JOIN b ON aid=bid; 不支持RIGHT OUTER JOIN的左表倾斜处理。 示例:执行下面SQL语句,a表倾斜无法触发该优化。 select aid FROM a RIGHT OUTER JOIN b ON aid=bid;
  • 回答 在Spark配置中,“spark.yarn.executor.memoryOverhead”参数的值应大于CarbonData配置参数“sort.inmemory.size.inmb” 与“Netty offheapmemory required”参数值的总和,或者“carbon.unsafe.working.memory.in.mb” 、 “carbon.sort.inememory.storage.size.in.mb” 与 “Netty offheapmemory required”参数值的总和。否则,如果堆外(off heap)访问超出配置的executor内存,则YARN可能会停止executor。 “Netty offheapmemory required”说明:当“spark.shuffle.io.preferDirectBufs”设为true时,Spark中netty 传输服务从"spark.yarn.executor.memoryOverhead"中拿掉部分堆内存[~ 384 MB or 0.1 x 执行器内存]。 详细信息请参考常见配置Spark Executor堆内存参数。
  • 注意事项 分区并发写控制基于单表并发写控制的基础上实现,因此使用约束条件与单表并控制写基本相同。 当前分区并发只支持Spark方式写入,Flink不支持该特性。 为避免过大并发量占用ZooKeeper过多资源,对Hudi在ZooKeeper上增加了Quota配额限制,可以通过服务端修改Spark组件中参数zk.quota.number来调整Hudi的Quota配额,默认为500000,最小为5,且不可通过此参数来控制并行任务数,仅用来控制对ZooKeeper的访问压力。
  • 使用分区并发机制 通过设置参数:hoodie.support.partition.lock=true来启动分区并发写。 示例: spark datasource方式开启分区并发写: upsert_data.write.format("hudi"). option("hoodie.datasource.write.table.type", "COPY_ON_WRITE"). option("hoodie.datasource.write.precombine.field", "col2"). option("hoodie.datasource.write.recordkey.field", "primary_key"). option("hoodie.datasource.write.partitionpath.field", "col0"). option("hoodie.upsert.shuffle.parallelism", 4). option("hoodie.datasource.write.hive_style_partitioning", "true"). option("hoodie.support.partition.lock", "true"). option("hoodie.table.name", "tb_test_cow"). mode("Append").save(s"/tmp/huditest/tb_test_cow") spark-sql开启分区并发写: set hoodie.support.partition.lock=true; insert into hudi_table1 select 1,1,1;
  • 回答 建议在使用Hudi时,schema应该以向后兼容的方式演进。此错误通常发生在使用向后不兼容的演进方式删除某些列如“col1”后,更新parquet文件中以旧的schema写入的列“col1”,在这种情况下,parquet尝试在传入记录中查找所有当前字段,当发现“col1”不存在时,发生上述异常。 解决这个问题的办法是使用所有schema演进版本来创建uber schema,并使用该schema作为target schema。用户可以从hive metastore中获取schema并将其与当前schema合并。
  • 示例 create database productdb; use productdb; CREATE TABLE productSalesTable(a int,b string,c string) stored as carbondata; create index productNameIndexTable on table productSalesTable(c) as 'carbondata'; insert into table productSalesTable select 1,'a','aaa'; create database productdb2; 使用hdfs命令将productdb数据库下的productSalesTable和productNameIndexTable复制到productdb2。 refresh table productdb2.productSalesTable ; refresh table productdb2.productNameIndexTable ; explain select * from productdb2.productSalesTable where c = 'aaa'; //可以发现该查询命令没有使用索引表 REGISTER INDEX TABLE productNameIndexTable ON productdb2.productSalesTable; explain select * from productdb2.productSalesTable where c = 'aaa'; //可以发现该查询命令使用了索引表
  • 问题 线程“main”报错 org.apache.kafka.common.KafkaException,构造kafka消费者失败,报错: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
  • 回答 当试图从启用SSL的kafka数据源采集数据时,而安装程序无法读取jars.conf文件及其属性时,可能会发生这种情况。 要解决此问题,需要将所需的属性作为通过Spark提交的命令的一部分传递。如:--files jaas.conf,failed_tables.json --conf 'spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf' --conf 'spark.executor .extraJavaOptions=-Djava.security.auth.login.config=jaas.conf'
  • 使用coalesce调整分片的数量 coalesce可以调整分片的数量。coalesce函数有两个参数: coalesce(numPartitions: Int, shuffle: Boolean = false) 当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。 遇到下列场景,可选择使用coalesce算子: 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。 当输入切片个数太大,导致程序无法正常运行时使用。 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。
  • 推荐资源配置 mor表: 由于其本质上是写增量文件,调优可以直接根据hudi的数据大小(dataSize)进行调整。 dataSize如果只有几个G,推荐跑单节点运行spark,或者yarn模式但是只分配一个container。 入湖程序的并行度p设置:建议 p = (dataSize)/128M,程序分配core的数量保持和p一致即可。内存设置建议内存大小和core的比例大于1.5:1 即一个core配1.5G内存, 堆外内存设置建议内存大小和core的比例大于0.5:1。 cow表: cow表的原理是重写原始数据,因此这种表的调优,要兼顾dataSize和最后重写的文件数量。总体来说core数量越大越好(和最后重写多少个文件数直接相关),并行度p和内存大小和mor设置类似。
  • 注意事项 以下为分别使用SET和RESET命令进行动态设置或清除操作的属性: 表2 属性描述 属性 描述 hoodie.insert.shuffle.parallelism insert方式写入数据时的spark shuffle并行度。 hoodie.upsert.shuffle.parallelism upsert方式写入数据时的spark shuffle并行度。 hoodie.delete.shuffle.parallelism delete方式删除数据时的spark shuffle并行度。 hoodie.sql.insert.mode 指定Insert模式,取值为strict、non-strict及upsert。 hoodie.sql.bulk.insert.enable 指定是否开启bulk insert写入。 spark.sql.hive.convertMetastoreParquet sparksql把parquet表转化为datasource表进行读取。当hudi的provider为hive的情况下,使用sparksql或sparkbeeline进行读取,需要将该参数设置为false。
  • 命令格式 Add或Update参数值: SET parameter_name=parameter_value 此命令用于添加或更新“parameter_name”的值。 Display参数值: SET parameter_name 此命令用于显示指定的“parameter_name”的值。 Display会话参数: SET 此命令显示所有支持的会话参数。 Display会话参数以及使用细节: SET -v 此命令显示所有支持的会话参数及其使用细节。 Reset参数值: RESET 此命令清除所有会话参数。
  • Hudi Compaction操作说明 Compaction用于合并mor表Base和Log文件。 对于Merge-On-Read表,数据使用列式Parquet文件和行式Avro文件存储,更新被记录到增量文件,然后进行同步/异步compaction生成新版本的列式文件。Merge-On-Read表可减少数据摄入延迟,因而进行不阻塞摄入的异步Compaction很有意义。 异步Compaction会进行如下两个步骤: 调度Compaction:由入湖作业完成,在这一步,Hudi扫描分区并选出待进行compaction的FileSlice,最后CompactionPlan会写入Hudi的Timeline。 执行Compaction:一个单独的进程/线程将读取CompactionPlan并对FileSlice执行Compaction操作。 使用Compaction的方式分为同步和异步两种: 同步方式由参数hoodie.compact.inline控制,默认为true,自动生成compaction调度计划并执行compaction: 关闭同步compaction datasource写入时可以通过 .option("hoodie.compact.inline", "false") 来关闭自动compaction。 spark-sql写入时可以通过set hoodie.compact.inline=false;来关闭自动compaction。 仅同步生成compaction调度而不执行compaction ·datasource写入时可以通过以下option参数来实现: option("hoodie.compact.inline", "true"). option("hoodie.schedule.compact.only.inline", "true"). option("hoodie.run.compact.only.inline", "false"). ·spark-sql写入时可以通过set 以下参数来实现: set hoodie.compact.inline=true; set hoodie.schedule.compact.only.inline=true; set hoodie.run.compact.only.inline=false; 异步方式由spark-sql来实现。 如果需要在异步compaction时只执行已经产生的compaction调度计划而不创建新的调度计划,则需要通过set命令设置以下参数: set hoodie.compact.inline=true; set hoodie.schedule.compact.only.inline=false; set hoodie.run.compact.only.inline=true; 更多compaction参数请参考compaction&cleaning配置章节。 为了保证入湖的最高效率,推荐使用同步产生compaction调度计划,异步执行compaction调度计划的方式。 父主题: 数据管理维护
  • 加载数据到CarbonData Table 创建CarbonData table之后,可以从CSV文件加载数据到所创建的表中。 用所要求的参数运行以下命令从CSV文件加载数据。该表的列名需要与CSV文件的列名匹配。 LOAD DATA inpath 'hdfs://hacluster/data/test.csv' into table x1 options('DELIMITER'=',', 'QUOTECHAR'='"','FILEHEADER'='imei, deviceinformationid,mac, productdate,updatetime, gamepointid,contractnumber'); 其中,“test.csv”为准备CSV文件的CSV文件,“x1”为示例的表名。 CSV样例内容如下: 13418592122,1001,MAC地址,2017-10-23 15:32:30,2017-10-24 15:32:30,62.50,74.56 13418592123,1002,MAC地址,2017-10-23 16:32:30,2017-10-24 16:32:30,17.80,76.28 13418592124,1003,MAC地址,2017-10-23 17:32:30,2017-10-24 17:32:30,20.40,92.94 13418592125,1004,MAC地址,2017-10-23 18:32:30,2017-10-24 18:32:30,73.84,8.58 13418592126,1005,MAC地址,2017-10-23 19:32:30,2017-10-24 19:32:30,80.50,88.02 13418592127,1006,MAC地址,2017-10-23 20:32:30,2017-10-24 20:32:30,65.77,71.24 13418592128,1007,MAC地址,2017-10-23 21:32:30,2017-10-24 21:32:30,75.21,76.04 13418592129,1008,MAC地址,2017-10-23 22:32:30,2017-10-24 22:32:30,63.30,94.40 13418592130,1009,MAC地址,2017-10-23 23:32:30,2017-10-24 23:32:30,95.51,50.17 13418592131,1010,MAC地址,2017-10-24 00:32:30,2017-10-25 00:32:30,39.62,99.13 命令执行结果如下: +------------+ |Segment ID | +------------+ |0 | +------------+ No rows selected (3.039 seconds)
  • 在CarbonData中查询数据 创建CarbonData table并加载数据之后,可以执行所需的数据查询操作。以下为一些查询操作举例。 获取记录数 为了获取在CarbonData table中的记录数,可以运行以下命令。 select count(*) from x1; 使用Groupby查询 为了获取不重复的deviceinformationid记录数,可以运行以下命令。 select deviceinformationid,count (distinct deviceinformationid) from x1 group by deviceinformationid; 用Filter查询 为了获取特定deviceinformationid的记录,可以运行以下命令。 select * from x1 where deviceinformationid='1010'; 在执行数据查询操作后,如果查询结果中某一列的结果含有中文字等非英文字符,会导致查询结果中的列不能对齐,这是由于不同语言的字符在显示时所占的字宽不尽相同。
  • 在Spark-shell上使用CarbonData 用户如果需要在Spark-shell上使用CarbonData,需通过如下方式创建CarbonData Table,加载数据到CarbonData Table和在CarbonData中查询数据的操作。 spark.sql("CREATE TABLE x2(imei string, deviceInformationId int, mac string, productdate timestamp, updatetime timestamp, gamePointId double, contractNumber double) STORED AS carbondata") spark.sql("LOAD DATA inpath 'hdfs://hacluster/data/x1_without_header.csv' into table x2 options('DELIMITER'=',', 'QUOTECHAR'='\"','FILEHEADER'='imei, deviceinformationid,mac, productdate,updatetime, gamepointid,contractnumber')") spark.sql("SELECT * FROM x2").show()
  • 创建CarbonData Table 在Spark Beeline被连接到JDBCServer之后,需要创建一个CarbonData table用于加载数据和执行查询操作。下面是创建一个简单的表的命令。 create table x1 (imei string, deviceInformationId int, mac string, productdate timestamp, updatetime timestamp, gamePointId double, contractNumber double) STORED AS carbondata TBLPROPERTIES ('SORT_COLUMNS'='imei,mac'); 命令执行结果如下: +---------+ | Result | +---------+ +---------+ No rows selected (1.093 seconds)
  • 参数描述 表1 CREATE SECONDARY INDEX参数 参数 描述 index_name 索引表的名称。表名称应由字母数字字符和下划线(_)特殊字符组成。 db_name 数据库的名称。数据库名称应由字母数字字符和下划线(_)特殊字符组成。 table_name 数据库中的表名称。表名称应由字母数字字符和下划线(_)特殊字符组成。 col_name 表中的列名称。支持多列。列名称应由字母数字字符和下划线(_)特殊字符组成。 table_blocksize 数据文件的block大小。更多详细信息,请参考•Block大小。
共100000条