华为云用户手册

  • 查看消费组 进入KafkaUI,请参考访问KafkaUI。 单击“Consumers”,进入消费组详情页面,可以查看当前集群内的所有ConsumerGroups,并可以查看各个ConsumerGroups Coordinator所在节点IP,在页面右上角,用户可以输入ConsumerGroup来搜索指定的ConsumerGroup信息。 在Consumer Summary一栏,可查看当前集群已存在的消费组,单击消费组名称,可查看该消费组所消费过的Topic,消费过的Topic有两种状态:“pending”和“running”,分别表示“曾经消费过但现在未消费”和“现在正在消费”,在弹框右上角,可以输入Topic名来进行过滤。 单击Topic名称,进入Consumer Offsets页面,可查看Topic消费详情。
  • 创建Topic 进入KafkaUI,请参考访问KafkaUI。 单击“Create Topic”进入创建Topic页面。在弹出的页面中参考表1填写信息,单击“Create”,完成Topic创建。 表1 创建Topic信息 参数名称 参数描述 备注 Topic Topic的名称,只能包含英文字母、数字、中划线和下划线,且不能多于249个字符。 例如:kafka_ui Partitions Topic的分区数量,取值范围大于等于1,默认为3。 - Replication Factor Topic的副本因子,取值范围为1~N,N为当前集群Broker个数,默认为2。 - 用户可根据业务需要单击“Advanced Options”配置topic相关高级参数,通常保持默认即可。 安全模式集群下,执行Create Topic操作的用户需属于“kafkaadmin”用户组,否则将会由于鉴权失败导致无法创建。 非安全模式集群下,执行Create Topic操作不作鉴权,即任意用户都可执行Create Topic操作。
  • 查看Broker 进入KafkaUI,请参考访问KafkaUI。 单击“Brokers”,进入Broker详情页面。 在“Broker Summary”一栏可查看Broker的“Broker ID”、“Host”、“Rack”、“Disk(Used|Total)”和“Memory(Used|Total)”。 在“Brokers Metrics”处可查看Broker节点数据流量的jmx指标,包括在不同时段的时间窗口内,Broker节点平均每秒流入消息条数, 每秒流入消息字节数, 每秒流出消息字节数,每秒失败的请求数,每秒总的请求数和每秒生产的请求数。
  • 操作场景 通过KafkaUI查看Topic详情、修改Topic Configs、增加Topic分区个数、删除Topic,并可实时查看不同时段的生产数据条数。 安全模式下,KafkaUI对查看Topic详情操作不作鉴权处理,即任何用户都可以查询Topic信息;对于修改Topic Configs、增加Topic分区个数、删除Topic场景,需保证KafkaUI登录用户属于“kafkaadmin”用户组或者单独给用户授予对应操作权限,否则将会鉴权失败。 非安全模式下,KafkaUI对所有操作不作鉴权处理。 本章节内容仅适用于MRS 3.1.2及之后版本。
  • 查看Topic详情 进入KafkaUI,请参考访问KafkaUI。 单击“Topics”,进入Topic管理页面。 在“Topic List”栏可查看当前集群已创建的Topic的名称、状态、分区数量、创建时间和副本个数等信息。 单击Topic名称可进入Topic详情页面。在该页面可查看Topic与分区的详细信息。 在“Producer Message”栏可根据业务需求选择“Day”、“Week”、“Month”不同时段查看此Topic生产数据条数。
  • GeoMesa命令行简介 该章节内容仅适用于MRS 3.1.0及之后版本。 本节介绍常用的GeoMesa命令。更多的GeoMesa命令,请参见https://www.geomesa.org/documentation/user/accumulo/commandline.html。 安装hbase客户端后,加载环境变量后,可使用geomesa-hbase命令行。 查看classpath 执行“classpath”命令,将会返回当前命令行工具的所有classpath信息。 bin/geomesa-hbase classpath 创建表 执行“create-schema”命令创建表,创建表时至少要指定目录名称与表名称,以及表规格。 bin/geomesa-hbase create-schema -c geomesa -f test -s Who:String,What:java.lang.Long,When:Date,*Where:Point:srid=4326,Why:String 获取表描述信息 执行“describe-schema”命令获取表描述信息,执行该命令时必须要指定目录名称与表名称。 bin/geomesa-hbase describe-schema -c geomesa -f test 批量导入数据 执行“ingest”命令批量导入数据,导入时需要指定目录名称,表名称,表规格,以及相应的数据转换器等。 数据(车牌号,车辆颜色,经度,纬度,时间):data.csv,并将数据表放在data文件夹中。 AAA,red,113.918417,22.505892,2017-04-09 18:03:46BBB,white,113.960719,22.556511,2017-04-24 07:38:47CCC,blue,114.088333,22.637222,2017-04-23 15:07:54DDD,yellow,114.195456,22.596103,2017-04-21 21:27:06EEE,black,113.897614,22.551331,2017-04-09 09:34:48 表结构定义:myschema.sft,并将myschema.sft放在geomesa命令行工具的conf文件夹中。 geomesa.sfts.cars = { attributes = [ { name = "carid", type = "String", index = true } { name = "color", type = "String", index = false } { name = "time", type = "Date", index = false } { name = "geom", type = "Point", index = true,srid = 4326,default = true } ]} 转换器定义:myconvertor.convert,并将myconvertor.convert放在geomesa命令行工具的conf文件夹中。 geomesa.converters.cars= { type = "delimited-text", format = "CSV", id-field = "$fid", fields = [ { name = "fid", transform = "concat($1,$5)" } { name = "carid", transform = "$1::string" } { name = "color", transform = "$2::string" } { name = "lon", transform = "$3::double" } { name = "lat", transform = "$4::double" } { name = "geom", transform = "point($lon,$lat)" } { name = "time", transform = "date('YYYY-MM-dd HH:mm:ss',$5)" } ]} 执行命令导入数据: bin/geomesa-hbase ingest -c geomesa -C conf/myconvertor.convert -s conf/myschema.sft data/data.csv 数据导入其他参数具体说明请参见:https://www.geomesa.org/documentation/user/accumulo/examples.html#ingesting-data 解释查询 执行“explain”命令获取指定查询语句执行计划的解释说明,解释语句时必须指定目录名称和表名称,以及给定查询语句。 bin/geomesa-hbase explain -c geomesa -f cars -q "carid = 'BBB'" 统计分析 执行“stats-analyze”命令对数据表进行统计分析,同时还可以进一步执行“stats-bounds”,“stats-count”,“stats-histogram”,“stats-top-k”命令对数据表做更详细的统计。 bin/geomesa-hbase stats-analyze -c geomesa -f cars bin/geomesa-hbase stats-bounds -c geomesa -f cars bin/geomesa-hbase stats-count -c geomesa -f cars bin/geomesa-hbase stats-histogram -c geomesa -f cars bin/geomesa-hbase stats-top-k -c geomesa -f cars 导出feature 执行“export”命令导出feature,导出时必须指定目录名称和表名称,同时还可以根据指定的查询语句进行导出。 bin/geomesa-hbase export -c geomesa -f cars -q "carid = 'BBB'" 删除feature 执行“delete-features”命令删除feature,删除时必须指定目录名称和表名称,同时还可以根据指定的查询语句进行删除。 bin/geomesa-hbase delete-features -c geomesa -f cars -q "carid = 'BBB'" 获取目录中的全部表的名称 执行“get-type-names”命令获取指定目录中的表名称。 bin/geomesa-hbase get-type-names -c geomesa 删除表 执行“remove-schema”命令删除表,删除表时至少要指定表所在的目录与表名称。 bin/geomesa-hbase remove-schema -c geomesa -f test bin/geomesa-hbase remove-schema -c geomesa -f cars 删除目录 执行“delete-catalog”命令删除指定的目录。 bin/geomesa-hbase delete-catalog -c geomesa 父主题: 使用HBase
  • 分区元数据冷热存储介绍 为了减轻元数据库压力,将长时间未使用过的指定范围的分区相关元数据移动到备份表,这一过程称为分区数据冻结,移动的分区数据称为冷分区,未冻结的分区称为热分区,存在冷分区的表称为冻结表。将被冻结的数据重新移回原元数据表,这一过程称为分区数据解冻。 一个分区从热分区变成冷分区,仅仅是在元数据中进行标识,其HDFS业务侧分区路径、数据文件内容并未发生变化。 本特性仅适用于MRS 3.1.2及之后版本。
  • 查询冻结表的冻结分区 查询冷冻分区: show frozen partitions 表名; 默认元数据库冻结分区类型只支持int、string、varchar、date、timestamp类型。 外置元数据库只支持Postgres数据库,且冻结分区类型只支持int、string、varchar、timestamp类型。 对冻结后的表进行Msck元数据修复时,需要先解冻数据。如果对冻结表进行过备份后恢复操作,则可以直接执行Msck元数据修复操作,且解冻只能通过msck repair命令进行操作。 对冻结后的分区进行rename时,需要先解冻数据,否则会提示分区不存在。 删除存在冻结数据的表时,被冻结的数据会同步删除。 删除存在冻结数据的分区时,被冻结的分区信息不会被删除,HDFS业务数据也不会被删除。 select查询数据时,会自动添加排查冷分区数据的过滤条件,查询结果将不包含冷分区的数据。 show partitions table查询表下的分区数据时,查询结果将不包含冷分区,可通过show frozen partitions table进行冷冻分区查询。
  • Distributed表引擎 Distributed表引擎本身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据到集群中的各个节点,分布式表需要和其他本地数据表一起协同工作。分布式表会将接收到的读写任务分发到各个本地表,而实际上数据的存储在各个节点的本地表中。 图1 Distributed原理图 Distributed表引擎的创建模板: ENGINE = Distributed(cluster_name, database_name, table_name, [sharding_key]) Distributed表参数解析如下: cluster_name:集群名称,在对分布式表执行读写的过程中,使用集群的配置信息查找对应的ClickHouse实例节点。 database_name:数据库名称。 table_name:数据库下对应的本地表名称,用于将分布式表映射到本地表上。 sharding_key:分片键(可选参数),分布式表会按照这个规则,将数据分发到各个本地表中。 Distributed表引擎使用示例: --先创建一个表名为test的ReplicatedMergeTree本地表CREATE TABLE default.test ON CLUSTER default_cluster_1( `EventDate` DateTime, `id` UInt64)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/test', '{replica}')PARTITION BY toYYYYMM(EventDate)ORDER BY id--基于本地表test创建表名为test_all的Distributed表CREATE TABLE default.test_all ON CLUSTER default_cluster_1( `EventDate` DateTime, `id` UInt64)ENGINE = Distributed(default_cluster_1, default, test, rand()) 分布式表创建规则: 创建Distributed表时需加上on cluster cluster_name,这样建表语句在某一个ClickHouse实例上执行一次即可分发到集群中所有实例上执行。 分布式表通常以本地表加“_all”命名。它与本地表形成一对多的映射关系,之后可以通过分布式表代理操作多张本地表。 分布式表的表结构尽量和本地表的结构一致。如果不一致,在建表时不会报错,但在查询或者插入时可能会抛出异常。
  • 背景介绍 表引擎在ClickHouse中的作用十分关键,不同的表引擎决定了: 数据存储和读取的位置 支持哪些查询方式 能否并发式访问数据 能不能使用索引 是否可以执行多线程请求 数据复制使用的参数 其中MergeTree和Distributed是ClickHouse表引擎中最重要,也是最常使用的两个引擎,本文将重点进行介绍。 其他表引擎详细可以参考官网链接:https://clickhouse.tech/docs/en/engines/table-engines。
  • Replicated*MergeTree引擎 ClickHouse中的所有MergeTree家族引擎前面加上Replicated就成了支持副本的合并树引擎。 Replicated系列引擎借助ZooKeeper实现数据的同步,创建Replicated复制表时通过注册到ZooKeeper上的信息实现同一个分片的所有副本数据进行同步。 Replicated表引擎的创建模板: ENGINE = Replicated*MergeTree('ZooKeeper存储路径','副本名称', ...) Replicated表引擎需指定两个参数: ZooKeeper存储路径:ZooKeeper中该表相关数据的存储路径,建议规范化,如:/clickhouse/tables/{shard}/数据库名/表名。 副本名称,一般用{replica}即可。 Replicated表引擎使用示例可以参考:ClickHouse表创建。
  • Hive支持ZSTD压缩格式 ZSTD(全称为Zstandard)是一种开源的无损数据压缩算法,其压缩性能和压缩比均优于当前Hadoop支持的其他压缩格式,本特性使得Hive支持ZSTD压缩格式的表。Hive支持基于ZSTD压缩的存储格式有常见的ORC,RCFile,TextFile,JsonFile,Parquet,Squence,CSV。 本特性仅适用于MRS 3.1.2及之后版本。 ZSTD压缩格式的建表方式如下: ORC存储格式建表时可指定TBLPROPERTIES("orc.compress"="zstd"): create table tab_1(...) stored as orc TBLPROPERTIES("orc.compress"="zstd"); Parquet存储格式建表可指定TBLPROPERTIES("parquet.compression"="zstd"): create table tab_2(...) stored as parquet TBLPROPERTIES("parquet.compression"="zstd"); 其他格式或通用格式建表可执行设置参数指定compress,codec为“org.apache.hadoop.io.compress.ZStandardCode”: set hive.exec.compress.output=true; set mapreduce.map.output.compress=true; set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.ZStandardCodec; set mapreduce.output.fileoutputformat.compress=true; set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.ZStandardCodec; set hive.exec.compress.intermediate=true; create table tab_3(...) stored as textfile; ZSTD压缩格式的表和其他普通压缩表的SQL操作没有区别,可支持正常的增删查及聚合类SQL操作。 父主题: 使用Hive
  • Hive配置类问题 Hive SQL执行报错:java.lang.OutOfMemoryError: Java heap space. 解决方案: 对于MapReduce任务,增大下列参数: set mapreduce.map.memory.mb=8192; set mapreduce.map.java.opts=-Xmx6554M; set mapreduce.reduce.memory.mb=8192; set mapreduce.reduce.java.opts=-Xmx6554M; 对于Tez任务,增大下列参数: set hive.tez.container.size=8192; Hive SQL对列名as为新列名后,使用原列名编译报错:Invalid table alias or column reference 'xxx'. 解决方案:set hive.cbo.enable=true; Hive SQL子查询编译报错:Unsupported SubQuery Expression 'xxx': Only SubQuery expressions that are top level conjuncts are allowed. 解决方案:set hive.cbo.enable=true; Hive SQL子查询编译报错:CalciteSubquerySemanticException [Error 10249]: Unsupported SubQuery Expression Currently SubQuery expressions are only allowed as Where and Having Clause predicates. 解决方案:set hive.cbo.enable=true; Hive SQL编译报错:Error running query: java.lang.AssertionError: Cannot add expression of different type to set. 解决方案:set hive.cbo.enable=false; Hive SQL执行报错:java.lang.NullPointerException at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFComputeStats$GenericUDAFNumericStatsEvaluator.init. 解决方案:set hive.map.aggr=false; Hive SQL设置hive.auto.convert.join = true(默认开启)和hive.optimize.skewjoin=true执行报错:ClassCastException org.apache.hadoop.hive.ql.plan.ConditionalWork cannot be cast to org.apache.hadoop.hive.ql.plan.MapredWork. 解决方案:set hive.optimize.skewjoin=false; Hive SQL设置hive.auto.convert.join=true(默认开启)、hive.optimize.skewjoin=true和hive.exec.parallel=true执行报错:java.io.FileNotFoundException: File does not exist:xxx/reduce.xml. 解决方案: 方法一:切换执行引擎为Tez,详情请参考切换Hive执行引擎为Tez。 方法二:set hive.exec.parallel=false; 方法三:set hive.auto.convert.join=false; Hive on Tez执行Bucket表Join报错:NullPointerException at org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.mergeJoinComputeKeys 解决方案:set tez.am.container.reuse.enabled=false; 父主题: Hive常见问题
  • 操作场景 HBase可以通过对HFile中的data block编码,减少keyvalue中key的重复部分,从而减少空间的使用。目前对data block的编码方式有:NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1,其中NONE表示不使用编码。另外,HBase还支持使用压缩算法对HFile文件进行压缩,默认支持的压缩算法有:NONE、GZ、SNAPPY和ZSTD,其中NONE表示HFile不压缩。 这两种方式都是作用在HBase的列簇上,可以同时使用,也可以单独使用。
  • Compaction Compaction用于合并mor表Base和Log文件。 对于Merge-On-Read表,数据使用列式Parquet文件和行式Avro文件存储,更新被记录到增量文件,然后进行同步/异步compaction生成新版本的列式文件。Merge-On-Read表可减少数据摄入延迟,因而进行不阻塞摄入的异步Compaction很有意义。 异步Compaction会进行如下两个步骤: 调度Compaction:由入湖作业完成,在这一步,Hudi扫描分区并选出待进行compaction的FileSlice,最后CompactionPlan会写入Hudi的Timeline。 执行Compaction:一个单独的进程/线程将读取CompactionPlan并对FileSlice执行Compaction操作。 使用Compaction的方式分为同步和异步两种: 同步方式由参数hoodie.compact.inline控制,默认为true,自动生成compaction调度计划并执行compaction: 关闭同步compaction datasource写入时可以通过 .option("hoodie.compact.inline", "false") 来关闭自动compaction。 spark-sql写入时可以通过set hoodie.compact.inline=false;来关闭自动compaction。 仅同步生成compaction调度而不执行compaction ·datasource写入时可以通过以下option参数来实现: option("hoodie.compact.inline", "true"). option("hoodie.schedule.compact.only.inline", "true"). option("hoodie.run.compact.only.inline", "false"). ·spark-sql写入时可以通过set 以下参数来实现: set hoodie.compact.inline=true; set hoodie.schedule.compact.only.inline=true; set hoodie.run.compact.only.inline=false; 异步方式由spark-sql来实现。 如果需要在异步compaction时只执行已经产生的compaction调度计划而不创建新的调度计划,则需要通过set命令设置以下参数: set hoodie.compact.inline=true; set hoodie.schedule.compact.only.inline=false; set hoodie.run.compact.only.inline=true; 更多compaction参数请参考compaction&cleaning配置章节。 为了保证入湖的最高效率,推荐使用同步产生compaction调度计划,异步执行compaction调度计划的方式。 父主题: 数据管理维护
  • HoodieDeltaStreamer流式写入 Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能: 支持Kafka,DFS多种数据源接入 。 支持管理检查点、回滚和恢复,保证exactly once语义。 支持自定义转换操作。 示例: 准备配置文件kafka-source.properties #hudi配置hoodie.datasource.write.recordkey.field=idhoodie.datasource.write.partitionpath.field=agehoodie.upsert.shuffle.parallelism=100#hive confighoodie.datasource.hive_sync.table=hudimor_deltastreamer_partitionhoodie.datasource.hive_sync.partition_fields=agehoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractorhoodie.datasource.hive_sync.use_jdbc=falsehoodie.datasource.hive_sync.support_timestamp=true# Kafka Source topichoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition#checkpointhoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition# Kafka props# The kafka cluster we want to ingest frombootstrap.servers= xx.xx.xx.xx:xxauto.offset.reset=earliest#auto.offset.reset=latestgroup.id=hoodie-delta-streameroffset.rang.limit=10000 指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令: spark-submit --master yarn --jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径 --driver-memory 1g --executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m --driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal --props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。 --target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径 --table-type MERGE_ON_READ // 指定要写入的hudi表类型 --target-table hudimor_deltastreamer_partition // 指定hudi表名 --source-ordering-field name // 指定hudi表预合并列 --source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源 为JsonKafkaSource, 该参数根据不同数据源指定不同的source类 --schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema --transformer-class com.huawei.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制 --enable-hive-sync // 开启hive同步,同步hudi表到hive --continuous // 指定流处理模式为连续模式
  • compaction&cleaning配置 参数 描述 默认值 hoodie.clean.automatic 是否执行自动clean。 true hoodie.cleaner.policy 要使用的清理政策。Hudi将删除旧版本的parquet文件以回收空间。 任何引用此版本文件的查询和计算都将失败。需要确保数据保留的时间超过最大查询执行时间。 KEEP_LATEST_COMMITS hoodie.cleaner.commits.retained 保留的提交数。因此,数据将保留为num_of_commits * time_between_commits(计划的),这也直接转化为逐步提取此数据集的数量。 10 hoodie.keep.max.commits 触发归档操作的commit数阈值 30 hoodie.keep.min.commits 归档操作保留的commit数 20 hoodie.commits.archival.batch 这控制着批量读取并一起归档的提交即时的数量。 10 hoodie.parquet.small.file.limit 该值应小于maxFileSize,如果将其设置为0,会关闭此功能。由于批处理中分区中插入记录的数量众多,总会出现小文件。Hudi提供了一个选项,可以通过将对该分区中的插入作为对现有小文件的更新来解决小文件的问题。此处的大小是被视为“小文件大小”的最小文件大小。 104857600 byte hoodie.copyonwrite.insert.split.size 插入写入并行度。为单个分区的总共插入次数。写出100MB的文件,至少1KB大小的记录,意味着每个文件有100K记录。默认值是超额配置为500K。 为了改善插入延迟,请对其进行调整以匹配单个文件中的记录数。将此值设置为较小的值将导致文件变小(尤其是当compactionSmallFileSize为0时)。 500000 hoodie.copyonwrite.insert.auto.split Hudi是否应该基于最后24个提交的元数据动态计算insertSplitSize,默认关闭。 true hoodie.copyonwrite.record.size.estimate 平均记录大小。如果指定,Hudi将使用它,并且不会基于最后24个提交的元数据动态地计算。 没有默认值设置。这对于计算插入并行度以及将插入打包到小文件中至关重要。 1024 hoodie.compact.inline 当设置为true时,紧接在插入或插入更新或批量插入的提交或增量提交操作之后由摄取本身触发压缩。 true hoodie.compact.inline.max.delta.commits 触发内联压缩之前要保留的最大增量提交数。 5 hoodie.compaction.lazy.block.read 当CompactedLogScanner合并所有日志文件时,此配置有助于选择是否应延迟读取日志块。选择true以使用I/O密集型延迟块读取(低内存使用),或者为false来使用内存密集型立即块读取(高内存使用)。 true hoodie.compaction.reverse.log.read HoodieLogFormatReader会从pos=0到pos=file_length向前读取日志文件。如果此配置设置为true,则Reader会从pos=file_length到pos=0反向读取日志文件。 false hoodie.cleaner.parallelism 如果清理变慢,请增加此值。 200 hoodie.compaction.strategy 用来决定在每次压缩运行期间选择要压缩的文件组的压缩策略。默认情况下,Hudi选择具有累积最多未合并数据的日志文件。 org.apache.hudi.table.action.compact.strategy. LogFileSizeBasedCompactionStrategy hoodie.compaction.target.io LogFileSizeBasedCompactionStrategy的压缩运行期间要花费的MB量。当压缩以内联模式运行时,此值有助于限制摄取延迟。 500 * 1024 MB hoodie.compaction.daybased.target.partitions 由org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy使用,表示在压缩运行期间要压缩的最新分区数。 10 hoodie.compaction.payload.class 这需要与插入/插入更新过程中使用的类相同。就像写入一样,压缩也使用记录有效负载类将日志中的记录彼此合并,再次与基本文件合并,并生成压缩后要写入的最终记录。 org.apache.hudi.common.model.Defaulthoodierecordpayload hoodie.schedule.compact.only.inline 在写入操作时,是否只生成压缩计划。在hoodie.compact.inline=true时有效。 false hoodie.run.compact.only.inline 通过Sql执行run compaction命令时,是否只执行压缩操作,压缩计划不存在时直接退出。 false 父主题: 配置参考
  • index相关配置 参数 描述 默认值 hoodie.index.class 用户自定义索引的全路径名,索引类必须为HoodieIndex的子类,当指定该配置时,其会优先于hoodie.index.type配置。 "" hoodie.index.type 使用的索引类型,默认为布隆过滤器。可能的选项是[BLOOM | HBASE | GLOBAL_BLOOM | SIMPLE | GLOBAL_SIMPLE] 。 布隆过滤器消除了对外部系统的依赖,并存储在Parquet数据文件的页脚中。 BLOOM hoodie.index.bloom.num_entries 存储在布隆过滤器中的条目数。 假设maxParquetFileSize为128MB,averageRecordSize为1024B,因此,一个文件中的记录总数约为130K。 默认值(60000)大约是此近似值的一半。 注意: 将此值设置得太低,将产生很多误报,并且索引查找将必须扫描比其所需的更多的文件;如果将其设置得非常高,将线性增加每个数据文件的大小(每50000个条目大约4KB)。 60000 hoodie.index.bloom.fpp 根据条目数允许的错误率。 用于计算应为布隆过滤器分配多少位以及哈希函数的数量。通常将此值设置得很低(默认值:0.000000001),在磁盘空间上进行权衡以降低误报率。 0.000000001 hoodie.bloom.index.parallelism 索引查找的并行度,其中涉及Spark Shuffle。 默认情况下,根据输入的工作负载特征自动计算的。 0 hoodie.bloom.index.prune.by.ranges 为true时,从文件框定信息,可以加快索引查找的速度。 如果键具有单调递增的前缀,例如时间戳,则特别有用。 true hoodie.bloom.index.use.caching 为true时,将通过减少用于计算并行度或受影响分区的IO来缓存输入的RDD以加快索引查找。 true hoodie.bloom.index.use.treebased.filter 为true时,启用基于间隔树的文件过滤优化。与暴力模式相比,此模式可根据键范围加快文件过滤速度。 true hoodie.bloom.index.bucketized.checking 为true时,启用了桶式布隆过滤。这减少了在基于排序的布隆索引查找中看到的偏差。 true hoodie.bloom.index.keys.per.bucket 仅在启用bloomIndexBucketizedChecking并且索引类型为bloom的情况下适用。 此配置控制“存储桶”的大小,该大小可跟踪对单个文件进行的记录键检查的次数,并且是分配给执行布隆过滤器查找的每个分区的工作单位。 较高的值将分摊将布隆过滤器读取到内存的固定成本。 10000000 hoodie.bloom.index.update.partition.path 仅在索引类型为GLOBAL_BLOOM时适用。 为true时,当对一个已有记录执行包含分区路径的更新操作时,将会导致把新记录插入到新分区,而把原有记录从旧分区里删除。为false时,只对旧分区的原有记录进行更新。 true hoodie.index.hbase.zkquorum 仅在索引类型为HBASE时适用,必填选项。要连接的HBase ZK Quorum URL。 无 hoodie.index.hbase.zkport 仅在索引类型为HBASE时适用,必填选项。要连接的HBase ZK Quorum端口。 无 hoodie.index.hbase.zknode.path 仅在索引类型为HBASE时适用,必填选项。这是根znode,它将包含HBase创建及使用的所有znode。 无 hoodie.index.hbase.table 仅在索引类型为HBASE时适用,必填选项。HBase表名称,用作索引。Hudi将row_key和[partition_path, fileID, commitTime]映射存储在表中。 无 父主题: 配置参考
  • 存储配置 参数 描述 默认值 hoodie.parquet.max.file.size Hudi写阶段生成的parquet文件的目标大小。对于DFS,这需要与基础文件系统块大小保持一致,以实现最佳性能。 120 * 1024 * 1024 byte hoodie.parquet.block.size parquet页面大小,页面是parquet文件中的读取单位,在一个块内,页面被分别压缩。 120 * 1024 * 1024 byte hoodie.parquet.compression.ratio 当Hudi尝试调整新parquet文件的大小时,预期对parquet数据进行压缩的比例。 如果bulk_insert生成的文件小于预期大小,请增加此值。 0.1 hoodie.parquet.compression.codec parquet压缩编解码方式名称,默认值为gzip。可能的选项是[gzip | snappy | uncompressed | lzo] snappy hoodie.logfile.max.size LogFile的最大值。这是在将日志文件移到下一个版本之前允许的最大值。 1GB hoodie.logfile.data.block.max.size LogFile数据块的最大值。这是允许将单个数据块附加到日志文件的最大值。 这有助于确保附加到日志文件的数据被分解为可调整大小的块,以防止发生OOM错误。此大小应大于JVM内存。 256MB hoodie.logfile.to.parquet.compression.ratio 随着记录从日志文件移动到parquet,预期会进行额外压缩的比例。 用于merge_on_read存储,以将插入内容发送到日志文件中并控制压缩parquet文件的大小。 0.35 父主题: 配置参考
  • 同步hive表配置 参数 描述 默认值 hoodie.datasource.hive_sync.enable 是否同步hudi表信息到hive metastore。 注意: 建议该值设置为true,统一使用hive管理hudi表。 false hoodie.datasource.hive_sync.database 要同步给hive的数据库名。 default hoodie.datasource.hive_sync.table 要同步给hive的表名,建议这个值和hoodie.datasource.write.table.name保证一致。 unknown hoodie.datasource.hive_sync.username 同步hive时,指定的用户名。 hive hoodie.datasource.hive_sync.password 同步hive时,指定的密码。 hive hoodie.datasource.hive_sync.jdbcurl 连接hive jdbc指定的连接。 "" hoodie.datasource.hive_sync.use_jdbc 是否使用hive jdbc方式连接hive同步hudi表信息。建议该值设置为false,设置为false后 jdbc连接相关配置无效。 true hoodie.datasource.hive_sync.partition_fields 用于决定hive分区列。 "" hoodie.datasource.hive_sync.partition_extractor_class 用于提取hudi分区列值,将其转换成hive分区列。 org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor hoodie.datasource.hive_sync.support_timestamp 当hudi表存在timestamp类型字段时,需指定此参数为true,以实现同步timestamp类型到hive元数据中。该值默认为false,默认将timestamp类型同步为bigInt,默认情况可能导致使用sql查询包含timestamp类型字段的hudi表出现错误。 true 父主题: 配置参考
  • Cleaning Cleaning用于清理不再需要的版本数据。 Hudi使用Cleaner后台作业,不断清除不需要的旧得版本的数据。通过配置hoodie.cleaner.policy和hoodie.cleaner.commits.retained可以使用不同的清理策略和保存的commit数量。 执行cleaning有两种方式: 同步clean由参数hoodie.clean.automatic控制,默认自动开启。 关闭同步clean: datasource写入时可以通过.option("hoodie.clean.automatic", "false")来关闭自动clean。 spark-sql写入时可以通过set hoodie.clean.automatic=false;来关闭自动clean。 异步clean可以使用spark-sql来执行。 更多clean相关参数请参考compaction&cleaning配置章节。 父主题: 数据管理维护
  • Clustering架构 Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。 为能够支持快速摄取的同时不影响查询性能,引入了Clustering服务来重写数据以优化Hudi数据湖文件的布局。 Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。 Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。 总体而言Clustering分为两个部分: 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。 将Clustering计划以avro元数据格式保存到时间线。 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。
  • 将Hudi表数据同步到Hive 通过执行run_hive_sync_tool.sh可以将Hudi表数据同步到Hive中。 例如:需要将HDFS上目录为hdfs://hacluster/tmp/huditest/hudimor1_deltastreamer_partition的Hudi表同步为Hive表,表名为table hive_sync_test3,使用unite、country和state为分区键,命令示例如下: run_hive_sync_tool.sh --partitioned-by unite,country,state --base-path hdfs://hacluster/tmp/huditest/hudimor1_deltastreamer_partition --table hive_sync_test3 --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor --support-timestamp 表1 参数说明 命令 描述 必填 默认值 --database Hive database名称 N default --table Hive表名 Y - --base-file-format 文件格式 (PARQUET或HFILE) N PARQUET --user Hive用户名 N - --pass Hive密码 N - --jdbc-url Hive jdbc connect url N - --base-path 待同步的Hudi表存储路径 Y - --partitioned-by 分区键- N - --partition-value-extractor 分区类,需实现PartitionValueExtractor ,可以从HDFS路径中提取分区值 N SlashEncodedDayPartitionValueExtractor --assume-date-partitioning 以 yyyy/mm/dd进行分区从而支持向后兼容。 N false --use-pre-apache-input-format 使用com.uber.hoodie包下的InputFormat替换 org.apache.hudi包下的。除了从com.uber.hoodie迁移项目至org.apache.hudi外请勿使用。 N false --use-jdbc 使用Hive jdbc连接 N true --auto-create-database 自动创建Hive database N true --skip-ro-suffix 注册时跳过读取_ro后缀的读优化视图 N false --use-file-listing-from-metadata 从Hudi的元数据中获取文件列表 N false --verify-metadata-file-listing 根据文件系统验证Hudi元数据中的文件列表 N false --help、-h 查看帮助 N false --support-timestamp 将原始类型中'INT64'的TIMESTAMP_MICROS转换为Hive的timestamp N false --decode-partition 如果分区在写入过程中已编码,则解码分区值 N false --batch-sync-num 指定每批次同步hive的分区数 N 1000 Hive Sync时会判断表不存在时建外表并添加分区,表存在时对比表的schema是否存在差异,存在则替换,对比分区是否有新增,有则添加分区。 因此使用hive sync时有以下约束: 写入数据Schema只允许增加字段,不允许修改、删除字段。 分区目录只能新增,不会删除。 Overwrite覆写Hudi表不支持同步覆盖Hive表。 Hudi同步Hive表时,不支持使用timestamp类型作为分区列。 父主题: 写操作指导
  • 问题 Hive同步数据时报错: Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. The following columns have types incompatible with the existing columns in their respective positions :__col1,__col2
  • 回答 不可以,会抛HoodieKeyException异常。 Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "name" cannot be null or empty.at org.apache.hudi.keygen.SimpleKeyGenerator.getKey(SimpleKeyGenerator.java:58)at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:104)at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:100)
  • 问题 线程“main”报错 org.apache.kafka.common.KafkaException,构造kafka消费者失败,报错: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
  • 回答 当试图从启用SSL的kafka数据源采集数据时,而安装程序无法读取jars.conf文件及其属性时,可能会发生这种情况。 要解决此问题,需要将所需的属性作为通过Spark提交的命令的一部分传递。如:--files jaas.conf,failed_tables.json --conf 'spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf' --conf 'spark.executor .extraJavaOptions=-Djava.security.auth.login.config=jaas.conf'
  • 回答 建议在使用Hudi时,schema应该以向后兼容的方式演进。此错误通常发生在使用向后不兼容的演进方式删除某些列如“col1”后,更新parquet文件中以旧的schema写入的列“col1”,在这种情况下,parquet尝试在传入记录中查找所有当前字段,当发现“col1”不存在时,抛出上述异常。 解决这个问题的办法是使用所有schema演进版本来创建uber schema,并使用该schema作为target schema。用户可以从hive metastore中获取schema并将其与当前schema合并。
  • 操作并发 DDL和DML中的操作,执行前,需要获取对应的锁,各操作需要获取锁的情况见表1 操作获取锁一览表,√表示需要获取该锁,一个操作仅在获取到所有需要获取的锁后,才能继续执行。 任意两个操作是否可以并发执行,可以通过如下方法确定:表1两行代表两个操作,这两行没有任意一列都标记√,即不存在某一列两行全为√。 表1 操作获取锁一览表 操作 METADATA_LOCK COMPACTION_LOCK DROP_TABLE_LOCK DELETE_SEGMENT_LOCK CLEAN_FILES_LOCK ALTER_PARTITION_LOCK UPDATE_LOCK STREAMING_LOCK CONCURRENT_LOAD_LOCK SEGMENT_LOCK CREATE TABLE - - - - - - - - - - CREATE TABLE As SELECT - - - - - - - - - - DROP TABLE √ - √ - - - - √ - - ALTER TABLE COMPACTION - √ - - - - √ - - - TABLE RENAME - - - - - - - - - - ADD COLUMNS √ √ - - - - - - - - DROP COLUMNS √ √ - - - - - - - - CHANGE DATA TYPE √ √ - - - - - - - - REFRESH TABLE - - - - - - - - - - REGISTER INDEX TABLE √ - - - - - - - - - REFRESH INDEX - √ - - - - - - - - LOAD DATA/INSERT INTO - - - - - - - - √ √ UPDATE CARBON TABLE √ √ - - - - √ - - - DELETE RECORDS from CARBON TABLE √ √ - - - - √ - - - DELETE SEGMENT by ID - - - √ √ - - - - - DELETE SEGMENT by DATE - - - √ √ - - - - - SHOW SEGMENTS - - - - - - - - - - CREATE SECONDARY INDEX √ √ - √ - - - - - - SHOW SECONDARY INDEXES - - - - - - - - - - DROP SECONDARY INDEX √ - √ - - - - - - - CLEAN FILES - - - - - - - - - - SET/RESET - - - - - - - - - - Add Hive Partition - - - - - - - - - - Drop Hive Partition √ √ √ √ √ √ - - - - Drop Partition √ √ √ √ √ √ - - - - Alter table set √ √ - - - - - - - - 父主题: CarbonData语法参考
  • 基础操作 使用root用户登录集群客户端节点,执行如下命令: cd {客户端安装目录} source bigdata_env source Hudi/component_env kinit 创建的用户 执行hudi-cli.sh进入Hudi客户端, cd {客户端安装目录}/Hudi/hudi/bin/ ./hudi-cli.sh 即可执行各种Hudi命令,执行示例(仅部分命令,全部命令请参考Hudi官网:https://hudi.apache.org/docs/quick-start-guide/): 查看帮助: help //查看hudi-cli的所有命令 help 'command' //查看某一个命令的帮助及参数列表。 连接表: connect --path '/tmp/huditest/test_table' 查看表信息: desc 查看compaction计划: compactions show all 查看clean计划: cleans show 执行clean: cleans run 查看commit信息: commits show 查看commit写入的分区: commit showpartitions --commit 20210127153356 20210127153356表示commit的时间戳,下同。 查看指定commit写入的文件: commit showfiles --commit 20210127153356 比较两个表的commit信息差异: commits compare --path /tmp/hudimor/mytest100 rollback指定提交(rollback每次只允许rollback最后一次commit): commit rollback --commit 20210127164905 compaction调度: compaction schedule --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' 执行compaction compaction run --parallelism 100 --sparkMemory 1g --retry 1 --compactionInstant 20210602101315 --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' --propsFilePath hdfs://hacluster/tmp/default/tb_test_mor/.hoodie/hoodie.properties --schemaFilePath /tmp/default/tb_test_mor/.hoodie/compact_tb_base.json 创建savepoint savepoint create --commit 20210318155750 回滚指定的savepoint savepoint rollback --savepoint 20210318155750 若commit写入导致元数据冲突异常,执行commit rollback、savepoint rollback能回退数据,但不能回退Hive元数据,只能删除Hive表然后手动进行同步刷新。 commit rollback只能回退当前最新的一个commit,savepoint rollback只能回退到最新的一个savepoint。二者均不能随意指定进行回退。
共100000条