impala 语法

ads

一、概述

1.1简介

Impala是由Cloudera公司开发的新型查询系统,能够对存储在HDFS、HBase以及S3上的数据进行快速的交互式SQL查询。另外,impala与Hive使用了统一的存储系统、同样的元数据库、SQL语法(Hive SQL)、ODBC驱动和用户交互接口(Hue),Impala对实时的或者面向批处理的查询提供了一个统一的平台,Impala在性能上比Hive高出3~30倍。

Impala是用于查询大数据的工具的补充,Impala不是取代构建在MapReduce之上的批处理框架,比如Hive。Hive和其他的基于MapReduce的框架适合处理长时间运行的批处理作业,比如涉及到批处理的ETL类型的作业。

注意:Impala于2017年11月15日从Apache的孵化器毕业。

1.2优势

  • 使用的是数据科学家和分析师熟悉的SQL接口
  • 能查询大数据集
  • 是集群环境中的分布式查询,便于扩展和使用廉价商用硬件
  • 能够在不同的分析引擎之前共享数据,比如可以通过pig写数据,使用Hive转换数据,再使用impala查询数据。impala能够读写hive中的表,使用impala对Hive生成的数据进行分析,实现简单的数据交换
  • 单一系统用于大数据处理和分析,因此可以避免成本高昂的建模和ETL.

1.3主要特点

  • 支持Hive查询语言(HiveQL)最常见的SQL-92功能,包括 SELECT, JOIN和聚合函数
  • 支持HDFS, HBase和S3存储, 包括:
    • HDFS 文件格式: delimited text files, Parquet, Avro, SequenceFile,和 RCFile.
    • 压缩: Snappy, GZIP, Deflate, BZIP.
  • 常见的数据访问接口,包括JDBC driver、ODBC driver
  • 支持impala-shell命令行接口
  • Kerberos授权

二、Impala架构

为了避免延迟,impala绕过MapReduce,采用了与商用并行关系数据库类似的分布式查询引擎,可以直接与HDFS和HBase进行交互查询,性能上比Hive要快。

Impala server 是一个分布式的大规模并行处理(MPP)的数据库引擎, 它由运行在集群中特定主机上的不同守护进程组成。其架构图如下图所示:

2.1Impala Daemon

这个进程是运行在集群每个DataNode节点上的守护进程,是impala的核心组件。在每个节点上这个进程的名字称为impalad。主要负责读写数据,接受 impala-shell,Hue, JDBC或者ODBC的查询请求,与集群中的其他节点分布式并行工作,并将本节点的查询结果返回给中心协调者节点(central coordinator)。

我们可以向运行在DataNode上的任何impalad进程提交一个查询,提交查询的这个节点将作为这个查询的“协调者节点”(coordinator)为这个查询提供服务。其他节点的运算结果会被传输到协调者节点,协调者节点将最终的运算结果返回。当使用 mpala-shell命令进行功能性测试的时候,为了方便起见,我们总是会连接到同一个节点上的impalad。但是对于生产环境中的impala集群而言,必须要考虑到各个节点的负载均衡,建议使用JDBC/ODBC接口以轮询(round-robin)的方式提交到不同的impalad进程上。

为了了解其他节点的健康状况和负载,Impalad进程会一直与 statestore保持通信,用以确保哪个节点是健康的并且可以接受任务的。

当impala集群中创建,修改或者删除了对象,或者进行了INSERT/LOAD DATAT操作,catalogd进程会向所有的节点广播消息,以保证每个impalad节点都能够及时地了解整个集群中对象元数据的最新状态。后台进程间的通信最大限度的降低了对 REFRESH/INVALIDATE METADATA命令的依赖。(但是对于和impala1.2版本之前的节点通信,还是需要显示指定)

对impala 2.9或者更高版本,可以控制哪一个节点为查询协调器( query coordinators ),也可以控制哪一个节点为查询协调器(query executors), 能够提高大型集群上高并发工作负载的可扩展性。

2.2Impala Statestore

statestore检查集群中impalad进程节点的健康状况,并不断地将健康状况结果转发给所有的impalad进程节点。statestore进程的名称为statestored。一个impala集群只需要一个statestored进程,如果impala节点由于硬件故障、网络错误、软件问题或者其他的原因导致节点不可用,statestore将确保这条信息及时地传达到所有的impalad进程节点上,当有新的查询请求时 ,impalad进程节点将不会把查询请求放松到不可用的节点上。

由于statestore的目的是在集群故障时对impalad进程节点同步信息,所以对于一个正常运行的impala集群来说,它并不是一个关键进程。如果statestore不可用,impalad进程节点之间仍然可以相互协调正常对外提供分布式查询。在statestore不可用的情况下,impalad进程节点失败,只是会让集群不再那么强健。当statestore恢复正常时,它重新与impalad进程节点建立通信,恢复对集群的监控功能。

对于负载平衡和高可用性都适用于impalad守护进程。statestore和catalog进程对高可用性没有特殊要求,因为即便这些守护进程存在问题,也不会导致数据丢失。如果这些守护进程因中断而变得不可用,则可以停止impala服务,删除impala StateStore和impala Catalog角色,将角色添加到不同的主机上,并重新启动impala服务。

2.3Impala Catalog Service

当impala集群中执行的SQL语句会引起元数据变化时,catalog服务会将这些变化推送到其他的impalad进程节点上。catalog服务对应的进程名称为catalogd,一个impala集群只需要一个catalogd进程 。由于所有的请求都是通过statestore进程发送过来的,所以建议让statestore和catalog运行在同一个节点上。

catalog服务大大地减少了对 REFRESH / INVALIDATE METADATA 语句的元数据同步的需求。在创建和删除表的过程中,catalogd进程负责连接元数据库并进行元数据更新操作,从而确保不必执行REFRESH / INVALIDATE METADATA这样的元数据同步语句。但是,如果通过Hive执行了创建表 、加载数据等操作,则在impala中执行查询之前需要先执行 REFRESH或者INVALIDATE METADATA 命令。

三、Impala查询的执行过程

3.1Impala查询过程图

3.2Impala执行查询的具体过程

第0步,当用户提交查询前,Impala先创建一个负责协调客户端提交的查询的Impalad进程,该进程会向Impala State Store提交注册订阅信息,State Store会创建一个statestored进程,statestored进程通过创建多个线程来处理Impalad的注册订阅信息。

第1步,用户通过CLI客户端提交一个查询到impalad进程,Impalad的Query Planner对SQL语句进行解析,生成解析树;然后,Planner把这个查询的解析树变成若干PlanFragment,发送到Query Coordinator

第2步,Coordinator通过从MySQL元数据库中获取元数据,从HDFS的名称节点中获取数据地址,以得到存储这个查询相关数据的所有数据节点。

第3步,Coordinator初始化相应impalad上的任务执行,即把查询任务分配给所有存储这个查询相关数据的数据节点。

第4步,Query Executor通过流式交换中间输出,并由Query Coordinator汇聚来自各个impalad的结果。

第5步,Coordinator把汇总后的结果返回给CLI客户端。

四、Impala与Hive的比较

4.1Impala与Hive对比图

4.2Hive与Impala的相同点

  • Hive与Impala使用相同的存储数据池,都支持把数据存储于HDFS和HBase中

  • Hive与Impala使用相同的元数据

  • Hive与Impala中对SQL的解释处理比较相似,都是通过词法分析生成执行计划

4.3Hive与Impala的不同点

  • Hive适合于长时间的批处理查询分析,而Impala适合于实时交互式SQL查询

  • Hive依赖于MapReduce计算框架,Impala把执行计划表现为一棵完整的执行计划树,直接分发执行计划到各个Impalad执行查询

  • Hive在执行过程中,如果内存放不下所有数据,则会使用外存,以保证查询能顺序执行完成,而Impala在遇到内存放不下数据时,不会利用外存,所以Impala目前处理查询时会受到一定的限制

4.4总结

  • Impala的目的不在于替换现有的MapReduce工具

  • 把Hive与Impala配合使用效果最佳

  • 可以先使用Hive进行数据转换处理,之后再使用Impala在Hive处理后的结果数据集上进行快速的数据分析

五、常用端口号

以下列举了Impala所使用的端口号,在部署Impala的时候,确保下面列出的端口是开启的。

六、Impala分析函数

分析函数又称为开窗函数,是一种特殊的内置函数。分析函数不会仅限于对每个group by的分组产生一个结果,它操作的是一个窗口(window),输入的行是排序和分组的,可以通过over()语句使用灵活的条件。impala的分析函数是从impala2.0.0开始添加的。分析函数经常被用于金融和科学领域,用来分析趋势、离群点以及大数据集的分桶分析。

6.1.over从句

当调用分析函数时,比如LEAD(),RANK()以及FIRST_VALUE,需要使用OVER从句。当调用聚合函数时使用了OVER从句,比如MAX(),COUNT()或者SUM(),将被视为分析函数。

语法

function(args) OVER([partition_by_clause] [order_by_clause [window_clause]])
partition_by_clause ::= PARTITION BY expr [, expr ...]
order_by_clause ::= ORDER BY expr [ASC | DESC] [NULLS FIRST
window_clause: See Window Clause

PARTITION BY从句

PARTITION BY 从句与GROUP BY从句类似,按照一列或者多列相同的值,将数据分成不同的组,这个逻辑上的组,称之为分区。但是,请注意以下限制,这些限制特别适用于涉及分区表的分析函数调用。当查询涉及到分析函数和分区表时,仅对分析函数调用的partition by子句中命名的列执行分区裁剪。比如,当一个分析函数查询带有WHERE year=2016的分区裁剪条件,只有在PARTITION BY 从句中指定year才可以裁剪掉其他的分区数据,比如 OVER (PARTITION BY year,other_columns other_analytic_clauses)

分析函数作用的范围是一个分组内(分区)的数据,任意MAX(),SUM()或者ROW_NUMBER()等独立应用于每个分区。当省略PARTITION BY子句时,则会将分析操作应用于表中的所有行。

ORDER BY 从句

ORDER BY从句与一个查询中最外面的ORDER BY从句类似。它会对整个数据集或者PARTITION BY指定的分组数据进行排序。可以按一个或多个列进行排序,可以选择升序或降序以及空值是按排序顺序排在第一还是最后。因为ORDER BY从句仅仅对分析函数作用的数据进行排序,所以,如果使输出的结果是全排序的,可以在查询语句的最外层使用ORDER BY从句。当ORDER BY从句省略时,分析函数作用于PARTITION BY指定的分组的所有数据。当指定ORDER BY从句时,分析函数可以作用于分组的所有数据或者部分数据,这取决于WINDOW从句。在OVER从句内部使用的ORDER BY,与查询最外层的ORDER BY是有区别的,比如ORDER BY 1被解释为常量排序值(实际上是无操作),而不是按照第1列进行排序。

Window 从句

window从句只能ORDER BY 从句一起使用.。如果指定了 ORDER BY 从句,而省略了window从句,则默认的窗口(window)是RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,表示从开始到当前行。

HBase 表注意事项

由于 HBase 表在单行查找做了优化,而不是全表扫描。所以不推荐在HBase表上使用分析函数,尽管查询是有效的,但是与查询HDFS上的数据相比,性能较差。

Parquet 表注意事项

在Parquet表上使用分析函数效率较高。

文本表注意事项

分析函数可以方便地与文本存储格式的表一起使用,但当数据量非常大时,优先使用 Parquet格式存储。在impala2.0.0添加的功能。

6.2Window从句

分析函数可以使用一个可选的window从句, 主要作用是让分析函数作用于某些行,而不是分组中的所有行。例如,可以通过指定前面和后面几行的数目,得到移动平均值。对于同一分组中的行,此子句可以产生不同的分析结果。AVG(), COUNT(), FIRST_VALUE(), LAST_VALUE(), 和SUM()函数支持window子句。对于MAX()和 MIN(),window 从句只允许其实行为  UNBOUNDED PRECEDING,即第一行。

语法

ROWS BETWEEN [ { m | UNBOUNDED } PRECEDING | CURRENT ROW] [ AND [CURRENT ROW
| { UNBOUNDED | n } FOLLOWING] ]
RANGE BETWEEN [ {m | UNBOUNDED } PRECEDING | CURRENT ROW] [ AND [CURRENT ROW
| { UNBOUNDED | n } FOLLOWING] ]

目前, 对于RANGE从句,Impala仅支持以下几种组合:

  • RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,为默认值,即当指定了ORDER BY从句,而省略了window从句 ,表示从开始到当前行。

  • RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING,表示从当前行到最后一行

  • RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING,表示所有行

  • n  PRECEDING  m FOLLOWING: 表示窗口的范围是:[(当前行的行数)- n, (当前行的行数)+m] row.

例子:

  • 数据准备
--创建表
create table stock_ticker 
          (stock_symbol string,
          closing_price decimal(8,2),
          closing_date timestamp);

--插入数据
insert into stock_ticker values ("JDR",12.86,"2014-10-02 00:00:00"),
                                ("JDR",12.89,"2014-10-03 00:00:00"),
                                ("JDR",12.94,"2014-10-04 00:00:00"),
                                ("JDR",12.55,"2014-10-05 00:00:00"),
                                ("JDR",14.03,"2014-10-06 00:00:00"),
                                ("JDR",14.75,"2014-10-07 00:00:00"),
                                ("JDR",13.98,"2014-10-08 00:00:00");

如果只对分组的所有数据做统计,则可以不指定ORDER BY从句。比如:

 --SQL1,统计分组数据的行数
select
     stock_symbol,
     closing_date,
     closing_price,
     count(*) over (partition by stock_symbol) cnt                     
from stock_ticker;

--SQL1结果输出
+--------------+---------------------+---------------+-----+
| stock_symbol | closing_date        | closing_price | cnt |
+--------------+---------------------+---------------+-----+
| JDR          | 2014-10-02 00:00:00 | 12.86         | 7   |
| JDR          | 2014-10-03 00:00:00 | 12.89         | 7   |
| JDR          | 2014-10-04 00:00:00 | 12.94         | 7   |
| JDR          | 2014-10-05 00:00:00 | 12.55         | 7   |
| JDR          | 2014-10-06 00:00:00 | 14.03         | 7   |
| JDR          | 2014-10-07 00:00:00 | 14.75         | 7   |
| JDR          | 2014-10-08 00:00:00 | 13.98         | 7   |
+--------------+---------------------+---------------+-----+

--SQL2,对分组的数据按某个字段求和
select
     stock_symbol,
     closing_date,
     closing_price,
     sum(closing_price ) over (partition by stock_symbol) sums                      
from stock_ticker;
--SQL2结果输出
+--------------+---------------------+---------------+-------+
| stock_symbol | closing_date        | closing_price | sums  |
+--------------+---------------------+---------------+-------+
| JDR          | 2014-10-02 00:00:00 | 12.86         | 94.00 |
| JDR          | 2014-10-03 00:00:00 | 12.89         | 94.00 |
| JDR          | 2014-10-04 00:00:00 | 12.94         | 94.00 |
| JDR          | 2014-10-05 00:00:00 | 12.55         | 94.00 |
| JDR          | 2014-10-06 00:00:00 | 14.03         | 94.00 |
| JDR          | 2014-10-07 00:00:00 | 14.75         | 94.00 |
| JDR          | 2014-10-08 00:00:00 | 13.98         | 94.00 |
+--------------+---------------------+---------------+-------+

--SQL3,查找分组内的第一个值
select
     stock_symbol,
     closing_date,
     closing_price,
     first_value(closing_price) over (partition by stock_symbol) firstvalue                     
from stock_ticker;
--SQL3结果输出
+--------------+---------------------+---------------+------------+
| stock_symbol | closing_date        | closing_price | firstvalue |
+--------------+---------------------+---------------+------------+
| JDR          | 2014-10-02 00:00:00 | 12.86         | 12.86      |
| JDR          | 2014-10-03 00:00:00 | 12.89         | 12.86      |
| JDR          | 2014-10-04 00:00:00 | 12.94         | 12.86      |
| JDR          | 2014-10-05 00:00:00 | 12.55         | 12.86      |
| JDR          | 2014-10-06 00:00:00 | 14.03         | 12.86      |
| JDR          | 2014-10-07 00:00:00 | 14.75         | 12.86      |
| JDR          | 2014-10-08 00:00:00 | 13.98         | 12.86      |
+--------------+---------------------+---------------+------------+

--SQL4,查找分组内的最后一个值
select
     stock_symbol,
     closing_date,
     closing_price,
     last_value(closing_price) over (partition by stock_symbol)  as lastvalue                    
from stock_ticker;
SQL4结果输出
+--------------+---------------------+---------------+-----------+
| stock_symbol | closing_date        | closing_price | lastvalue |
+--------------+---------------------+---------------+-----------+
| JDR          | 2014-10-02 00:00:00 | 12.86         | 13.98     |
| JDR          | 2014-10-03 00:00:00 | 12.89         | 13.98     |
| JDR          | 2014-10-04 00:00:00 | 12.94         | 13.98     |
| JDR          | 2014-10-05 00:00:00 | 12.55         | 13.98     |
| JDR          | 2014-10-06 00:00:00 | 14.03         | 13.98     |
| JDR          | 2014-10-07 00:00:00 | 14.75         | 13.98     |
| JDR          | 2014-10-08 00:00:00 | 13.98         | 13.98     |
+--------------+---------------------+---------------+-----------+

--SQL5,查找分组内的某个字段的最大值
select
     stock_symbol,
     closing_date,
     closing_price,
     max(closing_price) over (partition by stock_symbol)  as maxvalue                    
from stock_ticker;

SQL5结果输出
+--------------+---------------------+---------------+----------+
| stock_symbol | closing_date        | closing_price | maxvalue |
+--------------+---------------------+---------------+----------+
| JDR          | 2014-10-02 00:00:00 | 12.86         | 14.75    |
| JDR          | 2014-10-03 00:00:00 | 12.89         | 14.75    |
| JDR          | 2014-10-04 00:00:00 | 12.94         | 14.75    |
| JDR          | 2014-10-05 00:00:00 | 12.55         | 14.75    |
| JDR          | 2014-10-06 00:00:00 | 14.03         | 14.75    |
| JDR          | 2014-10-07 00:00:00 | 14.75         | 14.75    |
| JDR          | 2014-10-08 00:00:00 | 13.98         | 14.75    |
+--------------+---------------------+---------------+----------+

下面的例子是求移动平均值

select 
    stock_symbol, 
    closing_date,
    closing_price,
    avg(closing_price) over (partition by stock_symbol order by closing_date
       rows between 1 preceding and 1 followingas moving_average
from stock_ticker;

-- rows between 1 preceding and 1 following的含义是:
窗口的范围为三行数据,即[当前行 -1 ,当前行 + 1]。
对于第一行而言,不存在前一行数据,所以只统计[第一行,第一行 + 1]的平均值。
对于最后一行而言,不存在后一行数据,所以只统计[最后一行 -1 ,最后一行]的平均值

--结果输出
+--------------+---------------------+---------------+----------------+
| stock_symbol | closing_date        | closing_price | moving_average |
+--------------+---------------------+---------------+----------------+
| JDR          | 2014-10-02 00:00:00 | 12.86         | 12.87          |
| JDR          | 2014-10-03 00:00:00 | 12.89         | 12.89          |
| JDR          | 2014-10-04 00:00:00 | 12.94         | 12.79          |
| JDR          | 2014-10-05 00:00:00 | 12.55         | 13.17          |
| JDR          | 2014-10-06 00:00:00 | 14.03         | 13.77          |
| JDR          | 2014-10-07 00:00:00 | 14.75         | 14.25          |
| JDR          | 2014-10-08 00:00:00 | 13.98         | 14.36          |
+--------------+---------------------+---------------+----------------+

七、Impala性能调优

7.1分区表

7.2Join查询调优

优化连接查询最简单的方式是使用compute stats命令收集所有参与关联表的统计信息,让impala根据每个表的大小、列的非重复值个数等相关信息自动优化查询。

如果参与关联的表的统计信息不可用,使用impala自动的连接顺序效率很低,可以在select关键字后使用straight_join关键字手动指定连接顺序,指定了该关键字之后,impala会使用表在查询中出现的先后顺序作为关联顺序进行处理。

使用straight_join关键字需要手动指定连接表的先后顺序:

  • 指定最大的表为第一张表。

  • 指定最小的一张表作为下一张表。

  • 接着指定剩下的表中最小的表作为下一张表。如果有四张表分别为BIG, MEDIUM, SMALL, 和TINY, 指定的顺序应该为BIG, TINY, SMALL, MEDIUM.

Impala查询优化器根据表的绝对大小和相对大小而选择不同的关联技术:

  • 默认的方式为Broadcast joins,当大表连接小表时,小表的内容会被发送到所有执行查询的节点上

  • 另一种为partitioned join,用于大小差不多的大表关联,使用此方式,可以保证关联操作可以并行执行,每个表的一部分数据被发送到不同的节点上,最后各个节点分别对传送过来的数据并行处理。具体使用哪种方式依赖于compute stats的统计信息。

可以使用特定的查询执行explain语句,来确定表的连接策略,如果通过基准测试发现某种策略优于另外一种策略,那么可以通过Hint的方式手动指定需要的连接方式。

当统计信息不可用时如何处理join

如果只有某些表的统计信息不可用,impala会根据存在统计信息的表重新生成连接顺序,有统计信息的表会被放在连接顺序的最左端,并根据表的基数和规模降序排列,没有统计信息的表会被作为空表对待,总是放在连接顺序的最右边。

使用straight_join覆盖连接顺序

如果关联查询由于统计信息过期或者数据分布等问题导致效率低下,可以通过straight_join关键字改变连接顺序,指定顺序后不会再使用impala自动生成的连接顺序。

案例

[localhost:21000] > create table big stored as parquet as select * from raw_data;
+----------------------------+
| summary                        |
+----------------------------+
| Inserted 1000000000 row(s) |
+----------------------------+
Returned 1 row(s) in 671.56s
[localhost:21000] > desc big;
+-----------+---------+---------+
| name       | type      | comment|
+-----------+---------+---------+
id        | int       |            |
| val       | int       |            |
| zfill     | string   |           |
name      | string   |           |
assertion | boolean |          |
+-----------+---------+---------+
Returned 5 row(s) in 0.01s
[localhost:21000] > create table medium stored as parquet as select * from big limit 200 * floor(1e6);
+---------------------------+
| summary                       |
+---------------------------+
| Inserted 200000000 row(s) |
+---------------------------+
Returned 1 row(s) in 138.31s
[localhost:21000] > create table small stored as parquet as select id,val,name from big where assertion = true limit 1 * floor(1e6);
+-------------------------+
| summary                    |
+-------------------------+
| Inserted 1000000 row(s) |
+-------------------------+
Returned 1 row(s) in 6.32s
实际运行查询之前使用explain查看连接信息,启用执行计划的详细输出,可以看到更多的性能相关的输出信息,红色字体显示。信息提示参与关联的表没有统计信息,impala不能为每个执行阶段估计出结果集的大小,使用Broadcast方式向每个节点发送一个表的完整副本。
[localhost:21000] > set explain_level=verbose;
EXPLAIN_LEVEL set to verbose
[localhost:21000] > explain select count(*) from big join medium where big.id = medium.id;
+----------------------------------------------------------+
Explain String                                           |
+----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=2.10GB VCores=2  |
|                                                          |
| PLAN FRAGMENT 0                                          |
|   PARTITION: UNPARTITIONED                               |
|                                                          |
|   6:AGGREGATE (merge finalize)                           |
|   |  outputSUM(COUNT(*))                               |
|   |  cardinality: 1                                      |
|   |  per-host memory: unavailable                        |
|   |  tuple ids: 2                                        |
|   |                                                      |
|   5:EXCHANGE                                             |
|      cardinality: 1                                      |
|      per-host memory: unavailable                        |
|      tuple ids: 2                                        |
|                                                          |
| PLAN FRAGMENT 1                                          |
|   PARTITION: RANDOM                                      |
|                                                          |
|   STREAM DATA SINK                                       |
|     EXCHANGE ID5                                       |
|     UNPARTITIONED                                        |
|                                                          |
|   3:AGGREGATE                                            |
|   |  outputCOUNT(*)                                    |
|   |  cardinality: 1                                      |
|   |  per-host memory10.00MB                            |
|   |  tuple ids: 2                                        |
|   |                                                      |
|   2:HASH JOIN                                            |
|   |  join op: INNER JOIN (BROADCAST)                     |
|   |  hash predicates:                                    |
|   |    big.id = medium.id                                |
|   |  cardinality: unavailable                            |
|   |  per-host memory2.00GB                             |
|   |  tuple ids: 0 1                                      |
|   |                                                      |
|   |----4:EXCHANGE                                        |
|   |       cardinality: unavailable                       |
|   |       per-host memory0B                            |
|   |       tuple ids: 1                                   |
|   |                                                      |
|   0:SCAN HDFS                                            |
|      table=join_order.big #partitions=1/1 size=23.12GB   |
|      table stats: unavailable                            |
|      column stats: unavailable                           |
|      cardinality: unavailable                            |
|      per-host memory88.00MB                            |
|      tuple ids: 0                                        |
|                                                          |
| PLAN FRAGMENT 2                                          |
|   PARTITION: RANDOM                                      |
|                                                          |
|   STREAM DATA SINK                                       |
|     EXCHANGE ID4                                       |
|     UNPARTITIONED                                        |
|                                                          |
|   1:SCAN HDFS                                            |
|      table=join_order.medium #partitions=1/1 size=4.62GB |
|      table stats: unavailable                            |
|      column stats: unavailable                           |
|      cardinality: unavailable                            |
|      per-host memory88.00MB                            |
|      tuple ids: 1                                        |
+----------------------------------------------------------+
Returned 64 row(s) in 0.04s
为每张表执行compute stats收集统计信息: 
[localhost:21000] > compute stats small;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 3 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 4.26s
[localhost:21000] > compute stats medium;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 5 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 42.11s
[localhost:21000] > compute stats big;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 5 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 165.44s
收集完统计信息之后,impala会根据统计信息选择更有效的连接顺序,具体选择哪种方式仍然是根据表的大小和行数的差别来确定。
[localhost:21000] > explain select count(*) from medium join big where big.id = medium.id;
Query: explain select count(*) from medium join big where big.id = medium.id
+-----------------------------------------------------------+
Explain String                                            |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=937.23MB VCores=2 |
|                                                           |
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   6:AGGREGATE (merge finalize)                            |
|   |  outputSUM(COUNT(*))                                |
|   |  cardinality: 1                                       |
|   |  per-host memory: unavailable                         |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   5:EXCHANGE                                              |
|      cardinality: 1                                       |
|      per-host memory: unavailable                         |
|      tuple ids: 2                                         |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID5                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE                                             |
|   |  outputCOUNT(*)                                     |
|   |  cardinality: 1                                       |
|   |  per-host memory10.00MB                             |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   2:HASH JOIN                                             |
|   |  join op: INNER JOIN (BROADCAST)                      |
|   |  hash predicates:                                     |
|   |    big.id = medium.id                                 |
|   |  cardinality: 1443004441                              |
|   |  per-host memory839.23MB                            |
|   |  tuple ids: 1 0                                       |
|   |                                                       |
|   |----4:EXCHANGE                                         |
|   |       cardinality: 200000000                          |
|   |       per-host memory0B                             |
|   |       tuple ids: 0                                    |
|   |                                                       |
|   1:SCAN HDFS                                             |
|      table=join_order.big #partitions=1/1 size=23.12GB    |
|      table stats: 1000000000 rows total                   |
|      column stats: all                                    |
|      cardinality: 1000000000                              |
|      per-host memory88.00MB                             |
|      tuple ids: 1                                         |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   0:SCAN HDFS                                             |
|      table=join_order.medium #partitions=1/1 size=4.62GB  |
|      table stats: 200000000 rows total                    |
|      column stats: all                                    |
|      cardinality: 200000000                               |
|      per-host memory88.00MB                             |
|      tuple ids: 0                                         |
+-----------------------------------------------------------+
Returned 64 row(s) in 0.04s
 
[localhost:21000] > explain select count(*) from small join big where big.id = small.id;
Query: explain select count(*) from small join big where big.id = small.id
+-----------------------------------------------------------+
Explain String                                            |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=101.15MB VCores=2 |
|                                                           |
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   6:AGGREGATE (merge finalize)                            |
|   |  outputSUM(COUNT(*))                                |
|   |  cardinality: 1                                       |
|   |  per-host memory: unavailable                         |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   5:EXCHANGE                                              |
|      cardinality: 1                                       |
|      per-host memory: unavailable                         |
|      tuple ids: 2                                         |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID5                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE                                             |
|   |  outputCOUNT(*)                                     |
|   |  cardinality: 1                                       |
|   |  per-host memory10.00MB                             |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   2:HASH JOIN                                             |
|   |  join op: INNER JOIN (BROADCAST)                      |
|   |  hash predicates:                                     |
|   |    big.id = small.id                                  |
|   |  cardinality: 1000000000                              |
|   |  per-host memory3.15MB                              |
|   |  tuple ids: 1 0                                       |
|   |                                                       |
|   |----4:EXCHANGE                                         |
|   |       cardinality: 1000000                            |
|   |       per-host memory0B                             |
|   |       tuple ids: 0                                    |
|   |                                                       |
|   1:SCAN HDFS                                             |
|      table=join_order.big #partitions=1/1 size=23.12GB    |
|      table stats: 1000000000 rows total                   |
|      column stats: all                                    |
|      cardinality: 1000000000                              |
|      per-host memory88.00MB                             |
|      tuple ids: 1                                         |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   0:SCAN HDFS                                             |
|      table=join_order.small #partitions=1/1 size=17.93MB  |
|      table stats: 1000000 rows total                      |
|      column stats: all                                    |
|      cardinality: 1000000                                 |
|      per-host memory32.00MB                             |
|      tuple ids: 0                                         |
+-----------------------------------------------------------+
Returned 64 row(s) in 0.03s
而实际执行查询时发现无论表的连接顺序如何,执行的时间差不多,因为样本数据的ID列和VAL列都包含很多的重复值
 [localhost:21000] > select count(*) from big join small on (big.id = small.id);
Query: select count(*) from big join small on (big.id = small.id)
+----------+
count(*) |
+----------+
1000000  |
+----------+
Returned 1 row(s) in 21.68s
[localhost:21000] > select count(*) from small join big on (big.id = small.id);
Query: select count(*) from small join big on (big.id = small.id)
+----------+
count(*) |
+----------+
1000000  |
+----------+
Returned 1 row(s) in 20.45s
 
[localhost:21000] > select count(*) from big join small on (big.val = small.val);
+------------+
| count(*)   |
+------------+
| 2000948962 |
+------------+
Returned 1 row(s) in 108.85s
[localhost:21000] > select count(*) from small join big on (big.val = small.val);
+------------+
| count(*)   |
+------------+
| 2000948962 |
+------------+
Returned 1 row(s) in 100.76s

7.3表统计和列统计

表统计信息

show table stats parquet_snappy;
compute stats parquet_snappy;
show table stats parquet_snappy;

列统计信息

show column stats parquet_snappy;
compute stats parquet_snappy;
show column stats parquet_snappy;

分区表的表统计信息和列统计信息

show partitions 与show table stats 显示信息一样

show partitions year_month_day;

+-------+-------+-----+-------+--------+---------+--------------+-------------------+---------+...

| year  | month | day | #Rows | #Files | Size    | Bytes Cached | Cache Replication | Format  |...

+-------+-------+-----+-------+--------+---------+--------------+-------------------+---------+...

| 2013  | 12    | 1   | -1    | 1      | 2.51MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 2   | -1    | 1      | 2.53MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 3   | -1    | 1      | 2.52MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 4   | -1    | 1      | 2.51MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 5   | -1    | 1      | 2.52MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| Total |       |     | -1    | 5      | 12.58MB | 0B           |                   |         |...

+-------+-------+-----+-------+--------+---------+--------------+-------------------+---------+...



show table stats year_month_day;

+-------+-------+-----+-------+--------+---------+--------------+-------------------+---------+...

| year  | month | day | #Rows | #Files | Size    | Bytes Cached | Cache Replication | Format  |...

+-------+-------+-----+-------+--------+---------+--------------+-------------------+---------+...

| 2013  | 12    | 1   | -1    | 1      | 2.51MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 2   | -1    | 1      | 2.53MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 3   | -1    | 1      | 2.52MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 4   | -1    | 1      | 2.51MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 5   | -1    | 1      | 2.52MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| Total |       |     | -1    | 5      | 12.58MB | 0B           |                   |         |...

+-------+-------+-----+-------+--------+---------+--------------+-------------------+---------+...



show column stats year_month_day;

+-----------+---------+------------------+--------+----------+----------+

| Column    | Type    | #Distinct Values | #Nulls | Max Size | Avg Size |

+-----------+---------+------------------+--------+----------+----------+

| id        | INT     | -1               | -1     | 4        | 4        |

| val       | INT     | -1               | -1     | 4        | 4        |

| zfill     | STRING  | -1               | -1     | -1       | -1       |

| name      | STRING  | -1               | -1     | -1       | -1       |

| assertion | BOOLEAN | -1               | -1     | 1        | 1        |

| year      | INT     | 1                | 0      | 4        | 4        |

| month     | INT     | 1                | 0      | 4        | 4        |

| day       | INT     | 5                | 0      | 4        | 4        |

+-----------+---------+------------------+--------+----------+----------+



compute stats year_month_day;

+-----------------------------------------+

| summary                                 |

+-----------------------------------------+

| Updated 5 partition(s) and 5 column(s). |

+-----------------------------------------+



show table stats year_month_day;

+-------+-------+-----+--------+--------+---------+--------------+-------------------+---------+...

| year  | month | day | #Rows  | #Files | Size    | Bytes Cached | Cache Replication | Format  |...

+-------+-------+-----+--------+--------+---------+--------------+-------------------+---------+...

| 2013  | 12    | 1   | 93606  | 1      | 2.51MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 2   | 94158  | 1      | 2.53MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 3   | 94122  | 1      | 2.52MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 4   | 93559  | 1      | 2.51MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| 2013  | 12    | 5   | 93845  | 1      | 2.52MB  | NOT CACHED   | NOT CACHED        | PARQUET |...

| Total |       |     | 469290 | 5      | 12.58MB | 0B           |                   |         |...

+-------+-------+-----+--------+--------+---------+--------------+-------------------+---------+...



show column stats year_month_day;

+-----------+---------+------------------+--------+----------+-------------------+

| Column    | Type    | #Distinct Values | #Nulls | Max Size | Avg Size          |

+-----------+---------+------------------+--------+----------+-------------------+

| id        | INT     | 511129           | -1     | 4        | 4                 |

| val       | INT     | 364853           | -1     | 4        | 4                 |

| zfill     | STRING  | 311430           | -1     | 6        | 6                 |

| name      | STRING  | 471975           | -1     | 22       | 13.00160026550293 |

| assertion | BOOLEAN | 2                | -1     | 1        | 1                 |

| year      | INT     | 1                | 0      | 4        | 4                 |

| month     | INT     | 1                | 0      | 4        | 4                 |

| day       | INT     | 5                | 0      | 4        | 4                 |

+-----------+---------+------------------+--------+----------+-------------------+

COMPUTE STATS 与COMPUTE INCREMENTAL STATS

COMPUTE STATS 和 COMPUTE INCREMENTAL STATS,只能使用其中的一个,不可同时使用。COMPUTE STATS收集表级和分区级的行统计与列统计信息,使用时会消耗CPU,对于非常大的表而言,会耗费很长的时间。提高COMPUTE STATS的效率,需要做到下面几点:

  • 限制统计列的数量。从2.12版本开始有此特点。

  • 设置MT_DOP查询选项,使用更多的线程进行统计信息,注意:对大表收集统计信息时,如果设置较高的MT_DOP值会对同时间运行的其他查询产生负面影响。此特点从2.8开始引入。

  • 通过实验推断或者取样特征进一步提高统计信息的效率。

Compute stats需要周期地运行,比如每周,或者当表的内容发生重大改变的时候。取样的特点是通过处理表的一部分数据,使得compute stats更有效率,推断特点的目的是通过估计新的或者修改的分区的行统计来减少需要重新compute stats的频率。取样和推断的特点默认是关闭的,可以全局开启也可以针对某个表开启,设置--enable_stats_extrapolation参数全局开启,同过针对某个表设置impala.enable.stats.extrapolation=true属性进行开启,表级别的设置会覆盖全局设置。

对于2.1.0或者更高版本,可以使用COMPUTE INCREMENTAL STATS 和DROP INCREMENTAL STATS命令,指的是增量统计,针对分区表。如果对分区表使用此命令,默认情况下impala只处理没有增量统计的分区,即仅处理新加入的分区。

对于一个有大量分区和许多列的表,每个分区的每个列大约400byte的元数据增加内存负载,当必须要缓存到catalogd主机和充当coordinator 的impalad主机时,如果所有表的元数据超过2G,那么服务会宕机。COMPUTE INCREMENTAL STATS比COMPUTE  STATS耗时。

使用alter table手动设置表和列的统计信息

--创建表
create table analysis_data stored as parquet as select * from raw_data;
Inserted 1000000000 rows in 181.98s
--收集统计信息
compute stats analysis_data;
--插入数据
insert into analysis_data select * from smaller_table_we_forgot_before;
Inserted 1000000 rows in 15.32s
-- 共 1001000000行. 设置统计信息.
alter table analysis_data set tblproperties('numRows'='1001000000''STATS_GENERATED_VIA_STATS_TASK'='true');

7.4使用profile信息

7.5最佳实践

选择合适的文件格式

通常对于大数据集而言(每个分区或者表的大小为几个G或者更大),推荐使用Parquet文件格式。因为它按列存储,单词IO可以请求更多的数据,另外它支持更好的压缩算法对二进制文件进行压缩。

对于小表而言(每个分区或者表的大小小于几个G或者更小),不同的存储格式之间没有明显的性能差别。在小数据量时,可以通过减少并行执行的机会(使用压缩文件格式),来减少的I / O。在规划生产部署或执行基准测试时,始终使用实际数据量来获得性能和扩展性的真实情况。

避免数据处理过程中产生过多小文件

对于外部的存储数据,通常的格式为文本格式或者Avro格式,这样可以按照行来构建数据文件。一旦数据要加载到Impala的表中,可以将其转换为更高效的Parquet格式,并使用单个INSERT ... SELECT语句拆分为多个数据文件。

使用insert…select在表与表之间拷贝数据。避免对海量数据或者影响性能的关键表使用insert…values插入数据,因为每条这样的insert语句都会产生单个的小文件。

如果在数据处理过程中产生了上千个小文件,需要使用insert…select来讲数据复制到另外一张表,在复制的过程中也解决了小文件过多的问题。

选择合适的分区粒度

分区是一种基于一个或多个列的值物理划分数据的技术,例如按年,月,日,地区,城市等。当查询指定了具体的分区列或者分区列范围时,Impala可以避免读取不相关的数据,从而可能大大节省磁盘I / O。

在确定分区列时,要选择合适的分区粒度。例如,是按照年、月、日进行分区,还是仅按照年、月进行分区。选择分区的策略是,要保证每个分区的数据至少为256 MB,这样可以更好地利用HDFS的IO批处理性能和Impala的分布式查询。

过度分区还可能导致查询计划花费的时间超过必要的时间,因为Impala会修剪不必要的分区。理想情况下,将表中的分区数保持在3万以下。

在准备每个分区目录的数据文件时,应当使用几个大文件而不是许多小文件。如果存在的数据本来就是以许多小文件的形式存在的,并且无法控制文件的格式,那么,建议使用INSERT ... SELECT语法将数据从一个表或分区复制到另一个表或分区,这样操作会合并文件从而减少文件的数量。

如果一个包含上千个分区的parquet表,每个分区的数据都小于1G,就需要采用更大的分区粒度,只有分区的粒度使文件的大小合适,才能充分利用HDFS的IO批处理性能和Impala的分布式查询。

对分区键列使用最小的适当整数类型

通常,人们更喜欢使用字符串作为分区键列,因为这些值都变成了HDFS目录名。但是,建议使用数值类型的值作为分区键,比如年、月、日,因为这样可以减少内存的使用。通常情况下,月和日的数据类型为TINYINT,年的类型为SMALLINT。使用该EXTRACT()函数从TIMESTAMP值中提取单个日期和时间字段,并使用CAST()函数将返回值转换成适当的整数类型。

选择合适的Parquet块大小

默认情况下,通过INSERT ... SELECT语句创建的Parquet文件的块大小为256 MB。(此默认值在Impala 2.0中已更改。以前限制为1 GB,但Impala对压缩做出了保守估计,导致文件小于1 GB。)

收集统计信息

使用compute stats收集连接查询中海量数据表或者影响性能的关键表的统计信息。

最大限度地减少将结果传回客户端的开销

可以考虑使用如下技术:

  • 聚合。如果需要计算满足条件的记录行数,求匹配到的行中某列的和、最大值、最小值等,不要将整个结果集发送到客户端由客户端应用来处理这些数据,而是可以调用像 COUNT()、SUM()、MAX()等聚集函数来处理,如果将整个未聚集过的数据集发送到客户端,单单将数据传送到客户端这一个动作就需要消耗很大的网络开销。

  • 过滤。使用不同的谓词条件尽可能的缩小结果集的大小,而不是把整个结果集发送到应用端,由应用来处理过滤逻辑。使用WHERE查询子句中的所有适用测试来消除不相关的行,而不是生成大的结果集并使用应用程序逻辑对其进行过滤。

  • LIMIT子句。如果您只需要查看结果集中的很少的样本数据,或查询使用ORDER BY之后产生的最大值或者最小值,可以使用LIMIT子句来最大限度的减少结果集的大小。

  • 避免对结果集进行美化输出:当通过impala-shell查询数据时,可以指定-B和--output_delimiter选项输出原始的结果集,而不需要impala对输出的格式进行美化,或者直接将结果集重定向到文件中。上述的情况也可以考虑使用 INSERT ... SELECT将结果直接写入HDFS上。

检查查询计划

在实际运行一个查询之前,使用explain查看执行计划是否以高效合理的方式运行

查看查询的性能特征

在运行一个查询之后,使用profile命令查看IO,内存消耗,网络带宽占用,CPU使用率等信息是否在期望的范围之内。

使用适当的操作系统设置

有关影响Impala性能的操作系统设置,请参阅Apache Hadoop发行版的文档。比如,将vm.swappiness设置为非零值可以提高整体性能。



进大数据交流群


最后编辑于:2024/1/16 拔丝英语网

admin-avatar

英语作文代写、国外视频下载

高质量学习资料分享

admin@buzzrecipe.com