华为云用户手册

  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 参数说明 connector 是 无 String 指定要使用的连接器,这里是'datagen'。 rows-per-second 否 10000 Long 每秒生成的行数,用以控制数据发出速率。 fields.#.kind 否 random String 指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。 参数值可以是 'sequence' 或 'random',具体含义如下: random是默认的生成器,您可以通过“fields.#.max”和“fields.#.min”参数指定随机生成的最大和最小值。 当指定的字段类型为char、varchar、string时,可以同时通过“fields.#.length”字段指定长度。random是无界的生成器。 sequence生成器,您可以通过“fields.#.start”和“fields.#.end”指定序列的起始和结束值。sequence是有界的生成器,当序列数字达到结束值,读取结束。 fields.#.min 否 '#'号指定的字段类型的最小值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。 fields.#.max 否 '#'号指定的字段类型的最大值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 随机生成器的最大值,'#' 指定的字段仅适用于数字类型。 fields.#.length 否 100 Integer 当“fields.#.kind”字段为:random时有效。 随机生成器生成字符的长度,#' 指定的字段仅适用于char、varchar、string。 fields.#.start 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的起始值。 fields.#.end 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的结束值。
  • 创建表相关语法 表1 创建表相关语法 语法分类 功能描述 创建源表 DataGen源表 DWS源表 Hbase源表 JDBC源表 Kafka源表 MySQL CDC源表 Postgres CDC源表 Redis源表 Upsert Kafka源表 创建结果表 BlackHole结果表 ClickHouse结果表 DWS结果表 Elasticsearch结果表 Hbase结果表 JDBC结果表 Kafka结果表 Print结果表 Redis结果表 Upsert Kafka结果表 创建维表 DWS维表 Hbase维表 JDBC维表 Redis维表 Format Avro Canal Confluent Avro CSV Debezium JSON Maxwell Raw
  • 功能描述 BlackHole Connector允许接收所有输入记录,常用于高性能测试和UDF 输出,其不是实质性Sink。Blackhole结果表是系统内置的Connector。 例如,如果您在注册其他类型的Connector结果表时报错,但您不确定是系统问题还是结果表WITH参数错误,您可以将WITH参数修改为'connector' = 'blackhole'后,单击运行。如果不再报错,则证明系统没有问题,您需要排查确认修改WITH参数是否正确。
  • 示例 通过DataGen源表产生数据,BlackHole结果表接收传来的数据。 create table datagenSource ( user_id string, user_name string, user_age int) with ( 'connector' = 'datagen', 'rows-per-second'='1');create table blackholeSink ( user_id string, user_name string, user_age int) with ( 'connector' = 'blackhole');insert into blackholeSink select * from datagenSource;
  • 数据类型映射 目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。 除了下面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。 您可以参考 Avro 规范 获取更多有关 Avro 类型的信息。 表2 数据类型映射 Flink SQL类型 Avro类型 Avro逻辑类型 CHAR / VARCHAR / STRING string - BOOLEAN boolean - BINARY / VARBINARY bytes - DECIMAL fixed decimal TINYINT int - SMALLINT int - INT int - BIGINT long - FLOAT float - DOUBLE double - DATE int date TIME int time-millis TIMESTAMP long timestamp-millis ARRAY array - MAP(key 必须是 string/char/varchar 类型) map - MULTISET(元素必须是 string/char/varchar 类型) map - ROW record -
  • 功能描述 Canal是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf序列化消息(Canal 默认使用 protobuf)。 Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。 Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,此处应为 'canal-json'. canal-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 canal-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值是:'SQL'和'ISO-8601'。 选项 'SQL' 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。 选项 'ISO-8601' 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。 canal-json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有'FAIL', 'DROP'和 'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'canal-json.map-null-key.literal' 定义。 canal-json.map-null-key.literal 否 'null' String 当 'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 canal-json.database.include 否 (none) String 仅读取指定数据库的 changelog 记录(通过对比 Canal 记录中的 "database" 元数据字段)。 canal-json.table.include 否 (none) String 仅读取指定表的 changelog 记录(通过对比 Canal 记录中的 "table" 元数据字段)。
  • 功能描述 Avro Schema Registry (avro-confluent) 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。 当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。 当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.schema-registry.subject 参数来制定。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,这里应该是 'csv'。 csv.field-delimiter 否 , String 字段分隔符 (默认','),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。 你也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 'csv.field-delimiter' = '\u0001' 代表 0x01 字符。 csv.disable-quote-character 否 false Boolean 是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 'csv.quote-character' 不能设置。 csv.quote-character 否 ‘’ String 用于围住字段值的引号字符 (默认"). csv.allow-comments 否 false Boolean 是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。 csv.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 csv.array-element-delimiter 否 ; String 分隔数组和行元素的字符串(默认';'). csv.escape-character 否 (none) String 转义字符(默认关闭). csv.null-literal 否 (none) String 是否将 "null" 字符串转化为 null 值。
  • 功能描述 Raw format 允许读写原始(基于字节)值作为单个列。 注意: 这种格式将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。 Raw format 连接器是内置的。
  • 参数说明 表1 参数 是否必选 默认值 类型 描述 format 是 (none) String 指定要使用的格式, 这里应该是 'raw'。 raw.charset 否 UTF-8 String 指定字符集来编码文本字符串。 raw.endianness 否 big-endian String 指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅字节序。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定使用格式,这是应该使用'maxwell-json'。 maxwell-json.ignore-parse-errors 否 false Boolean 跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。 maxwell-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”:选项“SQL”将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如“2020-12-30 12” :13:14.123' 并以相同格式输出时间戳。选项'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12: 13:14.123' 并以相同格式输出时间戳。 maxwell-json.map-null-key.mode 否 'FAIL' String 在序列化地图数据的空键时指定处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”:选项“FAIL”将在遇到带有空键的地图时抛出异常。选项“DROP”将删除地图数据的空键条目。选项“LITERAL”将替换空带字符串文字的键。字符串文字由 maxwell-json.map-null-key.literal 选项定义。 maxwell-json.map-null-key.literal 否 'null' String 当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串文字以替换空键。
  • 功能描述 Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON消息。 Flink 支持将 Debezium JSON解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。
  • 参数说明 表1 参数 是否必选 默认值 是否必选 描述 format 是 (none) String 指定要使用的格式,此处应为 'debezium-json'。 debezium-json.schema-include 否 false Boolean 设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。 debezium-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 debezium-json.timestamp-format.standard 否 'SQL' String 声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。 debezium-json.map-null-key.mode 否 'FAIL' String 指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'debezium-json.map-null-key.literal' 定义。 debezium-json.map-null-key.literal 否 'null' String 当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
  • 示例 根据order_id对数据进行去重,其中proctime为事件时间属性列 SELECT order_id, user, product, number FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num FROM Orders) WHERE row_num = 1;
  • 语法说明 ROW_NUMBER(): 从第一行开始,依次为每一行分配一个唯一且连续的号码。 PARTITION BY col1[, col2...]: 指定分区的列,例如去重的键。 ORDER BY time_attr [asc|desc]: 指定排序的列。所制定的列必须为时间属性。目前仅支持proctime。升序( ASC )排列指只保留第一行,而降序排列( DESC )则指保留最后一行。 WHERE rownum = 1: Flink 需要 rownum = 1 以确定该查询是否为去重查询。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 声明使用的格式,这里应为'json'。 json.fail-on-missing-field 否 false Boolean 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,不抛出错误失败)。 json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 json.timestamp-format.standard 否 'SQL' String 声明输入和输出的TIMESTAMP和TIMESTAMP WITH LOCAL TIME ZONE 的格式。 当前支持的格式为'SQL'和'ISO-8601': 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123", 以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP WITH LOCAL TIME ZONE, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" , 以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP WITH LOCAL TIME ZONE, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。 json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法。当前支持的值有:'FAIL','DROP'和'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。 json.map-null-key.literal 否 'null' String 当 'json.map-null-key.mode' 是LITERAL的时候,指定字符串常量替换 Map 中的空 key 值。
  • 函数说明 表1 值构建函数说明 值构建函数 函数说明 ROW(value1, [, value2]*) (value1, [, value2]*) 根据一系列值创建ROW ARRAY ‘[’ value1 [, value2 ]* ‘]’ 根据一系列值创建数组 MAP ‘[’ key1, value1 [, key2, value2]* ‘]’ 根据一系列值创建MAP 其键值对为(key1, value1),(key2, value2)
  • 函数说明 表1 属性访问函数说明 值接入函数 函数说明 tableName.compositeType.field 选择单个字段,通过名称访问Apache Flink复合类型(如Tuple,POJO等)的字段并返回其值。 tableName.compositeType.* 选择所有字段,将Apache Flink复合类型(如Tuple,POJO等)和其所有直接子类型转换为简单表示,其中每个子类型都是单独的字段。
  • 函数说明 表1 集合函数说明 集合函数 函数说明 CARDINALITY(array) 返回数组中元素个数 array ‘[’ integer ‘]’ 返回数组索引为integer的元素。索引从1开始 ELEMENT(array) 返回数组中的唯一元素。 若数组为空,则返回null 若数组中元素个数大于1,则抛出异常 CARDINALITY(map) 返回map中键值对的条数 map ‘[’ key ‘]’ 返回map中key所对应的值
  • 函数说明 表1 Hash函数说明 Hash函数 函数说明 MD5(string) 返回以32个十六进制数所表示的字符串的MD5哈希值 若字符串是null,则返回null SHA1(string) 返回以40个十六进制所表示的字符串的SHA-1哈希值 若字符串是null,则返回null SHA224(string) 返回以56个十六进制数所表示的字符串的SHA-224哈希值 若字符串是null,则返回null SHA256(string) 返回以64个十六进制数所表示的字符串的SHA-256哈希值 若字符串是null,则返回null SHA384(string) 返回以96个十六进制数所表示的字符串的SHA-384哈希值 若字符串是null,则返回null SHA512(string) 返回以128个十六进制数所表示的字符串的SHA-512哈希值 若字符串是null,则返回null SHA2(string, hashLength) 返回使用SHA-2哈希函数族(SHA-224, SHA-256, SHA-384, or SHA-512)得到的哈希值 第一个参数string表示被哈希的字符串,第二个参数hashLength表示哈希值的长度(224、256、384、512) 若任意参数为null,则返回null
  • 聚合函数 聚合函数是从一组输入值计算一个结果。例如使用COUNT函数计算SQL查询语句返回的记录行数。聚合函数如表1所示。 表1 聚合函数表 函数 返回值类型 描述 COUNT([ ALL ] expression | DISTINCT expression1 [, expression2]*) BIGINT 返回表达式不为NULL的输入行数。对每个值的一个唯一实例使用DISTINCT。 COUNT(*) COUNT(1) BIGINT 返回元组个数 AVG([ ALL | DISTINCT ] expression) DOUBLE 返回所有值的平均值。 对每个值的一个唯一实例使用DISTINCT。 SUM([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的数值之和 对每个值的一个唯一实例使用DISTINCT MAX([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的最大值 MIN([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值的最小值 STDDEV_POP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的总体标准偏差 STDDEV_SAMP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本标准偏差 VAR_POP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的总体方差 VAR_SAMP([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本方差 COLLECT([ ALL | DISTINCT ] expression) MULTISET 返回所有输入值的MULTISET VARIANCE([ ALL | DISTINCT ] expression) DOUBLE 返回所有输入值之间的数字字段的样本方差 FIRST_VALUE(expression) 数据实际类型 返回有序数据中的第一个数据 LAST_VALUE(expression) 数据实际类型 返回有序数据中的最后一个数据 父主题: 内置函数
  • 操作场景 类型推导包含了验证输入值、派生参数和返回值数据类型。从逻辑角度看,Planner需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用自定义函数时需要知道如何将内部数据结构表示为JVM对象。 Flink 自定义函数实现了自动的类型推导提取,通过反射从函数的类及其求值方法中派生数据类型。然而以反射方式提取数据类型并不总是成功的,比如UDTF中常见的Row类型。 由于 Flink 1.11 起引入了新的自定义函数注册接口,使用了新的自定义函数类型推断机制,因此原先1.10 重载 getResultType 声明返回字段类型的方式将不再可用。继续使用会抛出如下异常: Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types. 目前 Flink 1.12 可以通过使用DataTypeHint 和FunctionHint 注解相关参数、类或方法来支持提取过程。
  • 使用示例 UDTF支持CROSS JOIN和LEFT JOIN,在使用UDTF时需要带上 LATERAL 和TABLE 两个关键字。 CROSS JOIN:对于左表的每一行数据,假设UDTF不产生输出,则这一行不进行输出。 LEFT JOIN:对于左表的每一行数据,假设UDTF不产生输出,这一行仍会输出,UDTF相关字段用null填充。 CREATE FUNCTION udtf_test AS 'com.huaweicompany.udf.TableFunction';-- CROSS JOININSERT INTO sink_stream select subValue, length FROM source_stream, LATERALTABLE(udtf_test(attr, ',')) as T(subValue, length);-- LEFT JOININSERT INTO sink_stream select subValue, length FROM source_stream LEFT JOINLATERALTABLE(udtf_test(attr, ',')) as T(subValue, length) ON TRUE;
  • 算术运算符 算术运算符包括双目运算符与单目运算符,这些运算符都将返回数字类型。Flink SQL所支持的算术运算符如表3所示。 表3 算术运算符 运算符 返回类型 描述 + numeric 所有数字类型 返回数字。 - numeric 所有数字类型 返回负数。 A + B 所有数字类型 A和B相加。结果数据类型与操作数据类型相关,例如一个整数类型数据加上一个浮点类型数据,结果数值为浮点类型数据。 A - B 所有数字类型 A和B相减。结果数据类型与操作数据类型相关。 A * B 所有数字类型 A和B相乘。结果数据类型与操作数据类型相关。 A / B 所有数字类型 A和B相除。结果是一个double(双精度)类型的数值。 POWER(A, B) 所有数字类型 返回A数的B次方乘幂。 ABS(numeric) 所有数字类型 返回数值的绝对值。 MOD(A, B) 所有数字类型 返回A除以B的余数(模数)。 返回值只有在A为负数时才为负数。 SQRT(A) 所有数字类型 返回A的平方根。 LN(A) 所有数字类型 返回A的自然对数(基数e)。 LOG10(A) 所有数字类型 返回A的基数10对数。 LOG2(A) 所有数字类型 返回A的基数2对数。 LOG(B) LOG(A, B) 所有数字类型 当只有一个参数,返回B的自然对数(基数e)。 当有两个参数,返回B以A为基数的对数。 B必须大于0,且A必须大于1。 EXP(A) 所有数字类型 返回e的a次方。 CEIL(A) CEILING(A) 所有数字类型 将参数向上舍入为最接近的整数。例如ceil(21.2),返回22。 FLOOR(A) 所有数字类型 对给定数据进行向下舍入最接近的整数。例如floor(21.2),返回21。 SIN(A) 所有数字类型 计算给定A的正弦值。 COS(A) 所有数字类型 计算给定A的余弦值。 TAN(A) 所有数字类型 计算给定A的正切值。 COT(A) 所有数字类型 计算给定A的余切值。 ASIN(A) 所有数字类型 计算给定A的反正弦值。 ACOS(A) 所有数字类型 计算给定A的反余弦值。 ATAN(A) 所有数字类型 计算给定A的反正切值。 ATAN2(A, B) 所有数字类型 计算给定坐标(A, B)的反正切值。 COSH(A) 所有数字类型 计算给定A的双曲余弦值。返回类型为DOUBLE。 DEGREES(A) 所有数字类型 返回弧度所对应的角度。 RADIANS(A) 所有数字类型 返回角度所对应的弧度。 SIGN(A) 所有数字类型 返回a所对应的正负号,a为正返回1,a为负,返回-1,否则返回0。 ROUND(A, d) 所有数字类型 返回小数部分,d位之后数字的四舍五入,d为int型。例如round(21.263,2),返回21.26。 PI 所有数字类型 返回pi的值。 E() 所有数字类型 返回e的值。 RAND() 所有数字类型 返回一个0.0和1.0之间的随机double类型的数(包含0.0,不包含1.0)。 RAND(A) 所有数字类型 根据初始化种子A,返回一个0.0和1.0之间的随机double类型的数(包含0.0,不包含1.0)。若初始化种子相同,则返回的随机数相同。 RAND_INTEGER(A) 所有数字类型 返回一个0和A之间的随机整数(包含0,不包含A)。 RAND_INTEGER(A, B) 所有数字类型 根据初始化种子A,返回一个0和B之间的随机整数值(包含0,不包含B) UUID() 所有数字类型 返回一个UUID字符串。 BIN(A) 所有数字类型 返回一个整数A的二进制字符串。如为null则返回null。 HEX(A) HEX(B) 所有数字类型 返回一个整数A或者字符串B的十六进制字符串。若A或B为null,则返回null。 TRUNCATE(A, d) 所有数字类型 返回保留小数点后d为小数的数字。若A或d为null,则返回null。 例如:truncate(42.345, 2) = 42.340 truncate(42.345) = 42.000 PI() 所有数字类型 返回pi的值 注意事项 字符串类型不能参与算术运算。
  • 操作场景 如果您的自定义函数需要在多个作业中使用,但对于不同作业某些参数值不同,直接在UDF中修改较为复杂。您可以在Flink OpenSource SQL编辑页面,自定义配置中配置参数pipeline.global-job-parameters,在UDF代码中获取该参数并使用。如需修改参数值,直接在FlinkOpenSource SQL编辑页面,自定义配置中修改该参数值,即可达到快速修改UDF参数值的目的。
  • 操作步骤 自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下: 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters,格式如下: pipeline.global-job-parameters=k1:v1,"k2:v1,v2",k3:"str:ing","k4:str""ing" 该配置定义了如表1的map。 表1 pipeline.global-job-parameters示例 key value k1 v1 k2 v1,v2 k3 str:ing k4 str""ing FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。 key和value之间通过冒号(:)分隔,所有key-value用逗号(,)连接。 如果key或value中含有逗号(,),则需要用双引号(")将key:value整个包围起来。参考k2。 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。参考k3。 如果key或value中含有双引号("),则需要通过连写两个双引号("")进行转义,也需要用双引号(")将key:value整个包围起来。参考k4。 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容,代码示例如下: context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table");context.getJobParameter("driver","com.mysql.jdbc.Driver");context.getJobParameter("user","user");context.getJobParameter("password","password");
  • 语法说明 COMPUTED COLUMN 计算列是一个使用 “column_name AS computed_column_expression” 语法生成的虚拟列。它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以使用 cost AS price * quantity 进行定义,这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。 在 Flink 中计算列一般用于为 CREATE TABLE 语句定义 时间属性。 处理时间属性 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义。 另一方面,由于事件时间列可能需要从现有的字段中获得,因此计算列可用于获得事件时间列。例如,原始字段的类型不是 TIMESTAMP(3) 或嵌套在 JSON 字符串中。 注意: 定义在一个数据源表( source table )上的计算列会在从数据源读取数据后被计算,它们可以在 SELECT 查询语句中使用。 计算列不可以作为 INSERT 语句的目标,在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致。
  • 语法说明 string_split(target, separator) 表1 string_split参数说明 参数 数据类型 说明 target STRING 待处理的目标字符串。 说明: 如果target为NULL,则返回一个空行。 如果target包含两个或多个连续出现的分隔符时,则返回长度为零的空子字符串。 如果target未包含指定分隔符,则返回目标字符串。 separator VARCHAR 指定的分隔符,当前仅支持单字符分割。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,只能为opentsdb。 connector.region 是 OpenTSDB服务所在的区域。 connector.tsdb-metrics 是 数据点的metric,支持参数化。 其个数为要为1或者和“connector.tsdb-values”个数相同。 多个metric请使用“;”分隔。 connector.tsdb-timestamps 是 数据点的timestamp,仅支持指定动态列。 数据类型支持int、bigint、string,仅支持数据形式。 其个数需要为1或者和“connector.tsdb-values”的个数相同。 多个timestamp请使用“;”分隔。 connector.tsdb-values 是 数据点的value,支持指定动态列或者常数值。 多个values请使用“;”分隔。 connector.tsdb-tags 是 数据点的tags,每个tags里面至少一个标签值,最多8个标签值,多个标签使用“,”分隔,支持参数化。 其个数需要为1或者和“connector.tsdb-values”的个数相同。 多个tags请使用“;”分隔。 connector.batch-insert-data-num 否 表示一次性批量写入的数据量,即数据条数,值必须为正整数,默认值为8。 connector.tsdb-link-address 是 待插入数据所属集群的OpenTSDB连接地址。
共100000条