华为云用户手册

  • 规则 Hudi表必须执行Clean。 对于Hudi的MOR、COW表,都需要开启Clean。 Hudi表在写入数据时会自动判断是否需要执行Clean,因为Clean的开关默认打开(hoodie.clean.automatic默认为true)。 Clean操作并不是每次写数据时都会触发,至少需要满足两个条件: Hudi表中需要有旧版本的文件。对于COW表来说,只要保证数据被更新过就一定存在旧版本的文件。对于MOR表来说,要保证数据被更新过并且做过Compaction才能有旧版本的文件。 Hudi表满足hoodie.cleaner.commits.retained设置的阈值。如果是Flink写hudi,则至少提交的checkpoint要超过这个阈值;如果是批写Hudi,则批写次数要超过这个阈值。
  • 建议 MOR表下游采用批量读模式,采用clean的版本数为compaction版本数+1。 MOR表一定要保证Compaction Plan能够被成功执行,Compaction Plan只是记录了Hudi表中哪些Log文件要和哪些Parquet文件合并,所以最重要的地方在于保证Compaction Plan在被执行的时候它需要合并的文件都存在。而Hudi表中只有Clean操作可以清理文件,所以建议Clean的触发阈值(hoodie.cleaner.commits.retained的值)至少要大于Compaction的触发阈值(对于Flink任务来说就是compaction.delta_commits的值)。 MOR表下游采用流式计算,历史版本保留小时级。 如果MOR表的下游是流式计算,例如Flink流读,可以按照业务需要保留小时级的历史版本,这样的话近几个小时之内的增量数据可以通过log文件读出,如果保留时长过短,下游flink作业在重启或者异常中断阻塞的情况下,上游增量数据已经Clean掉了,flink需要从parquet文件读增量数据,性能会有下降;如果保留时间过长,会导致log里面的历史数据冗余存储。 具体可以按照下面的计算公式来保留2个小时的历史版本数据: 版本数设置为3600*2/版本interval时间,版本interval时间来自于flink作业的checkpoint周期,或者上游批量写入的周期。 COW表如果业务没有历史版本数据保留的特殊要求,保留版本数设置为1。 COW表的每个版本都是表的全量数据,保留几个版本就会冗余多少个版本。因此如果业务无历史数据回溯的需求,保留版本数设置为1,也就是保留当前最新版本 clean作业每天至少执行一次,可以2~4小时执行一次。 Hudi的MOR表和COW表都需要保证每天至少1次Clean,MOR表的Clean可以参考2.2.1.6小节和Compaction放在一起异步去执行。COW的Clean可以在写数据时自动判断是否执行。
  • 建议 事实表采用日期分区表,维度表采用非分区或者大颗粒度的日期分区 是否采用分区表要根据表的总数据量、增量和使用方式来决定。从表的使用属性看事实表和维度表具有的特点: 事实表:数据总量大,增量大,数据读取多以日期做切分,读取一定时间段的数据。 维度表:总量相对小,增量小,多以更新操作为主,数据读取会是全表读取,或者按照对应业务ID过滤。 基于以上考虑,维度表采用天分区会导致文件数过多,而且是全表读取,会导致所需要的文件读取Task过多,采用大颗粒度的日期分区,例如年分区,可以有效降低分区个数和文件数量;对于增量不是很大的维度表,也可以采用非分区表。如果维度表的总数据量很大或者增量也很大,可以考虑采用某个业务ID进行分区,在大部分数据处理逻辑中针对大维度表,会有一定的业务条件进行过滤来提升处理性能,这类表要结合一定的业务场景来进行优化,无法从单纯的日期分区进行优化。事实表读取方式都会按照时间段切分,近一年、近一个月或者近一天,读取的文件数相对稳定可控,所以事实表优先考虑日期分区表。 分区采用日期字段,分区表粒度,要基于数据更新范围确定,不要过大也不要过小。 分区粒度可以采用年、月、日,分区粒度的目标是减少同时写入的文件桶数,尤其是在有数据量更新,且更新数据有一定时间范围规律的,比如:近一个月的数据更新占比最大,可以按照月份创建分区;近一天内的数据更新占比大,可以按照天进行分区。 采用Bucket索引,写入是通过主键Hash打散的,数据会均匀的写入到分区下每个桶。因为各个分区的数据量是会有波动的,分区下桶的个数设计一般会按照最大分区数据量计算,这样会出现越细粒度的分区,桶的个数会冗余越多。例如: 采用天级分区,平均的日增数据量是3GB,最多一天的日志是8GB,这个会采用Bucket桶数= 8GB/2GB = 4 来创建表;每天的更新数据占比较高,且主要分散到近一个月。这样会导致结果是,每天的数据会写入到全月的Bucket桶中,那就是4*30 = 120个桶。如果采用月分区,分区桶的个数= 3GB * 30 /2GB = 45个桶 ,这样写入的数据桶数减少到了45个桶。在有限的计算资源下,写入的桶数越少,性能越高。
  • 规则 Hudi表必须执行Archive。 对于Hudi的MOR类型和COW类型的表,都需要开启Archive。 Hudi表在写入数据时会自动判断是否需要执行Archive,因为Archive的开关默认打开(hoodie.archive.automatic默认为true)。 Archive操作并不是每次写数据时都会触发,至少需要满足以下两个条件: Hudi表满足hoodie.keep.max.commits设置的阈值。如果是Flink写hudi至少提交的checkpoint要超过这个阈值;如果是Spark写hudi,写Hudi的次数要超过这个阈值。 Hudi表做过Clean,如果没有做过Clean就不会执行Archive(MRS 3.3.1-LTS及以后版本,忽略此项条件)。
  • 使用DataArts创建Hudi表 DataArts支持通过Spark JDBC方式和Spark API方式操作Hudi表: Spark JDBC方式使用公用资源,不用单独起Spark作业,但是不能指定执行SQL所需要的资源以及配置参数,因此建议用来做建表操作或小数据量的查询操作。 Spark API方式执行的SQL独立起Spark作业,有一定的耗时,但是可以通过配置运行程序参数来指定作业所需要的资源等参数,建议批量导入等 作业使用API方式来指定资源运行,防止占用jdbc资源长时间阻塞其他任务。 DataArts使用Spark API方式操作Hudi表,必须要添加参数--conf spark.support.hudi=true,并且通过执行调度来运行作业。
  • 确认建表SQL DataArts支持通过Spark JDBC方式和Spark API方式操作Hudi表: Spark JDBC方式使用公用资源,不用单独起Spark作业,但是不能指定执行SQL所需要的资源以及配置参数,因此建议用来做建表操作或小数据量的查询操作。 Spark API方式执行的SQL独立起Spark作业,有一定的耗时,但是可以通过配置运行程序参数来指定作业所需要的资源等参数,建议批量导入等 作业使用API方式来指定资源运行,防止占用jdbc资源长时间阻塞其他任务。 DataArts使用Spark API方式操作Hudi表,必须要添加参数--conf spark.support.hudi=true,并且通过执行调度来运行作业。
  • 判断使用分区表还是非分区表 根据表的使用场景一般将表分为事实表和维度表: 事实表通常整表数据规模较大,以新增数据为主,更新数据占比小,且更新数据大多落在近一段时间范围内(年或月或天),下游读取该表进行ETL计算时通常会使用时间范围进行裁剪(例如最近一天、一月、一年),这种表通常可以通过数据的创建时间来做分区已保证最佳读写性能。 维度表数据量一般整表数据规模较小,以更新数据为主,新增较少,表数据量比较稳定,且读取时通常需要全量读取做join之类的ETL计算,因此通常使用非分区表性能更好。 分区表的分区键不允许更新,否则会产生重复数据。 例外场景:超大维度表和超小事实表 特殊情况如存在持续大量新增数据的维度表(表数据量在200G以上或日增长量超过60M)或数据量非常小的事实表(表数据量小于10G且未来三至五年增长后也不会超过10G)需要针对具体场景来进行例外处理: 持续大量新增数据的维度表 方法一:预留桶数,如使用非分区表则需通过预估较长一段时间内的数据增量来预先增加桶数,缺点是随着数据的增长,文件依然会持续膨胀; 方法二:大粒度分区(推荐),如果使用分区表则需要根据数据增长情况来计算,例如使用年分区,这种方式相对麻烦些但是多年后表无需重新导入。 方法三:数据老化,按照业务逻辑分析大的维度表是否可以通过数据老化清理无效的维度数据从而降低数据规模。 数据量非常小的事实表 这种可以在预估很长一段时间的数据增长量的前提下使用非分区表预留稍宽裕一些的桶数来提升读写性能。
  • 确认表内桶数 Hudi表的桶数设置,关系到表的性能,需要格外引起注意。 以下几点,是设置桶数的关键信息,需要建表前确认。 非分区表 单表数据总条数 = select count(1) from tablename(入湖时需提供); 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100将查询结果粘贴在notepad++中得出100条数据的大小再除以100得到单条平均大小) 单表数据量大小(G) = 单表数据总条数*单表数据大小/1024/1024 非分区表桶数 = MAX(单表数据量大小(G)/2G*2,再向上取整,4) 分区表 最近一个月最大数据量分区数据总条数 = 入湖前咨询产品线 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100将查询结果粘贴在notepad++中得出100条数据的大小再除以100得到单条平均大小) 单分区数据量大小(G) = 最近一个月最大数据量分区数据总条数*单表数据大小/1024/1024 分区表桶数 = MAX(单分区数据量大小(G)/2G,再后向上取整,1) 需要使用的是表的总数据大小,而不是压缩以后的文件大小 桶的设置以偶数最佳,非分区表最小桶数请设置4个,分区表最小桶数请设置1个。
  • 规则 写作业未停止情况下,禁止手动执行run schedule命令生成compaction计划。 错误示例: run schedule on dsrTable 如果还有别的任务在写这张表,执行该操作会导致数据丢失。 执行run compaction命令时,禁止将hoodie.run.compact.only.inline设置成false,该值需要设置成true。 错误示例: set hoodie.run.compact.only.inline=false; run compaction on dsrTable; 如果还有别的任务在写这张表,执行上述操作会导致数据丢失。 正确示例:异步Compaction set hoodie.compact.inline = true; set hoodie.run.compact.only.inline=true; run compaction on dsrTable;
  • 规则 Hudi表必须设置合理的主键。 Hudi表提供了数据更新和幂等写入能力,该能力要求Hudi表必须设置主键,主键设置不合理会导致数据重复。主键可以为单一主键也可以为复合主键,两种主键类型均要求主键不能有null值和空值,可以参考以下示例设置主键: SparkSQL: // 通过primaryKey指定主键,如果是复合主键需要用逗号分隔 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: // 通过hoodie.datasource.write.recordkey.field指定主键 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). FlinkSQL: // 通过hoodie.datasource.write.recordkey.field指定主键 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') Hudi表必须配置precombine字段。 在数据同步过程中不可避免会出现数据重复写入、数据乱序问题,例如:异常数据恢复、写入程序异常重启等场景。通过设置合理precombine字段值可以保证数据的准确性,老数据不会覆盖新数据,也就是幂等写入能力。该字段可用选择的类型包括:业务表中更新时间戳、数据库的提交时间戳等。precombine字段不能有null值和空值,可以参考以下示例设置precombine字段: SparkSQL: //通过preCombineField指定precombine字段 create table hudi_table ( id1 int, id2 int, name string, price double ) using hudi options ( primaryKey = 'id1,id2', preCombineField = 'price' ); SparkDatasource: //通过hoodie.datasource.write.precombine.field指定precombine字段 df.write.format("hudi"). option("hoodie.datasource.write.table.type", COPY_ON_WRITE). option("hoodie.datasource.write.precombine.field", "price"). option("hoodie.datasource.write.recordkey.field", "id1,id2"). Flink: //通过write.precombine.field指定precombine字段 create table hudi_table( id1 int, id2 int, name string, price double ) partitioned by (name) with ( 'connector' = 'hudi', 'hoodie.datasource.write.recordkey.field' = 'id1,id2', 'write.precombine.field' = 'price') 流式计算采用MOR表。 流式计算为低时延的实时计算,需要高性能的流式读写能力,在Hudi表中存在的MOR和COW两种模型中,MOR表的流式读写性能相对较好,因此在流式计算场景下采用MOR表模型。关于MOR表在读写性能的对比关系如下: 对比维度 MOR表 COW表 流式写 高 低 流式读 高 低 批量写 高 低 批量读 低 高 实时入湖,表模型采用MOR表。 实时入湖一般的性能要求都在分钟内或者分钟级,结合Hudi两种表模型的对比,因此在实时入湖场景中需要选择MOR表模型。 Hudi表名以及列名采用小写字母。 多引擎读写同一张Hudi表时,为了规避引擎之间大小写的支持不同,统一采用小写字母。
  • 建议 Spark批处理场景,对写入时延要求不高的场景,采用COW表。 COW表模型中,写入数据存在写放大问题,因此写入速度较慢;但COW具有非常好的读取性能力。而且批量计算对写入时延不是很敏感,因此可以采用COW表。 Hudi表的写任务要开启Hive元数据同步功能。 SparkSQL天然与Hive集成,无需考虑元数据问题。该条建议针对的是通过Spark Datasource API或者Flin写Hudi表的场景,通过这两种方式写Hudi时需要增加向Hive同步元数据的配置项;该配置的目的是将Hudi表的元数据统一托管到Hive元数据服务中,为后续的跨引擎操作数据以及数据管理提供便利。
  • 规则 有数据持续写入的表,24小时内至少执行一次compaction。 对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题: Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。 CPU与内存比例为1:4~1:8。 Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:4~1:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。 【建议】通过增加并发数提升Compaction性能。
  • 实时任务接入 实时作业一般由Flink Sql或Sparkstreaming来完成,流式实时任务通常配置同步生成compaction计划,异步执行计划。 Flink SQL作业中sink端Hudi表相关配置如下: create table denza_hudi_sink ( $HUDI_SINK_SQL_REPLACEABLE$ ) PARTITIONED BY ( years, months, days ) with ( 'connector' = 'hudi', //指定写入的是Hudi表 'path' = 'obs://XXXXXXXXXXXXXXXXXX/', //指定Hudi表的存储路径 'table.type' = 'MERGE_ON_READ', //Hudi表类型 'hoodie.datasource.write.recordkey.field' = 'id', //主键 'write.precombine.field' = 'vin', //合并字段 'write.tasks' = '10', //flink写入并行度 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', //指定KeyGenerator,与Spark创建的Hudi表类型一致 'hoodie.datasource.write.hive_style_partitioning' = 'true', //使用hive支持的分区格式 'read.streaming.enabled' = 'true', //开启流读 'read.streaming.check-interval' = '60', //checkpoint间隔,单位为秒 'index.type'='BUCKET', //指定Hudi表索引类型为BUCKET 'hoodie.bucket.index.num.buckets'='10', //指定bucket桶数 'compaction.delta_commits' = '3', //compaction生成的commit间隔 'compaction.async.enabled' = 'false', //compaction异步执行关闭 'compaction.schedule.enabled' = 'true', //compaction同步生成计划 'clean.async.enabled' = 'false', //异步clean关闭 'hoodie.archive.automatic' = 'false', //自动archive关闭 'hoodie.clean.automatic' = 'false', //自动clean关闭 'hive_sync.enable' = 'true', //自动同步hive表 'hive_sync.mode' = 'jdbc', //同步hive表方式为jdbc 'hive_sync.jdbc_url' = '', //同步hive表的jdbc url 'hive_sync.db' = 'hudi_cars_byd', //同步hive表的database 'hive_sync.table' = 'byd_hudi_denza_1s_mor', //同步hive表的tablename 'hive_sync.metastore.uris' = 'thrift://XXXXX:9083 ', //同步hive表的metastore uri 'hive_sync.support_timestamp' = 'true', //同步hive表支持timestamp格式 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' //同步hive表的extractor类 ); Spark streaming写入Hudi表常用的参数如下(参数意义与上面flink类似,不再做注释): hoodie.table.name= hoodie.index.type=BUCKET hoodie.bucket.index.num.buckets=3 hoodie.datasource.write.precombine.field= hoodie.datasource.write.recordkey.field= hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.table.type= MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.clean.automatic=false hoodie.clean.async=false hoodie.archive.async=false hoodie.archive.automatic=false hoodie.compact.inline.max.delta.commits=50 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.table= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor 父主题: Bucket调优示例
  • Spark表数据维护规范 禁止通过Alter命令修改表关键属性信息:type/primaryKey/preCombineField/hoodie.index.type 错误示例,执行如下语句修改表关键属性: alter table dsrTable set tblproperties('type'='xx'); alter table dsrTable set tblproperties('primaryKey'='xx'); alter table dsrTable set tblproperties('preCombineField'='xx'); alter table dsrTable set tblproperties('hoodie.index.type'='xx'); Hive/Presto等引擎可以直接修改表属性,但是这种修改会导致整个Hudi表出现数据重复,甚至数据损坏;因此禁止修改上述属性。 父主题: 开发规范
  • 规则 建表必须指定primaryKey和preCombineField。 Hudi表提供了数据更新的能力和幂等写入的能力,该能力要求数据记录必须设置主键用来识别重复数据和更新操作。不指定主键会导致表丢失数据更新能力,不指定preCombineField会导致主键重复。 参数名称 参数描述 输入值 说明 primaryKey hudi主键 按需 必须指定,可以是复合主键但是必须全局唯一。 preCombineField 预合并键,相同主键的多条数据按该字段进行合并 按需 必须指定,相同主键的数据会按该字段合并,不能指定多个字段。 禁止建表时将hoodie.datasource.hive_sync.enable指定为false。 指定为false将导致新写入的分区无法同步到Hive Metastore中。由于缺失新写入的分区信息,查询引擎读取该时会丢数。 禁止指定Hudi的索引类型为INMEMORY类型。 该索引仅是为了测试使用。生产环境上使用该索引将导致数据重复。
  • 建表示例 create table data_partition(id int, comb int, col0 int,yy int, mm int, dd int) using hudi --指定hudi 数据源 partitioned by(yy,mm,dd) --指定分区, 支持多级分区 location '/opt/log/data_partition' --指定路径,如果不指定建表在hive warehouse里 options( type='mor', --表类型 mor 或者 cow primaryKey='id', --主键,可以是复合主键但是必须全局唯一 preCombineField='comb' --预合并字段,相同主键的数据会按该字段合并,当前不能指定多个字段 )
  • 规则 禁止修改表索引类型。 Hudi表的索引会决定数据存储方式,随意修改索引类型会导致表中已有的存量数据与新增数据之间出现数据重复和数据准确性问题。常见的索引类型如下: 布隆索引:Spark引擎独有索引,采用bloomfiter机制,将布隆索引内容写入到Parquet文件的footer中。 Bucket索引:在写入数据过程中,通过主键进行Hash计算,将数据进行分桶写入;该索引写入速度最快,但是需要合理配置分桶数目;Flink、Spark均支持该索引写入。 状态索引:Flink引擎独有索引,是将行记录的存储位置记录到状态后端的一种索引形式,在作业冷启动过程中会遍历所有数据存储文件生成索引信息。 用Flink状态索引,Flink写入后,不支持Spark继续写入。 Flink在写Hudi的MOR表只会生成log文件,后续通过compaction操作,将log文件转为parquet文件。Spark在更新Hudi表时严重依赖parquet文件是否存在,如果当前Hudi表写的是log文件,采用Spark写入就会导致重复数据的产生。在批量初始化阶段 ,先采用Spark批量写入Hudi表,在用Flink基于Flink状态索引写入不会有问题,原因是Flink冷启动的时候会遍历所有的数据文件生成状态索引。 实时入湖场景中,Spark引擎采用Bucket索引,Flink引擎可以用Bucket索引或者状态索引。 实时入湖都是需要分钟内或者分钟级的高性能入湖,索引的选择会影响到写Hudi表的性能。在性能方面各个索引的区别如下: Bucket索引 优点:写入过程中对主键进行hash分桶写入,性能比较高,不受表的数据量限制。Flink和Spark引擎都支持,Flink和Spark引擎可以实现交叉混写同一张表。 缺点:Bucket个数不能动态调整,数据量波动和整表数据量持续上涨会导致单个Bucket数据量过大出现大数据文件。需要结合分区表来进行平衡改善。 Flink状态索引 优点:主键的索引信息存在状态后端,数据更新只需要点查状态后端即可,速度较快;同时生成的数据文件大小稳定,不会产生小文件、超大文件问题。 缺点:该索引为Flink特有索引。在表的总数据行数达到数亿级别,需要优化状态后端参数来保持写入的性能。使用该索引无法支持Flink和Spark交叉混写。 对于数据总量持续上涨的表,采用Bucket索引时,须使用时间分区,分区键采用数据创建时间。 参照Flink状态索引的特点,Hudi表超过一定数据量后,Flink作业状态后端压力很大,需要优化状态后端参数才能维持性能;同时由于Flink冷启动的时候需要遍历全表数据,大数据量也会导致Flink作业启动缓慢。因此基于简化使用的角度,针对大数据量的表,可以通过采用Bucket索引来避免状态后端的复杂调优。 如果Bucket索引+分区表的模式无法平衡Bueckt桶过大的问题,还是可以继续采用Flink状态索引,按照规范去优化对应的配置参数即可。
  • 建议 基于Flink的流式写入的表,在数据量超过2亿条记录,采用Bucket索引,2亿以内可以采用Flink状态索引。 参照Flink状态索引的特点,Hudi表超过一定数据量后,Flink作业状态后端压力很大,需要优化状态后端参数才能维持性能;同时由于Flink冷启动的时候需要遍历全表数据,大数据量也会导致Flink作业启动缓慢。因此基于简化使用的角度,针对大数据量的表,可以通过采用Bucket索引来避免状态后端的复杂调优。 如果Bucket索引+分区表的模式无法平衡Bueckt桶过大的问题,还是可以继续采用Flink状态索引,按照规范去优化对应的配置参数即可。 基于Bucket索引的表,按照单个Bucket 2GB数据量进行设计。 为了规避单个Bucket过大,建议单个Bucket的数据量不要超过2GB(该2GB是指数据内容大小,不是指数据行数也不是parquet的数据文件大小),目的是将对应的桶的Parquet文件大小控制在256MB范围内(平衡读写内存消耗和HDFS存储有效利用),因此可以看出2GB的这个限制只是一个经验值,因为不同的业务数据经过列存压缩后大小是不一样的。 为什么建议是2GB? 2GB的数据存储成列存Parquet文件后,大概的数据文件大小是150MB ~ 256MB左右。不同业务数据会有出入。而HDFS单个数据块一般会是128MB,这样可以有效的利用存储空间。 数据读写占用的内存空间都是原始数据大小(包括空值也是会占用内存的),2GB在大数据计算过程中,处于单task读写可接受范围之内。 如果是单个Bucket的数据量超过了该值范围,可能会有什么影响? 读写任务可能会出现OOM的问题,解决方法就是提升单个task的内存占比。 读写性能下降,因为单个task的处理的数据量变大,导致处理耗时变大。
  • Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
  • 初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; // 入库前去重,如果数据没有重复 该参数无需设置 set hoodie.datasource.write.operation = bulk_insert; // 指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; // 指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数 insert into dsrTable select * from srcTabble
  • 优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
  • 调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
  • 优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
  • 离线Compaction配置 对于MOR表的实时业务,通常设置在写入中同步生成compaction计划,因此需要额外通过DataArts或者脚本调度SparkSQL去执行已经产生的compaction计划。 执行参数 set hoodie.compact.inline = true; //打开compaction操作 set hoodie.run.compact.only.inline = true; //compaction只执行已生成的计划,不产生新计划 set hoodie.cleaner.commits.retained = 120; // 清理保留120个commit set hoodie.keep.max.commits = 140; // 归档最大保留140个commit set hoodie.keep.min.commits = 121; // 归档最小保留121个commit set hoodie.clean.async = false; // 打开异步清理 set hoodie.clean.automatic = false; // 关闭自动清理,防止compaction操作出发clean run compaction on $tablename; // 执行compaction计划 run clean on $tablename; // 执行clean操作清理冗余版本 run archivelog on $tablename; // 执行archivelog合并清理元数据文件 关于清理、归档参数的值不宜设置过大,会影响Hudi表的性能,通常建议: hoodie.cleaner.commits.retained = compaction所需要的commit数的2倍 hoodie.keep.min.commits = hoodie.cleaner.commits.retained + 1 hoodie.keep.max.commits = hoodie.keep.min.commits + 20 执行compaction后再执行clean和archive,由于clean和archivelog对资源要求较小,为避免资源浪费,使用DataArts调度的话可以compaction作为一个任务,clean、archive作为一个任务分别配置不同的资源执行来节省资源使用。 执行资源 Compaction调度的间隔应小于Compaction计划生成的间隔,例如1小时左右生成一个Compaction计划的话,执行Compaction计划的调度任务应该至少半小时调度一次。 Compaction作业配置的资源,vcore数至少要大于等于单个分区的桶数,vcore数与内存的比例应为1:4即1个vcore配4G内存。 父主题: Bucket调优示例
  • 基于VirtualBox使用ISO制作镜像的操作流程 本文指导用户基于VirtualBox使用ISO文件制作镜像,操作流程如下图所示: 图1 操作流程 安装VirtualBox:用户首先需要准备一台宿主机,建议使用Windows 64位操作系统,然后在该宿主机上安装VirtualBox。安装前的准备工作及详细的安装流程参见安装VirtualBox。 创建虚拟机:在VirtualBox上创建一台空虚拟机,作为镜像的原始框架。具体操作参见创建空虚拟机。 安装操作系统:通过挂载ISO文件的方式为空虚拟机安装操作系统,您希望最终的镜像是什么系统,就要在这一步准备什么系统的ISO文件。具体操作参见安装Windows操作系统。 安装软件和插件:为保证最终制作的镜像可以成功发放弹性云服务器,并且弹性云服务器运行正常,那么在制作时必须在虚拟机中安装所依赖的软件和插件,包括virtio、Cloudbase-Init、一键式重置密码插件等。具体操作参见配置虚拟机。 获取镜像文件:在VirtualBox上导出vhd格式的镜像文件,具体操作参见导出镜像文件。 注册私有镜像:将导出的vhd镜像文件上传至OBS桶,并注册为私有镜像。这样,您在创建弹性云服务器时,就可以使用该私有镜像了。具体操作参见上传镜像文件并注册镜像。
  • 资源和成本规划 表1 资源和成本规划 资源 资源说明 成本说明 VirtualBox工具 VirtualBox是一款开源免费跨平台的虚拟机软件。 VirtualBox官方下载地址:https://www.virtualbox.org/wiki/Downloads。 免费 virtio驱动 使用弹性云服务器或者外部镜像文件创建私有镜像时,必须确保操作系统中已安装virtio驱动,使新发放的云服务器支持KVM虚拟化,同时也可以提升云服务器的网络性能。 获取方式: https://fedorapeople.org/groups/virt/virtio-win/direct-downloads/archive-virtio/ 免费 ISO镜像文件 用于为新创建的空虚拟机安装操作系统,需要用户自行提供。 文件名称:Windows_server_2008_r2.iso - Cloudbase-Init工具(可选) 为了保证使用生成的镜像创建的新云服务器可以自定义配置(例如修改云服务器密码),建议您安装Cloudbase-Init工具。不安装Cloudbase-Init工具,将无法对云服务器进行自定义配置,只能使用镜像原有密码登录云服务器。 获取方式:http://www.cloudbase.it/cloud-init-for-windows-instances/。 免费 一键式重置密码插件(可选) 为了保证使用生成的镜像创建的新云服务器可以实现一键式重置密码功能,建议您安装密码重置插件CloudResetPwdAgent,可以应用一键式重置密码功能,给云服务器设置新密码。 获取方式:https://cn-south-1-cloud-reset-pwd.obs.cn-south-1.myhuaweicloud.com/windows/reset_pwd_agent/CloudResetPwdAgent.zip 免费
  • 步骤五:创建新的云服务器 账号B使用共享镜像创建新的云服务器,并验证Web网站是否可用。 在共享镜像“copy_cn-north-4_migrate_test”所在行,单击操作列的“申请服务器”。 进入弹性云服务器购买向导页面。 按需选择计费模式、可用区、规格、网络等参数,镜像保持默认值,按界面提示完成云服务器创建。 图10 选择镜像 返回云服务器列表,等待几分钟,云服务器创建成功。 图11 查看云服务器 尝试访问云服务器的Web网站,验证是否可用。 浏览器中输入http://云服务器弹性公网IP/index.html,假设为http://124.70.xxx.xxx/index.html。经验证可以正常访问,表示云服务器迁移成功,任务结束。 图12 验证Web网站
  • 方案介绍 跨账号跨区域迁移云服务器的方案为:账号A将区域A的云服务器做成私有镜像,将此私有镜像复制到同账号的区域B,再共享给账号B;账号B接受账号A的共享镜像后,使用该镜像创建新的云服务器。 例如,账号A在“华北-北京四”区域的云服务器上搭建了Web网站,想要将云服务器迁移到账号B的“华东-上海一”,操作流程如下: 图1 操作流程 步骤一:创建私有镜像 步骤二:跨区域复制镜像 步骤三:共享镜像 步骤四:接受共享镜像 步骤五:创建新的云服务器
  • 背景 服务器迁移通常有三种手段:全新部署业务、主机迁移服务、镜像迁移,如表1所示。对于华为云上云服务器的跨账号跨区域迁移,建议采用镜像迁移方式。 表1 迁移方式对比 迁移方式 说明 特点 限制条件 全新部署业务 新买华为云ECS,业务重新部署;文件、软件等重新上传;文件目录重新创建、重新赋权等。 不需要迁移(数据盘的数据需要单独迁移)。 需要重新部署业务、重新配置服务等,耗费人力、物力和时间成本。 主机迁移服务 主机迁移服务支持P2V/V2V(物理机/虚拟机迁移),可以帮您把x86物理服务器,或者私有云、公有云平台上的虚拟机迁移到华为云。 界面化操作,简单易用,只需在源端服务器安装和配置Agent、在服务端创建迁移任务,其余事情都由主机迁移服务处理。 在迁移过程中无需中断业务,支持断点续传。 待迁移服务器必须能访问公网。 镜像迁移 结合使用私有镜像的导入、跨区域复制、共享等功能,实现服务器从线下IDC、其他云厂商迁移至华为云,以及在华为云各区域各账号之间迁移。 支持vhd、vmdk、qcow2、raw、vhdx、qcow、vdi、qed、zvhd和zvhd2格式的镜像文件。 兼容SUSE、Oracle Linux、Red Hat、Ubuntu、openSUSE、CentOS、Debian、Fedora、EulerOS等多种操作系统类型。 可制作成系统盘镜像、数据盘镜像和整机镜像,可在云平台重复利用,可用于批量部署。 占用一定的本地存储空间,对镜像文件大小有限制(不能超过1TB)。
  • 修订记录 版本日期 变更说明 2024-05-13 第十四次正式发布。 修改 方案概述、安装virtio驱动,更新virtio驱动描述。 2023-08-08 第十三次正式发布。 修改 安装一键式重置密码插件(可选),删除cloudResetPwdUpdateAgent相关内容。 安装一键式重置密码插件(可选),删除cloudResetPwdUpdateAgent相关内容。 2023-07-18 第十二次正式发布。 修改 安装一键式重置密码插件(可选),插件下载链接改成https协议。 镜像服务最佳实践汇总,更新使用Packer创建私有镜像最佳实践说明。 使用Packer创建私有镜像,内容优化,openstack相关内容更新为huaweicloud。 2023-02-09 第十一次正式发布。 修改 安装Linux操作系统,补充安装Ubuntu操作系统步骤。 2022-09-26 第十次正式发布。 修改 基于VirtualBox使用ISO创建Windows镜像,优化结构。 基于VirtualBox使用ISO创建Linux镜像,优化结构。 2022-08-26 第九次正式发布。 修改 通过qemu-img工具转换镜像格式,优化结构。 通过qemu-img-hw工具转换镜像格式,优化结构。 2022-05-25 第八次正式发布。 修改 安装Cloud-Init工具,更新Cloud-Inti工具下载地址。 2021-07-31 第七次正式发布。 新增 跨账号迁移业务数据(迁移系统盘+数据盘) 2020-11-26 第六次正式发布。 新增 跨账号迁移业务数据(只迁移数据盘) 2020-06-04 第五次正式发布。 新增 通过qemu-img-hw工具转换镜像格式 跨账号跨区域迁移云服务器 2019-12-30 第四次正式发布。 新增利用ISO为镜像配置本地源章节。 2019-11-30 第三次正式发布。 Windows操作系统云服务器磁盘空间清理,优化操作步骤。 2019-07-30 第二次正式发布。 方案概述、方案概述,补充创建镜像流程图的说明。 使用Packer创建私有镜像,优化操作步骤。 2019-04-03 第一次正式发布。
共100000条