华为云用户手册

  • 示例 create table productdb.productSalesTable(id int,price int,productName string,city string) stored as carbondata; CREATE INDEX productNameIndexTable on table productdb.productSalesTable (productName,city) as 'carbondata' ; 上述示例将创建名为“productdb.productNameIndexTable”的二级表并加载所提供列的索引信息。
  • 问题 Hive同步数据时报错: Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. The following columns have types incompatible with the existing columns in their respective positions : __col1,__col2
  • 回答 原因: Hudi表数据含有Decimal类型数据。 初始入库BULK_INSET方式会使用Spark内部parquet文件的写入类进行写入,Spark对不同精度的Decimal类型处理是不同的。 UPSERT操作时,Hudi使用Avro兼容的parquet文件写入类进行写入,这个和Spark的写入方式是不兼容的。 解决方案: 执行BULK_INSERT时指定设置“hoodie.datasource.write.row.writer.enable = false”,使hoodie采用Avro兼容的parquet文件写入类进行写入。
  • 操作场景 Broadcast(广播)可以把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。 大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
  • 注意事项 在执行此命令之前,应将旧表的表结构定义schema和数据复制到新数据库位置。 对于旧版本仓库,源集群和目的集群的时区应该相同。 新的数据库和旧数据库的名字应该相同。 执行命令前,旧表的表结构定义schema和数据应该复制到新的数据库位置。 如果表是聚合表,则应将所有聚合表复制到新的数据库位置。 如果旧集群使用HIVE元数据库来存储表结构,则刷新将不起作用,因为文件系统中不存在表结构定义schema文件。
  • 操作步骤 一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。 对Streaming调优,就必须使该三个部件的性能都更优化。 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点: 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。 详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html。 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API: KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。 ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。 DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。 从实现上来看,DirectKafka的性能更优,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html。 处理器调优 Spark Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Spark Streaming之中,例如: 数据序列化 配置内存 设置并行度 使用External Shuffle Service提升性能 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Spark Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Spark Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。
  • 回答 在这种场景下,CarbonData会给每个节点分配一个INSERT INTO或LOAD DATA任务。如果Executor不是不同的节点分配的,CarbonData将会启动较少的task。 解决措施: 您可以适当增大Executor内存和Executor核数,以便YARN可以在每个节点上启动一个Executor。具体的配置方法如下: 配置Executor核数。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 示例 alter table h0 add columns(ext0 string); alter table h0 add columns(new_col int not null comment 'add new column' after col1); alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);
  • 配置描述 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 表1 参数说明 参数 说明 默认值 spark.executor.memoryOverhead 用于指定每个executor的堆外内存大小(MB),增大该参数值,可以防止物理内存超限。该值是通过max(384,executor-memory*0.1)计算所得,最小值为384。 1024
  • 使用Payload Spark建表时指定Payload create table hudi_test(id int, comb int, price string, name string, par string) using hudi options( primaryKey = "id", preCombineField = "comb", payloadClass="org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") partitioned by (par); Datasource方式写入时指定Payload data.write.format("hudi"). option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL). option("hoodie.datasource.write.precombine.field", "comb"). option("hoodie.datasource.write.recordkey.field", "id"). option("hoodie.datasource.write.partitionpath.field", "par"). option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.DefaultHoodieRecordPayload"). option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator"). option("hoodie.datasource.write.operation", "upsert"). option("hoodie.datasource.hive_sync.enable", "true"). option("hoodie.datasource.hive_sync.partition_fields", "par"). option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). option("hoodie.datasource.hive_sync.table", "hudi_test"). option("hoodie.datasource.hive_sync.use_jdbc", "false"). option("hoodie.upsert.shuffle.parallelism", 4). option("hoodie.datasource.write.hive_style_partitioning", "true"). option("hoodie.table.name", "hudi_test").mode(Append).save(s"/tmp/hudi_test")
  • 常用Payload DefaultHoodieRecordPayload Hudi中默认使用DefaultHoodieRecordPayload,该Payload通过比较增量数据与存量数据的preCombineField字段值的大小来决定同主键的存量数据是否能被同主键的增量数据更新。在同主键的增量数据的preCombineField字段值绝对大于同主键的存量数据的preCombineField字段值时,同主键的增量数据将会被更新。 OverwriteWithLatestAvroPayload 该Payload保证同主键的增量数据永远都会更新至同主键的存量数据中。 PartialUpdateAvroPayload 该Payload继承了OverwriteNonDefaultsWithLatestAvroPayload,它可以保证在任何场景下增量数据中的null值不会覆盖存量数据。
  • Payload介绍 Payload是Hudi实现数据增量更新和删除的关键,它可以帮助Hudi在数据湖中高效的管理数据变更。Hudi Payload的格式是基于Apache Avro的,它使用了Avro的schema来定义数据的结构和类型。Payload可以被序列化和反序列化,以便在Hudi中进行数据的读取和写入。总之,Hudi Payload是Hudi的一个重要组成部分,它提供了一种可靠的、高效的、可扩展的方式来管理大规模数据湖中的数据变更。
  • 配置描述 要启动小文件优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 描述 默认值 spark.sql.files.maxPartitionBytes 在读取文件时,将单个分区打包的最大字节数。 单位:byte。 134217728(即128M) spark.files.openCostInBytes 打开文件的预估成本, 按照同一时间能够扫描的字节数来测量。当一个分区写入多个文件时使用。高估更好,这样小文件分区将比大文件分区更先被调度。 4M
  • 配置场景 Spark SQL的表中,经常会存在很多小文件(大小远小于HDFS块大小),每个小文件默认对应Spark中的一个Partition,也就是一个Task。在很多小文件场景下,Spark会起很多Task。当SQL逻辑中存在Shuffle操作时,会大大增加hash分桶数,严重影响性能。 在小文件场景下,您可以通过如下配置手动指定每个Task的数据量(Split Size),确保不会产生过多的Task,提高性能。 当SQL逻辑中不包含Shuffle操作时,设置此配置项,不会有明显的性能提升。
  • Clustering架构 Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。 为能够支持快速摄取的同时不影响查询性能,引入了Clustering服务来重写数据以优化Hudi数据湖文件的布局。 Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。 Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。 总体而言Clustering分为两个部分: 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。 将Clustering计划以avro元数据格式保存到时间线。 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。
  • 配置场景 SparkSQL在进行shuffle操作时默认的分块数为200。在数据量特别大的场景下,使用默认的分块数就会造成单个数据块过大。如果一个任务产生的单个shuffle数据块大于2G,该数据块在被fetch的时候还会报类似错误: Adjusted frame length exceeds 2147483647: 2717729270 - discarded 例如,SparkSQL运行TPCDS 500G的测试时,使用默认配置出现错误。所以当数据量较大时需要适当的调整该参数。
  • 使用约束 新增列在设置默认值前,如果数据已经进行了重写,则查询历史数据不支持返回列的默认值,返回NULL。数据入库、更新、执行Compaction、Clustering都会导致部分或全部数据重写。 列的默认值设置要与列的类型一致,如不一致会进行类型强转,导致默认值精度丢失或者默认值为NULL。 历史数据的默认值与列第一次设置的默认值一致,多次修改列的默认值不会影响历史数据的查询结果。 设置默认值后rollback不能回滚默认值配置。 Spark SQL暂不支持查看列默认值信息,可以通过Hive beeline执行show create table命令查看。
  • 示例 SQL语法具体参考Hudi SQL语法参考章节。 示例: 建表指定列默认值 create table if not exists h3( id bigint, name string, price double default 12.34 ) using hudi options ( primaryKey = 'id', type = 'mor', preCombineField = 'name' ); 添加列指定列默认值 alter table h3 add columns(col1 string default 'col1_value'); alter table h3 add columns(col2 string default 'col2_value', col3 int default 1); 修改列默认值 alter table h3 alter column price set default 14.56; 插入数据使用列默认值 insert into h3(id, name) values(1, 'aaa'); insert into h3(id, name, price) select 2, 'bbb', 12.5;
  • 操作步骤 用于CarbonData查询的配置介绍,详情请参见表1和表2。 表1 Shuffle过程中,启动Task的个数 参数 spark.sql.shuffle.partitions 所属配置文件 spark-defaults.conf 适用于 数据查询 场景描述 Spark shuffle时启动的Task个数。 如何调优 一般建议将该参数值设置为执行器核数的1到2倍。例如,在聚合场景中,将task个数从200减少到32,有些查询的性能可提升2倍。 表2 设置用于CarbonData查询的Executor个数、CPU核数以及内存大小 参数 spark.executor.cores spark.executor.instances spark.executor.memory 所属配置文件 spark-defaults.conf 适用于 数据查询 场景描述 设置用于CarbonData查询的Executor个数、CPU核数以及内存大小。 如何调优 在银行方案中,为每个执行器提供4个CPU内核和15GB内存,可以获得良好的性能。这2个值并不意味着越多越好,在资源有限的情况下,需要正确配置。例如,在银行方案中,每个节点有足够的32个CPU核,而只有64GB的内存,这个内存是不够的。例如,当每个执行器有4个内核和12GB内存,有时在查询期间发生垃圾收集(GC),会导致查询时间从3秒增加到超过15秒。在这种情况下需要增加内存或减少CPU内核。 用于CarbonData数据加载的配置参数,详情请参见表3、表4和表5。 表3 设置数据加载使用的CPU core数量 参数 carbon.number.of.cores.while.loading 所属配置文件 carbon.properties 适用于 数据加载 场景描述 数据加载过程中,设置处理数据使用的CPU core数量。 如何调优 如果有更多的CPU个数,那么可以增加CPU值来提高性能。例如,将该参数值从2增加到4,那么CSV文件读取性能可以增加大约1倍。 表4 是否使用YARN本地目录进行多磁盘数据加载 参数 carbon.use.local.dir 所属配置文件 carbon.properties 适用于 数据加载 场景描述 是否使用YARN本地目录进行多磁盘数据加载。 如何调优 如果将该参数值设置为“true”,CarbonData将使用YARN本地目录进行多表加载磁盘负载平衡,以提高数据加载性能。 表5 加载时是否使用多路径 参数 carbon.use.multiple.temp.dir 所属配置文件 carbon.properties 适用于 数据加载 场景描述 是否使用多个临时目录存储sort临时文件。 如何调优 设置为true,则数据加载时使用多个临时目录存储sort临时文件。此配置能提高数据加载性能并避免磁盘单点故障。 用于CarbonData数据加载和数据查询的配置参数,详情请参见表6。 表6 设置数据加载和查询使用的CPU core数量 参数 carbon.compaction.level.threshold 所属配置文件 carbon.properties 适用于 数据加载和查询 场景描述 对于minor压缩,在阶段1中要合并的segment数量和在阶段2中要合并的已压缩的segment数量。 如何调优 每次CarbonData加载创建一个segment,如果每次加载的数据量较小,将在一段时间内生成许多小文件,影响查询性能。配置该参数将小的segment合并为一个大的segment,然后对数据进行排序,可提高查询性能。 压缩的策略根据实际的数据大小和可用资源决定。如某银行1天加载一次数据,且加载数据选择在晚上无查询时进行,有足够的资源,压缩策略可选择为6、5。 表7 使用索引缓存服务器时是否开启数据预加载 参数 carbon.indexserver.enable.prepriming 所属配置文件 carbon.properties 适用于 数据加载 场景描述 使用索引缓存服务器过程中开启数据预加载可以提升首次查询的性能。 如何调优 用户可以将该参数设置为true来开启预加载。默认情况,该参数为false。
  • 示例 create table productdb.productSalesTable(id int,price int,productName string,city string) stored as carbondata; CREATE INDEX productNameIndexTable on table productdb.productSalesTable (productName,city) as 'carbondata' ; SHOW INDEXES ON productdb.productSalesTable;
  • 日志级别 DBService中提供了如表2所示的日志级别。日志级别优先级从高到低分别是ERROR、WARN、INFO、DEBUG。程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表2 日志级别 级别 描述 ERROR ERROR表示当前时间处理存在错误信息。 WARN WARN表示当前事件处理存在异常信息。 INFO INFO表示记录系统及各事件正常运行状态信息。 DEBUG DEBUG表示记录系统及系统的调试信息。
  • 示例 假设表包含4个列,分别命名为a1,b1,c1和d1。 删除单个列: ALTER TABLE carbon DROP COLUMNS (b1); ALTER TABLE test_db.carbon DROP COLUMNS (b1); 删除多个列: ALTER TABLE carbon DROP COLUMNS (b1,c1); ALTER TABLE test_db.carbon DROP COLUMNS (b1,c1);
  • 操作步骤 配置Driver内存。 Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。 您可以根据实际任务数量的多少,为Driver设置一个合适的内存。 将“spark-defaults.conf”中的“spark.driver.memory”配置项设置为合适大小。 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。 配置Executor个数。 每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。 将“spark-defaults.conf”中的“spark.executor.instance”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTANCES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。 配置Executor核数。 每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 示例 在执行spark wordcount计算中。1.6T数据,250个executor。 在默认参数下执行失败,出现Futures timed out和OOM错误。 因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。 当把Driver的内存设置到4g时,应用成功跑完。 使用JDBCServer执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。
  • 操作场景 Spark on Yarn模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对Spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
  • 操作场景 本章节根据超过50个测试用例总结得出建议,帮助用户创建拥有更高查询性能的CarbonData表。 表1 CarbonData表中的列 Column name Data type Cardinality Attribution msname String 3千万 dimension BEGIN_TIME bigint 1万 dimension host String 1百万 dimension dime_1 String 1千 dimension dime_2 String 500 dimension dime_3 String 800 dimension counter_1 numeric(20,0) NA measure ... ... NA measure counter_100 numeric(20,0) NA measure
  • 示例 部分字段更新 create table h0(id int, comb int, name string, price int) using hudi options(primaryKey = 'id', preCombineField = 'comb'); create table s0(id int, comb int, name string, price int) using hudi options(primaryKey = 'id', preCombineField = 'comb'); insert into h0 values(1, 1, 1, 1); insert into s0 values(1, 1, 1, 1); insert into s0 values(2, 2, 2, 2); //写法1 merge into h0 using s0 on h0.id = s0.id when matched then update set h0.id = s0.id, h0.comb = s0.comb, price = s0.price * 2; //写法2 merge into h0 using s0 on h0.id = s0.id when matched then update set id = s0.id, name = h0.name, comb = s0.comb + h0.comb, price = s0.price + h0.price; 缺省字段更新和插入 create table h0(id int, comb int, name string, price int, flag boolean) using hudi options(primaryKey = 'id', preCombineField = 'comb'); create table s0(id int, comb int, name string, price int, flag boolean) using hudi options(primaryKey = 'id', preCombineField = 'comb'); insert into h0 values(1, 1, 1, 1, false); insert into s0 values(1, 2, 1, 1, true); insert into s0 values(2, 2, 2, 2, false); merge into h0 as target using ( select id, comb, name, price, flag from s0 ) source on target.id = source.id when matched then update set * when not matched then insert *; 多条件更新和删除 create table h0(id int, comb int, name string, price int, flag boolean) using hudi options(primaryKey = 'id', preCombineField = 'comb'); create table s0(id int, comb int, name string, price int, flag boolean) using hudi options(primaryKey = 'id', preCombineField = 'comb'); insert into h0 values(1, 1, 1, 1, false); insert into h0 values(2, 2, 1, 1, false); insert into s0 values(1, 1, 1, 1, true); insert into s0 values(2, 2, 2, 2, false); insert into s0 values(3, 3, 3, 3, false); merge into h0 using ( select id, comb, name, price, flag from s0 ) source on h0.id = source.id when matched and flag = false then update set id = source.id, comb = h0.comb + source.comb, price = source.price * 2 when matched and flag = true then delete when not matched then insert *;
  • 参数描述 表1 UPDATE参数 参数 描述 tableIdentifier 在其中执行MergeInto操作的Hudi表的名称。 target_alias 目标表的别名。 sub_query 子查询。 source_alias 源表或源表达式的别名。 merge_condition 将源表或表达式和目标表关联起来的条件 condition 过滤条件,可选。 matched_action 当满足条件时进行Delete或Update操作 not_matched_action 当不满足条件时进行Insert操作
  • 操作场景 Spark是内存计算框架,计算过程中内存不够对Spark的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存中RDD的大小来判断内存是否变成性能瓶颈,并根据情况优化。 监控节点进程的GC情况(在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" ),如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
  • 操作步骤 优化GC,调整老年代和新生代的大小和比例。在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:-XX:NewRatio。如," -XX:NewRatio=2",则新生代占整个堆空间的1/3,老年代占2/3。 开发Spark应用程序时,优化RDD的数据结构。 使用原始类型数组替代集合类,如可使用fastutil库。 避免嵌套结构。 Key尽量不要使用String。 开发Spark应用程序时,建议序列化RDD。 RDD做cache时默认是不序列化数据的,可以通过设置存储级别来序列化RDD减小内存。例如: testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
共100000条