目录
上一篇,我们分享了hive表数据常用的优化策略,本篇再从hive的job执行层面来聊聊可以优化的常用的一些手段。
在正式分享job优化之前,有必要先了解下hive的一条sql执行时经历的事情,即explain执行计划,在学习mysql的时候,DBA或者开发人员经常通过explain关键字来分析一条慢sql的执行计划,从而指导sql优化。
HiveQL,是一种类SQL语言,从编程语言规范来说是一种声明式语言,用户会根据查询需求提交声明式的HQL查询,而Hive会根据底层计算引擎将其转化成Mapreduce/Tez/Spark的job;
hive explain 补充说明:
EXPLAIN [FORMATTED|EXTENDED|DEPENDENCY|AUTHORIZATION|] query
参数说明:
每个查询计划由以下几个部分组成:
The Abstract Syntax Tree for the query
抽象语法树(AST):Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
The dependencies between the different stages of the plan
Stage依赖关系:会列出运行查询划分的stage阶段以及之间的依赖关系
The description of each of the stages
Stage内容:包含了每个stage非常重要的信息,比如运行时的operator和sort orders等具体的信息
使用explain关键分析如下查询sql
explain extended select * from tb_login;
执行结果如下,重点关注下图中,TableScan后面的信息
执行下面的sql,根据主键字段查询
explain extended select count(*) from tb_emp01 where empno=7369;
使用explain分析执行计划,此时将会看到更复杂的执行语法树;
关于执行计划的结果分析中的各个参数作用,可以结合官网或者其他资料进行深入的学习,这里就不过多展开了;
通过上面的执行计划分析,其实可以知道,hive在大多数情况下,其得到的查询结果底层会走MR的job,所以可以理解为,MR与hive结合相关的属性调优也是整个job优化中非常重要的内容。
在使用Hive过程中,有一些数据量不大的表也会转换为MapReduce处理,提交到集群时,需要申请资源,等待资源分配,启动JVM进程,再运行Task,一系列的过程比较繁琐,本身数据量并不大,提交到YARN运行返回会导致性能较差的问题。
Hive为了解决这个问题,延用了MapReduce中的设计,提供本地计算模式,允许程序不提交给YARN,直接在本地运行,以便于提高小数据量程序的性能。
如果要使用本地模式,只需要开启下面的参数即可,也就是说,当开启这个参数之后,hive在提交一条查询sql之后,会被MR自动识别并解析到,从而走本地模式;
-- 开启本地模式
set hive.exec.mode.local.auto = true;
注意:该参数默认为false,要使用的话需要手动开启,如下即显示了hive要是走本地模式的几个条件,即条件都满足的情况下才会走本地模式;
设置参数
set hive.exec.mode.local.auto = true;
执行查询
select count(*) from tb_emp01;
可以看到,在这种情况下,140多万的数据只需要不到2秒的时间就统计出了结果 ,同时在执行日志中,也看到job的执行中使用了local,即走了本地模式;
Hadoop默认会为每个Task启动一个JVM来运行,而在JVM启动时内存开销大,Job数据量大的情况,如果单个Task数据量比较小,也会申请JVM,这就导致了资源紧张及浪费的情况;
JVM重用可以使得JVM实例在同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task运行,直到运行了N个Task以后,就会释放;
N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间;
参数设置如下
-- Hadoop3之前的配置,在mapred-site.xml中添加以下参数
-- Hadoop3中已不再支持该选项
mapreduce.job.jvm.numtasks=10
Hive在实现HQL计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的;
例如Union语句,Join语句等等,这些Stage没有依赖关系,但是Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执行,提高性能。
并行执行关键参数设置
-- 开启Stage并行化,默认为false
SET hive.exec.parallel=true;
-- 指定并行化线程数,默认为8
SET hive.exec.parallel.thread.number=16;
join在mysql的日常编写sql过程中可以说是非常常见了,也是数据分析处理过程中必不可少的操作,Hive同样支持Join的语法;
Hive Join的底层是通过MapReduce来实现的,Hive实现Join时,为了提高MapReduce的性能,提供了多种Join方案来实现;
例如:适合小表Join大表的Map Join,大表Join大表的Reduce Join,以及大表Join的优化方案Bucket Join等。
即没有reduce阶段的task的场景,适合于小表join大表或者小表Join小表,如下图执行流程所示,在这种情况下,减少了reduce阶段task带来的执行时的资源开销;
将小的那份数据给每个MapTask的内存都放一份完整的数据,大的数据每个部分都可以与小数据的完整数据进行join 底层不需要经过shuffle,需要占用内存空间存放小的数据文件
尽量使用Map Join来实现Join过程,Hive中默认自动开启了Map Join:hive.auto.convert.join=true,Hive中小表的大小限制,在不同的版本中主要设置参数如下:
-- 2.0版本之前的控制属性
hive.mapjoin.smalltable.filesize=25M
-- 2.0版本开始由以下参数控制
hive.auto.convert.join.noconditionaltask.size=512000000
如果map端的join处理不了的情况下,比如两个join表的数据量都比较大的时候,就要考虑使用Reduce Join;
适合于大表Join大表
如下图所示,将两张表的数据在shuffle阶段利用shuffle的分组来将数据按照关联字段进行合并 必须经过shuffle,利用Shuffle过程中的分组来实现关联;
Hive会自动判断是否满足Map Join,如果不满足Map Join,则自动执行Reduce Join
顾名思义,即利用分桶表进行join,从之前的学习中我们了解到,使用分桶表,在进行join操作的时候,可以减少笛卡尔乘积的值,从而达到提升性能的目的;
适合于大表Join大表
将两张表按照相同的规则将数据划分 根据对应的规则的数据进行join 减少了比较次数,提高了性能,如下图所示;
基于某个字段,语法:clustered by colName,参数设置
set hive.optimize.bucketmapjoin = true;
使用要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数
使用Sort Merge Bucket Join(SMB),基于有序的数据Join,语法:
clustered by colName sorted by (colName)
具体参数设置
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
使用要求
分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数
当一个程序中如果有一些操作彼此之间有关联性,是可以在一个MapReduce中实现的,但是Hive会不智能的选择,Hive会使用两个MapReduce来完成这两个操作。
例如:当我们执行 select …… from table group by id order by id desc。该SQL语句转换为MapReduce时,我们可以有两种方案来实现:
方案一
方案二
在这种场景下,Hive会默认选择用第一种方案来实现,这样会导致性能相对较差,可以在Hive中开启关联优化,对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce中实现,关键配置参数如下:
set hive.optimize.correlation=true;
Hive默认的优化器在解析一些聚合统计类的处理时,底层解析的方案有时候不是最佳的方案,例如:当前有一张表【共1000条数据】,id构建了索引,id =100值有900条,我们的需求是:
查询所有id = 100的数据,SQL语句为:select * from table where id = 100;
对于这个需求,可以考虑下面的方案进行实现
方案一
由于id这一列构建了索引,索引默认的优化器引擎RBO,会选择先从索引中查询id = 100的值所在的位置,再根据索引记录位置去读取对应的数据,但是这并不是最佳的执行方案。
方案二
有id=100的值有900条,占了总数据的90%,这时候是没有必要检索索引以后再检索数据的,可以直接检索数据返回,这样的效率会更高,更节省资源,这种方式就是CBO优化器引擎会选择的方案。
rule basic optimise
基于规则的优化器,根据设定好的规则来对程序进行优化
cost basic optimise
基于代价的优化器,根据不同场景所需要付出的代价来合适选择优化的方案,对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案是最佳方案
Hive中支持RBO与CBO这两种引擎,默认使用的是RBO优化器引擎,很明显CBO引擎更加智能,所以在使用Hive时,我们可以配置底层的优化器引擎为CBO引擎,配置参数如下:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true; set hive.stats.fetch.column.stats=true;
思考:CBO引擎是基于代价的优化引擎,那么CBO如何知道每种方案的计算代价的呢?接下来就要说说Analyze分析器了。
用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,搭配CBO引擎一起使用。
构建分区信息元数据
ANALYZE TABLE tablename
[PARTITION(partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [noscan];
构建列的元数据
ANALYZE TABLE tablename
[PARTITION(partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2...) [noscan];
查看元数据
DESC FORMATTED [tablename] [columnname];
比如我们按照下面的操作步骤分析 tb_login_part 这张表
--分析优化器
use tb_part;
-- 构建表中分区数据的元数据信息
ANALYZE TABLE tb_login_part PARTITION(logindate) COMPUTE STATISTICS;
-- 构建表中列的数据的元数据信息
ANALYZE TABLE tb_login_part COMPUTE STATISTICS FOR COLUMNS userid;
-- 查看构建的列的元数据
desc formatted tb_login_part userid;
可以看到如下信息
什么是谓词
用来描述或判定客体性质、特征或者客体之间关系的词项。比如"3 大于 2"中"大于"是一个谓词。
谓词下推Predicate Pushdown(PPD)基本思想:将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据。简单点说就是在不影响最终结果的情况下,尽量将过滤条件提前执行。
Hive中谓词下推后,过滤条件会下推到map端,提前执行过滤,减少map到reduce的传输数据,提升整体性能。
开启下面的参数来使用(默认开启)
hive.optimize.ppd=true;
比如有下面的2个执行sql,哪一个更好呢?推荐形式1的写法,先过滤再join;
select a.id,a.value1,b.value2 from table1 a
join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
on (a.id=c.id);
select a.id,a.value1,b.value2 from table1 a
join table2 b on a.id=b.id
where b.ds>='20181201' and b.ds<'20190101';
通过下面这些日常经常可能涉及到的sql,可以检验自己在编写sql的时候有没有满足谓词下推的规则;
下面整理了一些关联sql中关于谓词下推的优化参考规则;
分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是:
当提交运行一个程序时,这个程序的大多数的Task都已经运行结束了,只有某一个Task一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候就可以判定程序出现了数据倾斜的问题。
而造成数据倾斜的原因大多是:数据分配的不均匀,比如下面这张图,太多相同的单词被分配到某一个任务上去处理,导致这个任务所在的节点压力非常大,这个任务的执行将会非常慢,这就是发生了数据倾斜;
当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的,根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。
根本原因是因为分区规则导致的,所以可以通过以下几种方案来解决group by导致的数据倾斜的问题。
开启Map端聚合,通过减少shuffle数据量和Reducer阶段的执行时间,避免每个Task数据差异过大导致数据倾斜;
hive.map.aggr=true;
实现随机分区,比如下面这种写法,distribute by用于指定底层按照哪个字段作为Key实现分区、分组等 通过rank函数随机值实现随机分区,避免数据倾斜;
select * from table distribute by rand();
数据倾斜时自动负载均衡,通过开启如下参数
hive.groupby.skewindata=true;
开启该参数以后,当前程序会自动通过两个MapReduce来运行
Join操作时,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join产生的数据倾斜,核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现;
但往往很多的Join场景不满足Map Join的需求,那么可以以下几种方案来解决Join产生的数据倾斜问题:
提前过滤,将大数据变成小数据,实现Map Join,如下sql
select a.id,a.value1,b.value2 from table1 a
join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
on (a.id=c.id);
使用Bucket Join
如果使用方案一,过滤后的数据依旧是一张大表,那么最后的Join依旧是一个Reduce Join,这种场景下,可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数据倾斜;
使用Skew Join,Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程
这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现 ,其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题 最终将Map Join的结果和Reduce Join的结果进行Union合并;
Skew Joi实现原理示意图
Skew Join使用需要开启下面的参数配置
-- 开启运行过程中skewjoin
set hive.optimize.skewjoin=true;
-- 如果这个key的出现的次数超过这个范围
set hive.skewjoin.key=100000;
-- 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoin.compiletime=true;
-- 不合并,提升性能
set hive.optimize.union.remove=true;
-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;
更多【hive-【大数据Hive】hive 优化策略之job任务优化】相关视频教程:www.yxfzedu.com