给老田看看
大嘴猴嘴不大 2024-08-29 12:33:01 阅读 76
这里写自定义目录标题
Spark-sql 常用优化1.spark自动完成的优化2.spark-sql常用优化2.1 调优参数-调整map处理数据大小2.2 shuffle慢, 并行度不足2.3 AQE优化2.4 Hive兼容性配置
Spark-sql 常用优化
1.spark自动完成的优化
spark会自动完成的优化有:列裁剪, 分区裁剪, 谓词下推, CBO优化, 开启mapJoin、广播Join
列裁剪
列裁剪是指在查询过程中,只读取和处理查询所需的列,而忽略不相关的列。通过减少不必要的列读取,列裁剪可以显著降低I/O和内存开销,提高查询性能。
Spark SQL自动进行列裁剪优化。以下是一个示例:
<code>SELECT name, age FROM people;
在这个查询中,Spark SQL只会读取name和age列,而忽略表中的其他列
2. 分区裁剪
分区裁剪是指在查询过程中,只读取满足查询条件的分区,忽略不相关的分区。分区裁剪在处理大量数据时尤为重要,因为它可以显著减少需要扫描的数据量,提高查询性能
如果我们只想查询特定日期的数据:
SELECT event, user_id FROM logs WHERE date = '2024-07-07';
Spark SQL会自动进行分区裁剪,只扫描date为2024-07-07的分区
3. 谓词下推
谓词下推是指将查询中的过滤条件尽可能地推送到数据源执行,而不是在Spark执行计划中进行过滤。这样可以减少需要传输和处理的数据量,从而提高查询性能
如果我们执行以下查询:
SELECT name, age FROM people WHERE age > 30;
在这个查询中,age > 30的过滤条件会被下推到Parquet文件读取过程,这样只有满足条件的记录会被读取和返回
4. CBO优化
CBO是基于统计信息对查询计划进行优化的技术。它通过评估不同执行计划的代价,选择最优的执行计划,从而提高查询性能。CBO需要收集表和列的统计信息,如行数、列的基数、数据分布等.
启用CBO
要启用CBO,需要设置相关的Spark配置,并收集表的统计信息:
-- 启用CBO
SET spark.sql.cbo.enabled = true;
SET spark.sql.statistics.histogram.enabled = true;
-- 收集表的统计信息
ANALYZE TABLE people COMPUTE STATISTICS FOR ALL COLUMNS;
假设有一个表employees,我们进行如下查询:
SELECT * FROM employees WHERE age > 30 AND department = 'HR';
CBO会根据表的统计信息评估不同的执行计划,例如先过滤age > 30还是先过滤department = ‘HR’,然后选择代价最低的执行计划。
5. Mapjoin
Map Join是一种优化技术,通过提前在map阶段进行连接操作,而不是在reduce阶段进行,减少shuffle的数据量,提高连接性能。Map Join通常用于两个数据规模较为相近的表
在使用Spark SQL时,Map Join更多的是通过DataFrame或Dataset API进行手动优化,而不是通过SQL提示
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Map Join Example").getOrCreate()
val df1 = spark.read.format("parquet").load("path/to/table1")
val df2 = spark.read.format("parquet").load("path/to/table2")
// 通过在map阶段进行连接,减少shuffle的数据量
val joinedDF = df1.join(df2, df1("id") === df2("id"), "inner")
.repartition(df1("id")) // 在map阶段进行分区优化
joinedDF.show()
广播join
广播连接是一种优化技术,用于在一个表(通常是小表)和另一个表(大表)进行连接时,将小表广播到每个执行任务节点,以避免大表的shuffle操作
配置参数
set spark.sql.autoBroadcastJoinThreshold = 10m;set spark.sql.broadcastTimeout = 1500; --大表特大(小文件特多), 起很多task时, 建议调大超时, 600sset spark.sql.join.preferSortMergeJoin = true; 不能走广播join, 且该参数为false, 就会走ShuffleHashJoinselect /*+ broadcast(表别名) */
--假设有两个表:small_table 和 large_table
SELECT /*+ BROADCAST(small_table) */ *
FROM large_table
JOIN small_table
ON large_table.id = small_table.id;
2.spark-sql常用优化
2.1 调优参数-调整map处理数据大小
set spark.sql.files.maxPartitionBytes = 512m;--调大maptask的大小(文件可切分), 可少起一些task
set spark.sql.hive.convertInsertingPartitionedTable= fasle; 默认true
set spark.sql.hive.convertMetastoreParquet = false; 默认true
set spark.sql.hive.convertMetastoreOrc = fasle; 默认true
-- 默认32 --超过32个分区路径, 会单独起一个job来读路径, 而非Driver读
set spark.sql.sources.parallelPartitionDiscovery.threshold = 32
--读hdfs文件的最小开销是4M, 一个task最多合并128/4 = 32个小文件
set spark.files.openCostInBytes = 默认4M
读昨天分区, 写今天分区报错; 并发补数多个分区; 同时写多个二级分区第一个参数只影响写, 可以解决spark并发写少数据/报错的问题, 不影响读, 读性能好第二三个参数要看表格式是orc还是parquet, 同时影响读写, 读性能不好, 但是兼容性好, 具体需要按需使用
简单map调大以减少maptask数量; 复杂map调小以增多maptask数量
2.2 shuffle慢, 并行度不足
--Reduce 后partition 数量
set spark.sql.shuffle.partitions = 200;
调大shuffle并行度, 可以将数据打散, 减少磁盘溢写以提高效率
–shuffle参数
–shuffle的并发, 配置原则是减少shuffle阶段数据溢出到磁盘
–RDD的分区 spark.defalut.parallelism
2.3 AQE优化
根据stage结束后的统计信息, 动态调整执行计划, 调整partition数量
set spark.sql.adaptive.enabled = true;
set spark.sql.adaptive.skewJoin.enabled = true;(true) --开启skewJoin
set spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5;(5) --默认5倍, 切斜因子, 倾斜分区数据量大于其他分区5倍, 判断倾斜, 可降低
set spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 400(256m) --倾斜分区必须大于256M
set spark.sql.adaptive.coalescePartitions.enabled=true; --分区合并
set spark.sql.adaptive.coalescePartitions.initialPartitionNum = 200;(none) --reduce个数区间最大值,同时也是shuffle分区数的初始值
set spark.sql.adaptive.coalescePartitions.minPartitionNum = 1;(none) --reduce个数区间最小值, 设置后才会合并分区
set spark.sql.adaptive.advisoryPartitionSizeInBytes = 256m;(默认64m) --建议的reduce分区大小
2.4 Hive兼容性配置
–hive的serde兼容性好, 但性能差, spark的性能好, 兼容性差
set spark.sql.storeAssignmentPolicy=LEGACY; --允许字符类型转换, string和数值型
set spark.sql.legacy.timeParserPolicy = LEGACY; --解决时间解析函数的兼容性问题
动态分区
--动态分区
set spark.sql.hive.convertInsertingPartitionedTable=false;--避免丢数
set hive.exec.dynamic.partition=true; --动态分区开启
set hive.exec.dynamic.partition.mode=nonstrict; --动态分区非严格
set hive.exec.max.dynamic.partitions = 10000; --动态分区最大分区数
set hive.exec.max.dynamic.partitions.pernode=10000; --一个task的生成最大分区数
静态分区
--静态参数(配置文件, 提交时)
spark.dynamicAllocation.enabled true --动态分配
spark.dynamicAllocation.minExecutors 1 --单任务最小分配executor数
spark.dynamicAllocation.maxExecutors 100 --单任务最大分配executor数
spark.executor.cores --一个executor的核数
spark.executor.memory --一个executor的内存大小, 遇到大字段, 大json的string, 大数组等可调大
spark.driver.cores
spark.driver.memory
spark.executor.instances --静态指定executor数
– https://blog.csdn.net/u013332124/article/details/109359773
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。