给老田看看

大嘴猴嘴不大 2024-08-29 12:33:01 阅读 76

这里写自定义目录标题

Spark-sql 常用优化1.spark自动完成的优化2.spark-sql常用优化2.1 调优参数-调整map处理数据大小2.2 shuffle慢, 并行度不足2.3 AQE优化2.4 Hive兼容性配置

Spark-sql 常用优化

1.spark自动完成的优化

spark会自动完成的优化有:列裁剪, 分区裁剪, 谓词下推, CBO优化, 开启mapJoin、广播Join

列裁剪

列裁剪是指在查询过程中,只读取和处理查询所需的列,而忽略不相关的列。通过减少不必要的列读取,列裁剪可以显著降低I/O和内存开销,提高查询性能。

Spark SQL自动进行列裁剪优化。以下是一个示例:

<code>SELECT name, age FROM people;

在这个查询中,Spark SQL只会读取name和age列,而忽略表中的其他列

2. 分区裁剪

分区裁剪是指在查询过程中,只读取满足查询条件的分区,忽略不相关的分区。分区裁剪在处理大量数据时尤为重要,因为它可以显著减少需要扫描的数据量,提高查询性能

如果我们只想查询特定日期的数据:

SELECT event, user_id FROM logs WHERE date = '2024-07-07';

Spark SQL会自动进行分区裁剪,只扫描date为2024-07-07的分区

3. 谓词下推

谓词下推是指将查询中的过滤条件尽可能地推送到数据源执行,而不是在Spark执行计划中进行过滤。这样可以减少需要传输和处理的数据量,从而提高查询性能

如果我们执行以下查询:

SELECT name, age FROM people WHERE age > 30;

在这个查询中,age > 30的过滤条件会被下推到Parquet文件读取过程,这样只有满足条件的记录会被读取和返回

4. CBO优化

CBO是基于统计信息对查询计划进行优化的技术。它通过评估不同执行计划的代价,选择最优的执行计划,从而提高查询性能。CBO需要收集表和列的统计信息,如行数、列的基数、数据分布等.

启用CBO

要启用CBO,需要设置相关的Spark配置,并收集表的统计信息:

-- 启用CBO

SET spark.sql.cbo.enabled = true;

SET spark.sql.statistics.histogram.enabled = true;

-- 收集表的统计信息

ANALYZE TABLE people COMPUTE STATISTICS FOR ALL COLUMNS;

假设有一个表employees,我们进行如下查询:

SELECT * FROM employees WHERE age > 30 AND department = 'HR';

CBO会根据表的统计信息评估不同的执行计划,例如先过滤age > 30还是先过滤department = ‘HR’,然后选择代价最低的执行计划。

5. Mapjoin

Map Join是一种优化技术,通过提前在map阶段进行连接操作,而不是在reduce阶段进行,减少shuffle的数据量,提高连接性能。Map Join通常用于两个数据规模较为相近的表

在使用Spark SQL时,Map Join更多的是通过DataFrame或Dataset API进行手动优化,而不是通过SQL提示

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Map Join Example").getOrCreate()

val df1 = spark.read.format("parquet").load("path/to/table1")

val df2 = spark.read.format("parquet").load("path/to/table2")

// 通过在map阶段进行连接,减少shuffle的数据量

val joinedDF = df1.join(df2, df1("id") === df2("id"), "inner")

.repartition(df1("id")) // 在map阶段进行分区优化

joinedDF.show()

广播join

广播连接是一种优化技术,用于在一个表(通常是小表)和另一个表(大表)进行连接时,将小表广播到每个执行任务节点,以避免大表的shuffle操作

配置参数

set spark.sql.autoBroadcastJoinThreshold = 10m;set spark.sql.broadcastTimeout = 1500; --大表特大(小文件特多), 起很多task时, 建议调大超时, 600sset spark.sql.join.preferSortMergeJoin = true; 不能走广播join, 且该参数为false, 就会走ShuffleHashJoinselect /*+ broadcast(表别名) */

--假设有两个表:small_table 和 large_table

SELECT /*+ BROADCAST(small_table) */ *

FROM large_table

JOIN small_table

ON large_table.id = small_table.id;

2.spark-sql常用优化

2.1 调优参数-调整map处理数据大小

set spark.sql.files.maxPartitionBytes = 512m;--调大maptask的大小(文件可切分), 可少起一些task

set spark.sql.hive.convertInsertingPartitionedTable= fasle; 默认true

set spark.sql.hive.convertMetastoreParquet = false; 默认true

set spark.sql.hive.convertMetastoreOrc = fasle; 默认true

-- 默认32 --超过32个分区路径, 会单独起一个job来读路径, 而非Driver读

set spark.sql.sources.parallelPartitionDiscovery.threshold = 32

--读hdfs文件的最小开销是4M, 一个task最多合并128/4 = 32个小文件

set spark.files.openCostInBytes = 默认4M

读昨天分区, 写今天分区报错; 并发补数多个分区; 同时写多个二级分区第一个参数只影响写, 可以解决spark并发写少数据/报错的问题, 不影响读, 读性能好第二三个参数要看表格式是orc还是parquet, 同时影响读写, 读性能不好, 但是兼容性好, 具体需要按需使用

简单map调大以减少maptask数量; 复杂map调小以增多maptask数量

2.2 shuffle慢, 并行度不足

--Reduce 后partition 数量

set spark.sql.shuffle.partitions = 200;

调大shuffle并行度, 可以将数据打散, 减少磁盘溢写以提高效率

–shuffle参数

–shuffle的并发, 配置原则是减少shuffle阶段数据溢出到磁盘

–RDD的分区 spark.defalut.parallelism

2.3 AQE优化

根据stage结束后的统计信息, 动态调整执行计划, 调整partition数量

set spark.sql.adaptive.enabled = true;

set spark.sql.adaptive.skewJoin.enabled = true;(true) --开启skewJoin

set spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5;(5) --默认5倍, 切斜因子, 倾斜分区数据量大于其他分区5倍, 判断倾斜, 可降低

set spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 400(256m) --倾斜分区必须大于256M

set spark.sql.adaptive.coalescePartitions.enabled=true; --分区合并

set spark.sql.adaptive.coalescePartitions.initialPartitionNum = 200;(none) --reduce个数区间最大值,同时也是shuffle分区数的初始值

set spark.sql.adaptive.coalescePartitions.minPartitionNum = 1;(none) --reduce个数区间最小值, 设置后才会合并分区

set spark.sql.adaptive.advisoryPartitionSizeInBytes = 256m;(默认64m) --建议的reduce分区大小

2.4 Hive兼容性配置

–hive的serde兼容性好, 但性能差, spark的性能好, 兼容性差

set spark.sql.storeAssignmentPolicy=LEGACY; --允许字符类型转换, string和数值型

set spark.sql.legacy.timeParserPolicy = LEGACY; --解决时间解析函数的兼容性问题

动态分区

--动态分区

set spark.sql.hive.convertInsertingPartitionedTable=false;--避免丢数

set hive.exec.dynamic.partition=true; --动态分区开启

set hive.exec.dynamic.partition.mode=nonstrict; --动态分区非严格

set hive.exec.max.dynamic.partitions = 10000; --动态分区最大分区数

set hive.exec.max.dynamic.partitions.pernode=10000; --一个task的生成最大分区数

静态分区

--静态参数(配置文件, 提交时)

spark.dynamicAllocation.enabled true --动态分配

spark.dynamicAllocation.minExecutors 1 --单任务最小分配executor数

spark.dynamicAllocation.maxExecutors 100 --单任务最大分配executor数

spark.executor.cores --一个executor的核数

spark.executor.memory --一个executor的内存大小, 遇到大字段, 大json的string, 大数组等可调大

spark.driver.cores

spark.driver.memory

spark.executor.instances --静态指定executor数

– https://blog.csdn.net/u013332124/article/details/109359773



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。