hive自定义函数和transform

原标题:MaxCompute重装上阵 第五弹 – SELECT TRANSFOR

UDTF

  • Hive中UDTF编写和使用

hive自定义函数(udf:user-defined function)

例1:
对于以下数据

1367775,10
1363426,10
1371235,10
1371237,10
1371236,10
1376888,10
1382132,10

1367775   beijing    10
1363426   beijing    10
1371235   shanghai   10
1371237   shanghai   10
1361236   beijing    10
1366888   beijing    10
1382132   shenzhen   10
写一个函数得到省份名

1、开发一个java类,继承UDF(聚合函数继承UDAF)并重载evaluate方法

package bigdata.udf

import org.apache.hadoop.hive.ql.exec.UDF;
                                 //继承类
public class ToLowerCase(GetProvince) extends UDF{
//加载一个字典表
    public static HashMap<Integer,String> provinceMap=new HashMap<Integer,String>
    static {
        provinceMap.put("136","beijing");
        provinceMap.put("137","shanghai");
        provinceMap.put("138","shenzhen");
    }


    //必须是public        //重载evaluate方法根据不同的输入判断调用那个函数
    public String evaluate(String field){
        String result = field.toLowerCase();
        return result;
    }
           //返回值           //输入
    public String evaluate(int phonenbr){
        String pnb = String.valueOf(phonenbr);
        return provinceMap.get(pnb.substring(0,3))== null?"huoxin":provinceMap.get(pnb.substring(0,3));
    }   
}

2、打成jar包上传到服务器
3、将jar包添加到hive的classpath
add JAR /home/hadoop/udf.jar;
4、创建临时函数与开发好的java class 关联
create temporary function getprovince as 'bigdata.udf.ToLowerCase';
5、hql中使用

create table t_flow(phonenbr int,flow int)
row format delimited //使用自带的serde:S erDe是Serialize/Deserilize的简称,目的是用于序列化和反序列化。S erDe能为表指定列,且对列指定相应的数据。
fields terminated by ',';
load data local inpath '/home/hadoop/flow.dat' into table t_flow;

select phonenbr,getprovince(phonenbr),flow from t_flow;

例2:

create table t_json(line string)
row format delimited;
load data local inpath '' into table t_json;
select * from t_json limit 10;

class JsonParser
package bigdata.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import parquet.org.codehaus.jackson.map.ObjectMapper;
public class JsonParser extends UDF {  //alt+/ctrl+shift+o导包
//Window - Preferences - Java - Editor - Templates,这里你可以看到所有的eclipse的快捷方式
//alt+/补全   
    public String evaluate(String jsonline){  //输入jsonline返回string
        ObjectMapper objectMapper = new ObjectMapper();
        try{
            MovieRateBean bean = ObjectMapper.readValue(jsonline,MovieRateBean);
            return bean.toString();
        }catch(Exception e){
        }
        return "";
    }   
}

MovieRateBean

package bigdata.udf;
public class MovieRateBean{
    private String movie;
    private String rate;
    private String timeStamp;
    private String uid;

    //alt+shift+s
    public String getMovie(){
        return movie;
    }
    public String setMovie(String movie){
        this.movie = movie;
    }
    public String getRate(){
        return rate;
    }
    public void setRate(String rate){
        this.rate = rate;
    }
    public String getTimeStamp(){
        return timestamp;
    }
    public void setTimeStamp(String timeStamp){
        this.timeStamp = timeStamp;
    }
    public String getUid(){
        return uid;
    }
    public void setUid(String uid){
        this.uid = uid;
    }

    public String toString(){

        return this.movie + "t" + this.rate + "t" +this.timeStamp + "t" + this.uid();
    }
}

javabean:这个类是public的,还要有一个无参数的构造函数。第二,属性是private的,必须通过get
和set 方法进行访问。第三,支持“事件”,例如addXXXXListener(XXXEvent
e),可以处理各种事件,比如鼠标点击,键盘响应等等。第四,提供一个反射机制。第五,可以序列化/反序列化的,这样,我就可以被方便的存储,转移了。

bin/beeline -u jdbc:hive2://localhost:10000 -n hadoop
add JAR /home/hadoop/udf.jar;
create temporary function parsejson as 'bigdata.udf.JsonParser';
select parsejson(line) form t_json limit 10;

但是只有一个字段,如何把它分为四个字段

//insert overwrite table t_rating
create table t_rating as
select split(parsejson(line),'t')[0]as movieid,
split(parsejson(line),'t')[1] as rate,
split(parsejson(line),'t')[2] as timestring,
split(parsejson(line),'t')[3] as uid 
from t_json;

内置json函数
select get_json_object(line,'$.movie') as moive,
get_json_object(line,'$.rate') as rate  from rat_json limit 10;

摘要: 大数据计算服务 MaxCompute
能够提供强大的分析能力,而分布式 NoSQL
数据库表格存储在行级别上的实时更新和可覆盖性写入等特性,相对于
MaxCompute 内置表 append-only 批量操作,提供了一个很好的补充。

摘要:
MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

UDAF

  • Hive
    udaf开发入门和运行过程详解
  • Hive通用型自定义聚合函数(UDAF)

Transform实现

提供了在sql中调用自写脚本(python或shell脚本)的功能,适合hive中没有的功能又不想写udf的情况。
1.加载rating.json文件到hive的一个原始表

create table t_json(line string)
row format delimited;
load data local inpath '' into table t_json;
select * from t_json limit 10;

2.需要解析json数据成四个字段,插入一张新表t_rating
内置json函数

set hive.support.sql11.reserved.keywords=false;##不然识不出timeStamp
hive> create table t_rating as
    > select get_json_object(line,'$.movie') as moive,get_json_object(line,'$.rate') as rate,get_json_object(line,'$.timeStamp') as timeStamp,get_json_object(line,'$.uid') as uid from t_json;

3.使用transform+python的方式转换unixtime为weekday
先编辑一个python脚本文件,然后将文件加入hive的classpath下:

vi weekday_mapper.py
#!/bin/python
import sys
import datetime

for line in sys.stdin:
    line = line.strip()//去空格
    movieid,rate,timestring,uid = line.split('t')
    weekday=datetime.datetime.fromtimestamp(float(timestring)).isoweekday()
    print 't'.join([movieid,rating,str(weekday),userid])  //相当于后面用/t串起来

add file weekday_mapper.py;

create table u_data_new(
    movieid int,
    rating int,
    weekday int,
    userid int)
row format delimited
fields terminated by '/t';

insert overwrite table u_data_new
//create table u_data_new as
select
    transform(movieid,rate,timestring,uid)
    using'python weekday_mapper.py'
    as(movieid,rating,weekday,userid)
from t_rating;

报错:生无可恋
ERROR : Ended Job = job_local1691136619_0009 with errors
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask (state=08S01,code=2)

select distinct(weekday) from u_data_new limit 10;

关系数据库已经存在半个世纪,有非常广泛的使用场景,但是在快速迭代的互联网领域其扩展性和
schema 灵活性被诟病颇多,因此类似 TableStore/BigTable/HBase
等强调扩展性和灵活性的NoSQL数据库逐步流行起来,这些 NoSQL 数据库只提供
API 接口,不提供 SQL 访问,这就导致很多熟悉 SQL
但是不喜欢写代码的用户没法很舒服的使用此类NoSQL数据库。基于此,表格存储开发团队联合
MaxCompute(下文中 ODPS 与 MaxCompute 同义)团队打通了 ODPS-SQL
访问表格存储的路径,这样一个只懂 SQL
的用户也可以愉快的访问表格存储里面的大量数据了。

MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

Hive中的TRANSFORM:使用脚本完成Map/Reduce

转自:
http://www.coder4.com/archives/4052

hive自定义函数和transform。首先来看一下数据:

hive> select * from test;
OK
1       3
2       2
3       1

假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。

我们看一下Python的代码:

#!/home/tops/bin/python

import sys
import hashlib

for line in sys.stdin:
    line = line.strip()
    arr = line.split()
    md5_arr = []
    for a in arr:
        md5_arr.append(hashlib.md5(a).hexdigest())
    print "t".join(md5_arr)

在Hive中,使用脚本,首先要将他们加入:

add file /xxxx/test.py

然后,在调用时,使用TRANSFORM语法。

SELECT 
    TRANSFORM (col1, col2) 
    USING './test.py' 
    AS (new1, new2) 
FORM 
    test;

这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。

这里有一个小坑:有时候,我们结合INSERT
OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是t。不要考虑外面其他的分割符号!

最后,解释一下MAP、REDUCE。

在有的Hive语句中,大家可能会看到SELECT MAP (…) USING ‘xx.py’这样的语法。

然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:

Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!

所以、混用map
reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。

友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add
file。

如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?

例如:

CREATE TABLE features
(
    id BIGINT,
    norm_features MAP<STRING, FLOAT> 
);

必威,答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。

例如,以上面的表结构为例,每行输出应为:

1^Ifeature1^C1.0^Bfeature2^C2.0

其中I是tab键,这是TRANSFORM要求的分割符号。B和^C是Hive存储时MAP类型的KV分割符。

另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:

SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 MAP<STRING, FLOAT>)

本篇文章就以一个小白用户的身份体验如何使用
MaxCompute-SQL 查询表格存储里面的数据,以及如何开发自定义逻辑(User
Defined Function, UDF)来处理用户特定的数据格式。

MaxCompute基于ODPS2.0新一代的SQL引擎,显著提升了SQL语言编译过程的易用性与语言的表达能力。我们在此推出MaxCompute(ODPS2.0)重装上阵系列文章

Hive中的TRANSFORM:自定义Mapper和Reducer完成Map/Reduce

/**
 * Mapper.
 */
public interface Mapper {
  /**
   * Maps a single row into an intermediate rows.
   * 
   * @param record
   *          input record
   * @param output
   *          collect mapped rows.
   * @throws Exception
   *           on error
   */
  void map(String[] record, Output output) throws Exception;
}

可以将一列拆分为多列

使用样例:

public class ExecuteMap {

    private static final String FULL_PATH_CLASS = "com.***.dpop.ods.mr.impl.";

    private static final Map<String, Mapper> mappers = new HashMap<String, Mapper>();

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new Exception("Process class must be given");
        }

        new GenericMR().map(System.in, System.out,
                getMapper(args[0], Arrays.copyOfRange(args, 1, args.length)));
    }

    private static Mapper getMapper(String parserClass, String[] args)
            throws ClassNotFoundException {
        if (mappers.containsKey(parserClass)) {
            return mappers.get(parserClass);
        }

        Class[] classes = new Class[args.length];
        for (int i = 0; i < classes.length; ++i) {
            classes[i] = String.class;
        }
        try {
            Mapper mapper = (Mapper) Class.forName(FULL_PATH_CLASS + parserClass).getConstructor(classes).newInstance(args);
            mappers.put(parserClass, mapper);
            return mapper;
        } catch (ClassNotFoundException e) {
            throw new ClassNotFoundException("Unknown MapperClass:" + parserClass, e);
        } catch (Exception e) {
            throw new  ClassNotFoundException("Error Constructing processor", e);
        }

    }
}

MR_USING=" USING 'java -Xmx512m -Xms512m -cp ods-mr-1.0.jar:hive-contrib-2.3.33.jar com.***.dpop.ods.mr.api.ExecuteMap "

COMMAND="FROM dw_rtb.event_fact_adx_auction "
COMMAND="${COMMAND} INSERT overwrite TABLE dw_rtb.event_fact_mid_adx_auction_ad PARTITION(yymmdd=${CURRENT_DATE}) SELECT transform(search_id, print_time, pthread_id, ad_s) ${MR_USING} EventFactMidAdxAuctionAdMapper' as search_id, print_time, pthread_id, ad_s, ssp_id WHERE $INSERT_PARTITION and original = 'exinternal' "

什么是表格存储 TableStore?

第一弹 – 善用MaxCompute编译器的错误和警告

Hive Python Streaming的原理及写法

http://www.tuicool.com/articles/vmumUjA

分布式NoSQL数据存储服务,无缝支持单表PB级数据及百万级访问并发,弹性资源,按量计费,对数据高频的增、删、改支持的很好,保证单行数据读写的强一致性。

第二弹 – 新的基本数据类型与内建函数

什么是大数据计算服务 MaxCompute?

第三弹 – 复杂类型

一种快速、完全托管的TB/PB级数据仓库解决方案,提供多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题。

第四弹 – CTE,VALUES,SEMIJOIN

下面首先我们将介绍环境准备,这是所有后面的操作的基础。然后会介绍使用
OdpsCmd
访问表格存储。在第三节我们介绍使用 OdpsStudio
访问表格存储。最后介绍如何写 UDF、部署 UDF 以及在查询中使用 UDF。

上次向您介绍了CTE,VALUES,SEMIJOIN,本篇向您介绍MaxCompute对其他脚本语言的支持

环境准备

  • SELECT TRANSFORM。

  • 场景1

  • 我的系统要迁移到MaxCompute平台上,系统中原来有很多功能是使用脚本来完成的,包括python,shell,ruby等脚本。
    要迁移到MaxCompute上,我需要把这些脚本全部都改造成UDF/UDAF/UDTF。改造过程不仅需要耗费时间人力,还需要做一遍又一遍的测试,从而保证改造成的udf和原来的脚本在逻辑上是等价的。我希望能有更简单的迁移方式。
  • 场景2
  • SQL比较擅长的是集合操作,而我需要做的事情要对一条数据做更多的精细的计算,现有的内置函数不能方便的实现我想要的功能,而UDF的框架不够灵活,并且Java/Python我都不太熟悉。相比之下我更擅长写脚本。我就希望能够写一个脚本,数据全都输入到我的脚本里来,我自己来做各种计算,然后把结果输出。而MaxCompute平台就负责帮我把数据做好切分,让我的脚本能够分布式执行,负责数据的输入表和输出表的管理,负责JOIN,UNION等关系操作就好了。

首先,准备好一个 MaxCompute 的工程,工程创建指导文档,准备好AccessId和AccessKey备用,为了区别其他产品的AccessId和AccessKey,后面我们称之为ODPS-AccessId,ODPS-AccessKey。并在RAM中授权
MaxCompute 访问 TableStore 的权限,授权方式请参考MaxCompute访问TableStore数据——授权

上述功能可以使用SELECT TRANSFORM来实现

小提示:由于 MaxCompute 在 2.0
版本的计算框架才能支持直接访问 TableStore
数据,该版本还在灰度上线中,目前还需要 申请MaxCompute
2.0试用
,具体开通使用方法请参见 如何申请试用MaxCompute
2.0。

SELECT TRANSFORM 介绍

然后,准备好一个表格存储的实例以及一张数据表,表格存储实例管理,准备好实例名、EndPoint,为了区别其他产品的AccessId和AccessKey,后面我们称之为TableStore-InstanceName,TableStore-EndPoint。

此文中采用MaxCompute Studio作展示,首先,安装MaxCompute
Studio,导入测试MaxCompute项目,创建工程,建立一个新的MaxCompute脚本文件, 如下

网络连通性

必威 1

MaxCompute 与 TableStore
是两个独立的大数据计算以及大数据存储服务,所以两者之间的网络必须保证连通性。
对于 MaxCompute 公共云服务访问 TableStore 存储,推荐使用 TableStore
私网
地址,例如

提交作业可以看到执行计划(全部展开后的视图):

如果网络不通,可以使用公网地址,TableStore原生支持 VPC
网络控制,也需要将网络类型设置为 “允许任意网络访问”

必威 2

 

Select
transform允许sql用户指定在服务器上执行一句shell命令,将上游数据各字段用tab分隔,每条记录一行,逐行输入shell命令的stdin,并从stdout读取数据作为输出,送到下游。Shell命令的本质是调用Unix的一些utility,因此可以启动其他的脚本解释器。包括python,java,php,awk,ruby等。

使用客户端 ODPS-CMD

该命令兼容Hive的Transform功能,可以参考Hive的文档。一些需要注意的点如下:

1.下载并安装大数据计算服务客户端

  1. Using
    子句指定的是要执行的命令,而非资源列表,这一点和大多数的MaxCompute
    SQL语法不一样,这么做是为了和hive的语法保持兼容。

  2. 输入从stdin传入,输出从stdout传出;

  3. 可以配置分隔符,默认使用 t 分隔列,用换行分隔行;

  4. 可以自定义reader/writer,但用内置的reader/writer会快很多

  5. 使用自定义的资源(脚本文件,数据文件等),可以使用 set
    odps.sql.session.resources=foo.sh,bar.txt;
    来指定。可以指定多个resource文件,用逗号隔开(因此不允许resource名字中包含逗号和分号)。此外我们还提供了resources子句,可以在using
    子句后面指定 resources ‘foo.sh’, ‘bar.txt’
    来指定资源,两种方式是等价的(参考“用odps跑测试”的例子);

2.下载解压,将conf/odps_config.ini
的内容修改为:

6.
资源文件会被下载到执行指定命令的工作目录,可以使用文件接口打开./bar.txt文件。

project_name=上面申请的ODPS工程名

目前odps select transform完全兼容了hive的语法、功能和行为,包括
input/output row format 以及
reader/writer。Hive上的脚本,大部分可以直接拿来运行,部分脚本只需要经过少许改动即可运行。另外我们很多功能都用比hive更高执行效率的语言
(C++) 重构,用以优化性能。

access_id=ODPS-AccessId

应用场景举例

access_key= ODPS-AccessKey

理论上select transform能实现的功能udtf都能实现,但是select
transform比udtf要灵活得多。且select
transform不仅支持java和python,还支持shell,perl等其它脚本和工具。
且编写的过程要简单,特别适合adhoc功能的实现。举几个例子:

end_point=

  1. 无中生有造数据

https_check=true

必威 3

# confirm threshold for query input size(unit:
GB)

或者使用python

data_size_confirm=100.0

必威 4

# this url is for odpscmd update

上面的语句造出一份有50行的数据表,值是从1到50;
测试时候的数据就可以方便造出来了。功能看似简单,但以前是odps的一个痛点,没有方便的办法造数据,就不方便测试以及初学者的学习和探索。当然这也可以通过udtf来实现,但是需要复杂的流程:进入ide->写udtf->打包->add
jar/python->create function->执行->drop function->drop
resource。

update_url=

  1. awk 用户会很喜欢这个功能

3.行bin/odpscmd,输入show
tables,正常执行则表示上面配置正确。

必威 5

 

上面的语句仅仅是把value原样输出,但是熟悉awk的用户,从此过上了写awk脚本不写sql的日子

4.在bin/odpscmd 下输入环境变量,显式开启 ODPS 2.0
的非结构化功能( 仅在 ODPS 2.0 计算框架完全上线为必须),单独执行
xx.sql 文件时也需要将下属设置写在 SQL 文件的开头处。

  1. 用odps跑测试

set odps.task.major.version=2dot0_demo_flighting;

必威 6

set
odps.sql.planner.mode=lot;

或者

set odps.sql.ddl.odps2=true;

必威 7

set
odps.sql.preparse.odps2=lot;

这个例子是为了说明,很多java的utility可以直接拿来运行。java和python虽然有现成的udtf框架,但是用select
transform编写更简单,并且不需要额外依赖,也没有格式要求,甚至可以实现离线脚本拿来直接就用。

set odps.sql.type.system.odps2=true; –是支持表格存储的binary类型

  1. 支持其他脚本语言

5.创建一张 MaxCompute 的数据表关联到 TableStore
的某一张表。

select transform (key, value) using “perl -e ‘while($input =
<STDIN>){print $input;}'” from src;

关联的数据表信息如下:

上面用的是perl。这其实不仅仅是语言支持的扩展,一些简单的功能,awk,
python, perl, shell
都支持直接在命令里面写脚本,不需要写脚本文件,上传资源等过程,开发过程更简单。另外,由于目前我们计算集群上没有php和ruby,所以这两种脚本不支持。

  • 实例名称:vehicle-test
  • 数据表名称:vehicle_track
  • 主键信息:vid(int); gt (int)
  • 访问域名:https://vehicle-test.cn-shanghai.ots-internal.aliyuncs.com
  1. 可以串联着用,使用 distribute by和 sort by对输入数据做预处理

DROP TABLE IF EXISTS
ots_vehicle_track;

必威 8

 

或者用map,reduce的关键字会让逻辑显得清楚一些

CREATE EXTERNAL TABLE IF NOT
EXISTS
ots_vehicle_track

必威 9

(

理论上OpenMR的模型都可以映射到上面的计算过程。注意,使用map,reduce,select
transform这几个语法其实语义是一样的,用哪个关键字,哪种写法,不影响直接过程和结果。

vid bigint,

性能

gt bigint,

性能上,SELECT TRANSFORM 与UDTF
各有千秋。经过多种场景对比测试,数据量较小时,大多数场景下select
transform有优势,而数据量大时UDTF有优势。由于transform的开发更加简便,所以select
transform非常适合做adhoc的数据分析。

longitude double,

UDTF的优势:

latitude double,

  1. UDTF是有类型,而Transform的子进程基于stdin/stdout传输数据,所有数据都当做string处理,因此transform多了一步类型转换;
  2. Transform数据传输依赖于操作系统的管道,而目前管道的buffer仅有4KB,且不能设置,
    transform读/写 空/满 的pipe会导致进程被挂起;
  3. UDTF的常量参数可以不用传输,而Transform没办法利用这个优化。

distance double
,

SELECT TRANSFORM 的优势:

speed double,

  1. 子进程和父进程是两个进程,而UDTF是单线程的,如果计算占比比较高,数据吞吐量比较小,可以利用服务器的多核特性
  2. 数据的传输通过更底层的系统调用来读写,效率比java高
  3. SELECT
    TRANSFORM支持的某些工具,如awk,是natvie代码实现的,和java相比理论上可能会有性能优势。

oil_consumption double

小结

)

MaxCompute基于ODPS2.0的SQL引擎,提供了SELECT
TRANSFORM功能,可以明显简化对脚本代码的引用,与此同时,也提高了性能!我们推荐您尽量使用SELECT
TRANSFORM。

STORED BY ‘com.aliyun.odps.TableStoreStorageHandler’
— (1)

标注

WITH SERDEPROPERTIES ( — (2)

  • 注一,USING
    后面的字符串,在后台是直接起的子进程来调起命令,没有起shell,所以shell的某些语法,如输入输出重定向,管道等是不支持的。如果用户需要可以以
    shell 作为命令,真正的命令作为数据输入,参考“无中生有造数据”的例子;
  • 注二,JAVA 和 PYTHON 的实际路径,可以从JAVA_HOME 和 PYTHON_HOME
    环境变量中得到作业;

‘tablestore.columns.mapping’=’:vid, :gt, longitude, latitude, distance, speed,
oil_consumption’, —
(3)

作者:隐林

‘tablestore.table.name’=’vehicle_track’ —
(4)

本文为云栖社区原创内容,未经允许不得转载。返回搜狐,查看更多

)

责任编辑:

LOCATION ‘tablestore://vehicle-test.cn-shanghai.ots-internal.aliyuncs.com’; — (5)

  • com.aliyun.odps.TableStoreStorageHandler 是 MaxCompute
    内置的处理 TableStore 数据的 StorageHandler, 定义了 MaxCompute 和
    TableStore 的交互,相关逻辑由 MaxCompute 实现。
  • SERDEPROPERITES
    可以理解成提供参数选项的接口,在使用 TableStoreStorageHandler
    时,有两个必须指定的选项,分别是下面介绍的
    tablestore.columns.mapping 和 tablestore.table.name。
    更多的可选选项将在后面其他例子中提及。
  • tablestore.columns.mapping
    选项:必需选项,用来描述对需要 MaxCompute 将访问的 TableStore
    表的列,包括主键和属性列。 这其中以 : 打头的用来表示 TableStore
    主键,例如这个例子中的 :vid:gt。 其他的均为属性列。
    TableStore支持最少1个,最多4个主键,主键类型为 bigint 或
    string,其中第一个主键为分区键。 在指定映射的时候,用户必须提供指定
    TableStore 表的
    所有主键,对于属性列则没有必要全部提供,可以只提供需要通过
    MaxCompute 来访问的属性列。
  • tablestore.table.name:需要访问的 TableStore 表名。
    如果指定的 TableStore 表名错误(不存在),则会报错,MaxCompute
    不会主动去创建 TableStore 表。
  • LOCATION 用来指定访问的 TableStore 的实例信息,包括
    instance 名字,endpoint 等。 
  • 数据格式对应,MaxCompute 与 TableStore
    的数据格式对应如下:

TableStore数据类型

MaxCompute数据类型

string

string

binary

blob

int

bigint

double

double

6.执行ODPS-SQL

// 统计编号 4 以下的车辆在时间戳 1469171387
以前的平均速度和平均油耗

select vid,count(*),avg(speed),avg(oil_consumption) from
ots_vehicle_track where vid
<4 and gt<1469171387  group by
vid;

 

使用 UDF(User Defined Function)处理数据

有时候用户在表格存储里面的数据有独特的结构,希望自己开发逻辑来处理每一行数据,比如解析特定的json字符串,这一块的开发也已经很方便了。

1.按照MaxCompute
Studio文档的说明在IntelliJ里面安装MaxCompute-Java/MaxCompute-Studio插件,一旦插件安装完毕,就可以直接开发。

比如下面就是一个简单的 UDF
定义,只是简单的将两个字符串连接。 MaxCompute
支持更复杂的UDF,包括自定义窗口执行逻辑等,详细请参考MaxCompute Studio-开发
UDF。

 

2.打包之后可以上传到
MaxCompute,其中打包这里有需要注意的地方,File->Project
Structure->Artifacts, 填写好 Name 和 Output Directory 后,要点击
+ 选择输出模块,打包后通过 ODPS Project Explorer
来上传资源、创建函数,然后就可以在SQL中调用。

 

3.打开bin/odpscmd,输入

//
我们选出来1行数据,并将name/name传入UDF,返回两个string的累加

select
cloud_metric_extract_md5(name,
name) as udf_test from test_table
limit 1;

 

写在最后

目前ODPS-SQL访问 TaleStore
还在对执行逻辑进行深度的优化,如果有需求请联系ots_support,我们将针对业务场景来进行优化。

FAQ

常见错误处理:

  1. FAILED: ODPS-0010000:System internal error – fuxi
    job failed, WorkerPackageNotExist:需要设置set
    odps.task.major.version=unstructured_data
  2. FAILED: ODPS-0010000:System internal error –
    std::exception:Message: a timeout was
    reached:一般情况下是OTS的endpoint填写错误,导致ODPS没法访问,可以咨询ots_support旺旺账号
  3. logview invalid
    end_point:在执行过程中,会返回一个logview
    URL地址,如果使用浏览器访问该地址返回错误,可能是配置不对,请检查
    MaxCompute 配置,并咨询 odps_support 旺旺账号

相关文章