要有最朴素的生活和最遥远的梦想,
即使明天天寒地冻,山高水远,路远马亡。
---七堇年
背景:为什么使用二级索引?
-
Hbase中通过非rowkey查询数据查询速度会很慢
-
在Hbase中要想精确查询一条数据所以必须使用rowkey,如果不通过rowkey查询数据,就必须逐行逐列的比较(即全表扫描),效率很低. 实际业务中需要通过多个维度快速查询数据. 例如查询用户的时候可能需要通过用户名,姓名,邮箱,手机号查询,但是把这种多维度的查询字段都放到rowkey中,显然是不可能的(灵活性不高,roekey的长度也是有限制的),因此二级索引的应用场景就应运而生,Phoenix已经提供了对HBase的二级索引支持支持。
-
开启Hbase对二级索引的支持
-
vim vim hbase-2.3.1/conf/hbase-site.xml
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
1.简介
Phoenix是一个HBase框架,可以通过SQL的方式来操作HBase。
Phoenix是构建在HBase上的一个SQL层,是内嵌在HBase中的JDBC驱动,能够让用户使用标准的JDBC来操作HBase。
Phoenix使用JAVA语言进行编写,其查询引擎会将SQL查询语句转换成一个或多个HBase Scanner,且并行执行生成标准的JDBC结果集。
*如果需要对HBase进行复杂的操作,那么应该使用Phoenix,其会将SQL语句转换成HBase相应的API。
*Phoenix只能用在HBase上,其查询性能要远高于Hive。
2.Phoenix与HBase的关系
Phoenix与HBase中的表是独立的,两者之间没有必然的关系。
Phoenix与HBase集成后会创建六张系统表:SYSTEM.CATALOG、SYSTEM.FUNCTION、SYSTEM.LOG、SYSTEM.SEQUENCE、SYSTEM.STATS,其中SYSTEM.CATALOG表用于存放Phoenix创建表时的元数据。
Phoenix创建表时会自动调用HBase客户端创建相应的表,并且在SYSTEM.CATALOG系统表中记录Phoenix创建表时的元数据,其主键的值对应HBase的RowKey,非主键的列对应HBase的Column(列族不指定时为0,且列会进行编码)
如果是通过Phoenix创建的表,那么必须通过Phoenix客户端来对表进行操作,因为通过Phoenix创建的表其非主键的列会进行编码。
3.Phoenix语法
Phoenix的SQL中如果表名、字段名不使用双引号标注那么默认转换成大写。
Phoenix中的字符串使用单引号进行标注。
创建表
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city)
);
*主键的值对应HBase中的RowKey,列族不指定时默认是0,非主键的列对应HBase的列。
删除表
DROP TABLE us_population;
查询数据
SELECT *
FROM us_population
WHERE state = 'NA' AND population > 10000
ORDER BY population DESC;
*在进行查询时,支持ORDER BY、GROUP BY、LIMIT、JOIN等操作,同时Phoenix提供了一系列的函数,其中包括COUNT()、MAX()、MIN()、SUM()等,具体的函数列表可以查看:http://phoenix.apache.org/language/functions.html
*不管条件中的列是否是联合主键中的,Phoenix一样可以支持。
删除数据
DELETE FROM us_population WHERE state = 'NA';
插入或更新数据
UPSERT INTO us_population VALUES('CA','GZ',850000);
UPSERT INTO us_population(state,city) VALUES('CA','GZ');
*如果主键的值重复,那么进行更新操作,否则插入一条新的记录(在进行更新时,没有更新的列保持原值,在进行插入时,没有插入的列为null)
*在使用UPSERT时,主键的列不能为空(包括联合主键)
4.Phoenix映射HBase
只要直接通过HBase客户端创建的表,若想用Phoenix来进行操作,那么必须要进行表的映射,因为SYSTEM.CATALOG表中并没有维护Phoenix创建表的元数据。
创建表来进行表的映射
CREATE TABLE IF NOT EXISTS 表名(
列名 类型 主键,
列簇.列名,
列簇.列名
)
*HBase中的RowKey映射Phoenix的主键,HBase中的Column映射Phoenix的列,且使用列簇名.列名进行映射。
*相当于在SYSTEM.CATALOG表中录入相关的元数据,使Phoenix能够进行操作它。
创建视图来进行表的映射
CREATE VIEW 视图名(
列名 类型 主键,
列簇.列名,
列簇.列名
)
*Phoenix中的视图只能进行查询,不能进行添加、更新、删除操作。
5.Phoenix优化
1.服务端配置优化
*往HBase安装目录下的conf目录下的hbase-site.xml文件中添加配置。
1. index.builder.threads.max
创建索引时,使用的最大线程数。
默认值: 10。
2. index.builder.threads.keepalivetime
创建索引的创建线程池中线程的存活时间,单位:秒。
默认值: 60
3. index.writer.threads.max
写索引表数据的写线程池的最大线程数。
更新索引表可以用的最大线程数,也就是同时可以更新多少张索引表,数量最好和索引表的数量一致。
默认值: 10
4. index.writer.threads.keepalivetime
索引写线程池中,线程的存活时间,单位:秒。
默认值:60
5. hbase.htable.threads.max
每一张索引表可用于写的线程数。
默认值: 2,147,483,647
6. hbase.htable.threads.keepalivetime
索引表线程池中线程的存活时间,单位:秒。
默认值: 60
7. index.tablefactory.cache.size
允许缓存的索引表的数量。
增加此值,可以在写索引表时不用每次都去重复的创建htable,这个值越大,内存消耗越多。
默认值: 10
8. org.apache.phoenix.regionserver.index.handler.count
处理全局索引写请求时,可以使用的线程数。
默认值: 30
2.对表中的数据进行分区
哈希取模
通过在创建表时指定SALE_BUCKETS来实现将表中的数据预分割到多个Region中,有利于提高读取数据的性能。
其原理是将RowKey进行散列,把得到的余数的byte值插入到RowKey的第一个字节中,并通过预定义每个Region的Start Key和End Key,将数据分散存储到不同的Region中。
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city)
)SALT_BUCKETS=16;
*通过SALE_BUCKETS设置哈希函数的除数P(除留余数法)
根据值来进行预分区
在创建表时,可以精确的指定RowKey根据什么值来进行预分区,不同的值存储在独立的Region中,有利于提高读取数据的性能。
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city)
)SPLIT ON('CS','EU','NA');
3.创建表时指定列簇
在HBase中每个列簇对应一个文件,如果要查询的列其列簇下只有它自己,那么将极大的提高读取数据的性能。
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
C1.population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city)
);
*列族只能在非主键列中进行指定。
4.对表进行压缩
在创建表时可以指定表的压缩方式,能极大的提高数据的读写效率。
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city)
)COMPRESSION='GZ';
*可选的压缩方式包括GZip、Snappy、Lzo等。
5.使用二级索引
在HBase中会自动为RowKey添加索引,因此在通过RowKey查询数据时效率会很高,但是如果要根据其他列来进行组合查询,那么查询的性能就很低下,此时可以使用Phoenix提供的二级索引,能够极大的提高查询数据的性能。
创建普通索引
CREATE INDEX 索引名称 ON 表名(列名)
>create table user1(
id varchar(10) primary key,
f.name varchar(100),
f.pass varchar(100),
f.grp varchar(10),
f.type varchar(5),
f.types varchar(2),
f.code varchar(10),
f.uname varchar(20),
f.email varchar(20),
f.factory varchar(10),
f.depart varchar(10),
f.region varchar(10)
)column_encoded_bytes=0;
> create index user_name on user(name);
//查询索引
> !indexes user
user表删除索引
drop index user_name on user;
实战
-
create index user_name on user; //默认可变索引
//创建索引
> create index user_name on user;
//*这样查询是不会走索引的
> select * from user where name='rumenz';
> explain select * from user where name='rumenz';
//FULL SCAN OVER USER SERVER FILTER BY F.NAME =
//查询字段和索引字段保持一致就可以用到索引
> select name from user where name='rumenz';
> explain select name from user where name='rumenz';
//CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN RANGE SCAN OVER USER_NAME SERVER FILTER BY FIRST KEY ONLY
-
create local index user_name on user1(name); //创建本地索引
//*这样查询就会走索引
> select * from user where name='rumenz';
> explain select * from user where name='rumenz';
//使用到了索引
//ROUND ROBIN RANGE SCAN OVER USER1
-
converted index //相当于一个联合索引
> create index user_name1 on user1(name) include(pass);
//只有当name,pass在查询字段出现时,才会用到索引:比如
//select name from user1 where name=''或者 pass='';
//select pass from user1 where name=''或者 pass='';
//select name,pass from user1 where name=''或者 pass='';
-
在select和column_name之间加上/*+ Index(<表名> <index名>)*/,通过这种方式强制使用索引。
> select /*+ index(user1,USER_NAME) */ pass from user1 where name='xxx';
//1.如果pass是索引那么从索引表查询.
//2.如果pass不是索引,那么会进行全表扫描会很慢.
重建索引
> alter index USER_NAME on user1 rebuild;
创建二级索引
CREATE INDEX 索引名称 ON 表名(列名) INCLUDE(列名)
6.Phoenix的搭建
选好软件版本:
apache-phoenix-5.0.0-HBase-2.0-bin.tar.gz
hadoop-2.9.2.tar.gz
hbase-2.0.5-bin.tar.gz
1.安装JDK、Hadoop、Zookeeper、HBase
由于Phoenix是内嵌在HBase的JDBC驱动,且HBase是通过JAVA语言编写的,其基于HDFS,且依赖于Zookeeper进行服务的协调和HA高可用配置,因此需要安装JDK、Hadoop和Zookeeper,并配置好JAVA_HOME环境变量。
由于HDFS一般都以集群的方式运行,因此需要搭建HDFS集群并启动。
*在搭建HDFS集群时,需要相互配置SSH使之互相信任并且开放防火墙相应的端口,或者直接关闭防火墙。
启动Zookeeper集群
由于HBase一般都以集群的方式运行,因此需要搭建HBase集群并启动。
2.安装Phoenix
选择对应HBase版本的Phoenix版本:http://archive.apache.org/dist
将下载后的hbase-server.jar复制到HBase安装目录的lib目录下。
3.重启HBase集群
由于已经将Phoenix的hbase-server.jar复制到HBase的lib目录下,当HBase集群启动时将会加载Phoenix,并加载hbase-site.xml配置文件(存放HBase和Phoenix的配置),此时可以使用Phoenix客户端进行连接,通过标准的JDBC来操作HBase。
*Phoenix第一次启动时,会在HBase中创建六张系统表,用于存放Phoenix的相关信息。
7.Phoenix客户端
使用sqlline.py操作Phoenix
Phoenix提供了sqlline.py脚本用于启动Phoenix客户端并进行连接,启动时只需要指定HBase连接的Zookeeper集群地址即可。
*使用!tables查询通过Phoenix创建的表(即SYSTEM.CATALOG表中的元数据)
使用JAVA操作Phoenix
导入依赖
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>5.0.0-HBase-2.0</version>
</dependency>
使用标准的JDBC来操作HBase
//加载驱动
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
//通过DriverManager获取连接
Connection conn = DriverManager.getConnection("jdbc:phoenix:192.168.1.80,192.168.1.81,192.168.1.82:2181);
//创建Statement实例
Statement statement = conn.prepareStatement(sql);
//执行增、删、改、查等操作
execute(sql)
executeUpdate(sql)
executeQuery(sql)
完整的PhoenixUtils
public class PhoenixUtils {
private static final Logger logger = LoggerFactory.getLogger(PhoenixUtils.class);
private static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
private static final String PHOENIX_URL = "jdbc:phoenix:192.168.1.80,192.168.1.81,192.168.1.82:2181";
private static Connection conn = null;
static {
try {
Class.forName(PHOENIX_DRIVER);
conn = DriverManager.getConnection(PHOENIX_URL);
} catch (Exception e) {
logger.info("初始化Phoenix连接时失败", e);
}
}
/**
* 获取Phoenix中的表(系统表除外)
*/
public static List<String> getTables() throws Exception {
List<String> tables = new ArrayList<>();
DatabaseMetaData metaData = conn.getMetaData();
String[] types = {"TABLE"}; //"SYSTEM TABLE"
ResultSet resultSet = metaData.getTables(null, null, null, types);
while (resultSet.next()) {
tables.add(resultSet.getString("TABLE_NAME"));
}
return tables;
}
/**
* 获取表中的所有数据
*/
public static List<Map<String, String>> getList(String tableName) throws Exception {
String sql = "SELECT * FROM " + tableName;
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<Map<String, String>> resultList = new ArrayList<>();
while (resultSet.next()) {
Map<String, String> result = new HashMap<>();
for (int i = 1, len = resultSetMetaData.getColumnCount(); i <= len; i++) {
result.put(resultSetMetaData.getColumnName(i), resultSet.getString(i));
}
resultList.add(result);
}
return resultList;
}
}
-
通过Java客户端操作phoenix
package com.suzhigang.hbase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
/**
* 通过Phoenix操作Hbase
*/
public class PhoenixQueryHbase {
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
public void init() throws Exception {
connection = DriverManager.getConnection("jdbc:phoenix:hd1,hd2,hd3:2181");
}
/**
* 建表并查询
* @throws Exception
*/
public void create() throws Exception {
Statement statement = connection.createStatement();
statement.executeUpdate("create table test(id integer primary key ,animal varchar )");
//新增和更新都是一个操作:upsert
statement.executeUpdate("upsert into test values (1,'dog')");
statement.executeUpdate("upsert into test values (2,'cat')");
connection.commit();
PreparedStatement preparedStatement = connection.prepareStatement("select * from test");
rs = preparedStatement.executeQuery();
while (rs.next()) {
String id = rs.getString("id");
String animal = rs.getString("animal");
String format = String.format("id:%s,animal:%s", id, animal);
System.out.println(format);
}
}
/**
* 查询已有的表
* @throws Exception
*/
public void testQuery() throws Exception {
String sql = "select * from tc";
try {
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
while (rs.next()) {
String id = rs.getString("ID");
String name = rs.getString("NAME");
String age = rs.getString("AGE");
String sex = rs.getString("SEX");
String format = String.format("id:%s,name:%s,age:%s,sex:%s", id, name, age, sex);
System.out.println(format);
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (rs != null) rs.close();
if (ps != null) ps.close();
if (connection != null) connection.close();
}
}
/**
* 删除数据
* @throws Exception
*/
public void delete() throws Exception {
try {
ps = connection.prepareStatement("delete from test where id=2");
ps.executeUpdate();
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (rs != null) rs.close();
if (ps != null) ps.close();
if (connection != null) connection.close();
}
}
/**
* 删除表
* @throws Exception
*/
public void dropTable() throws Exception {
try {
ps = connection.prepareStatement("drop table test");
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (rs != null) rs.close();
if (ps != null) ps.close();
if (connection != null) connection.close();
}
}
public void close() throws Exception {
if (rs != null) rs.close();
if (ps != null) ps.close();
if (connection != null) connection.close();
}
}
8.踩坑记录:
-
采坑,HBase的表是区分大小写,但是DBeaver是不区分的。
-
采坑,Phoenix 的timezone(时区)默认是国外了,而国内的是上海时区,这样导致时间类型数据入Phoenix后时间戳对不上。
-
采坑,使用Phoenix 进行select时及其小概率会出现两行一模一样的数据,重复数据,过会儿查就没了
-
采坑,Phoenix 结合HBase 使用起来像是操作mysql,但是HBase强大的动态列功能Phoenix貌似不行,个人觉得它有点儿阉割了HBase,故并不是任何场景都适合使用HBase整合Phoenix,对关系型数据入Hive倒是挺好用的。
-
Phoenix查询客户端超时
Error: Operation timed out. (state=TIM01,code=6000)
java.sql.SQLTimeoutException: Operation timed out.
解决方案:修改apache-phoenix-5.0.0-HBase-2.0-bin/bin/hbase-site.xml,增加以下配置
<property>
<name>phoenix.query.timeoutMs</name>
<value>3600000</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>3600000</value>
</property>
获取更多资料详情请:
发表评论