华为云用户手册

  • 功能描述 FileSystem sink用于将数据输出到分布式文件系统HDFS或者对象存储服务OBS等文件系统。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的Part文件。完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。即桶中将包含一个小时间隔内接收到的记录。 桶目录中的数据被拆分成多个Part文件。对于相应的接收数据的桶的Sink的每个Subtask,每个桶将至少包含一个Part文件。将根据配置的滚动策略来创建其他Part文件。对于Row Formats默认的策略是根据Part文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。对于Bulk Formats在每次创建Checkpoint时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。 在STREAMING模式下使用FileSink需要开启Checkpoint功能。Part文件只在Checkpoint成功时生成。如果没有开启Checkpoint功能,文件将永远停留在in-progress或者pending的状态,并且下游系统将不能安全读取该文件数据。 sink end算子的接受记录数为checkpoint的个数,非实际的发送数据,实际发送数据量请参考streaming-writer或StreamingFileWriter算子的记录数。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 固定位filesystem。 path 是 无 String OBS路径。 format 是 无 String 文件格式。 支持csv、parquet格式。 sink.rolling-policy.file-size 否 128MB MemorySize 单个part文件最大大小,超过该数值会滚动产生新文件。 说明: RollingPolicy 定义了何时关闭给定的In-progress Part文件,并将其转换为Pending状态,然后在转换为Finished状态。 Finished状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在STREAMING模式下,滚动策略结合Checkpoint间隔(到下一个Checkpoint成功时,文件的Pending状态才转换为Finished状态)共同控制Part文件对下游readers是否可见以及这些文件的大小和数量。 sink.rolling-policy.rollover-interval 否 30 min Duration 单个Part文件处于打开状态的最长时间,超过该时间会滚动产生新文件(默认值30分钟,以避免产生大量小文件)。检查频率是通过sink.rolling-policy.check-interval参数控制的。 说明: 该参数数字与单位之间必须要有空格。 支持的时间单位包括: d,h,min,s,ms等。 对于bulk格式的文件(parquet、orc、avro),checkpoint的时间间隔也会控制单个part文件打开的最长时间。 sink.rolling-policy.check-interval 否 1 min Duration 基于时间的滚动策略的检查间隔。 该属性控制了基于sink.rolling-policy.rollover-interval属性检查文件是否该被滚动的检查频率。 auto-compaction 否 false Boolean 在流式 sink 中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该checkpoint产生的临时文件会被合并。 compaction.file-size 否 `sink.rolling-policy.file-size`的大小 MemorySize 合并目标文件大小,默认值为滚动文件大小。 说明: 只有在同个checkpoint内的文件会被合并,因此最终文件的数量至少等于checkpoint的数量。 如果合并时间较长,可能会引起反压,延长checkpoint所需时间。 开启该功能后,checkpoint时会产生最终文件,并打开新的文件接收下个checkpoint产生的数据。
  • IP地理函数 当前仅支持IPV4的IP地址。 表6 IP地理函数表 函数 返回值 说明 IP_TO_COUNTRY STRING 获取IP地址所在的国家名称。 IP_TO_PROVINCE STRING 获取IP地址所在的省份。 用法说明: IP_TO_PROVINCE(STRING ip):返回IP地址所在的省份。 IP_TO_PROVINCE(STRING ip, STRING lang):以指定语言返回IP地址所在的省份。 说明: 当IP无法被解析到省份时,返回该IP所属的国家。当IP无法被解析时,返回“未知”。 函数返回的省份名称均为简称。 中文参考如下链接:http://www.gov.cn/guoqing/2005-09/13/content_5043917.htm IP_TO_CITY STRING 获取IP地址所在的城市名称。 说明: 当IP无法被解析到城市时,返回该IP所属的省份或者国家。当IP无法被解析时,返回“未知”。 IP_TO_CITY_GEO STRING 获取IP地址所在城市的经纬度,格式为“纬度,经度”。 用法说明: IP_TO_CITY_GEO(STRING ip):返回IP所在城市的经纬度。
  • 函数说明 基本地理空间几何元素介绍说明如表1所示。 表1 基本地理空间几何元素表 地理空间几何元素(统称geometry) 说明 举例 ST_POINT(latitude, longitude) 地理点,包含经度和维度两个信息。 ST_POINT(1.12012, 1.23401) ST_LINE(array[point1...pointN]) 地理线,由多个地理点(ST_POINT)按顺序连接成的折线或直线。 ST_LINE(ARRAY[ST_POINT(1.12, 2.23), ST_POINT(1.13, 2.44), ST_POINT(1.13, 2.44)]) ST_POLYGON(array[point1...point1]) 地理多边形,由首尾相同的多个地理点(ST_POINT)按顺序连线围成的封闭多边形区域。 ST_POLYGON(ARRAY[ST_POINT(1.0, 1.0), ST_POINT(2.0, 1.0), ST_POINT(2.0, 2.0), ST_POINT(1.0, 1.0)]) ST_CIRCLE(point, radius) 地理圆形,由圆心地理点(ST_POINT)和半径构成的地理圆形区域。 ST_CIRCLE(ST_POINT(1.0, 1.0), 1.234) 用户可以以基本地理空间几何元素为基础,构造复杂的地理空间几何元素,具体的变换方法见表2。 表2 基于基本地理空间几何元素构造复杂几何元素的变换表 变换方法 说明 举例 ST_BUFFER(geometry, distance) 创建一个环绕包含给定地理空间几何元素的多边形,并以给定距离作为环绕距离,通常使用该函数构造一定宽度的公路范围用于偏航检测。 ST_BUFFER(ST_LINE(ARRAY[ST_POINT(1.12, 2.23), ST_POINT(1.13, 2.44), ST_POINT(1.13, 2.44)]),1.0) ST_INTERSECTION(geometry, geometry) 创建一个多边形,其范围为给定的两个地理空间几何元素的交叠区域。 ST_INTERSECTION(ST_CIRCLE(ST_POINT(1.0, 1.0), 2.0), ST_CIRCLE(ST_POINT(3.0, 1.0), 1.234)) ST_ENVELOPE(geometry) 创建一个包含给定的地理空间几何元素的最小矩形。 ST_ENVELOPE(ST_CIRCLE(ST_POINT(1.0, 1.0), 2.0)) DLI提供丰富的对地理空间几何元素的操作和位置判断函数,具体的SQL标量函数介绍说明见表3。 表3 SQL标量函数表 函数 返回值 说明 ST_DISTANCE(point_1, point_2) DOUBLE 计算两个地理点之间的欧几里得距离。 示例如下: Select ST_DISTANCE(ST_POINT(x1, y1), ST_POINT(x2, y2)) FROM input ST_GEODESIC_DISTANCE(point_1, point_2) DOUBLE 计算两个地理点之间的测地距离,即两个地理点之间地表最短路径距离。 示例如下: Select ST_GEODESIC_DISTANCE(ST_POINT(x1, y1), ST_POINT(x2, y2)) FROM input ST_PERIMETER(polygon) DOUBLE 计算多边形的周长。 示例如下: Select ST_PERIMETER(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]) FROM input ST_AREA(polygon) DOUBLE 计算多边形区域的面积。 示例如下: Select ST_AREA(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]) FROM input ST_OVERLAPS(polygon_1, polygon_2) BOOLEAN 判断一个多边形是否与另一个多边形有重叠区域。 示例如下: SELECT ST_OVERLAPS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_INTERSECT(line1, line2) BOOLEAN 检查两条线段是否相互交叉,而非线条所在的直线是否交叉。 示例如下: SELECT ST_INTERSECT(ST_LINE(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12)]), ST_LINE(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23)])) FROM input ST_WITHIN(point, polygon) BOOLEAN 一个点是否包含在几何体(多边形或圆形)内。 示例如下: SELECT ST_WITHIN(ST_POINT(x11, y11), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_CONTAINS(polygon_1, polygon_2) BOOLEAN 判断第一个几何体是否包含第二个几何体。 示例如下: SELECT ST_CONTAINS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_COVERS(polygon_1, polygon_2) BOOLEAN 第一个几何体是否覆盖第二个几何体。与ST_CONTAINS相似,但在边界重叠情况下ST_COVER判断为TRUE,ST_CONTAINS判断为FALSE。 示例如下: SELECT ST_COVERS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON([ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_DISJOINT(polygon_1, polygon_2) BOOLEAN 判断一个多边形是否与另一个多边形不相交(不重叠)。 示例如下: SELECT ST_DISJOINT(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input 地理函数的基准坐标系标准为全球通用的GPS坐标系标准WGS84,GPS坐标不能直接在百度地图(BD09标准)或者google地图(GCJ02标准)上使用,会有偏移现象,为了在不同地理坐标系之间切换,DLI提供了坐标系转换的一系列函数,并且还提供地理距离与米之间的转换函数。详见表4。 表4 地理坐标系转换函数与距离单位转换函数表 函数 返回值 说明 WGS84_TO_BD09(geometry) 对应的百度地图坐标系地理空间几何元素 将GPS坐标系下的地理空间几何元素转换成百度地图坐标系下对应的地理空间几何元素。示例如下: WGS84_TO_BD09(ST_CIRCLE(ST_POINT(x, y), r)) WGS84_TO_CJ02(geometry) 对应的Google地图坐标系地理空间几何元素 将GPS坐标系下的地理空间几何元素转换成Google地图坐标系下对应的地理空间几何元素。示例如下: WGS84_TO_CJ02(ST_CIRCLE(ST_POINT(x, y), r)) BD09_TO_WGS84(geometry) 对应的GPS坐标系地理空间几何元素 将百度地图坐标系下的地理空间几何元素转换成GPS坐标系下对应的地理空间几何元素。示例如下: BD09_TO_WGS84(ST_CIRCLE(ST_POINT(x, y), r)) BD09_TO_CJ02(geometry) 对应的Google地图坐标系地理空间几何元素 将百度地图坐标系下的地理空间几何元素转换成Google地图坐标系下对应的地理空间几何元素。示例如下: BD09_TO_CJ02(ST_CIRCLE(ST_POINT(x, y), r)) CJ02_TO_WGS84(geometry) 对应的GPS坐标系地理空间几何元素 将Google地图坐标系下的地理空间几何元素转换成GPS坐标系下对应的地理空间几何元素。示例如下: CJ02_TO_WGS84(ST_CIRCLE(ST_POINT(x, y), r)) CJ02_TO_BD09(geometry) 对应的百度地图坐标系地理空间几何元素 将Google地图坐标系下的地理空间几何元素转换成百度地图坐标系下对应的地理空间几何元素。示例如下: CJ02_TO_BD09(ST_CIRCLE(ST_POINT(x, y), r)) DEGREE_TO_METER(distance) DOUBLE 将地理函数的距离数值转换成以“米”为单位的数值。示例如下(以米为单位计算地理三边形周长): DEGREE_TO_METER(ST_PERIMETER(ST_POLYGON(ARRAY[ST_POINT(x1,y1), ST_POINT(x2,y2), ST_POINT(x3,y3), ST_POINT(x1,y1)]))) METER_TO_DEGREE(numerical_value) DOUBLE 将以“米”为单位的数值转换成地理函数可计算的距离单位数值。示例如下(画出以指定地理点为圆心,半径1公里的圆): ST_CIRCLE(ST_POINT(x,y), METER_TO_DEGREE(1000)) DLI还提供了基于窗口的SQL地理聚合函数用于SQL逻辑涉及窗口和聚合的场景。详见表5的介绍说明。 表5 时间相关SQL地理聚合函数表 函数 说明 举例 AGG_DISTANCE(point) 距离聚合函数,用于计算窗口内所有相邻地理点的距离总和。 SELECT AGG_DISTANCE(ST_POINT(x,y)) FROM input GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY) AVG_SPEED(point) 平均速度聚合函数,用于计算窗口内所有地理点组成的移动轨迹的平均速度,单位为“米/秒”。 SELECT AVG_SPEED(ST_POINT(x,y)) FROM input GROUP BY TUMBLE(proctime, INTERVAL '1' DAY)
  • 示例 偏航检测样例: 1 2 3 4 INSERT INTO yaw_warning SELECT "The car is yawing" FROM driver_behavior WHERE NOT ST_WITHIN(ST_POINT(cast(Longitude as DOUBLE), cast(Latitude as DOUBLE)), ST_BUFFER(ST_LINE(ARRAY[ST_POINT(34.585555,105.725221),ST_POINT(34.586729,105.735974),ST_POINT(34.586492,105.740538),ST_POINT(34.586388,105.741651),ST_POINT(34.586135,105.748712),ST_POINT(34.588691,105.74997)]),0.001));
  • 语法格式 1 2 3 4 5 6 7 create table printSink ( attr_name attr_type (',' attr_name attr_type) * (',' PRIMARY KEY (attr_name,...) NOT ENFORCED) ) with ( 'connector' = 'print', 'print-identifier' = '', 'standard-error' = '' );
  • 示例 示例1:禁止表test_lifecycle的生命周期功能。 1 alter table test_lifecycle SET TBLPROPERTIES("dli.table.lifecycle.status"='disable'); 示例2:禁止表test_lifecycle中时间为20230520分区的生命周期功能。 1 alter table test_lifecycle partition (dt='20230520') LIFECYCLE 'disable'; 当设置禁止分区表的生命周期功能后,该表的所有分区的生命周期功能都会被禁止。
  • 参数说明 表1 禁止或恢复生命周期参数说明 参数名称 是否必选 参数说明 table_name 是 待禁止或恢复生命周期的表的名称。 pt_spec 否 待禁止或恢复生命周期的表的分区信息。格式为partition_col1=col1_value1, partition_col2=col2_value1...。对于有多级分区的表,必须指明全部的分区值。 enable 否 恢复表或指定分区的生命周期功能 表及其分区重新参与生命周期回收,默认使用当前表及分区上的生命周期配置。 开启表生命周期前可以修改表及分区的生命周期配置,防止开启表生命周期后因使用之前的配置导致数据被误回收。 disable 否 禁止表或指定分区的生命周期功能。 禁止表本身及其所有分区被生命周期回收,优先级高于恢复表分区生命周期。即当使用禁止表或指定分区的生命周期功能时,设置待禁止或恢复生命周期的表的分区信息是无效的。 禁止表的生命周期功能后,表的生命周期配置及其分区的enable和disable标记会被保留。 禁止表的生命周期功能后,仍然可以修改表及分区表的生命周期配置。
  • 示例 右外连接和左外连接相似,但是会将右边表(这里的course_info)中的所有记录返回,没有匹配值的左表记录将返回NULL。 1 2 SELECT student_info.name, course_info.courseName FROM student_info RIGHT OUTER JOIN course_info ON (student_info.courseId = course_info.courseId);
  • 语法格式 开启多版本功能 ALTER TABLE [db_name.]table_name SET TBLPROPERTIES ("dli.multi.version.enable"="true"); 关闭多版本功能 1 2 ALTER TABLE [db_name.]table_name UNSET TBLPROPERTIES ("dli.multi.version.enable"); 开启多版本功能后,在执行insert overwrite或者truncate操作时会自动在OBS存储路径下存储多版本数据。关闭多版本功能后,需要通过如下命令把多版本数据目录回收。 RESTORE TABLE [db_name.]table_name TO initial layout;
  • 示例 修改表test_table,开启多版本功能。 1 2 ALTER TABLE test_table SET TBLPROPERTIES ("dli.multi.version.enable"="true"); 修改表test_table,关闭多版本功能。 1 2 ALTER TABLE test_table UNSET TBLPROPERTIES ("dli.multi.version.enable"); 回退多版本路径。 RESTORE TABLE test_table TO initial layout;
  • 功能描述 DLI提供多版本功能,用于数据的备份与恢复。开启多版本功能后,在进行删除或修改表数据时(insert overwrite或者truncate操作),系统会自动备份历史数据并保留一定时间,后续您可以对保留周期内的数据进行快速恢复,避免因误操作丢失数据。其他多版本SQL语法请参考多版本备份恢复数据。 DLI数据多版本功能当前仅支持通过Hive语法创建的OBS表,具体建表语法可以参考使用Hive语法创建OBS表。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“Kafka”表示数据源。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_group_id 否 group id。 kafka_topic 是 读取的Kafka的topic。目前只支持读取单个topic。 encode 是 数据编码格式,可选为“csv”、“json”、“blob”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 当编码格式为"blob"时,表示不对接收的数据进行解析,流属性仅能有一个且为Array[TINYINT]类型。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。 说明: 请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 json_config 否 当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系。 格式:"field1=json_field1;field2=json_field2" 格式说明:field1、field2为创建的表字段名称。json_field1、json_field2为kafka输入数据json串的key字段名称。 具体使用方法可以参考示例说明。 field_delimiter 否 当encode为csv时,用于指定csv字段分隔符,默认为逗号。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前仅适用于CSV格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 start_time 否 kafka数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 示例 从Kafka名称为test的topic中读取数据。 1 2 3 4 5 6 7 8 9 10 11 CREATE SOURCE STREAM kafka_source ( name STRING, age int ) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json" ); 从Kafka读取对象为test的topic,使用json_config将json数据和表字段对应。 数据编码格式为json且不含嵌套,例如: {"attr1": "lilei", "attr2": 18} 建表语句参考如下: 1 2 3 4 5 6 7 8 9 CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json", json_config = "name=attr1;age=attr2" );
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "kafka", kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json" );
  • 参数说明 表1 参数说明 参数 是否必选 默认参数 数据类型 说明 connector 是 无 String 固定为:print。 print-identifier 否 无 String 配置一个标识符作为输出数据的前缀。 standard-error 否 false Boolean 该值只能为true或false,默认为false。 若为true,则表示输出数据到taskmanager的error文件中。 若为false,则表示输出数据到taskmanager的out中。
  • 语法格式 1 2 3 4 5 6 7 8 9 create table printSink ( attr_name attr_type (',' attr_name attr_type) * (',' PRIMARY KEY (attr_name,...) NOT ENFORCED) ) with ( 'connector' = 'print', 'print-identifier' = '', 'standard-error' = '' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'connector.properties.group.id' = '', 'connector.startup-mode' = '', 'format.type' = '' );
  • 示例 从Kafka中读取编码格式为csv,对象为kafkaSource的表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'csv' ); 从Kafka中读取编码格式为不含嵌套的json数据,对象为kafkaSource的表。 例如不含嵌套的json数据格式为: {"car_id": 312, "car_owner": "wang", "car_brand": "tang"} {"car_id": 313, "car_owner": "li", "car_brand": "lin"} {"car_id": 314, "car_owner": "zhao", "car_brand": "han"} 则创建表语句为: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); 从Kafka中读取编码格式包含嵌套的json数据,对象为kafkaSource的表。 例如包含嵌套的json数据格式为: { "id":"1", "type":"online", "data":{ "patient_id":1234, "name":"bob1234", "age":"Bob", "gmt_create":"Bob", "gmt_modify":"Bob" } } 则创建表语句为: CREATE table kafkaSource( id STRING, type STRING, data ROW( patient_id STRING, name STRING, age STRING, gmt_create STRING, gmt_modify STRING) ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于kafka,需配置为'kafka'。 connector.version 是 Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 format.type 是 数据反序列化格式,支持:'csv', 'json'及'avro'等。 format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.topic 是 kafka topic名。该参数和“connector.topic-pattern”两个参数只能使用其中一个。 connector.topic-pattern 否 匹配读取kafka topic名称的正则表达式。该参数和“connector.topic”两个参数只能使用其中一个。 例如: 'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔。 connector.properties.group.id 否 消费组名称 connector.startup-mode 否 consumer启动模式,支持:'earliest-offset', 'latest-offset', 'group-offsets', 'specific-offsets'及'timestamp'。默认值为'group-offsets'。 connector.specific-offsets 否 指定消费offset,'startup-mode'为'specific-offsets'时需配置,格式为: 'partition:0,offset:42;partition:1,offset:300'。 connector.startup-timestamp-millis 否 指定起始消费时间戳,'startup-mode'为'timestamp'时需配置。 connector.properties.* 否 配置kafka任意原生属性。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_key = "", encode= "", field_delimiter= "" );
  • 示例 CSV编码格式:数据输出到DIS通道,使用csv编码,并且以逗号为分隔符,多个分区用car_owner做为key进行分发。数据输出示例:"ZJA710XC", "lilei", "BMW", 700000。 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "xxx", channel = "dlioutput", encode = "csv", field_delimiter = "," ); JSON编码格式:数据输出到DIS通道,使用json编码,多个分区用car_owner,car_brand 做为key进行分发,“enableOutputNull”为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。数据示例:"car_id ":"ZJA710XC", "car_owner ":"lilei", "car_brand ":"BMW", "car_price ":700000。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", channel = "dlioutput", region = "xxx", partition_key = "car_owner,car_brand", encode = "json", enable_output_null = "false" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dis表示输出到数据接入服务。 region 是 数据所在的DIS所在区域。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 channel 是 DIS通道。 partition_key 否 数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。 encode 是 数据编码格式,可选为“csv”、“json”和“user_defined”。 说明: 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需使用“enable_output_null”来配置是否输出空字段,具体见示例。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 field_delimiter 是 属性分隔符。 当编码格式为csv时,需要设置属性分隔符,用户可以自定义,如:“,”。 当编码格式为json时,则不需要设置属性之间的分隔符。 json_config 否 当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2”。 enable_output_null 否 当编码格式为json时,需使用该参数来配置是否输出空字段。 当该参数为“true”表示输出空字段(值为null),若为“false”表示不输出空字段。默认值为“true”。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。
  • 功能描述 DLI将Flink作业的输出数据写入数据接入服务(DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 示例 将流disSink的数据输出到DIS中。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table disSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table disSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format.type' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 connector.region 是 数据所在的DIS区域。 connector.ak 否 访问密钥ID(Access Key ID),需与sk同时设置 connector.sk 否 Secret Access Key,需与ak同时设置 connector.channel 是 数据所在的DIS通道名称。 format.type 是 数据编码格式,可选为“csv”、“json” format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.partition-key 否 数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。
  • 功能描述 DLI将Flink作业的输出数据写入数据接入服务(DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 注意事项 privilege必须是可授权限中的一种。且如果赋权对象在resource或上一级resource上已经有对应权限时,则会赋权失败。Privilege支持的权限类型可参见数据权限列表。 resource可以是queue、database、table、view、column,格式分别为: queue的格式为:queues.queue_name queue支持的Privilege权限类型可以参考下表: 操作 说明 DROP_QUEUE 删除队列 SUBMIT_JOB 提交作业 CANCEL_JOB 终止作业 RESTART 重启队列 SCALE_QUEUE 扩缩容队列 GRANT_PRIVILEGE 队列的赋权 REVOKE_PRIVILEGE 队列权限的回收 SHOW_PRIVILEGES 查看其他用户具备的队列权限 database的格式为:databases.db_name database支持的Privilege权限类型可参见数据权限列表。 table的格式为:databases.db_name.tables.table_name table支持的Privilege权限类型可参见数据权限列表。 view的格式为:databases.db_name.tables.view_name view支持的Privilege权限类型和table一样,具体可以参考数据权限列表中table的权限列表描述。 column的格式为:databases.db_name.tables.table_name.columns.column_name column支持的Privilege权限类型仅为:SELECT
共100000条