华为云用户手册

  • 示例 该示例是从Kafka数据源中读取数据,并写入到Elasticsearch结果表中(本次所使用Elasticsearch版本为7.10.2),其具体步骤如下: 参考增强型跨源连接,在DLI上根据Elasticsearch和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置Elasticsearch和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Elasticsearch和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录Elasticsearch集群的Kibana,并选择Dev Tools,输入下列语句并执行,以创建值为orders的index: PUT /orders { "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } } } } 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'ElasticsearchAddress:ElasticsearchPort', 'index' = 'orders' ); insert into elasticsearchSink select * from kafkaSource; 连接Kafka集群,向kafka中插入如下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 在Elasticsearch集群的Kibana中输入下述语句并查看相应结果: GET orders/_search { "took" : 201, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "orders", "_type" : "_doc", "_id" : "fopyx4sBUuT2wThgYGcp", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "orders", "_type" : "_doc", "_id" : "f4pyx4sBUuT2wThgYGcr", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } } ] } }
  • 参数说明 表2 Elasticsearch结果表参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,固定为:elasticsearch-7。表示连接到 Elasticsearch 7.x 及更高版本集群。 hosts 是 无 String Elasticsearch所在集群的主机名,多个以';'间隔。 index 是 无 String 每条记录的 Elasticsearch 索引。可以是静态索引(例如'myIndex')或动态索引(例如'index-{log_ts|yyyy-MM-dd}')。更多详细信息,请参见下面的动态索引。 username 否 无 String Elasticsearch所在集群的账号。该账号参数需和密码“password”参数同时配置。 password 否 无 String Elasticsearch所在集群的密码。该密码参数需和“username”参数同时配置。 document-id.key-delimiter 否 _ String 复合键的分隔符(默认为"_"),例如,指定为"$"将导致文档 ID 为"KEY1$KEY2$KEY3"。 failure-handler 否 fail String 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为: fail:如果请求失败并因此导致作业失败,则抛出异常。 ignore:忽略失败并放弃请求。 retry-rejected:重新添加由于队列容量饱和而失败的请求。 自定义类名称:使用 ActionRequestFailureHandler 的子类进行失败处理。 sink.flush-on-checkpoint 否 true Boolean 在进行 checkpoint 时是否保证刷出缓冲区中的数据。 如果关闭这一选项,在进行checkpoint时 sink 将不再为所有进行 中的请求等待 Elasticsearch 的执行完成确认。因此,在这种情况下 sink 将不对至少一次的请求的一致性提供任何保证。 sink.bulk-flush.max-actions 否 1000 Interger 每个批量请求的最大缓冲操作数。可以设置'0'为禁用它。 sink.bulk-flush.max-size 否 2mb MemorySize 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为'0'来禁用它。 sink.bulk-flush.interval 否 1s Duration flush 缓冲操作的间隔。 可以设置为'0'来禁用它。 注意,'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions'都设置为'0'的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。 sink.bulk-flush.backoff.strategy 否 DISABLED String 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为: DISABLED:不执行重试,即第一次请求错误后失败。 CONSTANT:等待重试之间的回退延迟。 EXPONENTIAL:先等待回退延迟,然后在重试之间指数递增。 sink.bulk-flush.backoff.max-retries 否 无 Integer 最大回退重试次数。 sink.bulk-flush.backoff.delay 否 无 Duration 每次退避尝试之间的延迟。 对于 CONSTANT 退避策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略,该值是初始的延迟。 connection.path-prefix 否 无 String 添加到每个REST通信中的前缀字符串,例如, '/v1'。 connection.request-timeout 否 无 Duration 从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。 connection.timeout 否 无 Duration 建立请求的超时时间 。 超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。 socket.timeout 否 无 Duration 等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。 format 否 json String Elasticsearch连接器支持指定格式。该格式必须生成有效的 json 文档。默认情况下使用内置'json'格式。 请参考Format页面以获取更多详细信息和格式参数。 certificate 否 无 String Elasticsearch集群的证书在OBS中的位置。 仅在开启安全模式,且开启https下需要配置该参数。 请先在CSS管理控制台下载证书后将证书上传至OBS,该参数配置的是OBS地址。 例如:obs://bucket/path/CloudSearchService.cer
  • 功能描述 DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch 引擎的索引中。 Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。 云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。 云搜索服务的更多信息,请参见《云搜索服务用户指南》。 更多具体使用可参考开源社区文档:Elasticsearch SQL 连接器。 表1 支持类别 类别 详情 支持表类型 结果表 支持数据格式 JSON
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。 当前只支持CSS集群7.X及以上版本。 如果未开启安全模式,语法中hosts字段值以http开头。 如果开启安全模式,未开启https,需要配置用户名username、密码password,且语法中hosts字段值以http开头。 如果开启安全模式,开启https,需要配置用户名username、密码password、证书位置certificate。请注意该场景hosts字段值以https开头。 CSS集群安全组入向规则必须开启ICMP。 with参数中字段只能使用单引号,不能使用双引号。 数据类型的使用,请参考Format章节。
  • 语法格式 create table esSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'elasticsearch-7', 'hosts' = '', 'index' = '' );
  • 主键处理 Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以upsert模式工作,该模式可以消费包含UPDATE/DELETE消息的查询。 如果未定义主键,Elasticsearch sink 将以append模式工作,该模式只能消费包含INSERT消息的查询。 在Elasticsearch连接器中,主键用于计算Elasticsearch 的文档ID,文档ID为最多512字节且不包含空格的字符串。 Elasticsearch连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档ID字符串。 某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES,ROW,ARRAY,MAP 等。 如果未指定主键,Elasticsearch 将自动生成文档ID。
  • 参数说明 表2 参数说明 参数 是否必选 默认值 数据类型 参数说明 connector 是 无 String 指定要使用的连接器,这里是'datagen'。 rows-per-second 否 10000 Long 每秒生成的行数,用以控制数据发出速率。 number-of-rows 否 无 Long 生成数据的总行数。默认条件下,不限制生成数据的总行数。如果有字段生成器类型为序列生成器,则当生成数据的行数达到上限或者序列数字达到结束值时,都不会再生成数据。 fields.#.kind 否 random String 指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。 参数值可以是 'sequence' 或 'random',具体含义如下: random是默认值,表示无界的随机生成器。您可以通过“fields.#.max”和“fields.#.min”参数指定随机生成数的最大和最小值。当指定的字段类型为char、varchar、string时,可以通过“fields.#.length”参数指定长度。当指定的字段类型为时间戳类型时,可以通过“fields.#.max-past”参数指定相对当前时间向过去偏移的最大值。 sequence表示有界的序列生成器。您可以通过“fields.#.start”和“fields.#.end”指定序列的起始和结束值,当序列数字达到结束值时,就不会再生成数据。 fields.#.min 否 '#'号指定的字段类型的最小值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。 fields.#.max 否 '#'号指定的字段类型的最大值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 随机生成数的最大值,'#' 指定的字段仅适用于数字类型。 fields.#.max-past 否 0 Duration 当“fields.#.kind”字段为:random时有效。 随机生成器生成相对当前时间向过去偏移的最大值,'#' 指定的字段仅适用于时间戳类型。 fields.#.length 否 100 Integer 当“fields.#.kind”字段为:random时有效。 随机生成器生成字符的长度,'#' 指定的字段仅适用于char、varchar、string。 fields.#.start 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的起始值。 fields.#.end 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的结束值。
  • 注意事项 创建DataGen表时,表字段类型不支持Array,Map和Row复杂类型,可以通过CREATE TABLE语句中的“COMPUTED COLUMN”来进行类似功能构造。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 窗口函数简介 Apache Flink 提供3个内置的窗口表值函数:TUMBLE,HOP 和 CUMULATE。 窗口表值函数的返回值包括原生列和附加的三个指定窗口的列,分别是:“window_start”,“window_end”,“window_time”。 在批计算模式,window_time 是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型(具体哪种类型取决于输入的时间字段类型)的字段。 window_time 字段用于后续基于时间的操作,例如:其他的窗口表值函数,或者interval joins,over aggregations。 它的值总是等于 window_end - 1ms。
  • 累积窗口(CUMULATE) 功能描述 累积窗口在某些场景中非常有用,比如说提前触发的滚动窗口。例如:每日仪表盘从 00:00 开始每分钟绘制累积 UV,10:00 时 UV 就是从 00:00 到 10:00 的UV 总数。累积窗口可以简单且有效地实现它。 CUMULATE 函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素,另外,窗口的开始时间是固定的。 你可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。 例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00),[00:00, 02:00),[00:00, 03:00), …, [00:00, 24:00) 图3 累积窗口示例图
  • 窗口表值函数(Windowing TVFs) 窗口是处理无限流的核心。窗口把流分割为有限大小的 “桶”,这样就可以在其之上进行计算。 Apache Flink 提供了如下 窗口表值函数(table-valued function, 缩写TVF)把表的数据划分到窗口中: 滚动窗口 滑动窗口 累积窗口 逻辑上,每个元素可以应用于一个或多个窗口,这取决于所使用的窗口表值函数的类型。例如:滑动窗口可以把单个元素分配给多个窗口。 窗口表值函数 是 Flink 定义的多态表函数(Polymorphic Table Function,缩写PTF),PTF 是 SQL 2016 标准中的一种特殊的表函数,它可以把表作为一个参数。 窗口表值函数是分组函数(已废弃)的替代方案。窗口表值函数 更符合 SQL 标准,在支持基于窗口的复杂计算上也更强大。例如:窗口 TopN、窗口 Join。而分组窗口函数只支持窗口聚合。 更多介绍和使用请参考开源社区文档:窗口函数。
  • 参数说明 表1 参数说明 参数 是否必须 默认值 类型 描述 format 是 (none) String 指定要使用的格式,此处应为 'ogg-json'。 ogg-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 debezium-json.timestamp-format.standard 否 'SQL' String 声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601': 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。 ogg-json.map-null-key.mode 否 'FAIL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL', 'DROP' 和 'LITERAL': Option 'FAIL' 将抛出异常。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ogg-json.map-null-key.literal 定义。 ogg-json.map-null-key.literal 否 'null' String 当 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
  • 功能描述 Oracle GoldenGate (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等 Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。如果表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • 分区扫描 为了在并行 Source task 实例中加速读取数据,Flink 提供了分区扫描的特性。 如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。注意,scan.partition.lower-bound 和 scan.partition.upper-bound 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。 scan.partition.column:输入用于进行分区的列名。 scan.partition.num:分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。
  • 数据类型映射 表5 数据类型映射 DWS数据类型 Flink SQL数据类型 BOOLEAN BOOLEAN SMALLINT(INT2) SMALLSERIAL(SERIAL2) SMALLINT INTEGER SERIAL INTEGER BIGINT BIGSERIAL BIGINT REAL FLOAT4 FLOAT DOUBLE FLOAT8 DOUBLE CHAR(n) CHAR VARCHAR(n) VARCHAR DATE DATE TIMESTAMP[(p)] [WITHOUT TIME ZONE] TIMESTAMP NUMERIC[(p[,s])] DECIMAL[(p[,s])] DECIMAL
  • 键处理 当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。 在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。
  • 常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.io.IOException: unable to open JDBC writer ... Caused by: org.postgresql.util.PSQLException: The connection attempt failed. ... Caused by: java.net.SocketTimeoutException: connect timed out A:应考虑是跨源没有绑定,或者跨源没有绑定成功。 参考增强型跨源连接章节,重新配置跨源。参考DLI跨源连接DWS失败进行问题排查。
  • Lookup Cache 该连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。 默认情况下,lookup cache 是未启用的,你可以设置 lookup.cache.max-rows and lookup.cache.ttl 参数来启用。 lookup cache 的主要目的是用于提高时态表关联该 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。 当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最先添加的条目将被标记为过期。 缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。 默认情况下,flink 会缓存主键的空查询结果,你可以通过将 lookup.cache.caching-missing-key 设置为 false 来切换行为。
  • 示例 该示例是从kafka数据源中读取数据,并将DWS的表作为维表,然后将结果写入到DWS的另一张表中。 在DWS中创建相应的表,表名为area_info作为flink维表,SQL语句参考如下。 create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR); 在DWS中执行以下SQL语句,向dws_order表中插入数据。 insert into public.area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); 在DWS中创建相应的表,表名为order_detail作为flink结果表,SQL语句参考如下。 create table public.order_detail( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR); 参考增强型跨源连接,根据DWS所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置DWS的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据DWS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本。 如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource( order_id string, order_channel string, order_time String, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsLookUp ( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName', 'table-name' = 'area_info', 'username' = 'xx', 'password' = 'xx', -- dew存储的密码的key,其值为dws密码 'dew.endpoint' = 'kms.xx.myhuaweicloud.com', --使用的DEW服务所在的endpoint信息 'dew.csms.secretName' = 'xx', --DEW服务通用凭据的凭据名称 'dew.csms.decrypt.fields' = 'password', --其中password字段值需要利用DEW凭证管理,进行解密替换 'dew.csms.version' = 'v1' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName', 'table-name' = 'order_detail', 'username' = 'xx', 'password' = 'xx', -- dew存储的密码的key,其值为dws密码 'dew.endpoint' = 'kms.xx.myhuaweicloud.com', --使用的DEW服务所在的endpoint信息 'dew.csms.secretName' = 'xx', --DEW服务通用凭据的凭据名称 'dew.csms.decrypt.fields' = 'password', --其中password字段值需要利用DEW凭证管理,进行解密替换 'dew.csms.version' = 'v1' ); insert into dwsSink select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from kafkaSource as orders left join dwsLookUp for system_time as of orders.proctime as area on orders.area_id = area.area_id; 向Kafka的源表的topic中发送如下数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 登录DWS,执行如下SQL查询结果表中的数据: select * from public.order_detail; 数据结果参考如下: 202103241606060001 appShop 2021-03-24 16:06:06 200.0 180.0 2021-03-24 16:10:06 0001 Alice 330106 a1 b1 c2 d2 e1 202103251202020001 miniAppShop 2021-03-25 12:02:02 60.0 60.0 2021-03-25 12:03:00 0002 Bob 330110 a1 b1 c4 d4 e1
  • 维表参数说明 表4 维表参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String 指定使用什么类型的连接器,这里应该是'jdbc'。 url 是 无 String DWS连接地址。“url”参数中的ip地址请使用DWS的内网地址。 jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 操作的DWS表名。如果该DWS表在某schema下,则具体可以参考如果该DWS表在某schema下的说明。 driver 否 无 String 用于连接到此 URL 的 DWS 驱动类名,如果不设置,将自动从 URL 中推导。 可配置com.huawei.gauss200.jdbc.Driver。 username 否 无 String DWS数据库认证用户名,需要和'password'参数一起配置。 password 否 无 String DWS数据库认证密码,需要和'username'参数一起配置。 connection.max-retry-timeout 否 60s Duration 最大重试超时时间,以秒为单位且不应该小于 1 秒。 scan.partition.column 否 无 String 用于对输入进行分区的列名。 scan.partition.column 否 无 Integer 分区数。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。 scan.fetch-size 否 0 Integer 每次循环读取时应该从数据库中获取的行数。如果指定的值为 '0',则该配置项会被忽略。 scan.auto-commit 否 true Boolean 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。Postgres可能需要将此设置为 false 以便流化结果。 lookup.cache.max-rows 否 无 Integer lookup cache 的最大行数,如果超过该值,缓存中最先添加的条目将被标记为过期。 默认情况下,lookup cache 是未开启的。请参阅下面的Lookup Cache 部分了解更多详情。 lookup.cache.ttl 否 无 Duration lookup cache 中每一行记录的最大存活时间,如果超过该时间,缓存中最先添加的条目将被标记为过期。 默认情况下,lookup cache 是未开启的。请参阅下面的Lookup Cache部分了解更多详情。 lookup.cache.caching-missing-key 否 true Boolean 标记缓存丢失的键,默认为true lookup.max-retries 否 3 Integer 查询数据库失败的最大重试时间。
  • 结果表参数说明 表3 结果表参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String 指定使用什么类型的连接器,这里应该是'jdbc'。 url 是 无 String DWS连接地址。“url”参数中的ip地址请使用DWS的内网地址。 jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 操作的DWS表名。如果该DWS表在某schema下,则具体可以参考如果该DWS表在某schema下的说明。 driver 否 无 String 用于连接到此 URL 的 DWS 驱动类名,如果不设置,将自动从 URL 中推导。 可配置com.huawei.gauss200.jdbc.Driver。 username 否 无 String DWS数据库认证用户名,需要和'password'参数一起配置。 password 否 无 String DWS数据库认证密码,需要和'username'参数一起配置。 connection.max-retry-timeout 否 60s Duration 最大重试超时时间,以秒为单位且不应该小于 1 秒。 sink.buffer-flush.max-rows 否 100 Integer flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 sink.buffer-flush.interval 否 1s Duration flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 sink.max-retries 否 3 Integer 写入记录到数据库失败后的最大重试次数。 sink.parallelism 否 无 Integer 用于定义sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。
  • 功能描述 DLI将Flink作业从数据仓库服务(DWS)中读取数据。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。 DWS的更多信息,请参见《数据仓库服务管理指南》。 表1 支持类别 类别 详情 支持表类型 源表、维表、结果表
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsTable ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
  • 前提条件 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考创建集群。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。
  • 源表参数说明 表2 源表参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String 指定使用什么类型的连接器,这里应该是'jdbc'。 url 是 无 String DWS连接地址。“url”参数中的ip地址请使用DWS的内网地址。 jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 操作的DWS表名。如果该DWS表在某schema下,则具体可以参考如果该DWS表在某schema下的说明。 driver 否 无 String 用于连接到此 URL 的 DWS 驱动类名,如果不设置,将自动从 URL 中推导。 可配置com.huawei.gauss200.jdbc.Driver。 username 否 无 String DWS数据库认证用户名,需要和'password'参数一起配置。 password 否 无 String DWS数据库认证密码,需要和'username'参数一起配置。 connection.max-retry-timeout 否 60s Duration 最大重试超时时间,以秒为单位且不应该小于 1 秒。 scan.partition.column 否 无 String 用于对输入进行分区的列名。 scan.partition.column 否 无 Integer 分区数。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。 scan.fetch-size 否 0 Integer 每次循环读取时应该从数据库中获取的行数。如果指定的值为 '0',则该配置项会被忽略。 scan.auto-commit 否 true Boolean 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。Postgres可能需要将此设置为 false 以便流化结果。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项: hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值(其具体使用请参考注意事项和常见问题): fields:适用于所有数据类型 fields-scores:适用于sorted set数据类型 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String Redis集群的部署模式,支持standalone、master-replica、cluster。默认为standalone。 Redis实例类型不同配置的部署模式不同: 单机、主备、proxy集群实例都选择standalone, cluster实例选择cluster。 retry-count 否 5 Integer 连接redis集群的尝试次数。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 scan-keys-count 否 1000 Integer 每次扫描时读取的数量。 default-score 否 0 Double 当data-type设置为“sorted-set”时的默认score。 deserialize-error-policy 否 fail-job Enum 数据解析失败时的处理方式。枚举类型,包含以下值: fail-job:作业失败 skip-row:跳过当前数据 null-field:设置当前数据为null skip-null-values 否 true Boolean 是否跳过null。 ignore-retractions 否 false Boolean 连接器应忽略更新插入/撤回流模式下的收回消息。 key-column 否 无 String Redis 表schema的key source.parallelism 否 无 int 定义源的自定义并行度。默认情况下,如果未定义此选项,使用全局配置来的并行度。
共100000条