华为云用户手册

  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 为每个 Reader 设置不同的 Server id 每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。 MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此,如果不同的作业共享相同的 Server id, 则可能导致从错误的 binlog 位置读取数据。 因此,建议通过为每个 Reader 设置不同的 Server id SQL Hints, 假设 Source 并行度为 4, 我们可以使用 SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ; 来为 4 个 Source readers 中的每一个分配唯一的 Server id。 设置 MySQL 会话超时时间 当为大型数据库创建初始一致快照时,你建立的连接可能会在读取表时碰到超时问题。你可以通过在 MySQL 侧配置 interactive_timeout 和 wait_timeout 来缓解此类问题。 interactive_timeout: 服务器在关闭交互连接之前等待活动的秒数。 更多信息请参考 MySQL documentations. wait_timeout: 服务器在关闭非交互连接之前等待活动的秒数。 更多信息请参考 MySQL documentations. 使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。 在使用无主键表时,需要注意以下两种情况: 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 度。 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定: 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。 MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。 若连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。
  • 参数说明 表2 源表参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'mysql-cdc'。 hostname 是 无 String MySQL 数据库服务器的 IP 地址或主机名。 username 是 无 String 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。 password 是 无 String 连接 MySQL 数据库服务器时使用的密码。 database-name 是 无 String 要监视的 MySQL 服务器的数据库名称。 数据库名称还支持正则表达式,以监视多个与正则表达式匹配的表。 前缀匹配:^(test).* 匹配前缀为test的数据库名,例如test1、test2等。 后缀匹配:.*[p$] 匹配后缀为p的数据库名,例如cdcp、edcp等。 特定匹配:txc 匹配具体的数据库名。 table-name 是 无 String 需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。 注意:MySQL CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后使用该正则表达式和 MySQL 数据库中表的全限定名进行正则匹配。 前缀匹配:^(test).* 匹配前缀为test的表名,例如test1、test2等。 后缀匹配:.*[p$] 匹配后缀为p的表名,例如cdcp、edcp等。 特定匹配:txc 匹配具体的表名。 port 否 3306 Integer MySQL 数据库服务器的整数端口号。 server-id 否 无 String 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408'。 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 scan.incremental.snapshot.enabled 否 true Boolean 增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多优点,包括: 在快照读取期间,Source 支持并发读取 在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint 在快照读取之前,Source 不需要数据库锁权限。 如果希望 Source 并行运行,则每个并行 Readers 都应该具有唯一的 Server id,所以 Server id 必须是类似 `5400-6400` 的范围,并且该范围必须大于并行度。 scan.incremental.snapshot.chunk.size 否 8096 Integer 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 scan.snapshot.fetch.size 否 1024 Integer 读取表快照时每次读取数据的最大条数。 scan.startup.mode 否 initial String MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。 earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取。 latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。 specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。 timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。 scan.startup.specific-offset.file 否 无 String 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 scan.startup.specific-offset.pos 否 无 Long 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 scan.startup.specific-offset.gtid-set 否 无 String 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 scan.startup.specific-offset.skip-events 否 无 Long 在指定的启动位点后需要跳过的事件数量。 scan.startup.specific-offset.skip-rows 否 无 Long 在指定的启动位点后需要跳过的数据行数量。 server-time-zone 否 无 String 数据库服务器中的会话时区, 例如: "Asia/Shanghai". 它控制 MYSQL 中的时间戳类型如何转换为字符串。 更多请参考 这里. 如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。 debezium.min.row. count.to.stream.result 否 1000 Integer 在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。 此参数确定 MySQL 连接是否将表的所有结果拉入内存(速度很快,但需要大量内存), 或者结果是否需要流式传输(传输速度可能较慢,但适用于非常大的表)。 该值指定了在连接器对结果进行流式处理之前,表必须包含的最小行数,默认值为1000。 将此参数设置为`0`以跳过所有表大小检查,并始终在快照期间对所有结果进行流式处理。 connect.timeout 否 30s Duration 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 connect.max-retries 否 3 Integer 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 connection.pool.size 否 20 Integer 连接池大小。 jdbc.properties.* 否 无 String 传递自定义 JDBC URL 属性的选项。 用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. heartbeat.interval 否 30s Duration 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 debezium.* 否 无 String 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 例如: 'debezium.snapshot.mode' = 'never'. 查看更多关于 Debezium 的 MySQL 连接器属性 scan.incremental.close-idle-reader.enabled 否 false Boolean 是否在快照结束后关闭空闲的 Reader。 此特性需要'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
  • 支持特性 增量快照读取 增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: 在快照读取期间,Source 支持并发读取, 在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint, 在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此server id的范围必须类似于 5400-6400, 且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk), 然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。 无锁算法 MySQL CDC source 使用 增量快照算法, 避免了数据库锁的使用,因此不需要 “RELOAD” 权限。 并发读取 增量快照读取提供了并行读取快照数据的能力。 全量阶段支持 checkpoint 增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。
  • 前提条件 MySQL CDC要求MySQL版本为5.6,5.7或8.0.x。 with参数中字段只能使用单引号,不能使用双引号。 该场景作业需要DLI与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 MySQL已开启了Binlog,并且binlog_row_image设置为FULL。 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。 GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
  • GROUPING SETS Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。 GROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。 对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。 SELECT supplier_id, rating, COUNT(*) AS total FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
  • ROLLUP ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。 SELECT supplier_id, rating, COUNT(*) FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY ROLLUP (supplier_id, rating)
  • CUBE CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。 例如:下面两个查询是等效的。 SELECT supplier_id, rating, product_id, COUNT(*) FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY CUBE (supplier_id, rating, product_id) SELECT supplier_id, rating, product_id, COUNT(*) FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SET ( ( supplier_id, product_id, rating ), ( supplier_id, product_id ), ( supplier_id, rating ), ( supplier_id ), ( product_id, rating ), ( product_id ), ( rating ), ( ) )
  • 语法支持类型 CHAR, VARCHAR, STRING, BOOLEAN, BINARY, VARBINARY, BYTES, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, INTERVAL, ARRAY, MULTISET, MAP, ROW, RAW 父主题: SQL语法约束与定义
  • 示例 从obs表作为数据源读取数据,输出到print connector。 CREATE TABLE obs_source( name string, num INT, `file.path` STRING NOT NULL METADATA ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://demo/sink_parquent_obs', 'format' = 'parquet', 'source.monitor-interval'='1 h' ); CREATE TABLE print ( name string, num INT, path STRING ) WITH ( 'connector' = 'print' ); insert into print select * from obs_source;
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE sink_table ( name string, num INT, p_day string, p_hour string ) partitioned by (p_day, p_hour) WITH ( 'connector' = 'filesystem', 'path' = 'obs://*** ', 'format' = 'parquet', 'source.monitor-interval'='' );
  • Format概述 Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。 Flink 支持以下格式: 表1 Flink支持格式 Formats 支持的Connectors CSV Kafka, Upsert Kafka, FileSystem JSON Kafka, Upsert Kafka, FileSystem, Elasticsearch Avro Kafka, Upsert Kafka, FileSystem Confluent Avro Kafka, Upsert Kafka Debezium Kafka, FileSystem Canal Kafka, FileSystem Maxwell Kafka, FileSystem Ogg Kafka, FileSystem Orc FileSystem Parquet FileSystem Raw Kafka, Upsert Kafka, FileSystem 父主题: Format
  • 操作步骤 自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下: 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters,格式如下: pipeline.global-job-parameters=k1:v1,"k2:v1,v2",k3:"str:ing","k4:str""ing" 该配置定义了如表1的map。 表1 pipeline.global-job-parameters示例 key value k1 v1 k2 v1,v2 k3 str:ing k4 str""ing FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。 key和value之间通过冒号(:)分隔,所有key-value用逗号(,)连接。 如果key或value中含有逗号(,),则需要用双引号(")将key:value整个包围起来。参考k2。 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。参考k3。 如果key或value中含有双引号("),则需要通过连写两个双引号("")进行转义,也需要用双引号(")将key:value整个包围起来。参考k4。 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容,代码示例如下: context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table"); context.getJobParameter("driver","com.mysql.jdbc.Driver"); context.getJobParameter("user","user"); context.getJobParameter("password","password");
  • 操作场景 如果您的自定义函数需要在多个作业中使用,但对于不同作业某些参数值不同,直接在UDF中修改较为复杂。您可以在Flink OpenSource SQL编辑页面,自定义配置中配置参数pipeline.global-job-parameters,在UDF代码中获取该参数并使用。如需修改参数值,直接在FlinkOpenSource SQL编辑页面,自定义配置中修改该参数值,即可达到快速修改UDF参数值的目的。
  • Hash函数 表1 Hash函数 Hash函数 函数说明 MD5(string) 以 32 个十六进制数字的字符串形式返回 string 的 MD5 哈希值;如果字符串为 NULL,则返回 NULL。 SHA1(string) 以 40 个十六进制数字的字符串形式返回 string 的 SHA-1 哈希值;如果字符串为 NULL,则返回 NULL。 SHA224(string) 以 56 个十六进制数字的字符串形式返回 string 的 SHA-224 哈希值;如果字符串为 NULL,则返回 NULL。 SHA256(string) 以 64 个十六进制数字的字符串形式返回 string 的 SHA-256 哈希值;如果字符串为 NULL,则返回 NULL。 SHA384(string) 以 96 个十六进制数字的字符串形式返回 string 的 SHA-384 哈希值;如果字符串为 NULL,则返回 NULL。 SHA512(string) 以 128 个十六进制数字的字符串形式返回 string 的 SHA-512 哈希值;如果字符串为 NULL,则返回 NULL。 SHA2(string, hashLength) 使用 SHA-2 系列散列函数(SHA-224,SHA-256,SHA-384 或 SHA-512)返回散列值。第一个参数字符串是要散列的字符串, 第二个参数 hashLength 是结果的位长(224,256,384 或 512)。如果 string 或 hashLength 为 NULL,则返回 NULL。 父主题: 内置函数
  • 保留关键字 一些字符串的组合已经被预留为关键字以备未来使用。 如果使用以下字符串作为字段名,请在使用时使用反引号将该字段名包起来,例如 `value`, `count` 。 A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMNS, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MODULES, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, RAW, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRING, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE 父主题: SQL语法约束与定义
  • 函数说明 表1 值构建函数说明 值构建函数 函数说明 -- implicit constructor with parenthesis (value1 [, value2]*) 返回从值列表 (value1, value2, …) 创建的行。隐式行构造函数支持任意表达式作为字段,但至少需要两个字段。 显式行构造函数可以处理任意数量的字段,但目前还不能很好地支持所有类型的字段表达式。 ARRAY ‘[’ value1 [, value2 ]* ‘]’ 返回从值列表 (value1, value2, …) 创建的数组。 MAP ‘[’ value1, value2 [, value3, value4 ]* ‘]’ 返回从键值对列表 ((value1, value2), (value3, value4), …) 创建的 map。
  • 功能描述 HiveCatalog有两个用途:作为原生Flink元数据的持久化存储,以及作为读写现有Hive元数据的接口。 Flink 的Hive 文档提供了有关设置 HiveCatalog以及访问现有 Hive 元数据的详细信息。详情参考:Apache Flink Hive Catalog HiveCatalog可以用来处理两种类型的表:Hive兼容表和通用表。 Hive兼容表是以Hive兼容的方式存储的,他们的元数据和实际的数据都在分层存储中。因此,通过flink创建的与hive兼容的表,可以通过hive查询。 Hive通用表是特定于Flink的。当使用HiveCatalog创建通用表时,只是使用HMS来持久化元数据。虽然这些表对Hive来说是可见的,但Hive不太可能理解元数据。因此,在Hive中使用这样的表会导致未定义的行为。 建议切换到Hive方言来创建Hive兼容表。如果你想用默认的方言创建Hive兼容表,确保在你的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。如果你使用Hive方言,就不需要connector属性了。了解Hive方言。
  • 注意事项 警告Hive Metastore以小写形式存储所有元数据对象名称。 如果使用相同名称的目录已经存在,那么将会抛出一个异常。 Hudi表需要使用hudi catalog。并不适用于hive catalog。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 参数说明 表1 参数说明 参数 必选 默认值 类型 描述 type 是 无 String Catalog的类型。 创建HiveCatalog时,该参数必须设置为'hive'。 hive-conf-dir 是 无 String 指向包含 hive-site.xml目录的URI。 该值固定为'hive-conf-dir' = '/opt/flink/conf' default-database 否 default String 当一个catalog被设为当前catalog时,所使用的默认当前database。
  • 示例 在Flink OpenSource SQL作业中,创建名为myhive的catalog,并使用它用于管理元数据。 CREATE CATALOG myhive WITH ( 'type' = 'hive' ,'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; create table dataGenSource( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', --每秒生成一条数据 'fields.user_id.kind' = 'random', --为字段user_id指定random生成器 'fields.user_id.length' = '3' --限制user_id长度为3 ); create table printSink( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink select * from dataGenSource; 查看default数据库中,是否含有dataGenSource、printSink 表。 Hive Metastore 以小写形式存储所有元数据对象名称。 图1 查看default数据库 使用名为myhive的catalog中的元数据,新建Flink OpenSource SQL作业。 CREATE CATALOG myhive WITH ( 'type' = 'hive' ,'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; insert into printSink select * from dataGenSource;
  • 简介 Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。 数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的UDF。 元数据也可以是持久化的,例如Hive Metastore中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。详情参考Apache Flink Catalogs
  • 值获取函数 表1 值获取函数 SQL函数 描述 tableName.compositeType.field 按名称从 Flink 复合类型(例如,Tuple,POJO)返回字段的值。 tableName.compositeType.* 返回 Flink 复合类型(例如,Tuple,POJO)的平面表示,将其每个直接子类型转换为单独的字段。在大多数情况下,平面表示 的字段与原始字段的命名类似,但使用 $ 分隔符(例如 mypojo$mytuple$f0)。 父主题: 内置函数
  • 功能描述 BlackHole Connector允许接收所有输入记录,常用于高性能测试和UDF输出,其不是实质性Sink。Blackhole结果表是系统内置的Connector。 例如,如果您在注册其他类型的Connector结果表时报错,但您不确定是系统问题还是结果表WITH参数错误,您可以将WITH参数修改为'connector' = 'blackhole'后,单击运行。如果不再报错,则证明系统没有问题,您需要排查确认修改WITH参数是否正确。 表1 支持类别 类别 详情 支持表类型 结果表
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 示例 通过DataGen源表产生数据,BlackHole结果表接收传来的数据。 create table datagenSource ( user_id string, user_name string, user_age int ) with ( 'connector' = 'datagen', 'rows-per-second'='1' ); create table blackholeSink ( user_id string, user_name string, user_age int ) with ( 'connector' = 'blackhole' ); insert into blackholeSink select * from datagenSource;
  • 注意事项 Hive方言只能用于操作Hive对象,并要求当前Catalog是一个HiveCatalog 。 Hive方言只支持db.table这种两级的标识符,不支持带有Catalog名字的标识符。更多信息请参考Apache Flink Hive Read & Write。 虽然所有Hive版本支持相同的语法,但是一些特定的功能对Hive版本有依赖,请参考Hive 版本。 例如,更新数据库位置 只在 Hive-2.4.0 或更高版本支持。 执行DML和DQL时应该使用HiveModule 。 从Flink 1.15版本开始,在使用Hive方言抛出以下异常时,请尝试用opt目录下的 flink-table-planner_2.12 jar包来替换lib目录下的flink-table-planner-loader jar包。具体原因请参考 FLINK-25128。
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • Hbase HBase连接器支持读取和写入HBase集群。本文档介绍如何使用HBase连接器基于HBase进行SQL查询。 HBase连接器在upsert模式下运行,可以使用 DDL 中定义的主键与外部系统交换更新操作消息。但是主键只能基于HBase的rowkey字段定义。如果没有声明主键,HBase连接器默认取rowkey作为主键。详情可参考HBase SQL 连接器 Hbase源表 Hbase结果表 Hbase维表 父主题: Connector列表
共100000条