ODPS重点参数语法-使用小记
在日常工作中,记录一些用到阿里云odps的常用的参数语法,记录一下,方便查找和定位。
常用命令
-- 查询项目空间
list projects;
-- 进入指定项目空间
use <project_name>;
-- 查看空间垃圾
show recyclebin;
-- 清空回收站中所有项目
purge all;
-- 回收某张表的命令
purge table tblname;
-- 列出当前项目空间下所有的表
show tables;
-- 列出当前项目空间下表名与'dim'匹配上的表,支持正则表达式
show tables like 'dim*';
-- 列出一张表的所有分区
show partitions <table_name>;
-- 显示Set设置的参数
show flags;
-- 预估出一条SQL的计量信息,包含输入数据的大小、UDF个数以及SQL复杂等级
cost sql <SQL Sentence>;
-- 查看project空间的使用情况
-- 查看表信息
desc <table_name>;
-- 查看表信息以及Extended信息
desc extended <table_name>;
-- 查看分区信息
desc table_name partition(pt_spec);
-- 查看当前project资源信息
desc resource $resource_name;
-- 查看集群信息
desc project $project_name -extended;
-- 导出项目内所有表的DDL语句
export <project name> local_file_path;
-- 生成创建表的SQL DDL语句
show create table <table_name>;
-- 修改表的owner
alter table table_name changeowner to new_owner;
-- 重命名表
alter table table_name rename to new_table_name;
-- 修改表的注释
alter table table_name set comment 'tbl comment';
-- 修改表的生命周期属性
alter table table_name set lifecycle <days>;
-- 禁止/恢复生命周期
alter table table_name partition[partition_spec] enable|disable lifecycle;
-- 查看表的分区信息
desc table_name partition(ds='20151010');
-- 修改表的修改时间
alter table table_name touch;
-- 清空非分区表里的数据,不支持分区表
truncate table table_name;
-- 对于分区表,可以用
alter table table_name drop partition(partition_spec);
-- 强制删除表数据(分区数据)
drop table tblname purge;
alter table tblname drop partition(part_spec) purge;
-- 添加列,目前MaxCompute单表的列数上限为1200。
alter table table_name add columns (col_name1 type1, col_name2 type2...);
-- 修改列名
alter table table_name change column old_col_name rename to new_col_name;
-- 修改列和分区注释,comment内容最长1024字节,列的数据类型和位置不能修改
alter table table_name change column col_name comment 'comment';
-- 合并小文件
alter table tablename [partition] merge smallfiles;
-- MAPJOIN HINT
-- left outer join的左表必须是大表。
-- right outer join的右表必须是大表。
-- inner join的左表或右表均可以作为大表。
-- full outer join不能使用mapjoin。
-- mapjoin支持小表为子查询。
-- 使用mapjoin时需要引用小表或子查询时,需要引用别名。
-- 在mapjoin中,可以使用不等值连接或者使用or连接多个条件。
-- 如果使用mapjoin,则所有小表占用的内存总和不得超过512MB。
-- 该限制值可以用参数 odps.sql.mapjoin.memory.max 调整,最大为2048M。
-- 该大小指的是数据解压后的大小,而通过desc命令查询相关表显示的是数据压缩后的大小,需要在计算时按照压缩格式乘以压缩比。
select /* + mapjoin(a) */ a.shop_name, b.customer_id, b.total_price from shop a join sale_detail b on a.shop_name = b.shop_name;
-- 全量创建并复制非分区表
clone table src_copy to src_clone;
-- 复制表部分分区
clone table srcpart_copy partition(ds="2008-04-09", hr='11') to srcpart_clone if exists overwrite;
-- 完整复制分区表并跳过已存在的分区
clone table srcpart_copy to srcpart_clone if exists ignore;
实例操作
-- 返回由当前用户创建的实例信息
show instances [from startdate to enddate] [number];
show p [from startdate to enddate] [number];
show instances [-all];
show p [-all];
show p -p <project name>;
-- 查看某实例的状态
status <instance_id>;
-- 停止某实例,将其状态设置为Canceled
kill <instance_id>;
-- 返回当前项目中当前账号所提交的正在执行的作业信息
top instance;
-- 返回当前项目下所有正在执行的作业,默认最大返回50条
top instance -all;
top instance -limit 50;
-- 根据具体的实例ID获得作业信息
desc instance <instance_id>;
-- 根据具体的实例ID获得任务运行日志信息,包含Logview链接。再通过查看Logview获得任务的详细日志
wait instance_id;
参数说明如下:
-
• startdate To enddate:返回指定时间段内的实例,即从起始时间startdate到结束时 间enddate的实例信息。需满足如下格式:yyyy-mm-dd,精度到天。可选参数,若不指定,返回用户三天内提交的实例。
-
• number:指定返回实例的数量。依照时间排序,返回距离当前时间最近的number个实例信息。若不指定number,返回满足要求的所有实例信息。
-
• -all:返回当前项目下所有执行过的实例,默认最大返回50条。需要注意,执行该命令的用户需要有Project的List权限。如需返回更多条记录,请使用-limit number参数,例如show p -all -limit 100表示返回当前项目下100条执行过的实例记录。
-
• project name:项目名称,用户使用的账号必须已经是项目成员。
使用odpscmd导入数据
drop table if exists import__test;
create table import__test (
col1 string
, col2 string
, col3 string
)
partitioned by (dt string)
;
tunnel upload -acp true -cf true -c "utf-8" -fd "," -h true -rd "rn" test.csv {project_name}.import__test/dt=20220616;
使用odpscmd导出数据
tunnel download -t 2 -cf ture -c "utf-8" -fd "&$" -rd "n" {project_name}.import__test/dt=20220616 test.csv;
tunnel download -cf ture -h true instance://{project_name}/{instanceid} test.csv;
拆分文件成多个,并一一进行压缩
split --suffix-length=6
--numeric-suffixes=1
--additional-suffix=.csv
--lines=2
--verbose
test.csv split/test_
cd split
find . -name "*.csv" | awk -F '[./]' '{print $3}' | xargs -P 2 -I {} zip -r ../{}.zip {}.csv
find . -name "*.csv" | xargs wc -l
设置参数
-- session级别
set odps.sql.type.system.odps2=true;
set odps.sql.timezone=<timezoneid>;
-- project级别
setProject odps.sql.type.system.odps2=false;
setProject odps.sql.timezone=<timezoneid>;
-- pyodps
o.execute_sql('set odps.sql.type.system.odps2=true;query_sql', hints={"odps.sql.submit.mode" : "script"})
-- 设置阅读模式
set odps.sql.select.output.format=HumanReadable/json;
项目的数据类型版本
--查看项目数据类型版本。
setproject;
--开启/关闭MaxCompute2.0数据类型版本。
setproject odps.sql.type.system.odps2=true/false;
--开启/关闭decimal2.0数据类型。
setproject odps.sql.decimal.odps2=true/false;
--开启/关闭hive兼容模式数据类型版本。
setproject odps.sql.hive.compatible=true/false;
MaxCompute参数调优
set odps.instance.priority=x; -- 1-9 数字越小,优先级越高
-- group by中的整型常量会被当做select的列序号处理
set hive.groupby.position.alias=true;
-- order by中的整型常量会被当做select的列序号处理
set hive.orderby.position.alias=true;
# Map设置
-- 设置处理Map Task每个Instance的CPU数目,默认为100,在[50,800]之间调整
set odps.sql.mapper.cpu=100;
-- 设定Map Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整
set odps.sql.mapper.memory=1024;
-- 设定控制文件被合并的最大阈值,单位M,默认64M,在[0,Integer.MAX_VALUE]之间调整
set odps.sql.mapper.merge.limit.size=64;
-- 设定一个Map的最大数据输入量,可以通过设置这个变量达到对Map端输入的控制,单位M,默认256M,在[1,Integer.MAX_VALUE]之间调整
set odps.sql.mapper.split.size=256;
# Join设置
-- 设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为2000,走HBO时可以超过2000
set odps.sql.joiner.instances=-1;
-- 设定Join Task每个Instance的CPU数目,默认为100,在[50,800]之间调整
set odps.sql.joiner.cpu=100;
-- 设定Join Task每个Instance的Memory大小,单位为M,默认为1024M,在[256,12288]之间调整
set odps.sql.joiner.memory=1024;
# Reduce设置
-- 设定Reduce Task的Instance数量,默认为-1,在[0,2000]之间调整。不走HBO优化时,ODPS能够自动设定的最大值为1111,手动设定的最大值为2000,走HBO优化时可以超过2000
set odps.sql.reducer.instances=30;
-- 设定Reduce Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整
set odps.sql.reducer.memory=4096;
-- 设定处理Reduce Task每个Instance的Cpu数目,默认为100,在[50,800]之间调整
set odps.sql.reducer.cpu=200;
-- 数据倾斜
# 1. group by数据倾斜
set odps.sql.groupby.skewindata=true;
# 2. 不使用动态分区
set odps.sql.reshuffle.dynamicpt=false;
-- 设置join优化具体信息
-- 开启join优化,必须设置odps.sql.skewinfo才有效
set odps.sql.skewjoin=true;
set odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")]
-- 输出结果为
explain select a.key c1, a.value c2, b.key c3, b.value c4 from src a join src_skewjoin1 b on a.key = b.key;
# 小文件合并参数
-- 设置是否跨路径合并,对于表下面有多个分区的情况,合并过程会将多个分区生成独立的Merge Action进行合并,所以对于odps.merge.cross.paths设置为true,并不会改变路径个数,只是分别去合并每个路径下的小文件
set odps.merge.cross.paths=true;
-- 设置合并文件的小文件大小阀值,文件大小超过该阀值,则不进行合并,单位为M,可以不设,不设时,则使用全局变量odps_g_merge_filesize_threshold,该值默认为32M,设置时必须大于32M
set odps.merge.smallfile.filesize.threshold=128;
-- 设置合并输出文件量的大小,输出文件大于该阀值,则创建新的输出文件,单位为M,可以不设,不设时,则使用全局变odps_g_max_merged_filesize_threshold,该值默认为256M,设置时必须大于256M
set odps.merge.maxmerged.filesize.threshold=512;
-- 设置合并Fuxi Job的单个Instance允许合并的小文件个数,控制合并并行的Fuxi Instance数,可以不设,不设时,则使用全局变量odps_g_merge_files_per_instance,该值默认为100,在一个Merge任务中,需要的Fuxi Instance个数至少为该目录下面的总文件个数除以该限制
set odps.merge.max.filenumber.per.instance=10000;
-- 设置合并最大的小文件个数,小文件数量超过该限制,则超过限制部分的文件忽略,不进行合并,可以不设,不设时,则使用全局变量odps_g_max_merge_files,该值默认为10000
--值默认为50000个;当分区数大于50000时需要调整,最大可到1000000万,大于1000000的提交多次merge
set odps.merge.max.filenumber.per.job=50000;
alter table tb_name partition(dt='20210501') merge smallfiles;
# Mapjoin设置
-- 设置Mapjoin时小表的最大内存,默认512,单位M,[128,2048]之间调整
set odps.sql.mapjoin.memory.max=512;
# 动态分区设置
-- False:会减少小文件的产生,如果动态分区值很少,关闭数据就不会倾斜了。True:不会数据倾斜
set odps.sql.reshuffle.dynamicpt=true/false;
发表评论