华为云用户手册

  • Dynamic Allocation 动态资源调度是On Yarn模式特有的特性,并且必须开启Yarn External Shuffle才能使用这个功能。在使用Spark作为一个常驻的服务时候,动态资源调度将大大的提高资源的利用率。例如JDBCServer服务,大多数时间该进程并不接受JDBC请求,因此将这段空闲时间的资源释放出来,将极大的节约集群的资源。 表5 参数说明 参数 描述 默认值 spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。注意目前仅在YARN模式下有效。 启用动态资源调度必须将spark.shuffle.service.enabled设置为true。以下配置也与此相关:spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors和spark.dynamicAllocation.initialExecutors。 JDBCServer2x: true SparkResource2x: false spark.dynamicAllocation.minExecutors 最小Executor个数。 0 spark.dynamicAllocation.initialExecutors 初始Executor个数。 spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors 最大executor个数。 2048 spark.dynamicAllocation.schedulerBacklogTimeout 调度第一次超时时间。单位为秒。 1s spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 调度第二次及之后超时时间。 1s spark.dynamicAllocation.executorIdleTimeout 普通Executor空闲超时时间。单位为秒。 60 spark.dynamicAllocation.cachedExecutorIdleTimeout 含有cached blocks的Executor空闲超时时间。 JDBCServer2x:2147483647s IndexServer2x:2147483647s SparkResource2x:120
  • 配置Stage失败重试次数 Spark任务在遇到FetchFailedException时会触发Stage重试。为了防止Stage无限重试,对Stage重试次数进行限制。重试次数可以根据实际需要进行调整。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表1 参数说明 参数 说明 默认值 spark.stage.maxConsecutiveAttempts Stage失败重试最大次数。 4
  • 问题 Spark Streaming应用创建1个输入流,但该输入流无输出逻辑。应用从checkpoint恢复启动失败,报错如下: 17/04/24 10:13:57 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:125) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123) at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:515) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:510) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:191) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:186) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:142) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142) at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1230) at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:143) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:566) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:612) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:611) at com.spark.test.kafka08LifoTwoInkfk$.main(kafka08LifoTwoInkfk.scala:21) at com.spark.test.kafka08LifoTwoInkfk.main(kafka08LifoTwoInkfk.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:772) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  • 回答 Streaming Context启动时,若应用设置了checkpoint,则需要对应用中的DStream checkpoint对象进行序列化,序列化时会用到dstream.context。 dstream.context是Streaming Context启动时从output Streams反向查找所依赖的DStream,逐个设置context。若Spark Streaming应用创建1个输入流,但该输入流无输出逻辑时,则不会给它设置context。所以在序列化时报“NullPointerException”。 解决办法:应用中如果有无输出逻辑的输入流,则在代码中删除该输入流,或添加该输入流的相关输出逻辑。
  • 操作步骤 根据业务情况,准备好客户端,登录安装客户端的节点。 请根据客户端所在位置,参考安装客户端章节,登录安装客户端的节点。 执行以下命令,切换到客户端安装目录。 cd /opt/hadoopclient 执行以下命令配置环境变量。 source bigdata_env 执行以下命令,进行用户认证。(普通模式跳过此步骤) kinit 组件业务用户 执行命令进行客户端操作。 例如执行以下命令: cql storm 同一个storm客户端不能同时连接安全和非安全的ZooKeeper。
  • 相关概念 SparkSQL的语句在SparkSQL中进行处理,权限要求如表1所示。 表1 使用SparkSQL表、列或数据库场景权限一览 操作场景 用户需要的权限 CREATE TABLE “创建”,RWX+ownership(for create external table - the location) 说明: 按照指定文件路径创建datasource表时,需要path后面文件的RWX+ownership权限。 DROP TABLE “Ownership”(of table) DROP TABLE PROPERTIES “Ownership” DESCRIBE TABLE “查询” SHOW PARTITIONS “查询” ALTER TABLE LOCATION “Ownership”,RWX+ownership (for new location) ALTER PARTITION LOCATION “Ownership”,RWX+ownership (for new partition location) ALTER TABLE ADD PARTITION “插入”,RWX+ownership (for partition location) ALTER TABLE DROP PARTITION “删除” ALTER TABLE(all of them except the ones above) “Update”,“Ownership” TRUNCATE TABLE “Ownership” CREATE VIEW “查询”,“Grant Of Select”,“创建” ALTER VIEW PROPERTIES “Ownership” ALTER VIEW RENAME “Ownership” ALTER VIEW ADD PARTS “Ownership” ALTER VIEW AS “Ownership” ALTER VIEW DROPPARTS “Ownership” ANALYZE TABLE “查询”,“插入” SHOW COLUMNS “查询” SHOW TABLE PROPERTIES “查询” CREATE TABLE AS SELECT “查询”,“创建” SELECT “查询” 说明: 与表一样,对视图进行SELECT操作的时候需要有该视图的“查询”权限。 INSERT “插入”,“删除 (for overwrite)” LOAD “插入”,“删除”,RWX+ownership(input location) SHOW CREATE TABLE “查询”,“Grant Of Select” CREATE FUNCTION “管理” DROP FUNCTION “管理” DESC FUNCTION - SHOW FUNCTIONS - MSCK (metastore check) “Ownership” ALTER DATABASE “管理” CREATE DATABASE - SHOW DATABASES - EXPLAIN “查询” DROP DATABASE “Ownership” DESC DATABASE - CACHE TABLE “查询” UNCACHE TABLE “查询” CLEAR CACHE TABLE “管理” REFRESH TABLE “查询” ADD FILE “管理” ADD JAR “管理” HEALTHCHECK -
  • 操作步骤 SparkSQL表授权、列授权、数据库授权与Hive的操作相同,详情请参见权限管理。 在权限管理中,为了方便用户使用,授予数据库下表的任意权限将自动关联该数据库目录的HDFS权限。为了避免产生性能问题,取消表的任意权限,系统不会自动取消数据库目录的HDFS权限,但对应的用户只能登录数据库和查看表名。 若为角色添加或删除数据库的查询权限,数据库中的表也将自动添加或删除查询权限。此机制为Hive实现,SparkSQL与Hive保持一致。 Spark不支持struct数据类型中列名称含有特殊字符(除字母、数字、下划线外的其他字符)。如果struct类型中列名称含有特殊字符,在FusionInsight Manager的“编辑角色”页面进行授权时,该列将无法正确显示。
  • 不规则空间集合的聚合查询 查询语句及Filter UDF 根据polygon过滤数据 IN_POLYGON(pointList) UDF输入参数: 参数 类型 说明 pointList String 将多个点输入为一个字符串,每个点以longitude latitude表示。经纬度间用空格分隔,每对经纬度用逗号分隔,字符串首尾经纬度一致。 UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polygon_list之内。 使用示例: select longitude, latitude from geosot where IN_POLYGON('116.321011 40.123503, 116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503'); 根据polygon列表过滤数据。 IN_POLYGON_LIST(polygonList, opType) UDF输入参数: 参数 类型 说明 polygonList String 将多个polygon输入为一个字符串,每个polygon以POLYGON ((longitude1 latitude1, longitude2 latitude2, …))表示。注意“POLYGON”后有空格,经纬度间用空格分隔,每对经纬度用逗号分隔,一个polygon的首尾经纬度一致。IN_POLYGON_LIST必须输入2个以上polygon。 一个polygon示例: POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)) opType String 对多个polygon进行并交差操作。 目前支持的操作类型: OR:A U B U C (假设输入了三个POLYGON,A、B、C) AND:A ∩ B ∩ C UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polygon_list之内。 使用示例: select longitude, latitude from geosot where IN_POLYGON_LIST('POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540, 120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 30.321464, 120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', 'OR'); 根据polyline列表过滤数据。 IN_POLYLINE_LIST(polylineList, bufferInMeter) UDF输入参数: 参数 类型 说明 polylineList String 将多个polyline输入为一个字符串,每个polyline以LINESTRING (longitude1 latitude1, longitude2 latitude2, …)表示。注意“LINESTRING”后有空格,经纬度间用空格分隔,每组经纬度用逗号分隔。 对多个polyline区域内的数据会输出并集结果。 一个polyline示例: LINESTRING (116.137676 40.163503, 116.137676 39.935276, 116.260993 39.935276) bufferInMeter Float polyline的buffer距离,单位为米。末端使用直角创建缓冲区。 UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polyline_list之内。 使用示例: select longitude, latitude from geosot where IN_POLYLINE_LIST('LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464, 120.190359 30.315388)', 65); 根据GeoId区间列表过滤数据。 IN_POLYGON_RANGE_LIST(polygonRangeList, opType) UDF输入参数: 参数 类型 说明 polygonRangeList String 将多个rangeList输入为一个字符串,每个rangeList以RANGELIST (startGeoId1 endGeoId1, startGeoId2 endGeoId2, …)表示。注意“RANGELIST”后有空格,首尾GeoId间用空格分隔,每组GeoId range用逗号分隔。 一个rangeList示例: RANGELIST (855279368848 855279368850, 855280799610 855280799612, 855282156300 855282157400) opType String 对多个rangeList进行并交差操作。 目前支持的操作类型: OR:A U B U C (假设输入了三个RANGELIST,A、B、C) AND:A ∩ B ∩ C UDF输出参数: 参数 类型 说明 inOrNot Boolean 判断数据是否在指定的polyRange_list之内。 使用示例: select mygeosot, longitude, latitude from geosot where IN_POLYGON_RANGE_LIST('RANGELIST (526549722865860608 526549722865860618, 532555655580483584 532555655580483594)', 'OR'); polygon连接查询 IN_POLYGON_JOIN(GEO_HASH_INDEX_COLUMN, POLYGON_COLUMN) 两张表做join查询,一张表为空间数据表(有经纬度列和GeoHashIndex列),另一张表为维度表,保存polygon数据。 查询使用IN_POLYGON_JOIN UDF,参数GEO_HASH_INDEX_COLUMN和polygon表的POLYGON_COLUMN。Polygon_column列是一系列的点(经纬度列)。Polygon表的每一行的第一个点和最后一个点必须是相同的。Polygon表的每一行的所有点连接起来形成一个封闭的几何对象。 UDF输入参数: 参数 类型 说明 GEO_HASH_INDEX_COLUMN Long 空间数据表的GeoHashIndex列。 POLYGON_COLUMN String Polygon表的polygon列,数据为polygon的字符串表示。例如,一个polygon是POLYGON ((longitude1 latitude1, longitude2 latitude2, …)) 使用示例: CREATE TABLE polygonTable( polygon string, poiType string, poiId String) STORED AS carbondata; insert into polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245, 120.181411 30.314540,120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))','abc','1'; insert into polygonTable select 'POLYGON ((120.191603 30.328946,120.184179 30.327465, 120.181819 30.321464,120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))','abc','2'; select t1.longitude,t1.latitude from geosot t1 inner join (select polygon,poiId from polygonTable where poitype='abc') t2 on in_polygon_join(t1.mygeosot,t2.polygon) group by t1.longitude,t1.latitude; range_list连接查询 IN_POLYGON_JOIN_RANGE_LIST(GEO_HASH_INDEX_COLUMN, POLYGON_COLUMN) 同IN_POLYGON_JOIN,使用IN_POLYGON_JOIN_RANGE_LIST UDF关联空间数据表和polygon维度表,关联基于Polygon_RangeList。直接使用range list可以避免polygon到range list的转换。 UDF输入参数: 参数 类型 说明 GEO_HASH_INDEX_COLUMN Long 空间数据表的GeoHashIndex列。 POLYGON_COLUMN String Polygon表的rangelist列,数据为rangeList的字符串。例如,一个rangelist是RANGELIST (startGeoId1 endGeoId1, startGeoId2 endGeoId2, …) 使用示例: CREATE TABLE polygonTable( polygon string, poiType string, poiId String) STORED AS carbondata; insert into polygonTable select 'RANGELIST (526546455897309184 526546455897309284, 526549831217315840 526549831217315850, 532555655580483534 532555655580483584)','xyz','2'; select t1.* from geosot t1 inner join (select polygon,poiId from polygonTable where poitype='xyz') t2 on in_polygon_join_range_list(t1.mygeosot,t2.polygon); 空间索引工具类UDF GeoID转栅格行列号。 GeoIdToGridXy(geoId) UDF输入参数: 参数 类型 说明 geoId Long 根据GeoId计算栅格行列号。 UDF输出参数: 参数 类型 说明 gridArray Array[Int] 返回该geoid所包含的栅格行列号,以数组的方式返回,第一位为行,第二位为列。 使用示例: select longitude, latitude, mygeohash, GeoIdToGridXy(mygeohash) as GridXY from geoTable; 经纬度转GeoID。 LatLngToGeoId(latitude, longitude oriLatitude, gridSize) UDF输入参数: 参数 类型 说明 longitude Long 经度,注:转换后的整数类型。 latitude Long 纬度,注:转换后的整数类型。 oriLatitude Double 原点纬度,计算GeoId需要参数。 gridSize Int 栅格大小,计算GeoId需要参数。 UDF输出参数: 参数 类型 说明 geoId Long 通过编码获得一个表示经纬度的数。 使用示例: select longitude, latitude, mygeohash, LatLngToGeoId(latitude, longitude, 39.832277, 50) as geoId from geoTable; GeoID转经纬度。 GeoIdToLatLng(geoId, oriLatitude, gridSize) UDF输入参数: 参数 类型 说明 geoId Long 根据GeoId计算经纬度。 oriLatitude Double 原点纬度,计算经纬度需要参数。 gridSize Int 栅格大小,计算经纬度需要参数。 由于GeoId由栅格坐标生成,坐标为栅格中心点,则计算出的经纬度是栅格中心点经纬度,与生成该GeoId的经纬度可能有[0度~半个栅格度数]的误差。 UDF输出参数: 参数 类型 说明 latitudeAndLongitude Array[Double] 返回该geoid所表示的栅格的中心点的经纬度坐标,以数组的方式返回,第一位为latitude,第二位为longitude。 使用示例: select longitude, latitude, mygeohash, GeoIdToLatLng(mygeohash, 39.832277, 50) as LatitudeAndLongitude from geoTable; 计算金字塔模型向上汇聚一层的GeoID。 ToUpperLayerGeoId(geoId) UDF输入参数: 参数 类型 说明 geoId Long 根据输入GeoId计算金字塔模型上一层GeoId。 UDF输出参数: 参数 类型 说明 geoId Long 金字塔模型上一层GeoId。 使用示例: select longitude, latitude, mygeohash, ToUpperLayerGeoId(mygeohash) as upperLayerGeoId from geoTable; 输入polygon获得GeoID范围列表。 ToRangeList(polygon, oriLatitude, gridSize) UDF输入参数: 参数 类型 说明 polygon String 输入polygon字符串,用一组经纬度表示。 经纬度间用空格分隔,每对经纬度间用逗号分隔,首尾经纬度一致。 oriLatitude Double 原点纬度,计算GeoId需要参数。 gridSize Int 栅格大小,计算GeoId需要参数。 UDF输出参数: 参数 类型 说明 geoIdList Buffer[Array[Long]] 将polygon转换为一串geoid的范围列表。 使用示例: select ToRangeList('116.321011 40.123503, 116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503', 39.832277, 50) as rangeList from geoTable; 计算金字塔模型向上汇聚一层的longitude。 ToUpperLongitude (longitude, gridSize, oriLat) UDF输入参数: 参数 类型 说明 longitude Long 输入longitude,用一个长整型表示。 gridSize Int 栅格大小,计算longitude需要参数。 oriLatitude Double 原点纬度,计算longitude需要参数。 UDF输出参数: 参数 类型 说明 longitude Long 返回上一层的longitude。 使用示例: select ToUpperLongitude (-23575161504L, 50, 39.832277) as upperLongitude from geoTable; 计算金字塔模型向上汇聚一层的Latitude。 ToUpperLatitude(Latitude, gridSize, oriLat) UDF输入参数: 参数 类型 说明 latitude Long 输入latitude,用一个长整型表示。 gridSize Int 栅格大小,计算latitude需要参数。 oriLatitude Double 原点纬度,计算latitude需要参数。 UDF输出参数: 参数 类型 说明 Latitude Long 返回上一层的latitude。 使用示例: select ToUpperLatitude (-23575161504L, 50, 39.832277) as upperLatitude from geoTable; 经纬度转GeoSOT LatLngToGridCode(latitude, longitude, level) UDF输入参数: 参数 类型 说明 latitude Double 输入latitude。 longitude Double 输入longitude。 level Int 输入level,值区间[0-32]。 UDF输出参数: 参数 类型 说明 geoId Long 通过GeoSOT编码获得一个表示经纬度的数。 使用示例: select LatLngToGridCode(39.930753, 116.302895, 21) as geoId;
  • 空间索引介绍 空间数据包括多维点、线、矩形、立方体、多边形和其他几何对象。空间数据对象占据空间的某一区域,称为空间范围,通过其位置和边界描述。空间数据可以是点数据,也可以是区域数据。 点数据:一个点具有一个空间范围,仅通过其位置描述。它不占用空间,没有相关的边界。点数据由二维空间中的点的集合组成。点可以存储为一对经纬度。 区域数据:一个区域有空间范围,有位置和边界。位置可以看作是一个定点在区域内的位置,例如它的质心。在二维中,边界可以可视化为一条线(有限区域,闭环)。区域数据包含一系列区域。 目前仅限于支持点数据,存储点数据。 经纬度可以编码为唯一的GeoID。Geohash是Gustavo Niemeyer发明的公共域地理编码系统,它将地理位置编码为一串由字母和数字组成的短字符串。它是一种分层的空间数据结构,把空间细分为网格形状的桶,是被称为Z阶曲线和通常称为空间填充曲线的许多应用之一。 点在多维中的Z值是简单地通过交织其坐标值的二进制表示来计算的,如下图所示。使用Geohash创建GeoID时,数据按照GeoID排序,而不是按照经纬度排序,数据按照空间就近性排序存储。
  • 快速示例 create table IF NOT EXISTS carbonTable ( COLUMN1 BIGINT, LONGITUDE BIGINT, LATITUDE BIGINT, COLUMN2 BIGINT, COLUMN3 BIGINT ) STORED AS carbondata TBLPROPERTIES ('SPATIAL_INDEX.mygeohash.type'='geohash','SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude','SPATIAL_INDEX.mygeohash.originLatitude'='39.850713','SPATIAL_INDEX.mygeohash.gridSize'='50','SPATIAL_INDEX.mygeohash.minLongitude'='115.828503','SPATIAL_INDEX.mygeohash.maxLongitude'='720.000000','SPATIAL_INDEX.mygeohash.minLatitude'='39.850713','SPATIAL_INDEX.mygeohash.maxLatitude'='720.000000','SPATIAL_INDEX'='mygeohash','SPATIAL_INDEX.mygeohash.conversionRatio'='1000000','SORT_COLUMNS'='column1,column2,column3,latitude,longitude');
  • 建表 GeoHash编码: create table IF NOT EXISTS carbonTable ( ... `LONGITUDE` BIGINT, `LATITUDE` BIGINT, ... ) STORED AS carbondata TBLPROPERTIES ('SPATIAL_INDEX.mygeohash.type'='geohash','SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude','SPATIAL_INDEX.mygeohash.originLatitude'='xx.xxxxxx','SPATIAL_INDEX.mygeohash.gridSize'='xx','SPATIAL_INDEX.mygeohash.minLongitude'='xxx.xxxxxx','SPATIAL_INDEX.mygeohash.maxLongitude'='xxx.xxxxxx','SPATIAL_INDEX.mygeohash.minLatitude'='xx.xxxxxx','SPATIAL_INDEX.mygeohash.maxLatitude'='xxx.xxxxxx','SPATIAL_INDEX'='mygeohash','SPATIAL_INDEX.mygeohash.conversionRatio'='1000000','SORT_COLUMNS'='column1,column2,column3,latitude,longitude'); SPATIAL_INDEX:自定义索引处理器。此处理程序允许用户从表结构列集合中创建新的列。新创建的列名与处理程序名相同。处理程序的type和sourcecolumns属性是必需的属性。目前,type属性只支持“geohash”。Carbon提供一个简单的默认实现类。用户可以通过扩展默认实现类来挂载geohash的自定义实现类。该默认处理程序还需提供以下的表属性: SPATIAL_INDEX.xxx.originLatitude:Double类型,坐标原点纬度 SPATIAL_INDEX.xxx.gridSize:Int类型,栅格长度(米) SPATIAL_INDEX.xxx.minLongitude:Double类型,最小经度 SPATIAL_INDEX.xxx.maxLongitude:Double类型,最大经度 SPATIAL_INDEX.xxx.minLatitude:Double类型,最小纬度 SPATIAL_INDEX.xxx.maxLatitude:Double类型,最大纬度 SPATIAL_INDEX.xxx.conversionRatio:Int类型,将经纬度小数值转换为整型值 用户可以按照上述格式为处理程序添加自己的表属性,并在自定义实现类中访问它们。originLatitude,gridSize及conversionRatio是必选参数,其余属性在Carbon中都是可选的。可以使用“SPATIAL_INDEX.xxx.class”属性指定它们的实现类。 默认实现类可以为每一行的sourcecolumns生成handler列值,并且支持基于sourcecolumns的过滤条件查询。生成的handler列对用户不可见。除SORT_COLUMNS表属性外,任何DDL命令和属性都不允许包含handler列。 生成的handler列默认被视为排序列。如果SORT_COLUMNS不包含任何sourcecolumns,则将handler列追加到现有的SORT_COLUMNS最后。如果在SORT_COLUMNS中已经指定了该handler列,则它在SORT_COLUMNS的顺序将保持不变。 如果SORT_COLUMNS包含任意的sourcecolumns,但是没有包含handler列,则handler列将自动插入到SORT_COLUMNS中的sourcecolumns之前。 如果SORT_COLUMNS需要包含任意的sourcecolumns,那么需要保证handler列出现在sourcecolumns之前,这样handler列才能在排序中生效。
  • 导入数据 GeoHash默认实现类扩展自定义索引抽象类。如果没有配置handler属性为自定义的实现类,则使用默认的实现类。用户可以通过扩展默认实现类来挂载geohash的自定义实现类。自定义索引抽象类方法包括: Init方法,用来提取、验证和存储handler属性。在失败时抛出异常,并显示错误信息。 Generate方法,用来生成索引。它为每行数据生成一个索引数据。 Query方法,用来对给定输入生成索引值范围列表。 导入命令同普通Carbon表: LOAD DATA inpath '/tmp/geosotdata.csv' INTO TABLE geosot OPTIONS ('DELIMITER'= ','); LOAD DATA inpath '/tmp/geosotdata2.csv' INTO TABLE geosot OPTIONS ('DELIMITER'= ','); geosotdata.csv和geosotdata2.csv表请参考准备数据。
  • 准备数据 准备数据文件1:geosotdata.csv timevalue,longitude,latitude 1575428400000,116.285807,40.084087 1575428400000,116.372142,40.129503 1575428400000,116.187332,39.979316 1575428400000,116.337069,39.951887 1575428400000,116.359102,40.154684 1575428400000,116.736367,39.970323 1575428400000,116.720179,40.009893 1575428400000,116.346961,40.13355 1575428400000,116.302895,39.930753 1575428400000,116.288955,39.999101 1575428400000,116.17609,40.129953 1575428400000,116.725575,39.981115 1575428400000,116.266922,40.179415 1575428400000,116.353706,40.156483 1575428400000,116.362699,39.942444 1575428400000,116.325378,39.963129 准备数据文件2:geosotdata2.csv timevalue,longitude,latitude 1575428400000,120.17708,30.326882 1575428400000,120.180685,30.326327 1575428400000,120.184976,30.327105 1575428400000,120.189311,30.327549 1575428400000,120.19446,30.329698 1575428400000,120.186965,30.329133 1575428400000,120.177481,30.328911 1575428400000,120.169713,30.325614 1575428400000,120.164563,30.322243 1575428400000,120.171558,30.319613 1575428400000,120.176365,30.320687 1575428400000,120.179669,30.323688 1575428400000,120.181001,30.320761 1575428400000,120.187094,30.32354 1575428400000,120.193574,30.323651 1575428400000,120.186192,30.320132 1575428400000,120.190055,30.317464 1575428400000,120.195376,30.318094 1575428400000,120.160786,30.317094 1575428400000,120.168211,30.318057 1575428400000,120.173618,30.316612 1575428400000,120.181001,30.317316 1575428400000,120.185162,30.315908 1575428400000,120.192415,30.315871 1575428400000,120.161902,30.325614 1575428400000,120.164306,30.328096 1575428400000,120.197093,30.325985 1575428400000,120.19602,30.321651 1575428400000,120.198638,30.32354 1575428400000,120.165421,30.314834
  • 场景介绍 为了快速对用户数据创建索引,HBase提供了可通过MapReduce功能创建索引的TableIndexer工具,该工具可实现添加,构建和删除索引。具体使用场景如下: 在用户的表中预先存在大量数据的情况下,可能希望在某个列上添加索引。但是,使用addIndicesWithData()API添加索引会生成与相关用户数据对应的索引数据,这将花费大量时间。另一方面,使用addIndices()创建的索引不会构建与用户数据对应的索引数据。因此,为了为这样的用户数据建立索引数据,用户可以使用TableIndexer工具来完成索引的构建。 如果索引数据与用户数据不一致,该工具可用于重新构建索引数据。 如果用户暂时禁用索引并且在此期间,向禁用的索引列执行新的put操作,然后直接将索引从禁用状态启用可能会导致索引数据与用户数据不一致。因此,用户必须注意在再次使用之前重新构建所有索引数据。 对于大量现有的索引数据,用户可以使用TableIndexer工具将索引数据从用户表中完全删除。 对于未建立索引的用户表,该工具允许用户同时添加和构建索引。
  • 公用参数介绍 表2 公用参数介绍 分类 参数 说明 连接数据库 --connect 连接关系型数据库的url --connection-manager 指定连接管理类 --driver jdbc 连接驱动包 --help 帮助信息 --password 连接数据库密码 --username 连接数据库的用户名 --verbose 在控制台打印详细信息 import参数 --fields-terminated-by 设定字段分隔符,和Hive表或hdfs文件保持一致 --lines-terminated-by 设定行分隔符,和hive表或hdfs文件保持一致 --mysql-delimiters MySQL默认分隔符设置 export参数 --input-fields-terminated-by 字段分隔符 --input-lines-terminated-by 行分隔符 hive 参数 --hive-delims-replacement 用自定义的字符替换数据中的\r\n等字符 --hive-drop-import-delims 在导入数据到hive时,去掉\r\n等字符 --map-column-hive 生成hive表时可以更改字段的数据类型 --hive-partition-key 创建分区 --hive-partition-value 导入数据库指定分区 --hive-home 指定hive安装目录 --hive-import 表示操作是从关系型数据库导入到hive中 --hive-overwrite 覆盖hive已有数据 --create-hive-table 创建Hive表,默认false,如果目标表不存在,则会创建目标表 --hive-table 指定hive表 --table 关系型数据库表名 --columns 指定需要导入的关系型数据表字段 --query 指定查询语句,将查询结果导入 hcatalog参数 --hcatalog-database 指定hive库,使用hcatalog方式导入hive库 --hcatalog-table 指定hive表,使用hcatalog方式导入hive表 其他参数 -m或--num-mappers 后跟数字,表示sqoop任务的分片数 --split-by 按照某一字段进行分片,配合-m --target-dir 指定hdfs临时目录 --null-string string 类型为null时替换字符串 --null-non-string 非string类型为null时替换字符串 --check-column 增量判断的字段 --incremental append或lastmodified 增量导入参数 append:追加,比如对大于last-value指定的值之后的记录进行追加导入。 lastmodified:最后的修改时间,追加last-value指定的日期之后的记录。 --last-value 指定一个值,用于标记增量导入 --input-null-string 替换null字符串,如果没有指定,则字符串null将被使用。 --input-null-non-string 替换非String的null字符串,如果没有指定,则字符串null将被使用。
  • Sqoop常用命令介绍 表1 Sqoop常用命令介绍 命令 说明 import 数据导入到集群 export 集群数据导出 codegen 获取数据库中某张表数据生成Java并打包jar create-hive-table 创建Hive表 eval 执行sql并查看结果 import-all-tables 导入某个数据库下的所有表到HDFS中 job 生成一个sqoop任务 list-databases 列举数据库名 list-tables 列举表名 merge 将HDFS不同目录下的数据合在一起并存放到指定目录 metastore 启动元数据库,记录sqoop job的元数据 help 打印帮助信息 version 打印版本信息
  • 示例 CREATE TABLE IF NOT EXISTS productdb.productSalesTable ( productNumber Int, productName String, storeCity String, storeProvince String, productCategory String, productBatch String, saleQuantity Int, revenue Int) STORED AS carbondata TBLPROPERTIES ( 'table_blocksize'='128', 'SORT_COLUMNS'='productBatch, productName')
  • 参数描述 表1 CREATE TABLE参数描述 参数 描述 db_name Database名称,由字母、数字和下划线(_)组成。 col_name data_type 以逗号分隔的带数据类型的列表。列名由字母、数字和下划线(_)组成。 说明: 在CarbonData表创建过程中,不允许使用tupleId,PositionId和PositionReference为列命名,因为具有这些名称的列由二级索引命令在内部使用。 table_name Database中的表名,由字母、数字和下划线(_)组成。 STORED AS 参数carbondata,定义和创建CarbonData table。 TBLPROPERTIES CarbonData table属性列表。
  • 使用场景 通过指定列创建表 CREATE TABLE命令与Hive DDL相同。CarbonData的额外配置将作为表格属性给出。 CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type , ...)] STORED AS carbondata [TBLPROPERTIES (property_name=property_value, ...)];
  • 注意事项 以下是表格属性的使用。 Block大小 单个表的数据文件block大小可以通过TBLPROPERTIES进行定义,系统会选择数据文件实际大小和设置的blocksize大小中的较大值,作为该数据文件在HDFS上存储的实际blocksize大小。单位为MB,默认值为1024MB,范围为1MB~2048MB。若设置值不在[1, 2048]之间,系统将会报错。 一旦block大小达到配置值,写入程序将启动新的CarbonData数据的block。数据以页面大小(32000个记录)的倍数写入,因此边界在字节级别上不严格。 如果新页面跨越配置block的边界,则不会将其写入当前block,而是写入新的block。 TBLPROPERTIES('table_blocksize'='128') 当在CarbonData表中配置了较小的blocksize,而加载的数据生成的数据文件比较大时,在HDFS上显示的blocksize会与设置值不同。这是因为,对于每一个本地block文件的首次写入,即使待写入数据的大小大于blocksize的配置值,也直接将待写入数据写入此block。所以,HDFS上blocksize的实际值为待写入数据大小与blocksize配置值中的较大值。 当CarbonData表中的数据文件block.num小于任务并行度(parellelism)时,CarbonData数据文件的block会被切为新的block,使得blocks.num大于parellelism,这样所有core均可被使用。这种优化称为block distribution。 SORT_SCOPE:指定表创建时的排序范围。如下为四种排序范围。 GLOBAL_SORT:它提高了查询性能,特别是点查询。TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') LOCAL_SORT:数据会本地排序(任务级别排序)。 LOCAL_SORT与分区表的DDL操作存在冲突,不能同时使用,且对分区表性能提升不明显,不建议在分区表上启用该特性。 NO_SORT:默认排序。它将以不排序的方式加载数据,这将显着提升加载性能。 SORT_COLUMNS 此表属性指定排序列的顺序。 TBLPROPERTIES('SORT_COLUMNS'='column1, column3') 如果未指定此属性,则默认情况下,没有列会被排序。 如果指定了此属性,但具有空参数,则表将被加载而不进行排序。例如,('SORT_COLUMNS'='')。 SORT_COLUMNS将接受string,date,timestamp,short,int,long,byte和boolean数据类型。
  • 基于索引查询数据 在具有索引的用户表中,可以使用Filter来查询数据。对于创建单索引和组合索引的用户表,使用过滤器查询的结果与没有使用索引的表相同,但数据查询性能高于没有使用索引的表。 索引的使用规则如下: 对于为一个或多个列创建单个索引的情况: 当将此列用于AND或OR查询筛选时,使用索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2)。 当在查询中使用“索引列和非索引列”进行过滤时,此索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1)。 当在查询中使用“索引列或非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1)。 对于为多个列创建组合索引的情况: 当用于查询的列是组合索引的全部或部分列并且与组合索引具有相同的顺序时,使用索引会提高查询性能。 例如,为C1,C2和C3创建组合索引。 该索引在以下情况下生效: Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2) FILTER_CONDITION(IndexCol1) 该索引在下列情况下不生效: Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol3) FILTER_CONDITION(IndexCol2) FILTER_CONDITION(IndexCol3) 当在查询中使用“索引列和非索引列”进行过滤时,使用索引可提高查询性能。 例如: Filter_Condition(IndexCol1)AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1) 当在查询中使用“索引列或非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如: Filter_Condition(IndexCol1)OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2))OR(Filter_Condition(NonIndexCol1)) 当多个列用于查询时,只能为组合索引中的最后一列指定值范围,而其他列只能设置为指定值。 例如,为C1,C2和C3创建组合索引。在范围查询中,只能为C3设置数值范围,过滤条件为“C1 = XXX,C2 = XXX,C3 = 数值范围”。
  • 相关接口 使用HIndex的API都在类org.apache.hadoop.hbase.hindex.client.HIndexAdmin中,相关接口介绍如下: 操作 接口 描述 注意事项 添加索引 addIndices() 将索引添加到没有数据的表中。调用此接口会将用户指定的索引添加到表中,但会跳过生成索引数据。因此,在此操作之后,索引不能用于scan/filter操作。它的使用场景为用户想要在具有大量预先存在用户数据的表上批量添加索引,其具体操作为使用诸如TableIndexer工具之类的外部工具来构建索引数据。 索引一旦添加则不能修改。若要修改,则需先删除旧的索引然后重新创建。 用户应注意不要在具有不同索引名称的相同列上创建两个索引。如果这样做,将会导致存储和处理的浪费。 索引不能添加到系统表中。 向索引列put数据时不支持append和increment操作。 如果客户端出现任何故障,除非发生DoNotRetryIOException,否则用户应该重试。 索引列族根据可用性按顺序从以下条件中选择: 默认索引列族“d”或者如果设置了属性“hindex.default.family.name”的值,则以该值为准。 符号#,@,$或% #0,@ 0,$ 0,%0,#1,@ 1 ...上至#255,@ 255,$ 255,%255 throw Exception 可以通过HIndex TableIndexer工具添加索引而无需建立索引数据。 addIndicesWithData() 将索引添加到有数据的表中。此方法将用户指定的索引添加到表中,并会对已经存在的用户数据创建对应的索引数据,也可先调用该方法生成索引再在存入用户数据的同时生成索引数据。在此操作之后,这些索引立即可用于scan/filter操作。 删除索引 dropIndices() 仅删除索引。该API从表中删除用户指定的索引,但跳过相应的索引数据。在此操作之后,索引不能用于scan/filter操作。集群在major compaction期间会自动删除旧的索引数据。 此API使用场景为表中包含大量索引数据且dropIndicesWithData()不可行。另外,用户也可以通过TableIndexer工具删除索引以及索引数据。 在索引的状态为ACTIVE,INACTIVE和DROPPING时,允许禁用索引的操作。 对于使用dropIndices()删除索引的操作,用户必须确保在将索引添加到具有相同索引名的表之前,相应的索引数据已被删除(即major compaction已完成)。 用户删除相应的索引会删除: 一个带有索引的列族。 组合索引所有列族中的任一个列族。 索引可以通过HIndex TableIndexer工具与索引数据一起删除。 dropIndicesWithData() 删除索引数据。此API删除用户指定的索引,并删除用户表中与这些索引对应的所有索引数据。在此操作之后,删除的索引完全从表中删除,不再可用于scan/filter操作。 启用/禁用索引 disableIndices() 该API禁用所有用户指定的索引,使其不再可用于scan/filter操作。 在索引的状态为ACTIVE,INACTIVE和BUILDING时允许启用索引的操作。 在索引的状态为ACTIVE和INACTIVE时允许禁用索引操作。 在禁用索引之前,用户必须确保索引数据与用户数据一致。如果在索引处于禁用状态期间没有在表中添加新的数据,索引数据与用户数据将保持一致。 启用索引时,可以通过使用TableIndexer工具构建索引来保证数据一致性。 enableIndices() 该API启用所有用户指定的索引,使其可用于scan/filter操作。 查看已创建的索引 listIndices() 该API可用于列出给定表中的所有索引。 无
  • 快速配置常用参数 其他参数在安装集群时已进行了适配,以下参数需要根据使用场景进行调整。以下参数除特别指出外,一般在Spark2x客户端的“spark-defaults.conf”文件中配置。 表1 快速配置常用参数 配置项 说明 默认值 spark.sql.parquet.compression.codec 对于非分区parquet表,设置其存储文件的压缩格式。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 snappy spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。目前仅在YARN模式下有效。 JDBCServer默认值为true,client默认值为false。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512m,2g)。 4G spark.sql.autoBroadcastJoinThreshold 当进行join操作时,配置广播的最大值。 当SQL语句中涉及的表中相应字段的大小小于该值时,进行广播。 配置为-1时,将不进行广播。 10485760 spark.yarn.queue JDBCServer服务所在的Yarn队列。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 default spark.driver.memory 大集群下推荐配置32~64g驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512m, 2g)。 4G spark.yarn.security.credentials.hbase.enabled 是否打开获取HBase token的功能。如果需要Spark-on-HBase功能,并且配置了安全集群,参数值设置为“true”。否则设置为“false”。 false spark.serializer 用于串行化将通过网络发送或需要缓存的对象的类以序列化形式展现。 Java序列化的默认值适用于任何Serializable Java对象,但运行速度相当慢,所以建议使用org.apache.spark.serializer.KryoSerializer并配置Kryo序列化。可以是org.apache.spark.serializer.Serializer的任何子类。 org.apache.spark.serializer.JavaSerializer spark.executor.cores 每个执行者使用的内核个数。 在独立模式和Mesos粗粒度模式下设置此参数。当有足够多的内核时,允许应用程序在同样的worker上执行多个执行程序;否则,在每个worker上,每个应用程序只能运行一个执行程序。 1 spark.shuffle.service.enabled NodeManager中一个长期运行的辅助服务,用于提升Shuffle计算性能。 fasle spark.sql.adaptive.enabled 是否开启自适应执行框架。 false spark.executor.memoryOverhead 每个执行器要分配的堆内存量(单位为兆字节)。 这是占用虚拟机开销的内存,类似于内部字符串,其他内置开销等等。会随着执行器大小(通常为6-10%)而增长。 1GB spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false
  • 问题 Spark应用执行过程中,当driver连接RM失败时,会报下面的错误,且较长时间不退出。 16/04/23 15:31:44 INFO RetryInvocationHandler: Exception while invoking getApplicationReport of class ApplicationClientProtocolPBClientImpl over 37 after 1 fail over attempts. Trying to fail over after sleeping for 44160ms. java.net.ConnectException: Call From vm1/192.168.39.30 to vm1:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
  • 回答 在Spark中有个定期线程,通过连接RM监测AM的状态。由于连接RM超时,就会报上面的错误,且一直重试。RM中对重试次数有限制,默认是30次,每次间隔默认为30秒左右,每次重试时都会报上面的错误。超过次数后,driver才会退出。 RM中关于重试相关的配置项如表1所示。 表1 参数说明 参数 描述 默认值 yarn.resourcemanager.connect.max-wait.ms 连接RM的等待时间最大值。 900000 yarn.resourcemanager.connect.retry-interval.ms 重试连接RM的时间频率。 30000 重试次数=yarn.resourcemanager.connect.max-wait.ms/yarn.resourcemanager.connect.retry-interval.ms,即重试次数=连接RM的等待时间最大值/重试连接RM的时间频率。 在Spark客户端机器中,通过修改“conf/yarn-site.xml”文件,添加并配置“yarn.resourcemanager.connect.max-wait.ms”和“yarn.resourcemanager.connect.retry-interval.ms”,这样可以更改重试次数,Spark应用可以提早退出。
  • 回答 在单个父目录中创建大量的znode后,当客户端尝试在单个请求中获取所有子节点时,服务端将无法返回,因为结果将超出可存储在znode上的数据的最大长度。 为了避免这个问题,应该根据客户端应用的实际情况将“jute.maxbuffer”参数配置为一个更高的值。 “jute.maxbuffer”只能设置为Java系统属性,且没有zookeeper前缀。如果要将“jute.maxbuffer”的值设为X,在ZooKeeper客户端或服务端启动时传入以下系统属性:-Djute.maxbuffer=X。 例如,将参数值设置为4MB:-Djute.maxbuffer=0x400000。 表1 配置参数 参数 描述 默认值 jute.maxbuffer 指定可以存储在znode中的数据的最大长度。单位是Byte。默认值为0xfffff,即低于1MB。 说明: 如果更改此选项,则必须在所有服务器和客户端上设置该系统属性,否则将出现问题。 0xfffff
  • 操作步骤 使用Ranger管理员用户rangeradmin登录Ranger管理页面,具体操作可参考登录Ranger管理界面。 在首页中单击“KAFKA”区域的组件插件名称如“Kafka”。 单击“Add New Policy”,添加Kafka权限控制策略。 根据业务需求配置相关参数。 表1 Kafka权限参数 参数名称 描述 Policy Type Access。 Policy Conditions IP过滤策略,可自定义,配置当前策略适用的主机节点,可填写一个或多个IP或IP段,并且IP填写支持“*”通配符,例如:192.168.1.10,192.168.1.20或者192.168.1.*。 Policy Name 策略名称,可自定义,不能与本服务内其他策略名称重复。 Policy Label 为当前策略指定一个标签,您可以根据这些标签搜索报告和筛选策略。 topic 配置当前策略适用的topic名,可以填写多个值。这里支持通配符,例如:test、test*、*。 “Include”策略适用于当前输入的对象,“Exclude”表示策略适用于除去当前输入内容之外的其他对象。 Description 策略描述信息。 Audit Logging 是否审计此策略。 Allow Conditions 策略允许条件,配置本策略内允许的权限及例外,例外条件优先级高于正常条件。 在“Select Role”、“Select Group”、“Select User”列选择已创建好的需要授予权限的Role、用户组或用户。 单击“Add Conditions”,添加策略适用的IP地址范围,单击“Add Permissions”,添加对应权限。 Publish:生产权限。 Consume:消费权限。 Describe:查询权限。 Create: 创建主题权限。 Delete: 删除主题权限。 Describe Configs:查询配置权限。 Alter:修改topic的partition数量的权限。 Alter Configs:修改配置权限。 Select/Deselect All:全选/取消全选。 如需添加多条权限控制规则,可单击按钮添加。 如需当前条件中的用户或用户组管理本条策略,可勾选“Delegate Admin”,这些用户将成为受委托的管理员。被委托的管理员可以更新、删除本策略,它还可以基于原始策略创建子策略。 Deny Conditions 策略拒绝条件,配置本策略内拒绝的权限及例外,配置方法与“Allow Conditions”类型,拒绝条件的优先级高于“Allow Conditions”中配置的允许条件。 例如为用户“testuser”添加“test”主题的生产权限,配置如下: 图1 Kafka权限参数 表2 设置权限 任务场景 角色授权操作 设置Kafka管理员权限 在首页中单击“KAFKA”区域的组件插件名称,例如“Kafka”。 选择“Policy Name”为“all - topic”的策略,单击按钮编辑策略。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Select/Deselect All”。 设置用户对Topic的创建权限 在“topic”配置Topic名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Create”。 说明: 目前Kafka内核支持"--zookeeper"和"--bootstrap-server"两种方式创建Topic,社区将会在后续的版本中删掉对"--zookeeper"的支持,所以建议用户使用"--bootstrap-server"的方式创建Topic。 注意:目前Kafka只支持"--bootstrap-server"方式创建Topic行为的鉴权,不支持对"--zookeeper"方式的鉴权 设置用户对Topic的删除权限 在“topic”配置Topic名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Delete”。 说明: 目前Kafka内核支持"--zookeeper"和"--bootstrap-server"两种方式删除Topic,社区将会在后续的版本中删掉对"--zookeeper"的支持,所以建议用户使用"--bootstrap-server"的方式删除Topic。 注意:目前Kafka只支持对"--bootstrap-server"方式删除Topic行为的鉴权,不支持对"--zookeeper"方式的鉴权 设置用户对Topic的查询权限 在“topic”配置Topic名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Describe”和“Describe Configs”。 说明: 目前Kafka内核支持"--zookeeper"和"--bootstrap-server"两种方式查询Topic,社区将会在后续的版本中删掉对"--zookeeper"的支持,所以建议用户使用"--bootstrap-server"的方式查询Topic。 注意:目前Kafka只支持对"--bootstrap-server"方式查询Topic行为的鉴权,不支持对"--zookeeper"方式的鉴权 设置用户对Topic的生产权限 在“topic”配置Topic名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Publish”。 设置用户对Topic的消费权限 在“topic”配置Topic名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Consume”。 说明: 因为消费Topic时,涉及到Offset的管理操作,必须同时开启ConsumerGroup的“Consume”权限,详见“设置用户对ConsumerGroup Offsets 的提交权限” 设置用户对Topic的扩容权限(增加分区) 在“topic”配置Topic名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Alter”。 设置用户对Topic的配置修改权限 当前Kafka内核暂不支持基于“--bootstrap-server”的Topic参数修改行为,故当前Ranger不支持对此行为的鉴权操作。 设置用户对Cluster的所有管理权限 在“cluster”右侧输入并选择集群名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Kafka Admin”。 设置用户对Cluster的创建权限 在首页中单击“KAFKA”区域的组件插件名称,例如“Kafka”。 选择“Policy Name”为“all - cluster”的策略,单击按钮编辑策略。 在“cluster”右侧输入并选择集群名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Create”。 说明: 对于Cluster的Create操作鉴权主要涉及以下两个场景: 集群开启了“auto.create.topics.enable”参数后,客户端向服务的还未创建的Topic发送数据的场景,此时会判断用户是否有集群的Create权限 对于用户创建大量Topic的场景,如果授予用户Cluster Create权限,那么该用户可以在集群内部创建任意Topic 设置用户对Cluster的配置修改权限 在“cluster”右侧输入并选择集群名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Alter Configs”。 说明: 此处的配置修改权限,指的是Broker、Broker Logger的配置权限。 当授予用户配置修改权限后,即使不授予配置查询权限也可查询配置详情(配置修改权限高于且包含配置查询权限)。 设置用户对Cluster的配置查询权限 在“cluster”右侧输入并选择集群名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Describe”和 “Describe Configs”。 说明: 此处查询指的是查询集群内的Broker、Broker Logger信息。该查询不涉及Topic。 设置用户对Cluster的Idempotent Write权限 在“cluster”右侧输入并选择集群名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Idempotent Write”。 说明: 此权限会对用户客户端的Idempotent Produce行为进行鉴权。 设置用户对Cluster的分区迁移权限管理 在“cluster”右侧输入并选择集群名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Alter”。 说明: Cluster的Alter权限可以对以下三种场景进行权限控制: Partition Reassign场景下,迁移副本的存储目录。 集群里各分区内部leader选举。 Acl管理(添加或删除)。 其中1和2都是集群内部Controller与Broker间、Broker与Broker间的操作,创建集群时,默认授予内置kafka用户此权限,普通用户授予此权限没有意义。 3涉及Acl的管理,Acl设计的就是用于鉴权,由于目前kafka鉴权已全部托管给Ranger,所以这个场景也基本不涉及(配置后亦不生效)。 设置用户对Cluster的Cluster Action权限 在“cluster”右侧输入并选择集群名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Cluster Action”。 说明: 此权限主要对集群内部副本主从同步、节点间通信进行控制,在集群创建时已经授权给内置kakfa用户,普通用户授予此权限没有意义。 设置用户对TransactionalId的权限 在首页中单击“KAFKA”区域的组件插件名称,例如“Kafka”。 选择“Policy Name”为“all - transactionalid”的策略,单击按钮编辑策略。 在“transactionalid”配置事务ID。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Publish”和 "Describe"。 说明: “Publish”权限主要对用户开启了事务特性的客户端请求进行鉴权,例如事务开启、结束、提交offset、事务性数据生产等行为。 “Describe”权限主要对于开启事务特性的客户端与Coordinator的请求进行鉴权。 建议在开启事务特性的场景下,给用户同时授予“Publish”和“Describe”权限。 设置用户对DelegationToken的权限 在首页中单击“KAFKA”区域的组件插件名称,例如“Kafka”。 选择“Policy Name”为“all - delegationtoken”的策略,单击按钮编辑策略。 在“delegationtoken”配置delegationtoken。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“ Describe”。 说明: 当前Ranger对DelegationToken的鉴权控制仅限于对查询的权限控制,不支持对DelegationToken的create、renew、expire操作的权限控制。 设置用户对ConsumerGroup Offsets 的查询权限 在首页中单击“KAFKA”区域的组件插件名称,例如“Kafka”。 选择“Policy Name”为“all - consumergroup”的策略,单击按钮编辑策略。 在“consumergroup”配置需要管理的consumergroup。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Describe”。 设置用户对ConsumerGroup Offsets 的提交权限 在首页中单击“KAFKA”区域的组件插件名称,例如“Kafka”。 选择“Policy Name”为“all - consumergroup”的策略,单击按钮编辑策略。 在“consumergroup”配置需要管理的consumergroup。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Consume”。 说明: 当给用户授予了ConsumerGroup的“Consume”权限后,用户会同时被授予“Describe”权限。 设置用户对ConsumerGroup Offsets 的删除权限 在首页中单击“KAFKA”区域的组件插件名称,例如“Kafka”。 选择“Policy Name”为“all - consumergroup”的策略,单击按钮编辑策略。 在“consumergroup”配置需要管理的consumergroup。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Delete”。 说明: 当给用户授予了ConsumerGroup的“Delete”权限后,用户会同时被授予“Describe”权限。 (可选)添加策略有效期。在页面右上角单击“Add Validity period”,设置“Start Time”和“End Time”,选择“Time Zone”。单击“Save”保存。如需添加多条策略有效期,可单击按钮添加。如需删除策略有效期,可单击按钮删除。 单击“Add”,在策略列表可查看策略的基本信息。等待策略生效后,验证相关权限是否正常。 如需禁用某条策略,可单击按钮编辑策略,设置策略开关为“Disabled”。 如果不再使用策略,可单击按钮删除策略。
  • 解决办法 排查启动的MapReduce任务是否对应的HDFS文件个数很多,如果很多,减少文件数量,提前先合并小文件或者尝试使用combineInputFormat来减少任务读取的文件数量。 增大hadoop命令执行时的内存,该内存在客户端中设置,修改“客户端安装目录/HDFS/component_env”文件中“CLIENT_GC_OPTS”的“-Xmx”参数,将该参数的默认值改大,比如改为512m。然后执行source component_env命令,使修改的参数生效。
  • 原因分析 HBase客户端中默认日志打印设置为“INFO,console”,所以在使用期间会有INFO日志输出到控制台,影响HBase shell窗口的显示。 HBase客户端命令繁多,例如:hbase shell、hbase hbck、hbase org.apache.hadoop.hbase.mapreduce.RowCounter等,且后续还会增加。部分命令的输出为INFO打印,如果直接把INFO关闭会导致部分命令输出结果丢失。例如:RowCounter输出结果为INFO类型:
  • 处理步骤 使用root用户登录安装HBase客户端的节点。 在“HBase客户端安装目录/HBase/component_env”文件中添加如下信息: export HBASE_ROOT_LOGGER=INFO,RFA 把日志输出到日志文件中,后期如果使用hbase org.apache.hadoop.hbase.mapreduce.RowCounter等命令,执行结果请在日志文件“HBase客户端安装目录/HBase/hbase/logs/hbase.log”中查看。 切换到HBase客户端安装目录,执行以下命令使配置生效。 cd HBase客户端安装目录 source HBase/component_env
共100000条