Hive中的UDF和UDAF及其用法

本文介绍了Hive中的UDF和UDAF,详细讲解了它们的用途和实现方法。UDF是用户自定义函数,用于扩展数据库查询功能,通过创建Java类并继承UDF,实现特定功能。而UDAF是用户定义聚合函数,用于多行记录的统计分析。UDAF的实现涉及到多个阶段,包括初始化、迭代、部分结果收集、结果合并等。文章还提供了一个求平均值的UDAF实例作为演示。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

一、UDF

1、解释

UDF即User Define Function(用户自定义函数),很多数据库都支持UDF,在数据库内置函数无法满足用户的需求时便可以通过UDF来扩展数据库的查询功能,以满足特定需要。

2、使用方法

自定义一个java类并继承UDF,然后定义若干个evaluate方法(实现不同功能),示例如下

package com.example.hive.udf;
 
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
 
public final class MyLower extends UDF {
   
  public Text evaluate(final Text s) {
   
    if (s == null) {
    return null; }
    return new Text(s.toString().toLowerCase());
  }
}

该示例实现将传入的Text类型大写转换成小写的功能(实际Hive大小写转换有相应的内置函数,这做个样例)。

然后打成jar包并添加到Hive类路径中去
hive> add jar my_udf.jar;
Added my_udf.jar to class path

查看是否添加成功:
hive> list jars;
my_udf.jar

注册自定义函数:
create temporary function my_lower as ‘com.example.hive.udf.MyLower’;

使用示例:

hive> select my_lower(name), age from staff limit 1;

二、UDAF(本文对应版本Hive 1.2.1)

1、解释

UDAF即User-defined Aggregation Function(用户定义聚合函数),作用于多行记录上,返回一个结果值,多用于相同组内统计分析。

2、使用方法

  • 首先定义一个类并继承AbstractGenericUDAFResolver方法,然后重写其getEvaluator方法,这就搭好了一个实现Hive UDAF的一个架子。

我们看一个系统内置的UDAF的例子,先看是怎么注册的

system.registerGenericUDAF("avg", new GenericUDAFAverage());
  • 然后,定义一个类并继承GenericUDAFEvaluator,上边重写的getEvaluator返回的就是跟输入类型相对应的一个GenericUDAFEvaluator子类。GenericUDAFEvaluator这个类是对Mode(状态,和MapReduce各阶段相对应)及其对应阶段的聚合动作的一种封装。

3、具体分析

我们要自定义UDAF,并使其在MR框架下运行,就必须符合MR的逻辑。Map阶段、如果有Combiner会在Map端先进行本地聚合(merge)、reduce阶段merge并产生结果。也有任务在Map端直接完成而不需要reduce阶段。

然后看自定义时怎么跟这些MR阶段对应上?

首先Hive中有一个Mode分别定义了几个阶段需要做的操作,如下

public static enum Mode {
   
  // 表示从读输入数据迭代到出部分聚合结果的阶段,每个map产生一个结果,所以这
  // 里会调用iterate和terminatePartial
  PARTIAL1,
  // 表示从部分聚合结果到出部分聚合结果的阶段,相当于在Map端先做一次本地聚合,所以这里会
  // 调用merge和terminatePartial
  PARTIAL2, 
  // 表示从部分聚合结果到出完全聚合结果的阶段,相当于拉取各Map的结果并进行合并
  // 所以这里会调用merge和terminate
  FINAL,   
  // 表示从读输入数据迭代到出完全聚合结果的阶段,也就是说只有Map阶段,所以这里
  // 会调用iterate和terminate
  COMPLETE  
};

有了以上聚合阶段及其调用方法的概念之后,我们继续往下看具体定义UDAF时应该怎么做?

继承GenericUDAFEvaluator具体要实现的几个方法及保存聚合结果的对象

1、ObjectInspector init(Mode m, ObjectInspector[] parameters)

这个方法在子类中必须实现,init用于初始化各个阶段的输入、中间部分聚合结果、及返回值他们的类型。

2void iterate(AggregationBuffer agg, Object[] parameters)

这个方法对原始输入数据进行迭代,对map中同属一分组的row迭代便产生一个聚合结果(部分结果)。

3、Object terminatePartial(AggregationBuffer agg)

这个函数表示部分结果完成,其字面意思比较清楚。想一下对应在MapReduce的哪些阶段会部分完成?可以这么说,只要不是最终聚合,其它地方发生的聚合都生成部分结果,也就会调用terminatePartial来设置部分结果。所以这里是发生在map完成迭代以及map端进行本地合并时。

4void merge(AggregationBuffer agg, Object partial)

对部分已聚合的结果进行合并,对应于MR中的Map端本地聚合以及reduce拉取节点数据进行合并时。

5、Object terminate(AggregationBuffer agg)

获取最终结果,发生在reduce端合并完或者只有Map时map进行完。

最终的聚合数据存在于一个AggregationBuffer中,结果计算也是根据AggregationBuffer中的聚合值进行相应操作得到。

以上这些函数是必须实现的,当然,需要一个对象来保存结果,这个对象就是AggregationBuffer。

求平均值的保存结果对象是这样定义的

private static class AverageAggregationBuffer<TYPE> implements AggregationBuffer {
   
    private long count;
    private TYPE sum;
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值