华为云用户手册

  • 通过查看监控信息定位Back Pressure点 Flink提供了很多的监控指标,根据这些指标可以分析任务过程中的性能状况及瓶颈。 【示例】配置采样的样本数和时间间隔: # 有效的反压结果被废弃并重新进行采样的时间,单位ms web.backpressure.refresh-interval: 60000 # 用于确定反压采样的样本数 web.backpressure.num-samples: 100 # 用于确定反压采样的间隔时间,单位ms web.backpressure.delay-between-samples: 50 可以在Job的Overview选项卡后面查看BackPressure,如下图表示采样进行中,默认情况下,大约需要5秒完成采样。 图1 采样进行中 如下图显示“OK”表示没有反压,“HIGH”表示对应SubTask被反压。 图2 无反压状态 图3 反压状态
  • 使用EXACTLY ONCE流处理语义保证端到端的一致性 流处理语义有三种:EXACTLY ONCE、AT LEAST ONCE、AT MOST ONCE。 AT MOST ONCE:无法保证数据处理的完整性,但性能相比最好。 AT LEAST ONCE:可以保证数据处理的完整性,但无法保证数据处理的准确性,性能适中。 EXACTLY ONCE:可以保证数据处理的准确性,但性能最差。 首先需要确认能否保证EXACTLY_ONCE(严格一次),因为端到端EXACTLY ONCE语义需要输入数据源的可回放(例如Kafka可回放数据),输出数据源的事务性(例如MySQL可原子性写入数据)。在无法满足这些条件的情况下,可以视情况将其降级为AT LEAST ONCE或者AT MOST ONCE。 在无法满足输入源的可回放时,只能保证AT MOST ONCE。 在无法满足输出目的的原子性写入时,只能保证AT LEAST ONCE。 【示例】API方式设置Exactly once语义: env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 【示例】资源文件方式设置Exactly once语义: # checkpoint的语义 execution.checkpointing.mode: EXACTLY_ONCE
  • 高可用性下考虑提高Checkpoint保存数 Checkpoint保存数默认是1,也就是只保存最新的Checkpoint的状态文件,当进行状态恢复时,如果最新的Checkpoint文件不可用(比如HDFS文件所有副本都损坏或者其他原因),那么状态恢复就会失败。如果设置Checkpoint保存数为2,即使最新的Checkpoint恢复失败,那么Flink会回滚到之前那一次Checkpoint的状态文件进行恢复。所以可以增加Checkpoint保存数。 【示例】配置Checkpoint文件保存数为2: state.checkpoints.num-retained: 2
  • 生产环境使用增量Rocksdb作为State Backend Flink提供了三种状态后端:MemoryStateBackend,FsStateBackend,和RocksDBStateBackend。 MemoryStateBackend是将state存储在JobManager的Java堆上,每个状态的大小不能超过akka帧的大小,且总量不能超过JobManager的堆内存大小。所以只适合于本地开发调试,或状态大小有限的一些小状态的场景。 FsStateBackend是文件系统状态后端,正常情况下将state存储在TaskManager堆内存中,当Checkpoint时将state存储在文件系统上,而JobManager内存中存储极少的元数据(高可用场景下存储在ZooKeeper)。因为文件系统的存储空间足够,适合于大状态,长窗口,或大键值状态的有状态处理任务,也适合于高可用方案。 RocksDBStateBackend是内嵌数据库后端,正常情况下state存储在RocksDB数据库中,该数据库数据放在本地磁盘上,在Checkpoint时将state存储在配置的文件系统上而JobManager内存中存储极少的元数据(高可用场景下存储在ZooKeeper),同时是唯一一个可以增量Checkpoint的状态后端,除了适合于FsStateBackend的场景,还适用于超大状态的场景。 表1 Flink状态后端 类别 MemoryStateBackend FsStateBackend RocksDBStateBackend 方式 Checkpoint数据直接返回给Master节点,不落盘 数据写入文件,将文件路径传给Master 数据写入文件,将文件路径传给Master 存储 堆内存 堆内存 Rocksdb(本地磁盘) 性能 相比最好(一般不用) 性能好 性能不好 缺点 数据量小、易丢失 容易OOM风险 需要读写、序列化、IO等耗时 是否支持增量 不支持 不支持 支持 【示例】配置RockDBStateBackend(flink-conf.yaml): state.backend: rocksdb state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
  • 对有更新操作的数据流进行聚合计算时要注意数据准确性问题 在针对更新数据进行聚合需要选择合适的解决方案,否则聚合结果会是错误的。 例如: Create table t1( id int, partid int, value int ); select partid,sum(value) from t1 group by partid; 第一批数据:[1,1,10],[2,1,11],[3,2,8] 聚合结果:[1,21],[2,8] 第二批数据:[2,1,12] //对ID=2的记录进行更新。 错误结果:[1,33],[2,8] //若是无法识别是对ID=2的数据进行了更新。 聚合结果:[1,22],[2,8] //识别为更新操作可以得到正确结果。 对于如何识别是更新数据有三种方式: 通过状态后端解决 通过状态后端存储所有原始数据,新来的数据根据状态来判断是否是更新操作,进而通过Flink聚合回撤机制实现聚合结果数据的更新。 优点:可以解决聚合准确性问题,而且对用户友好,对数据没有要求。 缺点:大数据量情况下状态后端存储的数据比较多。 通过CDC格式数据解决 CDC格式数据是指更新操作记录中会同时包含更新前数据和更新后数据。通过更新前的内容来回撤掉之前的聚合结果,通过更新后的数据更新最新的计算结果。 优点:不需要有大的状态后端存储,整体计算资源压力要小于基于状态后端的方案。 缺点:需要依赖于数据格式,常见的方式通过CDC采集工具,将数据采集到Kafka,然后Flink读Kafka数据进行计算。 通过changelog数据解决 changelog与CDC格式的数据类似,只不过存储的方式不同,CDC格式数据会将更新前和更新后的数据在一行记录,而changelog数据会将更新数据拆分成两行,一行是对更新前数据的删除操作,一行是更新后的数据插入操作记录。Flink在计算的时候会将基于更新数据的聚合结果删除,在将基于更新后数据的计算结果插入。changelog可以基于Hudi表实现,基于CDC格式的数据可以转为changelog数据存储到Hudi的MOR表的log文件中,也可以基于状态后端生成Hudi的changelog数据。 优点:可以基于湖存储实现更新数据聚合一致性保证。 缺点: Hudi的MOR表中仅在log文件中存在changelog数据,如果Flink作业计算延迟导致上游数据积压,而Hudi又清理了log文件,就会导致changelog丢失。针对这种情况需要保留版本数多一点,且给Flink作业合理的资源配置避免数据积压周期超过了清理周期。 基于状态后端生成changelog也是依赖于状态后端的,状态后端通常是会配置TTL时间的,不会永久保留。这种场景下更新操作是任意更新,没有一定时间周期限制。例如更新近一个月的数据,TTL设置大于一个月即可;若更新全部数据,就需要设置TTL为永久,不适用于大表。 目前changelog的MOR表,仅支持Flink引擎进行compaction处理,不支持Spark引擎。
  • Flink流式写Hudi表参数规范 Flink流式写Hudi表参数规范如下表所示。 表1 Flink流式写Hudi表参数规范 参数名称 是否必填 参数描述 建议值 Connector 必填 读取表类型。 hudi Path 必填 表存储的路径。 根据实际填写 hoodie.datasource.write.recordkey.field 必填 表的主键。 根据实际填写 write.precombine.field 必填 数据合并字段。 根据实际填写 write.tasks 选填 写Hudi表task并行度,默认值为4。 4 index.bootstrap.enabled 选填 Flink采用的是内存索引,需要将数据的主键缓存到内存中,保证目标表的数据唯一,因此需要配置该值,否则会导致数据重复。默认值为FALSE。Bueckt索引时不配置该参数。 TRUE write.index_bootstrap.tasks 选填 index.bootstrap.enabled开启后有效,增加任务数提升启动速度。 4 index.state.ttl 选填 索引数据保存时长,默认值为0,表示永久不失效,可根据业务调整。 0 compaction.delta_commits 选填 MOR表Compaction计划触发条件。 200 compaction.async.enabled 必填 是否开启在线压缩。将compaction操作转移到sparksql运行,提升写性能。 FALSE hive_sync.enable 选填 是否向Hive同步表信息。 True hive_sync.metastore.uris 选填 Hivemeta uri信息。 根据实际填写 hive_sync.jdbc_url 选填 Hive jdbc链接。 根据实际填写 hive_sync.table 选填 Hive的表名。 根据实际填写 hive_sync.db 选填 Hive的数据库名,默认为default。 根据实际填写 hive_sync.support_timestamp 选填 是否支持时间戳。 True changelog.enabled 选填 是否写入changelog消息。默认值为false,CDC场景填写为true。 false
  • 多流Join场景事实流表个数不超过三个 当Join表过多时,状态后端压力太大会导致端到端时延增加。 【示例】实时Join维表数3个: CREATE TABLE table1(id int, param1 string) with(...); CREATE TABLE table2(id int, param2 string) with(...); CREATE TABLE table3(id int, param3 string) with(...); CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); select o.*, t1.param1, t2.param2, t3.param3 from orders AS o JOIN table1 AS t1 ON o.order_id = t1.id JOIN table2 AS t2 ON o.order_id = t2.id JOIN table3 AS t3 ON o.order_id = t3.id;
  • 关联嵌套层级不超过三层 嵌套层级越多,回撤流的的数据量越大。 【示例】关联嵌套3层: SELECT * FROM table1 WHERE column1 IN ( SELECT column1 FROM table2 WHERE column2 IN ( SELECT column2 FROM table3 WHERE column3 = 'value' ) )
  • 维表lookup join场景维度表个数不超过五个 Hudi维度表都在TM heap中,当维表过多时heap中保存的维表数据过多,TM会不断GC,导致作业性能下降。 【示例】lookup join维表数5个: CREATE TABLE table1(id int, param1 string) with(...); CREATE TABLE table2(id int, param2 string) with(...); CREATE TABLE table3(id int, param3 string) with(...); CREATE TABLE table4(id int, param4 string) with(...); CREATE TABLE table5(id int, param5 string) with(...); CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); select o.*, t1.param1, t2.param2, t3.param3, t4.param4, t5.param5 from orders AS o JOIN table1 FOR SYSTEM_TIME AS OF o.proc_time AS t1 ON o.order_id = t1.id JOIN table2 FOR SYSTEM_TIME AS OF o.proc_time AS t2 ON o.order_id = t2.id JOIN table3 FOR SYSTEM_TIME AS OF o.proc_time AS t3 ON o.order_id = t3.id JOIN table4 FOR SYSTEM_TIME AS OF o.proc_time AS t4 ON o.order_id = t4.id JOIN table5 FOR SYSTEM_TIME AS OF o.proc_time AS t5 ON o.order_id = t5.id;
  • 实时任务接入 实时作业一般由Flink Sql或Sparkstreaming来完成,流式实时任务通常配置同步生成compaction计划,异步执行计划。 Flink SQL作业中sink端Hudi表相关配置如下: create table denza_hudi_sink ( $HUDI_SINK_SQL_REPLACEABLE$ ) PARTITIONED BY ( years, months, days ) with ( 'connector' = 'hudi', //指定写入的是Hudi表 'path' = 'obs://XXXXXXXXXXXXXXXXXX/', //指定Hudi表的存储路径 'table.type' = 'MERGE_ON_READ', //Hudi表类型 'hoodie.datasource.write.recordkey.field' = 'id', //主键 'write.precombine.field' = 'vin', //合并字段 'write.tasks' = '10', //flink写入并行度 'hoodie.datasource.write.keygenerator.type' = 'COMPLEX', //指定KeyGenerator,与Spark创建的Hudi表类型一致 'hoodie.datasource.write.hive_style_partitioning' = 'true', //使用hive支持的分区格式 'read.streaming.enabled' = 'true', //开启流读 'read.streaming.check-interval' = '60', //checkpoint间隔,单位为秒 'index.type'='BUCKET', //指定Hudi表索引类型为BUCKET 'hoodie.bucket.index.num.buckets'='10', //指定bucket桶数 'compaction.delta_commits' = '3', //compaction生成的commit间隔 'compaction.async.enabled' = 'false', //compaction异步执行关闭 'compaction.schedule.enabled' = 'true', //compaction同步生成计划 'clean.async.enabled' = 'false', //异步clean关闭 'hoodie.archive.automatic' = 'false', //自动archive关闭 'hoodie.clean.automatic' = 'false', //自动clean关闭 'hive_sync.enable' = 'true', //自动同步hive表 'hive_sync.mode' = 'jdbc', //同步hive表方式为jdbc 'hive_sync.jdbc_url' = '', //同步hive表的jdbc url 'hive_sync.db' = 'hudi_cars_byd', //同步hive表的database 'hive_sync.table' = 'byd_hudi_denza_1s_mor', //同步hive表的tablename 'hive_sync.metastore.uris' = 'thrift://XXXXX:9083 ', //同步hive表的metastore uri 'hive_sync.support_timestamp' = 'true', //同步hive表支持timestamp格式 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' //同步hive表的extractor类 ); Spark streaming写入Hudi表常用的参数如下(参数意义与上面flink类似,不再做注释): hoodie.table.name= hoodie.index.type=BUCKET hoodie.bucket.index.num.buckets=3 hoodie.datasource.write.precombine.field= hoodie.datasource.write.recordkey.field= hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.table.type= MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.clean.automatic=false hoodie.clean.async=false hoodie.archive.async=false hoodie.archive.automatic=false hoodie.compact.inline.max.delta.commits=50 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.database= hoodie.datasource.hive_sync.table= hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor 父主题: Bucket调优示例
  • 离线Compaction配置 对于MOR表的实时业务,通常设置在写入中同步生成compaction计划,因此需要额外通过DataArts或者脚本调度SparkSQL去执行已经产生的compaction计划。 执行参数 set hoodie.compact.inline = true; //打开compaction操作 set hoodie.run.compact.only.inline = true; //compaction只执行已生成的计划,不产生新计划 set hoodie.cleaner.commits.retained = 120; // 清理保留120个commit set hoodie.keep.max.commits = 140; // 归档最大保留140个commit set hoodie.keep.min.commits = 121; // 归档最小保留121个commit set hoodie.clean.async = false; // 打开异步清理 set hoodie.clean.automatic = false; // 关闭自动清理,防止compaction操作出发clean run compaction on $tablename; // 执行compaction计划 run clean on $tablename; // 执行clean操作清理冗余版本 run archivelog on $tablename; // 执行archivelog合并清理元数据文件 关于清理、归档参数的值不宜设置过大,会影响Hudi表的性能,通常建议: hoodie.cleaner.commits.retained = compaction所需要的commit数的2倍 hoodie.keep.min.commits = hoodie.cleaner.commits.retained + 1 hoodie.keep.max.commits = hoodie.keep.min.commits + 20 执行compaction后再执行clean和archive,由于clean和archivelog对资源要求较小,为避免资源浪费,使用DataArts调度的话可以compaction作为一个任务,clean、archive作为一个任务分别配置不同的资源执行来节省资源使用。 执行资源 Compaction调度的间隔应小于Compaction计划生成的间隔,例如1小时左右生成一个Compaction计划的话,执行Compaction计划的调度任务应该至少半小时调度一次。 Compaction作业配置的资源,vcore数至少要大于等于单个分区的桶数,vcore数与内存的比例应为1:4即1个vcore配4G内存。 父主题: Bucket调优示例
  • 合理设置并行度 任务运行的速度和并行度相关,一般来说提升并行度能有效提升读取的速度,但是过大的并行度可能导致部分节点资源的浪费,过小的并行度可能导致部分节点运行缓慢。对于SQL当前不能手动指定每个Task的并行度,指定的是所有Task统一的并行度。 推荐Source的并行度由上游组件推断设置,对于流系统,与上游的分区数相同(例如Kafka的Topic分区数);对于批系统,与上游的切片数相同(例如HDFS的block数量)。 Flink作业中有Source、Sink、中间计算算子的并行度可以调整。通过分析作业流图,如果发现是中间计算Busy就需要通过调整整个作业并行度来调整这类算子的并行度,常见的如join算子。
  • Flink流式读Hudi表规则 Flink流式读Hudi表参数规范如下所示。 表1 Flink流式读Hudi表参数规范 参数名称 是否必填 参数描述 示例 Connector 必填 读取表类型。 hudi Path 必填 表存储的路径。 根据实际情况填写 table.type 必填 Hudi表类型,默认值为COPY_ON_WRITE。 MERGE_ON_READ hoodie.datasource.write.recordkey.field 必填 表的主键。 根据实际填写 write.precombine.field 必填 数据合并字段。 根据实际填写 read.tasks 选填 读Hudi表task并行度,默认值为4。 4 read.streaming.enabled 必填 true:开启流式增量模式。 false:批量读。 根据实际填写,流读场景下为true read.streaming.start-commit 选填 指定 ‘yyyyMMddHHmmss’ 格式的起始commit(闭区间),默认从最新commit。 - hoodie.datasource.write.keygenerator.type 选填 上游表主键生成类型。 COMPLEX read.streaming.check-interval 选填 流读检测上游新提交的周期,默认值为1分钟。 5(流量大建议使用默认值) read.end-commit 选填 Stream增量消费,通过参数read.streaming.start-commit指定起始消费位置; Batch增量消费,通过参数read.streaming.start-commit指定起始消费位置,通过参数read.end-commit指定结束消费位置(闭区间),即包含起始、结束的commit。默认到最新commit。 - changelog.enabled 选填 是否写入changelog消息。默认值为false,CDC场景填写为true。 false 父主题: Flink流式读Hudi表规范
  • 拆分distinct聚合优化聚合中数据倾斜 通过两阶段聚合能消除常规的数据倾斜,但是处理distinct聚合时性能并不好。因为即使启动了两阶段聚合,distinct key也不能combine消除重复值,累加器中仍然包含所有的原始记录。 可以将不同的聚合(例如 COUNT(DISTINCT col))分为两个级别: 第一次聚合由group key和额外的bucket key进行shuffle。bucket key是使用HASH_CODE(distinct_key) % BUCKET_NUM计算的,BUCKET_NUM默认为1024,可以通过table.optimizer.distinct-agg.split.bucket-num选项进行配置。 第二次聚合是由原始group key进行shuffle,并使用SUM聚合来自不同buckets的COUNT DISTINCT值。由于相同的distinct key将仅在同一bucket中计算,因此转换是等效的。bucket key充当附加group key的角色,以分担group key中热点的负担。bucket key使Job具有可伸缩性来解决不同聚合中的数据倾斜/热点。 【示例】 资源文件配置: table.optimizer.distinct-agg.split.enabled: true table.optimizer.distinct-agg.split.bucket-num: 1024 查询今天有多少唯一用户登录: SELECT day, COUNT(DISTINCT user_id) FROM T GROUP BY day 自动改写查询: SELECT day, SUM(cnt) FROM( SELECT day, COUNT(DISTINCT user_id) as cnt FROM T GROUP BY day, MOD(HASH_CODE(user_id), 1024) ) GROUP BY day
  • 慎用正则表达式函数REGEXP 正则表达式是非常耗时的操作,对比加减乘除通常有百倍的性能开销,而且正则表达式在某些极端情况下可能会进入无限循环,导致作业阻塞。推荐首先使用LIKE。正则函数包括: REGEXP REGEXP_EXTRACT REGEXP_REPLACE 【示例】 使用正则表达式: SELECT * FROM table WHERE username NOT REGEXP "test|ceshi|tester' 使用like模糊查询: SELECT * FROM table WHERE username NOT LIKE '%test%' AND username NOT LIKE '%ceshi%' AND username NOT LIKE '%tester%'
  • UDF嵌套不可过长 多个UDF嵌套时表达式长度很长,Flink优化生成的代码超过64KB导致编译错误。建议UDF嵌套不超过6个。 【示例】UDF嵌套: SELECT SUM(get_order_total(order_id)) FROM orders WHERE customer_id = ( SELECT customer_id FROM customers WHERE customer_name = get_customer_name('John Doe') )
  • 聚合函数中case when语法改写成filter语法 在聚合函数中,FILTER是更符合SQL标准用于过滤的语法,并且能获得更多的性能提升。FILTER是用于聚合函数的修饰符,用于限制聚合中使用的值。 【示例】在某些场景下需要从不同维度来统计UV,如Android中的UV,iPhone中的UV,Web中的UV和总UV,这时可能会使用如下CASE WHEN语法。 修改前: SELECT day, COUNT(DISTINCT user_id) AS total_uv, COUNT(DISTINCT CASE WHEN flag IN (android', "iphone'") THEN user_id ELSE NULL END) AS app_uv, COUNT(DISTINCT CASE WHEN flag IN(wap', 'other') THEN user_id ELSE NULL END) AS web_uv FROM T GROUP BY day 修改后: SELECT day, COUNT(DISTINCT user_id) AS total_uv, COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv, COUNT(DISTINCT user_id) FILTER(WHERE flag IN ('wap', 'other'))AS web_uv FROM T GROUP BY day Flink SQL优化器可以识别相同的distinct key上的不同过滤器参数。例如示例中三个COUNT DISTINCT都在user_id列上。Flink可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小,在某些工作负载下可以获得显著的性能提升。
  • 配置多个ClickHouseBalancer实例IP 配置多个ClickHouseBalancer实例IP可以避免ClickHouseBalancer实例单点故障。相关配置(with属性)如下: 'url' = 'jdbc:clickhouse://ClickHouseBalancer实例IP1:ClickHouseBalancer端口,ClickHouseBalancer实例IP2:ClickHouseBalancer端口/default',
  • Sink表配置合适的攒批参数 攒批写参数: Flink会将数据先放入内存,到达触发条件时再flush到数据库表中。 相关配置如下: sink.buffer-flush.max-rows:攒批写ClickHouse的行数,默认100。 sink.buffer-flush.interval:攒批写入的间隔时间,默认1s。 两个条件只要有一个满足,就会触发一次sink,即到达触发条件时再flush到数据库表中。 示例1:60秒sink一次 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' 示例2:100条sink一次 'sink.buffer-flush.max-rows' = '100', 'sink.buffer-flush.interval' = '0s' 示例3:数据不sink 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '0s'
  • Kafka作为source表时必须指定“properties.group.id”配置项 【示例】以“testGroup”为用户组读取主题为“test_sink”的Kafka消息: CREATE TABLE KafkaSource( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'testGroup', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); SELECT * FROM KafkaSource;
  • Kafka作为sink表时必须指定“topic”配置项 【示例】向Kafka的“test_sink”主题插入一条消息: CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); INSERT INTO KafkaSink (`user_id`, `user_name`, `age`)VALUES ('1', 'John Smith', 35);
  • Hudi表初始化 初始化导入存量数据通常有Spark作业来完成,由于初始化数据量通常较大,因此推荐使用API方式给充足资源来完成。 对于批量初始化后需要接Flink或Spark流作业实时写入的场景,一般建议通过对上有消息进行过滤,从一个指定的时间范围开始消费来控制数据的重复接入量(例如Spark初始化完成后,Flink消费Kafka时过滤掉2小时之前的数据),如果无法对kafka消息进行过滤,则可以考虑先实时接入生成offset,再truncate table ,再历史导入,再开启实时。 初始化操作流程应遵循下面的步骤: 如果批量初始化前表里已经存在数据且没有truncate table,则会导致批量数据写成非常大的log文件,对后续compaction形成很大压力需要更多资源才能完成 Hudi表在Hive元数据中,应该会存在1张内部表(手动创建),2张外部表(写入数据后自动创建)。 2张外部表,表名_ro(用户只读合并后的parquet文件,即读优化视图表),_rt(读实时写入的最新版本数据,即实时视图表)。 父主题: Bucket调优示例
  • 为保证数据准确性将同key数据写入Kafka的同一个分区 Flink写Kafka使用fixed策略,并在写入之前根据key进行Hash。 【示例】 CREATE TABLE kafka ( f_sequence INT, f_sequence1 INT, f_sequence2 INT, f_sequence3 INT ) WITH ( 'connector' = 'kafka', 'topic' = 'yxtest123', 'properties.bootstrap.servers' = '192.168.0.104:9092', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'sink.partitioner'='fixed' ); insert into kafka select /*+ DISTRIBUTEBY('f_sequence','f_sequence1') */ * from datagen;
  • Kafka作为source表时应设置限流 本章节适用于MRS 3.3.0及以后版本。 防止上限超过流量峰值,导致作业异常带来不稳定因素。因此建议设置限流,限流上限应该为业务上线压测的峰值。 【示例】 #如下参数作用在每个并行度 'scan.records-per-second.limit' = '1000' #真实的限流流量如下 min( parallelism * scan.records-per-second.limit,partitions num * scan.records-per-second.limit)
  • 响应消息 响应参数说明请参见下表: 表2 响应参数表 参数 是否必选 类型 最大字符长度 说明 resultCode M String 6 调用结果码。 具体请参见调用结果码说明。 resultMsg O String 255 调用结果描述。 encryptType O String 3 敏感信息加密算法 1:AES256_CBC_PKCS5Padding(默认值) 2:AES128_CBC_PKCS5Padding 说明: 敏感信息加密算法是AES256_CBC_PKCS5Padding时返回值为1; 敏感信息加密算法是AES128_CBC_PKCS5Padding时返回值为2; info O InstanceInfo[] / 实例详情 InstanceInfo数据结构定义如下: 参数 是否必选 类型及范围 最大字符长度 参数说明 instanceId M String 64 实例id appInfo O AppInfo N/A 应用实例信息。 客户购买商品后,商家需要返回登录服务地址(网站地址)或免登地址供客户后续操作。 说明: SaaS商品必须向客户提供应用使用信息,包括使用地址、账号、密码等。 如可实现通过短信、邮件等其他方式发送使用信息,则接口中允许不响应;否则,必须在接口中返回应用实例信息。 如使用信息不仅包含使用地址及账号密码,可通过如下memo参数灵活返回其他使用信息或使用说明等。 appInfo数据结构定义请参见下表。 usageInfo O UsageInfo[] N/A 应用实例关联的用量信息,按需和按需套餐包实例需要返回,对应按需套餐包,需要分别返回套餐包关联的所有费用项的用量信息。 AppInfo数据结构定义如下: 参数 是否必选 类型及范围 最大字符长度 参数说明 frontEndUrl M String 512 前台地址。 客户购买商品后,可以访问的网站地址。 adminUrl O String 512 管理地址。 客户购买商品后,可以访问的管理后台地址。 userName O String 128 加密后的管理员账号。 客户购买商品后,访问商家管理后台的账号(一般为邮箱和手机号)。该值由16位iv加密向量和base编码后的用户名密文组成。 iv+base64(AES_CBC(accessKey,userName)) 需要使用Key值对账号做加密处理,加密算法以encryptType参数为准。代码示例请参见ISV Server对资源开通后的用户名和密码加密。 password O String 128 加密后的管理员初始密码。 客户购买商品后,访问商家管理后台的密码(一般由商家生成)。该值由16位iv加密向量和base编码后的密码密文组成。 iv+base64(AES_CBC(accessKey,pwd)) 需要使用Key值对密码做加密处理,加密算法以encryptType参数为准。代码示例请参见ISV Server对资源开通后的用户名和密码加密。 memo O String 1024 备注。 说明: 如果备注包含中文内容,请将中文转换成unicode编码,例如:“中文”可以转换成“\u4e2d\u6587”。 UsageInfo数据结构定义如下: 参数 是否必选 类型及范围 最大字符长度 参数说明 relatedInstanceId O String 64 关联的按需实例ID,当查询按需套餐包实例的用量数据时,还需要返回此用量对应的按需实例id,譬如,当前套餐包包含短信100条和彩信50条,则在查询此套餐包的用量扣减时需要返回两个UsageInfo信息,分别对应短信和彩信的用量信息,relatedInstanceId分别对应短信和彩信按需实例ID usageValue M Double(12,4) 20 使用量具体值,最多支持4位有效小数,对于按需实例,应该是一个总体的累积值,对于按需套餐包实例,应该是套餐包的已用用量信息 statisticalTime M String 20 使用量统计时间,取UTC时间。 格式:yyyyMMddHHmmssSSS dashboardUrl O String 512 用量详细查看看板地址。 客户购买按需或按需套餐包商品后,可以在这个平台查看具体的用量信息。 响应消息示例: { "resultCode" : "000000", "resultMsg" : "success.", "encryptType" : "1", "info" : [{ "instanceId" : "ebc28eb6-4606-4098-b4bd-c201c99a0654", "appInfo" : { "frontEndUrl" : "https://www.***.com", "adminUrl" : "https://www.*****.com/admin", "userName" : "*****", "password" : "*****", "memo" : "hvave a test, 测试!" }, "usageInfo" : [{ "relatedInstanceId" : "ebc28eb6-4606-4098-b4bd-c201c99a0654", "usageValue" : "0.12", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" } ] }, { "instanceId" : "fe28e27e-1157-4105-8592-24cc9488db10", "appInfo" : { "frontEndUrl" : "https://www.***.com", "adminUrl" : "https://www.***.com/admin", "userName" : "*****", "password" : "*****", "memo" : "hvave a test, 测试!" }, "usageInfo" : [{ "relatedInstanceId" : "fe28e27e-1157-4105-8592-24cc9488db10", "usageValue" : "2042", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" } ] }, { "instanceId" : "92df74e4-163e-4e0b-a206-d9800d33881b", "appInfo" : { "frontEndUrl" : "https://www.baidu.com", "adminUrl" : "https://www.baidu.com/admin", "userName" : "huawei", "password" : "huawei123456", "memo" : "hvave a test, 测试!" }, "usageInfo" : [{ "relatedInstanceId" : "ebc28eb6-4606-4098-b4bd-c201c99a0654", "usageValue" : "3309", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" }, { "relatedInstanceId" : "fe28e27e-1157-4105-8592-24cc9488db10", "usageValue" : "3309", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" } ] } ] }
  • 请求方法:GET 请求参数说明请参见下表: 表1 请求参数表 参数 是否必选 类型 最大字符长度 说明 authToken M String 50 安全校验令牌。 取值请参见authToken取值说明。 activity M String 20 接口请求标识,用于区分接口请求场景。 查询实例场景取值:queryInstance timeStamp M String 20 请求发起时的时间戳,取UTC时间。 格式:yyyyMMddHHmmssSSS instanceId M String 64 实例ID,支持批量,多个实例批量查询时用逗号分隔,单次最多支持100个实例查询。 testFlag O String 2 是否为调试请求。 1:调试请求 0:非调试业务 默认取值为“0”。 请求示例: https://example.isv.com?activity=queryInstance&instanceId=ebc28eb6-4606-4098-b4bd-c201c99a0654%2Cfe28e27e-1157-4105-8592-24cc9488db10%2C92df74e4-163e-4e0b-a206-d9800d33881b&testFlag=1&timeStamp=20230327065233980&authToken=Eh%2F3Ud%2BR1j3d%2FwOui5CAcvRipM8IuribvgkXfJAsTfE%3D
  • 响应消息 响应参数说明请参见下表: 表2 响应参数表 参数 是否必选 类型 最大字符长度 说明 resultCode M String 6 调用结果码。 具体请参见调用结果码说明。 resultMsg O String 255 调用结果描述。 encryptType O String 3 敏感信息加密算法 1:AES256_CBC_PKCS5Padding(默认值) 2:AES128_CBC_PKCS5Padding 说明: 敏感信息加密算法是AES256_CBC_PKCS5Padding时返回值为1; 敏感信息加密算法是AES128_CBC_PKCS5Padding时返回值为2; info O InstanceInfo[] / 实例详情 InstanceInfo数据结构定义如下: 参数 是否必选 类型及范围 最大字符长度 参数说明 instanceId M String 64 实例id appInfo O AppInfo N/A 应用实例信息。 客户购买商品后,商家需要返回登录服务地址(网站地址)或免登地址供客户后续操作。 说明: SaaS商品必须向客户提供应用使用信息,包括使用地址、账号、密码等。 如可实现通过短信、邮件等其他方式发送使用信息,则接口中允许不响应;否则,必须在接口中返回应用实例信息。 如使用信息不仅包含使用地址及账号密码,可通过如下memo参数灵活返回其他使用信息或使用说明等。 appInfo数据结构定义请参见下表。 usageInfo O UsageInfo[] N/A 应用实例关联的用量信息,按需和按需套餐包实例需要返回,对应按需套餐包,需要分别返回套餐包关联的所有费用项的用量信息 AppInfo数据结构定义如下: 参数 是否必选 类型及范围 最大字符长度 参数说明 frontEndUrl M String 512 前台地址。 客户购买商品后,可以访问的网站地址。 adminUrl O String 512 管理地址。 客户购买商品后,可以访问的管理后台地址。 userName O String 128 加密后的管理员账号。 客户购买商品后,访问商家管理后台的账号(一般为邮箱和手机号)。该值由16位iv加密向量和base编码后的用户名密文组成。 iv+base64(AES_CBC(accessKey,userName)) 需要使用Key值对账号做加密处理,加密算法以encryptType参数为准。代码示例请参见ISV Server对资源开通后的用户名和密码加密。 password O String 128 加密后的管理员初始密码。 客户购买商品后,访问商家管理后台的密码(一般由商家生成)。该值由16位iv加密向量和base编码后的密码密文组成。 iv+base64(AES_CBC(accessKey,pwd)) 需要使用Key值对密码做加密处理,加密算法以encryptType参数为准。代码示例请参见 ISV Server对资源开通后的用户名和密码加密。 memo O String 1024 备注。 说明: 如果备注包含中文内容,请将中文转换成unicode编码,例如:“中文”可以转换成“\u4e2d\u6587”。 UsageInfo数据结构定义如下: 参数 是否必选 类型及范围 最大字符长度 参数说明 relatedInstanceId O String 64 关联的按需实例ID,当查询按需套餐包实例的用量数据时,还需要返回此用量对应的按需实例id,譬如,当前套餐包包含短信100条和彩信50条,则在查询此套餐包的用量扣减时需要返回两个UsageInfo信息,分别对应短信和彩信的用量信息,relatedInstanceId分别对应短信和彩信按需实例ID usageValue M Double(12,4) 20 使用量具体值,最多支持4位有效小数,对于按需实例,应该是一个总体的累积值,对于按需套餐包实例,应该是套餐包的已用用量信息 statisticalTime M String 20 使用量统计时间,取UTC时间。 格式:yyyyMMddHHmmssSSS dashboardUrl O String 512 用量详细查看看板地址。 客户购买按需或按需套餐包商品后,可以在这个平台查看具体的用量信息。 响应消息示例: { "resultCode" : "000000", "resultMsg" : "success.", "encryptType" : "1", "info" : [{ "instanceId" : "ebc28eb6-4606-4098-b4bd-c201c99a0654", "appInfo" : { "frontEndUrl" : "https://www.***.com", "adminUrl" : "https://www.***.com/admin", "userName" : "******", "password" : "********", "memo" : "hvave a test, 测试!" }, "usageInfo" : [{ "relatedInstanceId" : "ebc28eb6-4606-4098-b4bd-c201c99a0654", "usageValue" : "0.12", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" } ] }, { "instanceId" : "fe28e27e-1157-4105-8592-24cc9488db10", "appInfo" : { "frontEndUrl" : "https://www.****.com", "adminUrl" : "https://www.*****.com/admin", "userName" : "******", "password" : "***********", "memo" : "hvave a test, 测试!" }, "usageInfo" : [{ "relatedInstanceId" : "fe28e27e-1157-4105-8592-24cc9488db10", "usageValue" : "2042", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" } ] }, { "instanceId" : "92df74e4-163e-4e0b-a206-d9800d33881b", "appInfo" : { "frontEndUrl" : "https://www.*****.com", "adminUrl" : "https://www.*****.com/admin", "userName" : "*****", "password" : "*******", "memo" : "hvave a test, 测试!" }, "usageInfo" : [{ "relatedInstanceId" : "ebc28eb6-4606-4098-b4bd-c201c99a0654", "usageValue" : "3309", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" }, { "relatedInstanceId" : "fe28e27e-1157-4105-8592-24cc9488db10", "usageValue" : "3309", "statisticalTime" : "20221101025113409", "dashboardUrl" : "https://www.baidu.com/dashboard" } ] } ] }
  • 请求方法:GET 请求参数说明请参见下表: 表1 请求参数表 参数 是否必选 类型 最大字符长度 说明 authToken M String 50 安全校验令牌。 取值请参见authToken取值说明。 activity M String 20 接口请求标识,用于区分接口请求场景。 查询实例场景取值:queryInstance timeStamp M String 20 请求发起时的时间戳,取UTC时间。 格式:yyyyMMddHHmmssSSS instanceId M String 64 实例ID,支持批量,多个实例批量查询时用逗号分隔,单次最多支持100个实例查询。 testFlag O String 2 是否为调试请求。 1:调试请求 0:非调试业务 默认取值为“0”。 请求示例: https://example.isv.com?activity=queryInstance&instanceId=ebc28eb6-4606-4098-b4bd-c201c99a0654%2Cfe28e27e-1157-4105-8592-24cc9488db10%2C92df74e4-163e-4e0b-a206-d9800d33881b&testFlag=1&timeStamp=20230327065233980&authToken=Eh%2F3Ud%2BR1j3d%2FwOui5CAcvRipM8IuribvgkXfJAsTfE%3D
  • 前提条件 发布按需计量的联营SaaS商品时,需要在该产品的生产接口服务器上开发生产系统接口,具体操作方式可参考《SaaS类商品接入指南V2.0》。 接口版本 计费类型 需要开发并调试的接口类型 V 1.0 按需付费 新购-按需 释放 资源状态变更 查询实例 租户同步 应用同步 授权用户 组织部门信息同步(增量) 组织部门信息同步(全量) 按需使用量推送(新) 按需套餐包 新购-按需 释放 资源状态变更 查询实例 租户同步 应用同步 授权用户 组织部门信息同步(增量) 组织部门信息同步(全量) 按需使用量推送(新) V2.0 按需付费、按需套餐包 创建实例 查询实例信息 更新实例 释放实例 企业同步 应用同步 用户授权同步 部门增量同步 部门全量同步 按需使用量推送(新)
  • 操作场景 SFS容量型文件系统除了支持多VPC访问,还支持跨账号跨VPC访问。 只要将其他账号使用的VPC的VPC ID添加到SFS容量型文件系统的权限列表下,且云服务器IP地址或地址段被添加至授权地址中,则实际上不同账号间的云服务器也能共享访问同一个文件系统。 更多关于VPC的信息请参见虚拟私有云 VPC。 SFS Turbo文件系统基于VPC的对等连接功能,实现跨账号访问。更多关于VPC对等连接功能信息和实现方法请参见VPC对等连接。 本章节介绍SFS容量型文件系统如何实现跨账号跨VPC访问。SFS容量型文件系统目前仅北京四支持跨账号访问功能。
共100000条