华为云用户手册

  • 功能描述 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 作为 source,upsert-kafka 连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 指定Key 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE [IF NOT EXISTS] TABLE_NAME( FIELDNAME1 FIELDTYPE1, FIELDNAME2 FIELDTYPE2) USING REDIS OPTIONS ( 'host'='xx', 'port'='xx', 'passwdauth' = 'xxx', 'encryption' = 'true', 'table'='namespace_in_redis:key_in_redis', 'key.column'= 'FIELDNAME1' ); 通配key 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE [IF NOT EXISTS] TABLE_NAME( FIELDNAME1 FIELDTYPE1, FIELDNAME2 FIELDTYPE2) USING REDIS OPTIONS ( 'host'='xx', 'port'='xx', 'passwdauth' = 'xxx', 'encryption' = 'true', 'keys.pattern'='key*:*', 'key.column'= 'FIELDNAME1' );
  • 示例 指定table 1 2 3 4 5 6 7 create table test_redis(name string, age int) using redis options( 'host' = '192.168.4.199', 'port' = '6379', 'passwdauth' = 'xxx', 'encryption' = 'true', 'table' = 'person' ); 通配table名 1 2 3 4 5 6 7 8 create table test_redis_keys_patten(id string, name string, age int) using redis options( 'host' = '192.168.4.199', 'port' = '6379', 'passwdauth' = 'xxx', 'encryption' = 'true', 'keys.pattern' = 'p*:*', 'key.column' = 'id' );
  • 常见问题 Q:MySQL CDC源表不支持定义Watermark,怎么进行窗口聚合? A:可以采用非窗口聚合的方式,即将时间字段转换成窗口值,然后根据窗口值进行GROUP BY聚合。 例如:基于上述示例,统计每分钟的订单数,脚本如下(其中order_time为string类型,表示订单的时间)。 insert into printSink select DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm'), count(*) from mysqlCdcSource group by DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm');
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'mysql-cdc'。 hostname 是 无 String MySQL数据库的IP地址或者Hostname。 username 是 无 String MySQL数据库的用户名。 password 是 无 String MySQL数据库的密码。 database-name 是 无 String 访问的数据库名称。 数据库名称支持正则表达式以读取多个数据库的数据,例如flink(.)*表示以flink开头的数据库名。 table-name 是 无 String 访问的表名。 表名支持正则表达式以读取多个表的数据,例如cdc_order(.)*表示以cdc_order开头的表名。 port 否 3306 Integer MySQL数据库的端口号。 server-id 否 5400~6000随机值 String 数据库客户端的一个数字ID,该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。 默认会随机生成一个5400~6400的值。 scan.startup.mode 否 initial String 消费数据时的启动模式。 initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。 latest-offset:在第一次启动时,不会扫描历史全量数据,直接从Binlog的末尾(最新的Binlog处)开始读取,即只读取该Connector启动以后的最新变更。 server-time-zone 否 无 String 数据库在使用的会话时区。 例如:Asia/Shanghai。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置置账号和密码。
  • 语法格式 create table mySqlCdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'mysqlHostname', 'username' = 'mysqlUsername', 'password' = 'mysqlPassword', 'database-name' = 'mysqlDatabaseName', 'table-name' = 'mysqlTableName' );
  • 前提条件 MySQL CDC要求MySQL版本为5.7或8.0.x。 该场景作业需要DLI与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。 MySQL已开启了Binlog,并且binlog_row_image设置为FULL。 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 同步数据库数据的客户端,都会有一个唯一ID,即Server ID。同一个数据库下,建议每个MySQL CDC作业配置不同的Server ID。 主要原因如下: MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量相同的Server ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。 此外,多个作业共享相同的Server ID,会导致Binlog位点错乱,多读或少读数据,因此建议每个CDC作业都配置不同的Server ID。 MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。 若连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。
  • 示例一:将amount值转换成整型 将amount值转换成整型。 insert into temp select cast(amount as INT) from source_stream; 表1 类型转换函数示例 示例 说明 示例 cast(v1 as string) 将v1转换为字符串类型,v1可以是数值类型,TIMESTAMP/DATE/TIME。 表T1: | content (INT) | | ------------- | | 5 | 语句: SELECT cast(content as varchar) FROM T1; 结果: "5" cast (v1 as int) 将v1转换为int, v1可以是数值类型或字符类。 表T1: | content (STRING) | | ------------- | | "5" | 语句: SELECT cast(content as int) FROM T1; 结果: 5 cast(v1 as timestamp) 将v1转换为timestamp类型,v1可以是字符串或DATE/TIME。 表T1: | content (STRING) | | ------------- | | "2018-01-01 00:00:01" | 语句: SELECT cast(content as timestamp) FROM T1; 结果: 1514736001000 cast(v1 as date) 将v1转换为date类型, v1可以是字符串或者TIMESTAMP。 表T1: | content (TIMESTAMP) | | ------------- | | 1514736001000 | 语句: SELECT cast(content as date) FROM T1; 结果: "2018-01-01" Flink作业不支持使用CAST将“BIGINT”转换为“TIMESTAMP”,可以使用to_timestamp进行转换。
  • 数据类型映射 HBase以字节数组存储所有数据,在读和写过程中要序列化和反序列化数据。 Flink的HBase连接器利用HBase(Hadoop) 的工具类org.apache.hadoop.hbase.util.Bytes进行字节数组和Flink数据类型转换。 Flink的HBase连接器将所有数据类型(除字符串外)null值编码成空字节。对于字符串类型,null值的字面值由null-string-literal选项值决定。 表2 数据类型映射表 Flink数据类型 HBase转换 CHAR/VARCHAR/STRING byte[] toBytes(String s) String toString(byte[] b) BOOLEAN byte[] toBytes(boolean b) boolean toBoolean(byte[] b) BINARY/VARBINARY 返回 byte[]。 DECIMAL byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b) TINYINT new byte[] { val } bytes[0] // returns first and only byte from bytes SMALLINT byte[] toBytes(short val) short toShort(byte[] bytes) INT byte[] toBytes(int val) int toInt(byte[] bytes) BIGINT byte[] toBytes(long val) long toLong(byte[] bytes) FLOAT byte[] toBytes(float val) float toFloat(byte[] bytes) DOUBLE byte[] toBytes(double val) double toDouble(byte[] bytes) DATE 从 1970-01-01 00:00:00 UTC 开始的天数,int 值。 TIME 从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。 TIMESTAMP 从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。 ARRAY 不支持 MAP/MULTISET 不支持 ROW 不支持
  • 常见问题 Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 6 A:如果HBase表中的数据是以其他方式导入的话,那么其存储是以String格式存储的,所以使用其他的数据格式将会报该错误。需要将Flink创建HBase源表中非string类型的字段的字段类型重新改为String即可。
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 若使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机IP信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 create table hbaseSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'hbase-2.2', 'table-name' = '', 'zookeeper.quorum' = '' );
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 创建HBase源表的列簇必须定义为ROW类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。 用户只需在表结构中声明查询中使用的的列簇和列限定符。除了ROW类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为HBase的rowkey,一张表中只能声明一个rowkey。rowkey字段的名字可以是任意的,如果是保留关键字,需要用反引号进行转义。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 CREATE TABLE [IF NOT EXISTS] TABLE_NAME( FIELDNAME1 FIELDTYPE1, FIELDNAME2 FIELDTYPE2) USING CSS OPTIONS ( 'es.nodes'='xx', 'resource'='type_path_in_CSS', 'pushdown'='true', 'strict'='false', 'batch.size.entries'= '1000', 'batch.size.bytes'= '1mb', 'es.nodes.wan.only' = 'true', 'es.mapping.id' = 'FIELDNAME');
  • 示例 1 2 3 4 5 6 7 8 CREATE TABLE IF NOT EXISTS dli_to_css (doc_id String, name string, age int) USING CSS OPTIONS ( es.nodes 'to-css-1174404703-LzwpJEyx.datasource.com:9200', resource '/dli_index/dli_type', pushdown 'false', strict 'true', es.nodes.wan.only 'true', es.mapping.id 'doc_id');
  • 示例 从Kafka源表获取Kafka source topic数据,通过Upsert Kafka结果表将Kafka source topic数据写入到Kafka sink topic中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE UPSERTKAFKASINK ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'json', 'value.format' = 'json' ); insert into UPSERTKAFKASINK select * from orders; 连接Kafka集群,kafka中source topic发送如下测试数据: {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 连接Kafka集群,获取kafka sink topic的数据,结果参考如下: {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  • 功能描述 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。DLI将Flink作业的输出数据以upsert的模式输出到Kafka中。 Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。 upsert-kafka连接器作为 sink,可以消费changelog 流。它会将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入(表示对应 key 的消息被删除)。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
  • 前提条件 确保已创建Kafka集群。 该场景作业需要运行在DLI的独享队列上,因此要与Kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 数据类型的使用,请参考Format章节。 Upsert Kafka始终以upsert方式工作,并且需要在 DDL 中定义主键。 默认情况下,如果启用checkpoint,Upsert Kafka sink会保证至少一次将数据插入Kafka topic。这意味着,Flink可以将具有相同key的重复记录写入Kafka topic。因此,upsert-kafka 连接器可以实现幂等写入。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'upsert-kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'key.format' = '', 'value.format' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认参数 数据类型 说明 connector 是 (none) String connector类型,对于upsert kafka,需配置为'upsert-kafka'。 topic 是 (none) String Kafka topic名。 properties.bootstrap.servers 是 (none) String Kafka brokers地址,以逗号分隔。 key.format 是 (none) String 用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 key.fields-prefix 否 (none) String 为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。 默认情况下,前缀为空。如果定义了自定义前缀,则表架构 和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。 value.format 是 (none) String 用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 value.fields-include 否 ‘ALL’ String 控制哪些字段应该出现在value中。可取值: ALL:消息的value 部分将包含schema 的所有字段,包括定义中键的字段。 EXCEPT_KEY:记录的value 部分包含schema 的所有内容,定义为主键的字段除外。 sink.parallelism 否 (none) Interger 定义upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。 properties.* 否 (none) String 该选项可以传递任意的 Kafka 参数。 选项的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。 例如:你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。 但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。 ssl_auth_name 否 无 String DLI侧创建的Kafka_SSL类型的跨源认证名称。Kafka配置SSL时使用该配置。 注意:若仅使用SSL类型,则需要同时配置'properties.security.protocol '= 'SSL'; 若使用SASL_SSL类型,则需要同时配置'properties.security.protocol' = 'SASL_SSL'、'properties.sasl.mechanism' = 'GSSAPI或者PLAIN'、'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";' krb_auth_name 否 无 String DLI侧创建的Kerberos类型的跨源认证名称。Kafka配置SASL认证时使用该配置。 注意:如果使用SASL_PLAINTEXT类型,且使用Kerberos认证,则需要同时配置'properties.sasl.mechanism' = 'GSSAPI'和'properties.security.protocol' = 'SASL_PLAINTEXT'
  • 语法格式 1 2 3 4 5 6 7 CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name1 col_type1 [COMMENT col_comment1], ...)] USING file_format [OPTIONS (key1=val1, key2=val2, ...)] [PARTITIONED BY (col_name1, col_name2, ...)] [COMMENT table_comment] [AS select_statement];
  • 参数说明 表1 参数描述 参数 是否必选 描述 db_name 否 Database名称。 由字母、数字和下划线(_)组成。不能是纯数字,且不能以数字和下划线开头。 table_name 是 Database中的表名。 由字母、数字和下划线(_)组成。不能是纯数字,且不能以数字和下划线开头。匹配规则为:^(?!_)(?![0-9]+$)[A-Za-z0-9_$]*$。 特殊字符需要使用单引号('')包围起来。 表名对大小写不敏感,即不区分大小写。 col_name 是 以逗号分隔的带数据类型的列名。 列名由字母、数字和下划线(_)组成。不能是纯数字,且至少包含一个字母。 列名为大小写不敏感,即不区分大小写。 col_type 是 列字段的数据类型。数据类型为原生类型。 请参考原生数据类型。 col_comment 否 列字段描述。仅支持字符串常量。 file_format 是 DLI表数据存储格式,支持:parquet和orc格式。 table_comment 否 表描述。仅支持字符串常量。 select_statement 否 用于CTAS命令,将源表的select查询结果或某条数据插入到新创建的DLI表中。 表2 OPTIONS参数描述 参数 是否必选 描述 默认值 multiLevelDirEnable 否 是否迭代查询子目录中的数据。当配置为true时,查询该表时会迭代读取该表路径中所有文件,包含子目录中的文件。 false compression 否 指定压缩格式。一般为parquet格式时指定该参数,推荐使用'zstd'压缩格式。 -
  • 示例5:创建DLI分区表,自定义表的OPTIONS参数 示例说明:创建DLI表时支持自定义属性名与属性值,OPTIONS参数说明可参考表2。 本例创建名为table3并以col_2为分区依据的DLI分区表。在OPTIONS中配置pmultiLevelDirEnable和compression。 multiLevelDirEnable:本例设置为true,表示查询该表时会迭代读取表路径中的所有文件和子目录文件,若不需要此项配置可以设置为false或不设置(默认为false); compression:当创建的OBS表需要压缩时,可以使用compression关键字来配置压缩格式,本例中就使用了zstd压缩格式。 1 2 3 4 5 6 7 8 9 10 CREATE TABLE IF NOT EXISTs table3 ( col_1 STRING, col_2 int ) USING parquet PARTITIONED BY (col_2) OPTIONS ( multiLeveldirenable = true, compression = 'zstd' );
  • 注意事项 CTAS建表语句不能指定表的属性。 若没有指定分隔符,则默认为逗号(,)。 关于分区表的使用说明: 创建分区表时,PARTITIONED BY中指定分区列必须是表中的列,且必须在Column列表中指定类型。分区列只支持string, boolean, tinyint, smallint, short, int, bigint, long, decimal, float, double, date, timestamp类型。 创建分区表时,分区字段必须是表字段的最后一个字段或几个字段,且多分区字段的顺序也必须对应。否则将出错。 单表分区数最多允许7000个。 CTAS建表语句不支持创建分区表。
  • 示例2:创建DLI分区表 示例说明:创建一个名为student的分区表,该分区表使用院系编号(facultyNo)和班级编号(classNo)进行分区,该student表会同时按照不同的院系编号(facultyNo)和不同的班级编号(classNo)分区。 在实际的使用过程中,您可以选择合适的分区字段并将其添加到PARTITIONED BY关键字后。 1 2 3 4 5 6 7 CREATE TABLE IF NOT EXISTS student ( Name STRING, facultyNo INT, classNo INT ) USING orc PARTITIONED BY (facultyNo, classNo);
  • 批作业SQL语法概览 本章节介绍了目前DLI所提供的Spark SQL语法列表。参数说明,示例等详细信息请参考具体的语法说明。 表1 批作业SQL语法 语法分类 操作链接 数据库相关语法 创建数据库 删除数据库 查看指定数据库 查看所有数据库 创建OBS表相关语法 使用DataSource语法创建OBS表 使用Hive语法创建OBS表 创建DLI表相关语法 使用DataSource语法创建DLI表 使用Hive语法创建DLI表 删除表相关语法 删除表 查看表相关语法 查看所有表 查看建表语句 查看表属性 查看指定表所有列 查看指定表所有分区 查看表统计信息 修改表相关语法 添加列 分区表相关语法 添加分区(只支持OBS表) 重命名分区 删除分区 修改表分区位置(只支持OBS表) 更新表分区信息(只支持OBS表) 导入数据相关语法 导入数据 插入数据相关语法 插入数据 清空数据相关语法 清空数据 导出查询结果相关语法 导出查询结果 跨源连接HBase表相关语法 创建表关联HBase 插入数据至HBase表 查询HBase表 跨源连接OpenTSDB表相关语法 创建表关联OpenTSDB 插入数据至OpenTSDB 查询OpenTSDB表 跨源连接DWS表相关语法 创建表关联DWS 插入数据至DWS表 查询DWS表 跨源连接RDS表相关语法 创建表关联RDS 插入数据至RDS表 查询RDS表 跨源连接CSS表相关语法 创建表关联CSS 插入数据至CSS表 查询CSS表 跨源连接DCS表相关语法 创建表关联DCS 插入数据至DCS表 查询DCS表 跨源连接DDS表相关语法 创建表关联DDS 插入数据至DDS表 查询DDS表 视图相关语法 创建视图 删除视图 查看计划相关语法 查看计划 数据权限相关语法 创建角色 删除角色 绑定角色 解绑角色 显示角色 分配权限 回收权限 显示已授权限 显示所有角色和用户的绑定关系 自定义函数相关语法 创建函数 删除函数 显示函数详情 显示所有函数 数据多版本相关语法 创建OBS表时开启数据多版本 修改表时开启或关闭数据多版本 设置多版本备份数据保留周期 查看多版本备份数据 恢复多版本备份数据 配置多版本过期数据回收站 清理多版本数据 父主题: Spark SQL语法参考(即将下线)
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'postgres-cdc'。 hostname 是 无 String Postgres数据库的IP地址或者Hostname。 username 是 无 String Postgres数据库用户名。 password 是 无 String Postgres数据库服务的密码。 database-name 是 无 String 数据库名称。 schema-name 是 无 String Postgres Schema名称。 Schema名称支持正则表达式以读取多个Schema的数据,例如test(.)*表示以test开头的所有schema。 table-name 是 无 String Postgres表名。 表名支持正则表达式去读取多个表的数据,例如cdc_order(.)*表示以cdc_order开头的所有表。 port 否 5432 Integer Postgres数据库服务的端口号。 decoding.plugin.name 否 decoderbufs String 根据Postgres服务上安装的插件确定。支持的插件列表如下: decoderbufs(默认值) wal2json wal2json_rds wal2json_streaming wal2json_rds_streaming pgoutput debezium.* 否 无 String 更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见配置属性。 建议每个表都设置debezium.slot.name参数,以避免出现 “PSQLException: ERROR: replication slot "debezium" is active for PID 974”报错。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置账号和密码。
共100000条