一、UDF
1、解释
UDF即User Define Function(用户自定义函数),很多数据库都支持UDF,在数据库内置函数无法满足用户的需求时便可以通过UDF来扩展数据库的查询功能,以满足特定需要。
2、使用方法
自定义一个java类并继承UDF,然后定义若干个evaluate方法(实现不同功能),示例如下
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public final class MyLower extends UDF {
public Text evaluate(final Text s) {
if (s == null) {
return null; }
return new Text(s.toString().toLowerCase());
}
}
该示例实现将传入的Text类型大写转换成小写的功能(实际Hive大小写转换有相应的内置函数,这做个样例)。
然后打成jar包并添加到Hive类路径中去
hive> add jar my_udf.jar;
Added my_udf.jar to class path
查看是否添加成功:
hive> list jars;
my_udf.jar
注册自定义函数:
create temporary function my_lower as ‘com.example.hive.udf.MyLower’;
使用示例:
hive> select my_lower(name), age from staff limit 1;
二、UDAF(本文对应版本Hive 1.2.1)
1、解释
UDAF即User-defined Aggregation Function(用户定义聚合函数),作用于多行记录上,返回一个结果值,多用于相同组内统计分析。
2、使用方法
- 首先定义一个类并继承AbstractGenericUDAFResolver方法,然后重写其getEvaluator方法,这就搭好了一个实现Hive UDAF的一个架子。
我们看一个系统内置的UDAF的例子,先看是怎么注册的
system.registerGenericUDAF("avg", new GenericUDAFAverage());
- 然后,定义一个类并继承GenericUDAFEvaluator,上边重写的getEvaluator返回的就是跟输入类型相对应的一个GenericUDAFEvaluator子类。GenericUDAFEvaluator这个类是对Mode(状态,和MapReduce各阶段相对应)及其对应阶段的聚合动作的一种封装。
3、具体分析
我们要自定义UDAF,并使其在MR框架下运行,就必须符合MR的逻辑。Map阶段、如果有Combiner会在Map端先进行本地聚合(merge)、reduce阶段merge并产生结果。也有任务在Map端直接完成而不需要reduce阶段。
然后看自定义时怎么跟这些MR阶段对应上?
首先Hive中有一个Mode分别定义了几个阶段需要做的操作,如下
public static enum Mode {
// 表示从读输入数据迭代到出部分聚合结果的阶段,每个map产生一个结果,所以这
// 里会调用iterate和terminatePartial
PARTIAL1,
// 表示从部分聚合结果到出部分聚合结果的阶段,相当于在Map端先做一次本地聚合,所以这里会
// 调用merge和terminatePartial
PARTIAL2,
// 表示从部分聚合结果到出完全聚合结果的阶段,相当于拉取各Map的结果并进行合并
// 所以这里会调用merge和terminate
FINAL,
// 表示从读输入数据迭代到出完全聚合结果的阶段,也就是说只有Map阶段,所以这里
// 会调用iterate和terminate
COMPLETE
};
有了以上聚合阶段及其调用方法的概念之后,我们继续往下看具体定义UDAF时应该怎么做?
继承GenericUDAFEvaluator具体要实现的几个方法及保存聚合结果的对象
1、ObjectInspector init(Mode m, ObjectInspector[] parameters)
这个方法在子类中必须实现,init用于初始化各个阶段的输入、中间部分聚合结果、及返回值他们的类型。
2、void iterate(AggregationBuffer agg, Object[] parameters)
这个方法对原始输入数据进行迭代,对map中同属一分组的row迭代便产生一个聚合结果(部分结果)。
3、Object terminatePartial(AggregationBuffer agg)
这个函数表示部分结果完成,其字面意思比较清楚。想一下对应在MapReduce的哪些阶段会部分完成?可以这么说,只要不是最终聚合,其它地方发生的聚合都生成部分结果,也就会调用terminatePartial来设置部分结果。所以这里是发生在map完成迭代以及map端进行本地合并时。
4、void merge(AggregationBuffer agg, Object partial)
对部分已聚合的结果进行合并,对应于MR中的Map端本地聚合以及reduce拉取节点数据进行合并时。
5、Object terminate(AggregationBuffer agg)
获取最终结果,发生在reduce端合并完或者只有Map时map进行完。
最终的聚合数据存在于一个AggregationBuffer中,结果计算也是根据AggregationBuffer中的聚合值进行相应操作得到。
以上这些函数是必须实现的,当然,需要一个对象来保存结果,这个对象就是AggregationBuffer。
求平均值的保存结果对象是这样定义的
private static class AverageAggregationBuffer<TYPE> implements AggregationBuffer {
private long count;
private TYPE sum;