华为云用户手册

  • 步骤:3:检查网络 检查源Redis、目标Redis、迁移任务资源所在VPC是否在同一个VPC内。 如果是,则执行步骤4:创建在线迁移任务;如果不是,执行2。 检查源Redis的VPC、目标Redis的VPC、迁移任务资源所在VPC的网络是否打通,确保迁移任务的虚拟机资源能访问源Redis和目标Redis。 如果已打通,则执行步骤4:创建在线迁移任务;如果没打通,则执行3。 执行相应操作,打通网络。 当源Redis和目标Redis都属于DCS同一region,请参考VPC对等连接说明,查看和创建对等连接,打通网络。 当源Redis和目标Redis属于DCS不同Region,请参考云连接,查看和创建云连接,打通网络。 当源Redis和目标Redis属于不同的云厂商,仅支持云专线打通网络,请参考云专线。
  • 步骤4:创建在线迁移任务 登录分布式缓存服务控制台。 单击左侧菜单栏的“数据迁移”。页面显示迁移任务列表页面。 单击右上角的“创建在线迁移任务”。 设置迁移任务名称和描述。 配置在线迁移任务虚拟机资源的VPC、子网和安全组。 创建在线迁移任务时,需要选择迁移虚拟机资源的VPC和安全组,并确保迁移资源能访问源Redis和目标Redis实例。 创建的在线迁移任务会占用一个租户侧IP,即控制台上迁移任务对应的“迁移IP”。如果源端Redis或目标端Redis配置了白名单,需确保配置了迁移IP或关闭白名单限制。 迁移任务所选安全组的“出方向规则”需放通源端Redis和目标端Redis的IP和端口(安全组默认情况下为全部放通,则无需单独放通),以便迁移任务的虚拟机资源能访问源Redis和目标Redis。
  • 步骤5:配置在线迁移任务 创建完在线迁移任务之后,在“在线迁移”的列表,单击“配置”,配置在线迁移的源Redis、目标Redis等信息。 选择迁移方法。 支持“全量迁移”和“全量迁移+增量迁移”两种,“全量迁移”和“全量迁移+增量迁移”的功能及限制如表1所示。 表1 在线迁移方法说明 迁移类型 描述 全量迁移 该模式为Redis的一次性迁移,适用于可中断业务的迁移场景。全量迁移过程中,如果源Redis有数据更新,这部分更新数据不会被迁移到目标Redis。 全量迁移+增量迁移 该模式为Redis的持续性迁移,适用于对业务中断敏感的迁移场景。增量迁移阶段通过解析日志等技术, 持续保持源Redis和目标端Redis的数据一致。 增量迁移,迁移任务会在迁移开始后,一直保持迁移中状态,不会自动停止。需要您在合适时间,在“操作”列单击“停止”,手动停止迁移。停止后,源端数据不会造成丢失,只是目标端不再写入数据。增量迁移在传输链路网络稳定情况下是秒级时延,具体的时延情况依赖于网络链路的传输质量。 图1 选择迁移方法 配置“源Redis”和“目标Redis”。 Redis类型支持“云服务Redis”和“自建Redis”,需要根据迁移场景选择数据来源。 云服务Redis:当源端或目标Redis为DCS Redis,且与迁移任务处于相同VPC时,可以选择“云服务Redis”类型,并指定需要迁移的DCS Redis实例。 自建Redis:DCS Redis、其他云厂商Redis、自行搭建的Redis,都可以选择“自建Redis”类型,并输入Redis的连接地址。 当源Redis和目标Redis属于华为云的不同Region,则打通网路后,目标Redis实例无论是自建Redis或华为云Redis实例,在“目标Redis类型”区域,只能选中自建Redis,输入实例相关信息。 如果是密码访问模式实例,在输入连接实例密码后,单击密码右侧的“测试连接”,检查实例密码是否正确、网络是否连通。如果是免密访问的实例,请直接单击“测试连接”。 在“源DB”或“目标DB”中,您可以选择是否需要指定具体迁移的DB。例如源端输入5,目标端输入6时,表示迁移源Redis DB5中的数据到目标Redis的DB6;当源端不指定DB,目标端指定DB时,表示默认迁移源端的全部数据,到目标端指定的DB,当目标端不指定DB时,表示默认迁移到与源端对应的DB。 当源端为多DB,目标端为单DB的DCS实例时(单DB的实例只有DB0),需要源端的所有数据都在DB0,或者指定仅迁移源端某一DB中的数据并将目标端DB指定为0,否则会迁移失败。 DCS Redis的DB数请参见Redis实例是否支持多DB方式?。 单击“下一步”。 确认迁移信息,然后单击“提交”,开始创建迁移任务。 可返回迁移任务列表中,观察对应的迁移任务的状态,迁移成功后,任务状态显示“成功”。 如果是增量迁移,会一直保持迁移中状态,需要手动停止迁移。 如需停止迁移中的任务,勾选迁移任务左侧的方框,单击实例上方信息栏的“停止”,即可停止迁移。 数据迁移后,源端与目标端重复的Key会被覆盖。 如果出现迁移失败,可以单击迁移任务名称,进入迁移任务详情页面,查看“迁移日志”。
  • 迁移后验证 迁移完成后,请使用Redis-cli连接源Redis和目标Redis,确认数据的完整性。 连接源Redis和目标Redis。 输入info keyspace,查看keys参数和expires参数的值。 对比源Redis和目标Redis的keys参数分别减去expires参数的差值。如果差值一致,则表示数据完整,迁移正常。 注意:如果是全量迁移,迁移过程中源Redis更新的数据不会迁移到目标实例。
  • 场景描述 在满足源Redis和目标Redis的网络相通、源Redis已放通SYNC和PSYNC命令这两个前提下,使用在线迁移的方式,将源Redis中的数据全量迁移或增量迁移到目标Redis中。 如果源Redis禁用了SYNC和PSYNC命令,请务必放通后再执行在线迁移,否则迁移失败,选择DCS Redis实例进行在线迁移时,会自动放开SYNC命令。 在线迁移不支持公网方式直接迁移。 进行在线迁移时,建议将源端实例的参数repl-timeout配置为300秒,client-output-buffer-limit配置为实例最大内存的20%。 源端仅支持Redis 3.0及3.0以上的Redis版本。
  • 前提条件 在迁移之前,请先阅读迁移方案概览,选择正确的迁移方案,了解当前DCS支持的在线迁移能力,选择适当的目标实例。 如果是单机/主备等多DB的源端实例迁移到Proxy集群实例,Proxy集群默认不开启多DB,仅有一个DB0,请先确保源端实例DB0以外的DB是否有数据,如果有,请先参考开启多DB操作开启Proxy集群多DB设置。 如果是单机/主备等多DB的源端实例迁移到Cluster集群实例,Cluster集群不支持多DB,仅有一个DB0,请先确保源端实例DB0以外的DB是否有数据,如果有,请将数据转存到DB0,否则会出现迁移失败,将数据转存到DB0的操作请参考使用Rump在线迁移。
  • 示例 添加(Add)或更新(Update): SET enable.unsafe.sort=true 显示(Display)属性值: SET enable.unsafe.sort 显示段ID列表,段状态和其他所需详细信息的示例,然后指定要读取的段列表: SHOW SEGMENTS FOR TABLE carbontable1; SET carbon.input.segments.db.carbontable1 = 1, 3, 9; 多线程模式查询指定段示例如下: CarbonSession.threadSet ("carbon.input.segments.default.carbon_table_MulTI_THread", "1,3"); 在多线程环境中使用CarbonSession.threadSet查询段示例如下(以Scala代码为例): def main(args: Array[String]) { Future { CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1") spark.sql("select count(empno) from carbon_table_MulTI_THread").show() } } 重置(Reset): RESET
  • 命令格式 Add或Update参数值: SET parameter_name=parameter_value 此命令用于添加或更新“parameter_name”的值。 Display参数值: SET parameter_name 此命令用于显示指定的“parameter_name”的值。 Display会话参数: SET 此命令显示所有支持的会话参数。 Display会话参数以及使用细节: SET -v 此命令显示所有支持的会话参数及其使用细节。 Reset参数值: RESET 此命令清除所有会话参数。
  • 压缩调优 CarbonData结合少数轻量级压缩算法和重量级压缩算法来压缩数据。虽然这些算法可处理任何类型的数据,但如果数据经过排序,相似值在一起出现时,就会获得更好的压缩率。 CarbonData数据加载过程中,数据基于Table中的列顺序进行排序,从而确保相似值在一起出现,以获得更好的压缩率。 由于CarbonData按照Table中定义的列顺序将数据进行排序,因此列顺序对于压缩效率起重要作用。如果低cardinality维度位于左边,那么排序后的数据分区范围较小,压缩效率较高。如果高cardinality维度位于左边,那么排序后的数据分区范围较大,压缩效率较低。
  • CarbonData查询流程 当CarbonData首次收到对某个表(例如表A)的查询任务时,系统会加载表A的索引数据到内存中,执行查询流程。当CarbonData再次收到对表A的查询任务时,系统则不需要再加载其索引数据。 在CarbonData中执行查询时,查询任务会被分成几个扫描任务。即,基于CarbonData数据存储的HDFS block对扫描任务进行分割。扫描任务由集群中的执行器执行。扫描任务可以并行、部分并行,或顺序处理,具体采用的方式取决于执行器的数量以及配置的执行器核数。 查询任务的某些部分可在独立的任务级上处理,例如select和filter。查询任务的某些部分可在独立的任务级上进行部分处理,例如group-by、count、distinct count等。 某些操作无法在任务级上处理,例如Having Clause(分组后的过滤),sort等。这些无法在任务级上处理,或只能在任务级上部分处理的操作需要在集群内跨执行器来传输数据(部分结果)。这个传送操作被称为shuffle。 任务数量越多,需要shuffle的数据就越多,会对查询性能产生不利影响。 由于任务数量取决于HDFS block的数量,而HDFS block的数量取决于每个block的大小,因此合理选择HDFS block的大小很重要,需要在提高并行性,进行shuffle操作的数据量和聚合表的大小之间达到平衡。
  • 查询性能调优 CarbonData可以通过调整各种参数来提高查询性能。大部分参数聚焦于增加并行性处理和更好地使用系统资源。 Spark Executor数量:Executor是Spark并行性的基础实体。通过增加Executor数量,集群中的并行数量也会增加。关于如何配置Executor数量,请参考Spark资料。 Executor核:每个Executor内,并行任务数受Executor核的配置控制。通过增加Executor核数,可增加并行任务数,从而提高性能。 HDFS block容量:CarbonData通过给不同的处理器分配不同的block来分配查询任务。所以一个HDFS block是一个分区单元。另外,CarbonData在Spark驱动器中,支持全局block级索引,这有助于减少需要被扫描的查询block的数量。设置较大的block容量,可提高I/O效率,但是会降低全局索引效率;设置较小的block容量,意味着更多的block数量,会降低I/O效率,但是会提高全局索引效率,同时,对于索引查询会要求更多的内存。 扫描线程数量:扫描仪(Scanner)线程控制每个任务中并行处理的数据块的数量。通过增加扫描仪线程数,可增加并行处理的数据块的数量,从而提高性能。可使用“carbon.properties”文件中的“carbon.number.of.cores”属性来配置扫描仪线程数。例如,“carbon.number.of.cores = 4”。 B-Tree缓存:为了获得更好的查询特性,可以通过B-tree LRU(least recently used,最近最少使用)缓存来优化缓存内存。在driver中,B-Tree LRU缓存配置将有助于通过释放未被访问或未使用的表segments来释放缓存。类似地,在executor中,B-Tree LRU缓存配置将有助于释放未被访问或未使用的表blocks。具体可参考表2中的参数“carbon.max.driver.lru.cache.size”和“carbon.max.executor.lru.cache.size”的详细描述。
  • 配置扫描仪线程 扫描仪线程属性决定了每个分割的数据被划分的可并行处理的数据块的数量。如果数量过多,会产生很多小数据块,性能会受到影响。如果数量过少,并行性不佳,性能也会受到影响。因此,决定扫描仪线程数时,需要考虑一个分割内的平均数据大小,选择一个使数据块不会很小的值。经验法则是将单个块大小(MB)除以250得到的值作为扫描仪线程数。 增加并行性还需考虑的重要一点是集群中实际可用的CPU核数,确保并行计算数不超过实际CPU核数的75%至80%。 CPU核数约等于: 并行任务数x扫描仪线程数。其中并行任务数为分割数和执行器数x执行器核数两者之间的较小值。
  • 数据加载性能调优 数据加载性能调优与查询性能调优差异很大。跟查询性能一样,数据加载性能也取决于可达到的并行性。在数据加载情况下,工作线程的数量决定并行的单元。因此,更多的执行器就意味着更多的执行器核数,每个执行器都可以提高数据加载性能。 同时,为了得到更好的性能,可在HDFS中配置如下参数。 表1 HDFS配置 参数 建议值 dfs.datanode.drop.cache.behind.reads false dfs.datanode.drop.cache.behind.writes false dfs.datanode.sync.behind.writes true
  • 操作步骤 要启动小文件优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数介绍 参数 描述 默认值 spark.sql.files.maxPartitionBytes 在读取文件时,将单个分区打包的最大字节数。 单位:byte。 134217728(即128M) spark.files.openCostInBytes 打开文件的预估成本, 按照同一时间能够扫描的字节数来测量。当一个分区写入多个文件时使用。高估更好,这样小文件分区将比大文件分区更先被调度。 4M
  • 操作场景 Spark SQL表中,经常会存在很多小文件(大小远小于HDFS的块大小),每个小文件默认对应Spark中的一个Partition,即一个Task。在有很多小文件时,Spark会启动很多Task,此时当SQL逻辑中存在Shuffle操作时,会大大增加hash分桶数,严重影响系统性能。 针对小文件很多的场景,DataSource在创建RDD时,先将Table中的split生成PartitionedFile,再将这些PartitionedFile进行合并。即将多个PartitionedFile组成一个partition,从而减少partition数量,避免在Shuffle操作时生成过多的hash分桶,如图1所示。 图1 小文件合并
  • 空间索引介绍 空间数据包括多维点、线、矩形、立方体、多边形和其他几何对象。空间数据对象占据空间的某一区域,称为空间范围,通过其位置和边界描述。空间数据可以是点数据,也可以是区域数据。 点数据:一个点具有一个空间范围,仅通过其位置描述。它不占用空间,没有相关的边界。点数据由二维空间中的点的集合组成。点可以存储为一对经纬度。 区域数据:一个区域有空间范围,有位置和边界。位置可以看作是一个定点在区域内的位置,例如它的质心。在二维中,边界可以可视化为一条线(有限区域,闭环)。区域数据包含一系列区域。 目前仅限于支持点数据,存储点数据。 经纬度可以编码为唯一的GeoID。Geohash是Gustavo Niemeyer发明的公共域地理编码系统,它将地理位置编码为一串由字母和数字组成的短字符串。它是一种分层的空间数据结构,把空间细分为网格形状的桶,是被称为Z阶曲线和通常称为空间填充曲线的许多应用之一。 点在多维中的Z值是简单地通过交织其坐标值的二进制表示来计算的,如下图所示。使用Geohash创建GeoID时,数据按照GeoID排序,而不是按照经纬度排序,数据按照空间就近性排序存储。
  • 建表 GeoHash编码: create table IF NOT EXISTS carbonTable ( ... `LONGITUDE` BIGINT, `LATITUDE` BIGINT, ... ) STORED AS carbondata TBLPROPERTIES ('SPATIAL_INDEX.mygeohash.type'='geohash','SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude','SPATIAL_INDEX.mygeohash.originLatitude'='xx.xxxxxx','SPATIAL_INDEX.mygeohash.gridSize'='xx','SPATIAL_INDEX.mygeohash.minLongitude'='xxx.xxxxxx','SPATIAL_INDEX.mygeohash.maxLongitude'='xxx.xxxxxx','SPATIAL_INDEX.mygeohash.minLatitude'='xx.xxxxxx','SPATIAL_INDEX.mygeohash.maxLatitude'='xxx.xxxxxx','SPATIAL_INDEX'='mygeohash','SPATIAL_INDEX.mygeohash.conversionRatio'='1000000','SORT_COLUMNS'='column1,column2,column3,latitude,longitude'); SPATIAL_INDEX:自定义索引处理器。此处理程序允许用户从表结构列集合中创建新的列。新创建的列名与处理程序名相同。处理程序的type和sourcecolumns属性是必需的属性。目前,type属性只支持“geohash”。Carbon提供一个简单的默认实现类。用户可以通过扩展默认实现类来挂载geohash的自定义实现类。该默认处理程序还需提供以下的表属性: SPATIAL_INDEX.xxx.originLatitude:Double类型,坐标原点纬度 SPATIAL_INDEX.xxx.gridSize:Int类型,栅格长度(米) SPATIAL_INDEX.xxx.minLongitude:Double类型,最小经度 SPATIAL_INDEX.xxx.maxLongitude:Double类型,最大经度 SPATIAL_INDEX.xxx.minLatitude:Double类型,最小纬度 SPATIAL_INDEX.xxx.maxLatitude:Double类型,最大纬度 SPATIAL_INDEX.xxx.conversionRatio:Int类型,将经纬度小数值转换为整型值 用户可以按照上述格式为处理程序添加自己的表属性,并在自定义实现类中访问它们。originLatitude,gridSize及conversionRatio是必选参数,其余属性在Carbon中都是可选的。可以使用“SPATIAL_INDEX.xxx.class”属性指定它们的实现类。 默认实现类可以为每一行的sourcecolumns生成handler列值,并且支持基于sourcecolumns的过滤条件查询。生成的handler列对用户不可见。除SORT_COLUMNS表属性外,任何DDL命令和属性都不允许包含handler列。 生成的handler列默认被视为排序列。如果SORT_COLUMNS不包含任何sourcecolumns,则将handler列追加到现有的SORT_COLUMNS最后。如果在SORT_COLUMNS中已经指定了该handler列,则它在SORT_COLUMNS的顺序将保持不变。 如果SORT_COLUMNS包含任意的sourcecolumns,但是没有包含handler列,则handler列将自动插入到SORT_COLUMNS中的sourcecolumns之前。 如果SORT_COLUMNS需要包含任意的sourcecolumns,那么需要保证handler列出现在sourcecolumns之前,这样handler列才能在排序中生效。
  • 准备数据 准备数据文件1:geosotdata.csv timevalue,longitude,latitude 1575428400000,116.285807,40.084087 1575428400000,116.372142,40.129503 1575428400000,116.187332,39.979316 1575428400000,116.337069,39.951887 1575428400000,116.359102,40.154684 1575428400000,116.736367,39.970323 1575428400000,116.720179,40.009893 1575428400000,116.346961,40.13355 1575428400000,116.302895,39.930753 1575428400000,116.288955,39.999101 1575428400000,116.17609,40.129953 1575428400000,116.725575,39.981115 1575428400000,116.266922,40.179415 1575428400000,116.353706,40.156483 1575428400000,116.362699,39.942444 1575428400000,116.325378,39.963129 准备数据文件2:geosotdata2.csv timevalue,longitude,latitude 1575428400000,120.17708,30.326882 1575428400000,120.180685,30.326327 1575428400000,120.184976,30.327105 1575428400000,120.189311,30.327549 1575428400000,120.19446,30.329698 1575428400000,120.186965,30.329133 1575428400000,120.177481,30.328911 1575428400000,120.169713,30.325614 1575428400000,120.164563,30.322243 1575428400000,120.171558,30.319613 1575428400000,120.176365,30.320687 1575428400000,120.179669,30.323688 1575428400000,120.181001,30.320761 1575428400000,120.187094,30.32354 1575428400000,120.193574,30.323651 1575428400000,120.186192,30.320132 1575428400000,120.190055,30.317464 1575428400000,120.195376,30.318094 1575428400000,120.160786,30.317094 1575428400000,120.168211,30.318057 1575428400000,120.173618,30.316612 1575428400000,120.181001,30.317316 1575428400000,120.185162,30.315908 1575428400000,120.192415,30.315871 1575428400000,120.161902,30.325614 1575428400000,120.164306,30.328096 1575428400000,120.197093,30.325985 1575428400000,120.19602,30.321651 1575428400000,120.198638,30.32354 1575428400000,120.165421,30.314834
  • 不规则空间集合的聚合查询 查询语句及Filter UDF 根据polygon过滤数据 IN_POLYGON(pointList) UDF输入参数: 参数 类型 说明 pointList String 将多个点输入为一个字符串,每个点以longitude latitude表示。经纬度间用空格分隔,每对经纬度用逗号分隔,字符串首尾经纬度一致。 UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polygon_list之内。 使用示例: select longitude, latitude from geosot where IN_POLYGON('116.321011 40.123503, 116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503'); 根据polygon列表过滤数据。 IN_POLYGON_LIST(polygonList, opType) UDF输入参数: 参数 类型 说明 polygonList String 将多个polygon输入为一个字符串,每个polygon以POLYGON ((longitude1 latitude1, longitude2 latitude2, …))表示。注意“POLYGON”后有空格,经纬度间用空格分隔,每对经纬度用逗号分隔,一个polygon的首尾经纬度一致。IN_POLYGON_LIST必须输入2个以上polygon。 一个polygon示例: POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)) opType String 对多个polygon进行并交差操作。 目前支持的操作类型: OR:A U B U C (假设输入了三个POLYGON,A、B、C) AND:A ∩ B ∩ C UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polygon_list之内。 使用示例: select longitude, latitude from geosot where IN_POLYGON_LIST('POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540, 120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 30.321464, 120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', 'OR'); 根据polyline列表过滤数据。 IN_POLYLINE_LIST(polylineList, bufferInMeter) UDF输入参数: 参数 类型 说明 polylineList String 将多个polyline输入为一个字符串,每个polyline以LINESTRING (longitude1 latitude1, longitude2 latitude2, …)表示。注意“LINESTRING”后有空格,经纬度间用空格分隔,每组经纬度用逗号分隔。 对多个polyline区域内的数据会输出并集结果。 一个polyline示例: LINESTRING (116.137676 40.163503, 116.137676 39.935276, 116.260993 39.935276) bufferInMeter Float polyline的buffer距离,单位为米。末端使用直角创建缓冲区。 UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polyline_list之内。 使用示例: select longitude, latitude from geosot where IN_POLYLINE_LIST('LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464, 120.190359 30.315388)', 65); 根据GeoId区间列表过滤数据。 IN_POLYGON_RANGE_LIST(polygonRangeList, opType) UDF输入参数: 参数 类型 说明 polygonRangeList String 将多个rangeList输入为一个字符串,每个rangeList以RANGELIST (startGeoId1 endGeoId1, startGeoId2 endGeoId2, …)表示。注意“RANGELIST”后有空格,首尾GeoId间用空格分隔,每组GeoId range用逗号分隔。 一个rangeList示例: RANGELIST (855279368848 855279368850, 855280799610 855280799612, 855282156300 855282157400) opType String 对多个rangeList进行并交差操作。 目前支持的操作类型: OR:A U B U C (假设输入了三个RANGELIST,A、B、C) AND:A ∩ B ∩ C UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polyRange_list之内。 使用示例: select mygeosot, longitude, latitude from geosot where IN_POLYGON_RANGE_LIST('RANGELIST (526549722865860608 526549722865860618, 532555655580483584 532555655580483594)', 'OR'); polygon连接查询 IN_POLYGON_JOIN(GEO_HASH_INDEX_COLUMN, POLYGON_COLUMN) 两张表做join查询,一张表为空间数据表(有经纬度列和GeoHashIndex列),另一张表为维度表,保存polygon数据。 查询使用IN_POLYGON_JOIN UDF,参数GEO_HASH_INDEX_COLUMN和polygon表的POLYGON_COLUMN。Polygon_column列是一系列的点(经纬度列)。Polygon表的每一行的第一个点和最后一个点必须是相同的。Polygon表的每一行的所有点连接起来形成一个封闭的几何对象。 UDF输入参数: 参数 类型 说明 GEO_HASH_INDEX_COLUMN Long 空间数据表的GeoHashIndex列。 POLYGON_COLUMN String Polygon表的polygon列,数据为polygon的字符串表示。例如,一个polygon是POLYGON ((longitude1 latitude1, longitude2 latitude2, …)) 使用示例: CREATE TABLE polygonTable( polygon string, poiType string, poiId String) STORED AS carbondata; insert into polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245, 120.181411 30.314540,120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))','abc','1'; insert into polygonTable select 'POLYGON ((120.191603 30.328946,120.184179 30.327465, 120.181819 30.321464,120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))','abc','2'; select t1.longitude,t1.latitude from geosot t1 inner join (select polygon,poiId from polygonTable where poitype='abc') t2 on in_polygon_join(t1.mygeosot,t2.polygon) group by t1.longitude,t1.latitude; range_list连接查询 IN_POLYGON_JOIN_RANGE_LIST(GEO_HASH_INDEX_COLUMN, POLYGON_COLUMN) 同IN_POLYGON_JOIN,使用IN_POLYGON_JOIN_RANGE_LIST UDF关联空间数据表和polygon维度表,关联基于Polygon_RangeList。直接使用range list可以避免polygon到range list的转换。 UDF输入参数: 参数 类型 说明 GEO_HASH_INDEX_COLUMN Long 空间数据表的GeoHashIndex列。 POLYGON_COLUMN String Polygon表的rangelist列,数据为rangeList的字符串。例如,一个rangelist是RANGELIST (startGeoId1 endGeoId1, startGeoId2 endGeoId2, …) 使用示例: CREATE TABLE polygonTable( polygon string, poiType string, poiId String) STORED AS carbondata; insert into polygonTable select 'RANGELIST (526546455897309184 526546455897309284, 526549831217315840 526549831217315850, 532555655580483534 532555655580483584)','xyz','2'; select t1.* from geosot t1 inner join (select polygon,poiId from polygonTable where poitype='xyz') t2 on in_polygon_join_range_list(t1.mygeosot,t2.polygon); 空间索引工具类UDF GeoID转栅格行列号。 GeoIdToGridXy(geoId) UDF输入参数: 参数 类型 说明 geoId Long 根据GeoId计算栅格行列号。 UDF输出参数: 参数 类型 说明 gridArray Array[Int] 返回该geoid所包含的栅格行列号,以数组的方式返回,第一位为行,第二位为列。 使用示例: select longitude, latitude, mygeohash, GeoIdToGridXy(mygeohash) as GridXY from geoTable; 经纬度转GeoID。 LatLngToGeoId(latitude, longitude oriLatitude, gridSize) UDF输入参数: 参数 类型 说明 longitude Long 经度,注:转换后的整数类型。 latitude Long 纬度,注:转换后的整数类型。 oriLatitude Double 原点纬度,计算GeoId需要参数。 gridSize Int 栅格大小,计算GeoId需要参数。 UDF输出参数: 参数 类型 说明 geoId Long 通过编码获得一个表示经纬度的数。 使用示例: select longitude, latitude, mygeohash, LatLngToGeoId(latitude, longitude, 39.832277, 50) as geoId from geoTable; GeoID转经纬度。 GeoIdToLatLng(geoId, oriLatitude, gridSize) UDF输入参数: 参数 类型 说明 geoId Long 根据GeoId计算经纬度。 oriLatitude Double 原点纬度,计算经纬度需要参数。 gridSize Int 栅格大小,计算经纬度需要参数。 由于GeoId由栅格坐标生成,坐标为栅格中心点,则计算出的经纬度是栅格中心点经纬度,与生成该GeoId的经纬度可能有[0度~半个栅格度数]的误差。 UDF输出参数: 参数 类型 说明 latitudeAndLongitude Array[Double] 返回该geoid所表示的栅格的中心点的经纬度坐标,以数组的方式返回,第一位为latitude,第二位为longitude。 使用示例: select longitude, latitude, mygeohash, GeoIdToLatLng(mygeohash, 39.832277, 50) as LatitudeAndLongitude from geoTable; 计算金字塔模型向上汇聚一层的GeoID。 ToUpperLayerGeoId(geoId) UDF输入参数: 参数 类型 说明 geoId Long 根据输入GeoId计算金字塔模型上一层GeoId。 UDF输出参数: 参数 类型 说明 geoId Long 金字塔模型上一层GeoId。 使用示例: select longitude, latitude, mygeohash, ToUpperLayerGeoId(mygeohash) as upperLayerGeoId from geoTable; 输入polygon获得GeoID范围列表。 ToRangeList(polygon, oriLatitude, gridSize) UDF输入参数: 参数 类型 说明 polygon String 输入polygon字符串,用一组经纬度表示。 经纬度间用空格分隔,每对经纬度间用逗号分隔,首尾经纬度一致。 oriLatitude Double 原点纬度,计算GeoId需要参数。 gridSize Int 栅格大小,计算GeoId需要参数。 UDF输出参数: 参数 类型 说明 geoIdList Buffer[Array[Long]] 将polygon转换为一串geoid的范围列表。 使用示例: select ToRangeList('116.321011 40.123503, 116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503', 39.832277, 50) as rangeList from geoTable; 计算金字塔模型向上汇聚一层的longitude。 ToUpperLongitude (longitude, gridSize, oriLat) UDF输入参数: 参数 类型 说明 longitude Long 输入longitude,用一个长整型表示。 gridSize Int 栅格大小,计算longitude需要参数。 oriLatitude Double 原点纬度,计算longitude需要参数。 UDF输出参数: 参数 类型 说明 longitude Long 返回上一层的longitude。 使用示例: select ToUpperLongitude (-23575161504L, 50, 39.832277) as upperLongitude from geoTable; 计算金字塔模型向上汇聚一层的Latitude。 ToUpperLatitude(Latitude, gridSize, oriLat) UDF输入参数: 参数 类型 说明 latitude Long 输入latitude,用一个长整型表示。 gridSize Int 栅格大小,计算latitude需要参数。 oriLatitude Double 原点纬度,计算latitude需要参数。 UDF输出参数: 参数 类型 说明 Latitude Long 返回上一层的latitude。 使用示例: select ToUpperLatitude (-23575161504L, 50, 39.832277) as upperLatitude from geoTable; 经纬度转GeoSOT LatLngToGridCode(latitude, longitude, level) UDF输入参数: 参数 类型 说明 latitude Double 输入latitude。 longitude Double 输入longitude。 level Int 输入level,值区间[0-32]。 UDF输出参数: 参数 类型 说明 geoId Long 通过GeoSOT编码获得一个表示经纬度的数。 使用示例: select LatLngToGridCode(39.930753, 116.302895, 21) as geoId;
  • 导入数据 GeoHash默认实现类扩展自定义索引抽象类。如果没有配置handler属性为自定义的实现类,则使用默认的实现类。用户可以通过扩展默认实现类来挂载geohash的自定义实现类。自定义索引抽象类方法包括: Init方法,用来提取、验证和存储handler属性。在失败时发生异常,并显示错误信息。 Generate方法,用来生成索引。它为每行数据生成一个索引数据。 Query方法,用来对给定输入生成索引值范围列表。 导入命令同普通Carbon表: LOAD DATA inpath '/tmp/geosotdata.csv' INTO TABLE geosot OPTIONS ('DELIMITER'= ','); LOAD DATA inpath '/tmp/geosotdata2.csv' INTO TABLE geosot OPTIONS ('DELIMITER'= ','); geosotdata.csv和geosotdata2.csv表请参考准备数据。
  • 快速示例 create table IF NOT EXISTS carbonTable ( COLUMN1 BIGINT, LONGITUDE BIGINT, LATITUDE BIGINT, COLUMN2 BIGINT, COLUMN3 BIGINT ) STORED AS carbondata TBLPROPERTIES ('SPATIAL_INDEX.mygeohash.type'='geohash','SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude','SPATIAL_INDEX.mygeohash.originLatitude'='39.850713','SPATIAL_INDEX.mygeohash.gridSize'='50','SPATIAL_INDEX.mygeohash.minLongitude'='115.828503','SPATIAL_INDEX.mygeohash.maxLongitude'='720.000000','SPATIAL_INDEX.mygeohash.minLatitude'='39.850713','SPATIAL_INDEX.mygeohash.maxLatitude'='720.000000','SPATIAL_INDEX'='mygeohash','SPATIAL_INDEX.mygeohash.conversionRatio'='1000000','SORT_COLUMNS'='column1,column2,column3,latitude,longitude');
  • 二级索引表规格 表2 二级索引表规格 实体 测试值 二级索引表数量 10 二级索引表中的组合列的列数 5 二级索引表中的列名长度(单位:字符) 120 二级索引表名长度(单位:字符) 120 表中所有二级索引表的表名+列名的累积长度*(单位:字符) 3800** * Hive允许的上限值或可用资源的上限值。 ** 二级索引表使用hive注册,并以json格式的值存储在HiveSERDEPROPERTIES中。由hive支持的SERDEPROPERTIES的最大字符数为4000个字符,无法更改。
  • CarbonData主要规格 表1 CarbonData主要规格 实体 测试值 测试环境 表数 10000 3个节点,每个executor 4个CPU核,20GB。Driver内存5GB,3个Executor。 总列数:107 String:75 Int:13 BigInt:7 Timestamp:6 Double:6 表的列数 2000 3个节点,每个executor4个CPU核,20GB。Driver内存5GB,3个Executor。 原始CSV文件大小的最大值 200GB 17个cluster节点,每个executor 150GB,25个CPU核。Driver内存10 GB,17个Executor。 每个文件夹的CSV文件数 100个文件夹,每个文件夹10个文件,每个文件大小50MB。 3个节点,每个executor4个CPU核,20GB。Driver内存5GB,3个Executor。 加载文件夹数 10000 3个节点,每个executor4个CPU核,20GB。Driver内存5GB,3个Executor。
  • 操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
  • 示例 添加carbon配置参数 carbon.clean.file.force.allowed = true create table carbon01(a int,b string,c string) stored as carbondata; insert into table carbon01 select 1,'a','aa'; insert into table carbon01 select 2,'b','bb'; delete from table carbon01 where segment.id in (0); show segments for table carbon01; CLEAN FILES FOR TABLE carbon01 options('force'='true'); show segments for table carbon01; 上述命令将从物理上删除所有DELETE SEGMENT命令删除的segment和合并后的旧的segment。
  • 操作步骤 并行度可以通过如下三种方式来设置,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。 在会产生shuffle的操作函数内设置并行度参数,优先级最高。 testRDD.groupByKey(24) 在代码中配置“spark.default.parallelism”设置并行度,优先级次之。 val conf = new SparkConf() conf.set("spark.default.parallelism", 24) 在“$SPARK_HOME/conf/spark-defaults.conf”文件中配置“spark.default.parallelism”的值,优先级最低。 spark.default.parallelism 24
  • 基础操作 使用root用户登录集群客户端节点,执行如下命令: cd {客户端安装目录} source bigdata_env source Hudi/component_env kinit 创建的用户 执行hudi-cli.sh进入Hudi客户端, cd {客户端安装目录}/Hudi/hudi/bin/ ./hudi-cli.sh 即可执行各种Hudi命令,执行示例(仅部分命令,全部命令请参考Hudi官网:https://hudi.apache.org/docs/quick-start-guide/): 查看帮助: help //查看hudi-cli的所有命令 help 'command' //查看某一个命令的帮助及参数列表。 连接表: connect --path '/tmp/huditest/test_table' 查看表信息: desc 查看compaction计划: compactions show all 查看clean计划: cleans show 执行clean: cleans run 查看commit信息: commits show 查看commit写入的分区: commit showpartitions --commit 20210127153356 20210127153356表示commit的时间戳,下同。 查看指定commit写入的文件: commit showfiles --commit 20210127153356 比较两个表的commit信息差异: commits compare --path /tmp/hudimor/mytest100 rollback指定提交(rollback每次只允许rollback最后一次commit): commit rollback --commit 20210127164905 compaction调度: compaction schedule --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' 执行compaction compaction run --parallelism 100 --sparkMemory 1g --retry 1 --compactionInstant 20210602101315 --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' --propsFilePath hdfs://hacluster/tmp/default/tb_test_mor/.hoodie/hoodie.properties --schemaFilePath /tmp/default/tb_test_mor/.hoodie/compact_tb_base.json 创建savepoint savepoint create --commit 20210318155750 回滚指定的savepoint savepoint rollback --savepoint 20210318155750 如果commit写入导致元数据冲突异常,执行commit rollback、savepoint rollback能回退数据,但不能回退Hive元数据,只能删除Hive表然后手动进行同步刷新。 commit rollback只能回退当前最新的一个commit,savepoint rollback只能回退到最新的一个savepoint。二者均不能随意指定进行回退。
  • 示例 ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING); ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING) TBLPROPERTIES('COLUMNPROPERTIES.b1.shared_column'='sharedFolder.b1'); ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING) TBLPROPERTIES('DEFAULT.VALUE.a1'='10');
  • 命令语法 ALTER TABLE [db_name.]table_name ADD COLUMNS (col_name data_type,...) TBLPROPERTIES(''COLUMNPROPERTIES.columnName.shared_column'='sharedFolder.sharedColumnName,...', 'DEFAULT.VALUE.COLUMN_NAME'='default_value');
  • 参数描述 表1 ADD COLUMNS参数描述 参数 描述 db_name 数据库名。如果未指定,则选择当前数据库。 table_name 表名。 col_name data_type 带数据类型且用逗号分隔的列的名称。列名称包含字母,数字和下划线(_)。 说明: 创建CarbonData表时,不要将列名命名为tupleId,PositionId和PositionReference,因为将在UPDATE,DELETE和二级索引命令内部使用这些名称。
共100000条