使用Java给Hive3.1.2编写UDF实现编码解码加密解密并运行在USDP大数据集群
背景
集群从CDH5.16升级到CDP7.1后,笔者用的阿里云DataPhin中台也升级了版本,之前的UDF不是很好用。某些UDF主要是让肤浅的SQL Boy们看不到Hive表某些机密字段的真实信息,防止出现机密信息泄露。笔者编写UDF函数,实现编码、解码、加密、解密,经测试在USDP集群的Apache Hive中可行。USDP的稳定性比Aliyun的中台貌似还好点。。。
原理
UDF
Hive的exec包中有UDF类,继承后用Java重写具体实现,并将编译好的Jar包放置在Hive路径,加载注册后即可像普通函数那样使用。
UDF简易案例
import org.apache.hadoop.hive.ql.exec.UDF;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: 使用base64重新编码string
* @author: zhiyong
* @create: 2022-08-04 22:48
**/
public class base64code1 extends UDF {
public String evaluate(String input){
return java.util.Base64.getEncoder().encodeToString(input.getBytes(StandardCharsets.UTF_8));
}
}
这样即可实现最简易的UDF。
但是很明显,这个方法在3.1.2的Hive已经过时了。按照注释应该继承那些继承了该UDF类的类才能不报过时。
继承了该UDF类的类:
随便点开一个:
package org.apache.hadoop.hive.ql.udf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
* UDFAscii.
*
*/
@Description(name = "ascii",
value = "_FUNC_(str) - returns the numeric value of the first character"
+ " of str",
extended = "Returns 0 if str is empty or NULL if str is NULL\n"
+ "Example:\n"
+ " > SELECT _FUNC_('222') FROM src LIMIT 1;"
+ " 50\n"
+ " > SELECT _FUNC_(2) FROM src LIMIT 1;\n" + " 50")
public class UDFAscii extends UDF {
private final IntWritable result = new IntWritable();
public IntWritable evaluate(Text s) {
if (s == null) {
return null;
}
if (s.getLength() > 0) {
result.set(s.getBytes()[0]);
} else {
result.set(0);
}
return result;
}
}
其实也没啥特别的,这些自带的继承了UDF的类只是已经写好了evaluate方法,再次继承只需要重写该方法即可。好多个继承了UDF类的类包种都可以看到重写了evaluate方法,显然该方法很重要。
UDF源码
首先查看该过时方法:
package org.apache.hadoop.hive.ql.exec;
import org.apache.hadoop.hive.ql.udf.UDFType;
/**
* A User-defined function (UDF) for use with Hive.
* <p>
* New UDF classes need to inherit from this UDF class (or from {@link
* org.apache.hadoop.hive.ql.udf.generic.GenericUDF GenericUDF} which provides more flexibility at
* the cost of more complexity).
* <p>
* Requirements for all classes extending this UDF are:
* <ul>
* <li>Implement one or more methods named {@code evaluate} which will be called by Hive (the exact
* way in which Hive resolves the method to call can be configured by setting a custom {@link
* UDFMethodResolver}). The following are some examples:
* <ul>
* <li>{@code public int evaluate();}</li>
* <li>{@code public int evaluate(int a);}</li>
* <li>{@code public double evaluate(int a, double b);}</li>
* <li>{@code public String evaluate(String a, int b, Text c);}</li>
* <li>{@code public Text evaluate(String a);}</li>
* <li>{@code public String evaluate(List<Integer> a);} (Note that Hive Arrays are represented as
* {@link java.util.List Lists} in Hive.
* So an {@code ARRAY<int>} column would be passed in as a {@code List<Integer>}.)</li>
* </ul>
* </li>
* <li>{@code evaluate} should never be a void method. However it can return {@code null} if
* needed.
* <li>Return types as well as method arguments can be either Java primitives or the corresponding
* {@link org.apache.hadoop.io.Writable Writable} class.</li>
* </ul>
* One instance of this class will be instantiated per JVM and it will not be called concurrently.
*
* @see Description
* @see UDFType
*
* @deprecated use {@link org.apache.hadoop.hive.ql.udf.generic.GenericUDF}
*/
@Deprecated
@UDFType(deterministic = true)
public class UDF {
/**
* The resolver to use for method resolution.
*/
private UDFMethodResolver rslv;
/**
* The constructor.
*/
public UDF() {
rslv = new DefaultUDFMethodResolver(this.getClass());
}
/**
* The constructor with user-provided {@link UDFMethodResolver}.
*/
protected UDF(UDFMethodResolver rslv) {
this.rslv = rslv;
}
/**
* Sets the resolver.
*
* @param rslv The method resolver to use for method resolution.
*/
public void setResolver(UDFMethodResolver rslv) {
this.rslv = rslv;
}
/**
* Get the method resolver.
*/
public UDFMethodResolver getResolver() {
return rslv;
}
/**
* This can be overridden to include JARs required by this UDF.
*
* @see org.apache.hadoop.hive.ql.udf.generic.GenericUDF#getRequiredJars()
* GenericUDF.getRequiredJars()
*
* @return an array of paths to files to include, {@code null} by default.
*/
public String[] getRequiredJars() {
return null;
}
/**
* This can be overridden to include files required by this UDF.
*
* @see org.apache.hadoop.hive.ql.udf.generic.GenericUDF#getRequiredFiles()
* GenericUDF.getRequiredFiles()
*
* @return an array of paths to files to include, {@code null} by default.
*/
public String[] getRequiredFiles() {
return null;
}
}
发现一个使用频繁的类:
package org.apache.hadoop.hive.ql.exec;
import java.lang.reflect.Method;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
* The UDF Method resolver interface. A user can plugin a resolver to their UDF
* by implementing the functions in this interface. Note that the resolver is
* stored in the UDF class as an instance variable. We did not use a static
* variable because many resolvers maintain the class of the enclosing UDF as
* state and are called from a base class e.g. UDFBaseCompare. This makes it
* very easy to write UDFs that want to do resolution similar to the comparison
* operators. Such UDFs just need to extend UDFBaseCompare and do not have to
* care about the UDFMethodResolver interface. Same is true for UDFs that want
* to do resolution similar to that done by the numeric operators. Such UDFs
* simply have to extend UDFBaseNumericOp class. For the default resolution the
* UDF implementation simply needs to extend the UDF class.
*/
@Deprecated
public interface UDFMethodResolver {
/**
* Gets the evaluate method for the UDF given the parameter types.
*
* @param argClasses
* The list of the argument types that need to matched with the
* evaluate function signature.
*/
Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException;
}
显然这是个接口,继续查看具体的实现类:
主要是这3种:
显然正常情况应该是使用:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec;
import java.lang.reflect.Method;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
* The default UDF Method resolver. This resolver is used for resolving the UDF
* method that is to be used for evaluation given the list of the argument
* types. The getEvalMethod goes through all the evaluate methods and returns
* the one that matches the argument signature or is the closest match. Closest
* match is defined as the one that requires the least number of arguments to be
* converted. In case more than one matches are found, the method throws an
* ambiguous method exception.
*/
public class DefaultUDFMethodResolver implements UDFMethodResolver {
/**
* The class of the UDF.
*/
private final Class<? extends UDF> udfClass;
/**
* Constructor. This constructor sets the resolver to be used for comparison
* operators. See {@link UDFMethodResolver}
*/
public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
this.udfClass = udfClass;
}
/**
* Gets the evaluate method for the UDF given the parameter types.
*
* @param argClasses
* The list of the argument types that need to matched with the
* evaluate function signature.
*/
@Override
public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
return FunctionRegistry.getMethodInternal(udfClass, "evaluate", false,
argClasses);
}
}
这个工具类的方法:
package org.apache.hadoop.hive.ql.exec;
public final class FunctionRegistry {
/**
* This method is shared between UDFRegistry and UDAFRegistry. methodName will
* be "evaluate" for UDFRegistry, and "aggregate"/"evaluate"/"evaluatePartial"
* for UDAFRegistry.
* @throws UDFArgumentException
*/
public static <T> Method getMethodInternal(Class<? extends T> udfClass,
String methodName, boolean exact, List<TypeInfo> argumentClasses)
throws UDFArgumentException {
List<Method> mlist = new ArrayList<Method>();
for (Method m : udfClass.getMethods()) {
if (m.getName().equals(methodName)) {
mlist.add(m);
}
}
return getMethodInternal(udfClass, mlist, exact, argumentClasses);
}
}
显然底层是通过org.apache.hadoop.hive.ql.exec包的FunctionRegistry工具类的getMethodInternal这个静态方法,反射加载了所有继承了org.apache.hadoop.hive.ql.exec.UDF类并且方法名称为evaluate的所有方法。所以事实上可以进行类的重载,但是UDF函数的稳定性一直欠佳,笔者不喜欢这么做。到这一步就解释了为何继承UDF类后方法名称必须是evaluate。
而udfClass.getMethods()方法:
package java.lang;
public final class Class<T> implements java.io.Serializable,
GenericDeclaration,
Type,
AnnotatedElement {
/**
* Returns an array containing {@code Method} objects reflecting all the
* public methods of the class or interface represented by this {@code
* Class} object, including those declared by the class or interface and
* those inherited from superclasses and superinterfaces.
*
* <p> If this {@code Class} object represents a type that has multiple
* public methods with the same name and parameter types, but different
* return types, then the returned array has a {@code Method} object for
* each such method.
*
* <p> If this {@code Class} object represents a type with a class
* initialization method {@code <clinit>}, then the returned array does
* <em>not</em> have a corresponding {@code Method} object.
*
* <p> If this {@code Class} object represents an array type, then the
* returned array has a {@code Method} object for each of the public
* methods inherited by the array type from {@code Object}. It does not
* contain a {@code Method} object for {@code clone()}.
*
* <p> If this {@code Class} object represents an interface then the
* returned array does not contain any implicitly declared methods from
* {@code Object}. Therefore, if no methods are explicitly declared in
* this interface or any of its superinterfaces then the returned array
* has length 0. (Note that a {@code Class} object which represents a class
* always has public methods, inherited from {@code Object}.)
*
* <p> If this {@code Class} object represents a primitive type or void,
* then the returned array has length 0.
*
* <p> Static methods declared in superinterfaces of the class or interface
* represented by this {@code Class} object are not considered members of
* the class or interface.
*
* <p> The elements in the returned array are not sorted and are not in any
* particular order.
*
* @return the array of {@code Method} objects representing the
* public methods of this class
* @throws SecurityException
* If a security manager, <i>s</i>, is present and
* the caller's class loader is not the same as or an
* ancestor of the class loader for the current class and
* invocation of {@link SecurityManager#checkPackageAccess
* s.checkPackageAccess()} denies access to the package
* of this class.
*
* @jls 8.2 Class Members
* @jls 8.4 Method Declarations
* @since JDK1.1
*/
@CallerSensitive
public Method[] getMethods() throws SecurityException {
checkMemberAccess(Member.PUBLIC, Reflection.getCallerClass(), true);
return copyMethods(privateGetPublicMethods());
}
}
这个反射方法从JDK1.0时代就有了。而且反射还会抛异常:
package java.lang;
/**
* Thrown by the security manager to indicate a security violation.
*
* @author unascribed
* @see java.lang.SecurityManager
* @since JDK1.0
*/
public class SecurityException extends RuntimeException {
}
底层会抛出运行时异常。
GenericUDF初探
既然直接继承UDF类是过时的做法,那么根据注释可以继承GenericUDF类包,虽然不推荐使用,但目前流行使用这种复杂的类包。
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
/**
* @program: HiveUDF
* @description: 测试UDF
* @author: zhiyong
* @create: 2022-08-05 00:10
**/
public class base64code2 extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
return null;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
return null;
}
@Override
public String getDisplayString(String[] children) {
return null;
}
}
显然继承了GenericUDF就必须重写3个方法,并且导入4个类。根据类名称,initialize应该是初始化,evaluate是具体实现的算法,getDisplayString应该是类似打印日志之类的操作。
org.apache.hadoop.hive.serde2.objectinspector包的ObjectInspector类显然有必要看看。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.hadoop.hive.serde2.objectinspector;
import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
import org.apache.hadoop.hive.common.classification.InterfaceStability.Stable;
@Public
@Stable
public interface ObjectInspector extends Cloneable {
String getTypeName();
ObjectInspector.Category getCategory();
public static enum Category {
PRIMITIVE,
LIST,
MAP,
STRUCT,
UNION;
private Category() {
}
}
}
显然这是个接口,里边还定义了一个枚举体Category。
事实上,该包还有很多类:
名称差不多的类:
例如:ObjectInspectorConverters内部是转换类型的方法。ObjectInspectorFactory类与ObjectInspectorUtils类是工具类,内部有很多静态方法。
GenericUDF源码
package org.apache.hadoop.hive.ql.udf.generic;
/**
* A Generic User-defined function (GenericUDF) for the use with Hive.
*
* New GenericUDF classes need to inherit from this GenericUDF class.
*
* The GenericUDF are superior to normal UDFs in the following ways: 1. It can
* accept arguments of complex types, and return complex types. 2. It can accept
* variable length of arguments. 3. It can accept an infinite number of function
* signature - for example, it's easy to write a GenericUDF that accepts
* array<int>, array<array<int>> and so on (arbitrary levels of nesting). 4. It
* can do short-circuit evaluations using DeferedObject.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@UDFType(deterministic = true)
public abstract class GenericUDF implements Closeable {
}
显然这是个抽象类,虽然是共有的但是并不能直接使用。
这个抽象类有很多继承类:
至于为神马找到这2个类,其实笔者是根据Hive的Error Log堆栈找到的。。。。。。
这2个同名类的区别是:
Hive版本不同,CDH5.16都要淘汰了,2.3.7的老版本已经没有继续看的必要。
package org.apache.hadoop.hive.ql.udf.generic;
/**
* GenericUDFBridge encapsulates UDF to provide the same interface as
* GenericUDF.
*
* Note that GenericUDFBridge implements Serializable because the name of the
* UDF class needs to be serialized with the plan.
*
*/
public class GenericUDFBridge extends GenericUDF implements Serializable {
private static final long serialVersionUID = 4994861742809511113L;
/**
* The name of the UDF.
*/
private String udfName;
/**
* Whether the UDF is an operator or not. This controls how the display string
* is generated.
*/
private boolean isOperator;
/**
* The underlying UDF class Name.
*/
private String udfClassName;
/**
* The underlying method of the UDF class.
*/
private transient Method udfMethod;
/**
* Helper to convert the parameters before passing to udfMethod.
*/
private transient ConversionHelper conversionHelper;
/**
* The actual udf object.
*/
private transient UDF udf;
/**
* The non-deferred real arguments for method invocation.
*/
private transient Object[] realArguments;
private transient UdfWhitelistChecker udfChecker;
/**
* Create a new GenericUDFBridge object.
*
* @param udfName
* The name of the corresponding udf.
* @param isOperator true for operators
* @param udfClassName java class name of UDF
*/
public GenericUDFBridge(String udfName, boolean isOperator,
String udfClassName) {
this.udfName = udfName;
this.isOperator = isOperator;
this.udfClassName = udfClassName;
}
// For Java serialization only
public GenericUDFBridge() {
}
public void setUdfName(String udfName) {
this.udfName = udfName;
}
@Override
public String getUdfName() {
return udfName;
}
public String getUdfClassName() {
return udfClassName;
}
public void setUdfClassName(String udfClassName) {
this.udfClassName = udfClassName;
}
public boolean isOperator() {
return isOperator;
}
public void setOperator(boolean isOperator) {
this.isOperator = isOperator;
}
public Class<? extends UDF> getUdfClass() {
try {
return getUdfClassInternal();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
/** Gets the UDF class and checks it against the whitelist, if any. */
private Class<? extends UDF> getUdfClassInternal()
throws ClassNotFoundException {
@SuppressWarnings("unchecked")
Class<? extends UDF> clazz = (Class<? extends UDF>) Class.forName(
udfClassName, true, Utilities.getSessionSpecifiedClassLoader());
if (udfChecker != null && !udfChecker.isUdfAllowed(clazz)) {
throw new SecurityException("UDF " + clazz.getCanonicalName() + " is not allowed");
}
return clazz;
}
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
try {
udf = (UDF)getUdfClassInternal().newInstance();
} catch (Exception e) {
throw new UDFArgumentException(
"Unable to instantiate UDF implementation class " + udfClassName + ": " + e);
}
// Resolve for the method based on argument types
ArrayList<TypeInfo> argumentTypeInfos = new ArrayList<TypeInfo>(
arguments.length);
for (ObjectInspector argument : arguments) {
argumentTypeInfos.add(TypeInfoUtils
.getTypeInfoFromObjectInspector(argument));
}
udfMethod = udf.getResolver().getEvalMethod(argumentTypeInfos);
udfMethod.setAccessible(true);
// Create parameter converters
conversionHelper = new ConversionHelper(udfMethod, arguments);
// Create the non-deferred realArgument
realArguments = new Object[arguments.length];
// Get the return ObjectInspector.
ObjectInspector returnOI = ObjectInspectorFactory
.getReflectionObjectInspector(udfMethod.getGenericReturnType(),
ObjectInspectorOptions.JAVA);
return returnOI;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
assert (arguments.length == realArguments.length);
// Calculate all the arguments
for (int i = 0; i < realArguments.length; i++) {
realArguments[i] = arguments[i].get();
}
// Call the function
Object result = FunctionRegistry.invoke(udfMethod, udf, conversionHelper
.convertIfNecessary(realArguments));
// For non-generic UDF, type info isn't available. This poses a problem for Hive Decimal.
// If the returned value is HiveDecimal, we assume maximum precision/scale.
if (result != null && result instanceof HiveDecimalWritable) {
result = HiveDecimalWritable.enforcePrecisionScale
((HiveDecimalWritable) result,
HiveDecimal.SYSTEM_DEFAULT_PRECISION,
HiveDecimal.SYSTEM_DEFAULT_SCALE);
}
return result;
}
@Override
public String getDisplayString(String[] children) {
if (isOperator) {
if (children.length == 1) {
// Prefix operator
return "(" + udfName + " " + children[0] + ")";
} else {
// Infix operator
assert children.length == 2;
return "(" + children[0] + " " + udfName + " " + children[1] + ")";
}
} else {
StringBuilder sb = new StringBuilder();
sb.append(udfName);
sb.append("(");
for (int i = 0; i < children.length; i++) {
sb.append(children[i]);
if (i + 1 < children.length) {
sb.append(", ");
}
}
sb.append(")");
return sb.toString();
}
}
@Override
public String[] getRequiredJars() {
return udf.getRequiredJars();
}
@Override
public String[] getRequiredFiles() {
return udf.getRequiredFiles();
}
public void setUdfChecker(UdfWhitelistChecker udfChecker) {
this.udfChecker = udfChecker;
}
public interface UdfWhitelistChecker {
boolean isUdfAllowed(Class<?> clazz);
}
}
根据注释,显然可以看出GenericUDFBridge的功能就是把老的UDF类的方法封装成新的GenericUDF类。那么实际上写UDF时直接继承老的UDF类、继承已经继承过UDF类的子类、直接继承GenericUDF类其实没啥区别,底层都会用GenericUDFBridge类转换为GenericUDF类并解析AST及下发给Map Reduce、Tez、Spark等计算引擎具体执行job的计算。
其中的getUdfClass方法抛出的ClassNotFoundException正好和笔者见到的堆栈报错一致。initialize及getUdfClass方法都会调用getUdfClassInternal方法,执行:
Class<? extends UDF> clazz = (Class<? extends UDF>) Class.forName(
udfClassName, true, Utilities.getSessionSpecifiedClassLoader());
Class.forName就剩根据udf的类名,true的需要初始化,Utilities.getSessionSpecifiedClassLoader()获取到的类加载器,反射加载出最终所需要的UDF类。
GenericUDF简易案例
搭好基本框架后就可以重写initialize初始化、evaluate算法实现、getDisplayString展示字符串这3个方法。
既然这种做法更先进,必然使用起来更复杂且功能更多。传入的参数:
package org.apache.hadoop.hive.serde2.objectinspector;
@Public
@Stable
public interface ObjectInspector extends Cloneable {
String getTypeName();
ObjectInspector.Category getCategory();
public static enum Category {
PRIMITIVE,
LIST,
MAP,
STRUCT,
UNION;
private Category() {
}
}
}
显然是个接口类,必然要查看实现类:
编码解码、加密解密这种操作可以传入传出string就够用:
事实上:
这种PrimitiveObjectInspector接口类的实现类都是对应Java的基本数据类型,而其它的ListObjectInspector、MapObjectInspector、StructObjectInspector等接口类对应Java的数组、链表、Map、结构体等复杂数据类型。使用这些复杂数据类型时,Hive的UDF函数在传参时就可以传入复杂数据类型,而不仅限于简单的String、int等基本类型。先进的做法有先进的好处。
编码解码
编码解码只是为了让私密信息看起来不那么明显,使用Base64即可。
加密解密
为了防止加密解密的密钥相同导致不安全,可以对密钥做处理,使得加密解密不对称。
环境准备
集群环境
USDP2.0集群,自带Hive3.1.2。JDK1.8。
编译环境
Idea2021,Maven3.3.9,JDK1.8。
GAV
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hive-exec.version>3.1.2</hive-exec.version>
<lombok-version>1.18.8</lombok-version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive-exec.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
Build
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
Java实现
继承UDF
直接继承UDF类的UDF函数其实很容易写。
Base64编码
import org.apache.hadoop.hive.ql.exec.UDF;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: 使用base64重新编码string
* @author: zhiyong
* @create: 2022-08-04 22:48
**/
public class Base64code1 extends UDF {
public String evaluate(String input){
return java.util.Base64.getEncoder().encodeToString(input.getBytes(StandardCharsets.UTF_8));
}
}
Base64解码
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: Base64解码UDF
* @author: zhiyong
* @create: 2022-08-05 00:49
**/
public class Base64decode1 extends UDF {
public String evaluate(String input){
return new String(java.util.Base64.getDecoder().decode(input));
}
}
加密
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: UDF加密
* @author: zhiyong
* @create: 2022-08-05 00:03
**/
public class Encryption1 extends UDF {
public String evaluate(String input,String key){
StringBuffer strb = new StringBuffer();
//加密算法:字段值+key→反转→base64编码
return java.util.Base64.getEncoder().encodeToString(strb.append(input).append(key).reverse()
.toString().getBytes(StandardCharsets.UTF_8));
}
}
解密
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: UDF解密
* @author: zhiyong
* @create: 2022-08-05 00:15
**/
public class Decrypt1 extends UDF {
public String evaluate(String input,String key){
StringBuffer strb = new StringBuffer();
key=new String(java.util.Base64.getDecoder().decode(strb.append(key).reverse().toString()));//得到真实的key
//System.out.println("真实的key:" + key);
strb.delete(0,strb.length());//清空
input=new String(java.util.Base64.getDecoder().decode(input));//先解码
input=strb.append(input).reverse().toString();//反转
//System.out.println("反转后:" + input);
input=input.replaceAll(key,"");
//System.out.println("解密后:" + input);
return input;
}
}
Main方法测试使用
package com.zhiyong.hiveUDF;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: 简单测试UDF的功能
* @author: zhiyong
* @create: 2022-08-04 23:22
**/
public class TestDemo1 {
public static void main(String[] args) {
System.out.println("start!");
//测试Base64编码
Base64code1 base64code1 = new Base64code1();
String base64code = base64code1.evaluate("战斗暴龙兽");
System.out.println("输入内容:战斗暴龙兽\nbase64编码:" + base64code);//5oiY5paX5pq06b6Z5YW9
//测试Base64解码
Base64decode1 base64decode1 = new Base64decode1();
String base64decode = base64decode1.evaluate(base64code);
System.out.println("base64解码:" + base64decode);
//测试加密
Encryption1 encryption1 = new Encryption1();
String encryption = encryption1.evaluate("战斗暴龙兽", "八神太一");
System.out.println("加密密钥:八神太一\n加密后:" + encryption);//5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY
//测试密钥处理
StringBuffer strb = new StringBuffer();
String key=strb.append(java.util.Base64.getEncoder()
.encodeToString("八神太一".getBytes(StandardCharsets.UTF_8)))
.reverse().toString();
System.out.println("发给SQL Boy的密钥:" + key);//AiL5qSa5eW65rWY5
//测试解密
Decrypt1 decrypt1 = new Decrypt1();
String decrypt = decrypt1.evaluate(encryption, key);
System.out.println("解密后:" + decrypt);
System.out.println("***************************************");
System.out.println("end!");
}
}
执行后:
start!
输入内容:战斗暴龙兽
base64编码:5oiY5paX5pq06b6Z5YW9
base64解码:战斗暴龙兽
加密密钥:八神太一
加密后:5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY
发给SQL Boy的密钥:AiL5qSa5eW65rWY5
解密后:战斗暴龙兽
***************************************
end!
Process finished with exit code 0
可以看出在Main方法的测试中可以满足要求。
继承GenericUDF
为了能支持最常用的Parquet格式,最终选用了这种输出格式。
Base64编码
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: 测试UDF
* @author: zhiyong
* @create: 2022-08-05 00:10
**/
public class Base64code2 extends GenericUDF {
String output;
String input;
//初始化方法
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) {
output = "";//预设空string
return ParquetPrimitiveInspectorFactory.parquetStringInspector;
}
//具体算法实现
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (1 == arguments.length) {
//入参个数不对不操作
input = arguments[0].get().toString();//获取入参
output=java.util.Base64.getEncoder().encodeToString(input.getBytes(StandardCharsets.UTF_8));
}
return output;
}
//help信息
@Override
public String getDisplayString(String[] children) {
return "这是udf编码函数,入参string,输出base64编码后的string";
}
}
Base64解码
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
/**
* @program: HiveUDF
* @description: UDF解码
* @author: zhiyong
* @create: 2022-08-05 08:13
**/
public class Base64decode2 extends GenericUDF {
String output;
String input;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) {
output="";//预设空string
return ParquetPrimitiveInspectorFactory.parquetStringInspector;
}
@Override
public String evaluate(DeferredObject[] arguments) throws HiveException {
if(1==arguments.length) {
//入参个数不对不操作
input = (String) arguments[0].get();//获取入参
output=new String(java.util.Base64.getDecoder().decode(input));;
}
return output;
}
@Override
public String getDisplayString(String[] children) {
return "这是udf编码函数,入参string,输出base64解码后的string";
}
}
加密
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: UDF加密
* @author: zhiyong
* @create: 2022-08-05 08:14
**/
public class Encryption2 extends GenericUDF {
String output;
String input;
String key;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) {
output="";//预设空string
return ParquetPrimitiveInspectorFactory.parquetStringInspector;
}
@Override
public String evaluate(DeferredObject[] arguments) throws HiveException {
if(2==arguments.length){
StringBuffer strb = new StringBuffer();
input = (String) arguments[0].get();//获取入参
key = (String) arguments[1].get();//获取入参
//加密算法:字段值+key→反转→base64编码
output= java.util.Base64.getEncoder().encodeToString(strb.append(input).append(key).reverse()
.toString().getBytes(StandardCharsets.UTF_8));
}
return output;
}
@Override
public String getDisplayString(String[] children) {
return "这是udf加密函数,入参string1=明文,入参string2=密钥,输出加密后的string";
}
}
解密
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
/**
* @program: HiveUDF
* @description: UDF解密
* @author: zhiyong
* @create: 2022-08-05 08:21
**/
public class Decrypt2 extends GenericUDF {
String output;
String input;
String key;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) {
output="";//预设空string
return ParquetPrimitiveInspectorFactory.parquetStringInspector;
}
@Override
public String evaluate(DeferredObject[] arguments) throws HiveException {
if(2==arguments.length){
StringBuffer strb = new StringBuffer();
input = arguments[0].get().toString();//获取入参
key = arguments[1].get().toString();//获取入参
key=new String(java.util.Base64.getDecoder().decode(strb.append(key).reverse().toString()));//得到真实的key
//System.out.println("真实的key:" + key);
strb.delete(0,strb.length());//清空
input=new String(java.util.Base64.getDecoder().decode(input));//先解码
input=strb.append(input).reverse().toString();//反转
//System.out.println("反转后:" + input);
output=input.replaceAll(key,"");
//System.out.println("解密后:" + output);
}
return output;
}
@Override
public String getDisplayString(String[] children) {
return "这是udf解密函数,入参string1=明文,入参string2=密钥,输出解密后的string";
}
}
Main方法测试使用
package com.zhiyong.hiveUDF;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import java.nio.charset.StandardCharsets;
/**
* @program: HiveUDF
* @description: 简单测试UDF的功能
* @author: zhiyong
* @create: 2022-08-05 09:22
**/
public class TestDemo2 {
public static void main(String[] args) throws HiveException {
System.out.println("start!");
//测试Base64编码
Base64code2 base64code2 = new Base64code2();
String base64code = base64code2.evaluate(new GenericUDF.DeferredJavaObject[]{
new GenericUDF.DeferredJavaObject("战斗暴龙兽")}).toString();
System.out.println("输入内容:战斗暴龙兽\nbase64编码:" + base64code);//5oiY5paX5pq06b6Z5YW9
//测试Base64解码
Base64decode2 base64decode2 = new Base64decode2();
String base64decode = base64decode2.evaluate(new GenericUDF.DeferredJavaObject[]{new GenericUDF.DeferredJavaObject(base64code)});
System.out.println("base64解码:" + base64decode);
//测试加密
Encryption2 encryption2 = new Encryption2();
String encryption = encryption2.evaluate(new GenericUDF.DeferredJavaObject[]{
new GenericUDF.DeferredJavaObject("战斗暴龙兽"),
new GenericUDF.DeferredJavaObject("八神太一")
});
System.out.println("加密密钥:八神太一\n加密后:" + encryption);//5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY
//测试密钥处理
StringBuffer strb = new StringBuffer();
String key=strb.append(java.util.Base64.getEncoder()
.encodeToString("八神太一".getBytes(StandardCharsets.UTF_8)))
.reverse().toString();
System.out.println("发给SQL Boy的密钥:" + key);//AiL5qSa5eW65rWY5
//测试解密
Decrypt2 decrypt2 = new Decrypt2();
String decrypt = decrypt2.evaluate(new GenericUDF.DeferredJavaObject[]{
new GenericUDF.DeferredJavaObject(encryption),
new GenericUDF.DeferredJavaObject(key)
});
System.out.println("解密后:" + decrypt);
System.out.println("***************************************");
System.out.println("end!");
}
}
执行后:
start!
输入内容:战斗暴龙兽
base64编码:5oiY5paX5pq06b6Z5YW9
base64解码:战斗暴龙兽
加密密钥:八神太一
加密后:5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY
发给SQL Boy的密钥:AiL5qSa5eW65rWY5
解密后:战斗暴龙兽
***************************************
end!
Process finished with exit code 0
可以看到,不管是继承UDF还是GenericUDF,在Idea的Main方法都可以测试成功。但是绝大多数情况,不管是Hive SQL的UDF亦或Spark SQL的UDF甚至Flink SQL的UDF,都是要在集群上运行的,所以需要打Jar包扔集群运行。
运行在集群
检查MetaData
[root@zhiyong1 ~]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 2
Server version: 5.7.30 MySQL Community Server (GPL)
Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| db_hive_metastore |
| db_hue |
| db_ranger |
| db_udp |
| dolphinscheduler |
| mysql |
| performance_schema |
| sougoulogs |
| sys |
+--------------------+
10 rows in set (0.01 sec)
mysql> use db_hive_metastore;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+-------------------------------+
| Tables_in_db_hive_metastore |
+-------------------------------+
| AUX_TABLE |
| BUCKETING_COLS |
| CDS |
| COLUMNS_V2 |
| COMPACTION_QUEUE |
| COMPLETED_COMPACTIONS |
| COMPLETED_TXN_COMPONENTS |
| CTLGS |
| DATABASE_PARAMS |
| DBS |
| DB_PRIVS |
| DELEGATION_TOKENS |
| FUNCS |
| FUNC_RU |
| GLOBAL_PRIVS |
| HIVE_LOCKS |
| IDXS |
| INDEX_PARAMS |
| I_SCHEMA |
| KEY_CONSTRAINTS |
| MASTER_KEYS |
| MATERIALIZATION_REBUILD_LOCKS |
| METASTORE_DB_PROPERTIES |
| MIN_HISTORY_LEVEL |
| MV_CREATION_METADATA |
| MV_TABLES_USED |
| NEXT_COMPACTION_QUEUE_ID |
| NEXT_LOCK_ID |
| NEXT_TXN_ID |
| NEXT_WRITE_ID |
| NOTIFICATION_LOG |
| NOTIFICATION_SEQUENCE |
| NUCLEUS_TABLES |
| PARTITIONS |
| PARTITION_EVENTS |
| PARTITION_KEYS |
| PARTITION_KEY_VALS |
| PARTITION_PARAMS |
| PART_COL_PRIVS |
| PART_COL_STATS |
| PART_PRIVS |
| REPL_TXN_MAP |
| ROLES |
| ROLE_MAP |
| RUNTIME_STATS |
| SCHEMA_VERSION |
| SDS |
| SD_PARAMS |
| SEQUENCE_TABLE |
| SERDES |
| SERDE_PARAMS |
| SKEWED_COL_NAMES |
| SKEWED_COL_VALUE_LOC_MAP |
| SKEWED_STRING_LIST |
| SKEWED_STRING_LIST_VALUES |
| SKEWED_VALUES |
| SORT_COLS |
| TABLE_PARAMS |
| TAB_COL_STATS |
| TBLS |
| TBL_COL_PRIVS |
| TBL_PRIVS |
| TXNS |
| TXN_COMPONENTS |
| TXN_TO_WRITE_ID |
| TYPES |
| TYPE_FIELDS |
| VERSION |
| WM_MAPPING |
| WM_POOL |
| WM_POOL_TO_TRIGGER |
| WM_RESOURCEPLAN |
| WM_TRIGGER |
| WRITE_SET |
+-------------------------------+
74 rows in set (0.01 sec)
mysql> select * from FUNCS;
Empty set (0.01 sec)
mysql>
可以看到目前集群一穷二白,连个UDF函数都没有。
打Jar包
先MVN clean:
再MVN package:
然后根据路径找到打好的Jar包:
可以发现Jar包已经出现在该路径。
启动USDP集群
Last login: Fri Aug 5 12:19:02 2022 from 192.168.88.1
[root@zhiyong1 ~]# cd /opt/usdp-srv/usdp/
[root@zhiyong1 usdp]# bin/start-udp-server.sh
BASE_PATH: /opt/usdp-srv/usdp/bin
JMX_PATH: /opt/usdp-srv/usdp/jmx_exporter
REPAIR_PATH: /opt/usdp-srv/usdp/repair
UDP_PATH: /opt/usdp-srv/usdp
REPAIR_BIN_PATH: /opt/usdp-srv/usdp/repair/bin
REPAIR_SBIN_PATH: /opt/usdp-srv/usdp/repair/sbin
PACKAGES_PATH: /opt/usdp-srv/usdp/repair/packages
nmap-6.40-19.el7.x86_64
nmap exists
UDP Server is running with: 10995
Done.
[root@zhiyong1 usdp]#
web UI中登录:
http://192.168.88.100/login
admin,Zhiyong1
集群刚启动,资源充足。只需开启Zookeeper、HDFS、Yarn、Hive、Tez这几个组件即可。
按顺序启动后:
集群资源充足。
宿主机资源充足,没有爆内存风险。
上传Jar包
由于3台Worker节点都安装了Hive Client,故任选一台上传即可:
使用MobaXterm上传:
Last login: Wed Apr 13 23:30:26 2022 from zhiyong1
[root@zhiyong2 ~]# pwd
/root
[root@zhiyong2 ~]# ls -ltr
总用量 28
-rw-------. 1 root root 1639 3月 1 05:40 anaconda-ks.cfg
drwxr-xr-x. 2 root root 60 3月 1 23:11 logs
-rw-r--r-- 1 root root 14444 3月 11 22:36 test1.txt
-rw-r--r-- 1 root root 58 3月 14 22:19 wordtest1.txt
drwxr-xr-x 2 root root 34 4月 1 20:04 jars
-rw-r--r-- 1 root root 79 4月 13 00:40 taildir_position.json
[root@zhiyong2 ~]# cd jars/
[root@zhiyong2 jars]# ls -ltr
总用量 133364
-rw-r--r-- 1 root root 136561467 4月 1 20:04 flinkStudy-1.0.0.jar
[root@zhiyong2 jars]# ls -ltr
总用量 133380
-rw-r--r-- 1 root root 136561467 4月 1 20:04 flinkStudy-1.0.0.jar
-rw-r--r-- 1 root root 13083 8月 5 12:35 hiveDemo-1.0.0.jar
[root@zhiyong2 jars]#
上传完毕。
加载Jar包
[root@zhiyong2 jars]# hive
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/hive/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/usdp-srv/srv/udp/2.0.0.0/yarn/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 7c87c2aa-0d18-41aa-93ff-170b7363d068
Logging initialized using configuration in file:/opt/usdp-srv/srv/udp/2.0.0.0/hive/conf/hive-log4j2.properties Async: true
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Hive Session ID = a4f3739c-1693-4524-97b2-40601a0e4154
hive (default)> add jar /root/jars/hiveDemo-1.0.0.jar;
Added [/root/jars/hiveDemo-1.0.0.jar] to class path
Added resources: [/root/jars/hiveDemo-1.0.0.jar]
hive (default)>
此时已经加载成功。
注册UDF函数
加载后还需要注册才能使用。
create temporary function UDF函数名称 as '包名.类名'; #注册临时函数
desc function extended UDF函数名称; #查看UDF函数信息
drop temporary function if exists UDF函数名称; #可以弃用UDF
当然没有temporary就是永久函数了。
create function Base64code1 as 'com.zhiyong.hiveUDF.Base64code1';
create function Base64code2 as 'com.zhiyong.hiveUDF.Base64code2';
create function Base64decode1 as 'com.zhiyong.hiveUDF.Base64decode1';
create function Base64decode2 as 'com.zhiyong.hiveUDF.Base64decode2';
create function Encryption1 as 'com.zhiyong.hiveUDF.Encryption1';
create function Encryption2 as 'com.zhiyong.hiveUDF.Encryption2';
create function Decrypt1 as 'com.zhiyong.hiveUDF.Decrypt1';
create function Decrypt2 as 'com.zhiyong.hiveUDF.Decrypt2';
desc function extended Base64code1;
desc function extended Base64code2;
--常用命令
drop function if exists default.Base64code2;
create function default.Base64code2 as 'com.zhiyong.hiveUDF.Base64code2';
add jar /root/jars/hiveDemo-1.0.0.jar; --每次重启Hive都需要重新加载Jar包
select num,default.Base64code2(name) from digital_monster;
select num,default.Base64code2('aaa') from digital_monster;
执行后:
hive (default)> create function Base64code1 as 'com.zhiyong.hiveUDF.Base64code1';
2022-08-05 12:48:37 INFO impl.TimelineClientImpl: Timeline service address: zhiyong3:8188
OK
Time taken: 1.517 seconds
hive (default)> create function Base64code2 as 'com.zhiyong.hiveUDF.Base64code2';
OK
Time taken: 0.065 seconds
hive (default)> create function Base64decode1 as 'com.zhiyong.hiveUDF.Base64decode1';
OK
Time taken: 0.059 seconds
hive (default)> create function Base64decode2 as 'com.zhiyong.hiveUDF.Base64decode2';
OK
Time taken: 0.077 seconds
hive (default)> create function Encryption1 as 'com.zhiyong.hiveUDF.Encryption1';
OK
Time taken: 0.125 seconds
hive (default)> create function Encryption2 as 'com.zhiyong.hiveUDF.Encryption2';
OK
Time taken: 0.084 seconds
hive (default)> create function Decrypt1 as 'com.zhiyong.hiveUDF.Decrypt1';
OK
Time taken: 0.072 seconds
hive (default)> create function Decrypt2 as 'com.zhiyong.hiveUDF.Decrypt2';
OK
Time taken: 0.063 seconds
hive (default)> desc function extended Base64code1;
OK
tab_name
2022-08-05 12:49:15 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
2022-08-05 12:49:15 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 52decc77982b58949890770d22720a91adce0c3f]
There is no documentation for function 'Base64code1'
Function class:com.zhiyong.hiveUDF.Base64code1
Function type:PERSISTENT
Time taken: 0.215 seconds, Fetched: 3 row(s)
hive (default)> desc function extended Base64code2;
OK
tab_name
There is no documentation for function 'Base64code2'
Function class:com.zhiyong.hiveUDF.Base64code2
Function type:PERSISTENT
Time taken: 0.041 seconds, Fetched: 3 row(s)
hive (default)>
目前已注册成功。
吊起UDF函数
先切换库:
hive (default)> show databases;
OK
database_name
db_lzy
default
Time taken: 0.023 seconds, Fetched: 2 row(s)
hive (default)> use db_lzy;
OK
Time taken: 0.025 seconds
hive (db_lzy)> show tables;
OK
tab_name
demo1
Time taken: 0.053 seconds, Fetched: 1 row(s)
为了方便,先建表:
create table if not exists digital_monster(
num int comment'序号',
name string comment'名称'
)
comment '数码宝贝表'
stored as parquet
;
再插数据:
insert into digital_monster values(1,'亚古兽'),(2,'暴龙兽'),(3,'机械暴龙兽'),(4,'丧尸暴龙兽'),(5,'战斗暴龙兽'),(6,'大地暴龙兽'),(7,'闪光暴龙兽');
插入时需要先设置Tez引擎否则会报错return code1:
hive (db_lzy)> set hive.execution.engine;
hive.execution.engine=mr
hive (db_lzy)> set hive.execution.engine=tez;
hive (db_lzy)> insert into digital_monster values(1,'亚古兽'),(2,'暴龙兽'),(3,'机械暴龙兽'),(4,'丧尸暴龙兽'),(5,'战斗暴龙兽'),(6,'大地暴龙兽'),(7,'闪光暴龙兽');
Query ID = root_20220805125911_cacc50f7-86a5-415d-8518-18ab7d4f1e56
Total jobs = 1
Launching Job 1 out of 1
2022-08-05 12:59:12 INFO client.AHSProxy: Connecting to Application History server at zhiyong3/192.168.88.102:10201
2022-08-05 12:59:12 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
Status: Running (Executing on YARN cluster with App id application_1659673721568_0003)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 23.73 s
----------------------------------------------------------------------------------------------
Status: DAG finished successfully in 23.73 seconds
Query Execution Summary
----------------------------------------------------------------------------------------------
OPERATION DURATION
----------------------------------------------------------------------------------------------
Compile Query 0.67s
Prepare Plan 14.64s
Get Query Coordinator (AM) 0.03s
Submit Plan 0.62s
Start DAG 0.12s
Run DAG 23.73s
----------------------------------------------------------------------------------------------
Task Execution Summary
----------------------------------------------------------------------------------------------
VERTICES DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS
----------------------------------------------------------------------------------------------
Map 1 9973.00 13,420 2,192 3 1
Reducer 2 1668.00 1,370 0 1 0
----------------------------------------------------------------------------------------------
Loading data to table db_lzy.digital_monster
OK
col1 col2
Time taken: 42.947 seconds
hive (db_lzy)>
此时可以测试UDF:
select * from digital_monster;
select num,name from digital_monster;
select num,Base64code1(name) from digital_monster;
select num,Base64code2(name) from digital_monster;
执行效果:
hive (db_lzy)> select * from digital_monster;
OK
digital_monster.num digital_monster.name
1 亚古兽
2 暴龙兽
3 机械暴龙兽
4 丧尸暴龙兽
5 战斗暴龙兽
6 大地暴龙兽
7 闪光暴龙兽
Time taken: 0.457 seconds, Fetched: 7 row(s)
hive (db_lzy)> select num,Base64code1(name) from digital_monster;
FAILED: SemanticException [Error 10011]: Invalid function Base64code1
hive (db_lzy)> reload function;
OK
Time taken: 0.071 seconds
hive (db_lzy)> select num,Base64code1(name) from digital_monster;
FAILED: SemanticException [Error 10011]: Invalid function Base64code1
hive (db_lzy)> show functions;
OK
tab_name
!
!=
$sum0
%
&
*
+
-
/
<
<=
<=>
<>
=
==
>
>=
^
abs
acos
add_months
aes_decrypt
aes_encrypt
and
array
array_contains
ascii
asin
assert_true
assert_true_oom
atan
avg
base64
between
bin
bloom_filter
bround
cardinality_violation
case
cbrt
ceil
ceiling
char_length
character_length
chr
coalesce
collect_list
collect_set
compute_stats
concat
concat_ws
context_ngrams
conv
corr
cos
count
covar_pop
covar_samp
crc32
create_union
cume_dist
current_authorizer
current_database
current_date
current_groups
current_timestamp
current_user
date_add
date_format
date_sub
datediff
day
dayofmonth
dayofweek
decode
default.base64code1
default.base64code2
default.base64decode1
default.base64decode2
default.decrypt1
default.decrypt2
default.encryption1
default.encryption2
degrees
dense_rank
div
e
elt
encode
enforce_constraint
exp
explode
extract_union
factorial
field
find_in_set
first_value
floor
floor_day
floor_hour
floor_minute
floor_month
floor_quarter
floor_second
floor_week
floor_year
format_number
from_unixtime
from_utc_timestamp
get_json_object
get_splits
greatest
grouping
hash
hex
histogram_numeric
hour
if
in
in_bloom_filter
in_file
index
initcap
inline
instr
internal_interval
isfalse
isnotfalse
isnotnull
isnottrue
isnull
istrue
java_method
json_tuple
lag
last_day
last_value
lcase
lead
least
length
levenshtein
like
likeall
likeany
ln
locate
log
log10
log2
logged_in_user
lower
lpad
ltrim
map
map_keys
map_values
mask
mask_first_n
mask_hash
mask_last_n
mask_show_first_n
mask_show_last_n
matchpath
max
md5
min
minute
mod
month
months_between
murmur_hash
named_struct
negative
next_day
ngrams
noop
noopstreaming
noopwithmap
noopwithmapstreaming
not
ntile
nullif
nvl
octet_length
or
parse_url
parse_url_tuple
percent_rank
percentile
percentile_approx
pi
pmod
posexplode
positive
pow
power
printf
quarter
radians
rand
rank
reflect
reflect2
regexp
regexp_extract
regexp_replace
regr_avgx
regr_avgy
regr_count
regr_intercept
regr_r2
regr_slope
regr_sxx
regr_sxy
regr_syy
repeat
replace
replicate_rows
restrict_information_schema
reverse
rlike
round
row_number
rpad
rtrim
second
sentences
sha
sha1
sha2
shiftleft
shiftright
shiftrightunsigned
sign
sin
size
sort_array
sort_array_by
soundex
space
split
sq_count_check
sqrt
stack
std
stddev
stddev_pop
stddev_samp
str_to_map
struct
substr
substring
substring_index
sum
tan
to_date
to_epoch_milli
to_unix_timestamp
to_utc_timestamp
translate
trim
trunc
ucase
udftoboolean
udftobyte
udftodouble
udftofloat
udftointeger
udftolong
udftoshort
udftostring
unbase64
unhex
unix_timestamp
upper
uuid
var_pop
var_samp
variance
version
weekofyear
when
width_bucket
windowingtablefunction
xpath
xpath_boolean
xpath_double
xpath_float
xpath_int
xpath_long
xpath_number
xpath_short
xpath_string
year
|
~
Time taken: 0.016 seconds, Fetched: 297 row(s)
hive (db_lzy)> select num,default.Base64code1(name) from digital_monster;
OK
num _c1
1 5Lqa5Y+k5YW9
2 5pq06b6Z5YW9
3 5py65qKw5pq06b6Z5YW9
4 5Lin5bC45pq06b6Z5YW9
5 5oiY5paX5pq06b6Z5YW9
6 5aSn5Zyw5pq06b6Z5YW9
7 6Zeq5YWJ5pq06b6Z5YW9
Time taken: 0.278 seconds, Fetched: 7 row(s)
hive (db_lzy)> select num,default.Base64code2(name) from digital_monster;
OK
num _c1
1 5Lqa5Y+k5YW9
2 5pq06b6Z5YW9
3 5py65qKw5pq06b6Z5YW9
4 5Lin5bC45pq06b6Z5YW9
5 5oiY5paX5pq06b6Z5YW9
6 5aSn5Zyw5pq06b6Z5YW9
7 6Zeq5YWJ5pq06b6Z5YW9
Time taken: 0.277 seconds, Fetched: 7 row(s)
Tez的坑比较大,使用UDF时需要指定注册UDF时的库名。。。
select
num as `数码宝贝序号`,
name as `数码宝贝名称`,
default.Base64code1(name) as `base64编码`,
default.Base64Decode2(default.Base64code1(name)) as `base64解码`,
default.Encryption1(name,'八神太一') as `加密后`,
default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') as `解密后`
from db_lzy.digital_monster
;
Beeline最好是一行以防止报错:
select num,name,default.Base64code1(name),default.Base64Decode2(default.Base64code1(name)),default.Encryption1(name,'八神太一'),default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') from db_lzy.digital_monster;
带别名:
select num as `数码宝贝序号`, name as `数码宝贝名称`, default.Base64code1(name) as `base64编码`, default.Base64Decode2(default.Base64code1(name)) as `base64解码`, default.Encryption1(name,'八神太一') as `加密后`, default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') as `解密后` from db_lzy.digital_monster ;
执行后:
hive (db_lzy)> select num as `数码宝贝序号`, name as `数码宝贝名称`, default.Base64code1(name) as `base64编码`, default.Base64Decode2(default.Base64code1(name码`, default.Encryption1(name,'八神太一') as `加密后`, default.Decrypt2(default.Encryption1(name,'八神太一'),'AiL5qSa5eW65rWY5') as `解密后` from db_lzy.digital_monster ;
OK
数码宝贝序号 数码宝贝名称 base64编码 base64解码 加密后 解密后
1 亚古兽 5Lqa5Y+k5YW9 亚古兽 5LiA5aSq56We5YWr5YW95Y+k5Lqa 亚古兽
2 暴龙兽 5pq06b6Z5YW9 暴龙兽 5LiA5aSq56We5YWr5YW96b6Z5pq0 暴龙兽
3 机械暴龙兽 5py65qKw5pq06b6Z5YW9 机械暴龙兽 5LiA5aSq56We5YWr5YW96b6Z5pq05qKw5py6 机械暴龙兽
4 丧尸暴龙兽 5Lin5bC45pq06b6Z5YW9 丧尸暴龙兽 5LiA5aSq56We5YWr5YW96b6Z5pq05bC45Lin 丧尸暴龙兽
5 战斗暴龙兽 5oiY5paX5pq06b6Z5YW9 战斗暴龙兽 5LiA5aSq56We5YWr5YW96b6Z5pq05paX5oiY 战斗暴龙兽
6 大地暴龙兽 5aSn5Zyw5pq06b6Z5YW9 大地暴龙兽 5LiA5aSq56We5YWr5YW96b6Z5pq05Zyw5aSn 大地暴龙兽
7 闪光暴龙兽 6Zeq5YWJ5pq06b6Z5YW9 闪光暴龙兽 5LiA5aSq56We5YWr5YW96b6Z5pq05YWJ6Zeq 闪光暴龙兽
Time taken: 0.254 seconds, Fetched: 7 row(s)
hive (db_lzy)>
至此使用2种方式实现了Hive的UDF。
再次检查MetaData
mysql> select * from FUNCS;
ERROR 2006 (HY000): MySQL server has gone away
No connection. Trying to reconnect...
Connection id: 2113
Current database: db_hive_metastore
+---------+-----------------------------------+-------------+-------+---------------+-----------+------------+------------+
| FUNC_ID | CLASS_NAME | CREATE_TIME | DB_ID | FUNC_NAME | FUNC_TYPE | OWNER_NAME | OWNER_TYPE |
+---------+-----------------------------------+-------------+-------+---------------+-----------+------------+------------+
| 1 | com.zhiyong.hiveUDF.Base64code1 | 1659674918 | 1 | base64code1 | 1 | NULL | USER |
| 3 | com.zhiyong.hiveUDF.Base64decode1 | 1659674930 | 1 | base64decode1 | 1 | NULL | USER |
| 4 | com.zhiyong.hiveUDF.Base64decode2 | 1659674930 | 1 | base64decode2 | 1 | NULL | USER |
| 5 | com.zhiyong.hiveUDF.Encryption1 | 1659674931 | 1 | encryption1 | 1 | NULL | USER |
| 6 | com.zhiyong.hiveUDF.Encryption2 | 1659674931 | 1 | encryption2 | 1 | NULL | USER |
| 7 | com.zhiyong.hiveUDF.Decrypt1 | 1659674931 | 1 | decrypt1 | 1 | NULL | USER |
| 8 | com.zhiyong.hiveUDF.Decrypt2 | 1659674934 | 1 | decrypt2 | 1 | NULL | USER |
| 24 | com.zhiyong.hiveUDF.Base64code2 | 1659701054 | 1 | base64code2 | 1 | NULL | USER |
+---------+-----------------------------------+-------------+-------+---------------+-----------+------------+------------+
8 rows in set (0.02 sec)
mysql>
发现Hive存储在MySQL的元数据表中已经记录了这些UDF函数。如果把Jar包放置在hive-site.xml中配置的路径下,则每次重启Hive不需要执行add jar命令。