HIVE 自定义函数之UDF/UDAF/UDTF

原创  2017年04月10日 02:15:09
  • 689

UDF

UDF:用户自定义函数,表示以一行数据中的一列或者多列数据作为参数然后返回结果是一个值的函数,例如round()和floor(). 
示例:比较两个逗号分隔的字符串是否相同。

JAVA代码中一定要继承UDF类并实现evaluate()函数,在查询过程中对应的么一个用到这个函数的地方都会对这个类进行实例化,对每行输入都会调用到evaluate()函数,而且用户是可以重载evaluate方法的。 
废话不多说,上代码

import org.apache.hadoop.hive.ql.exec.UDF;

public class UDFTest extends UDF {


    private String[] isBlank(String value, String split) {
        String[] fields = value.split(split);
        return fields;
    }

    /**
     * 判断按照指定符号分隔的两个字段是否一致
     *
     * @param aids  第一个字段
     * @param bids  第二个字段
     * @param split 分隔符号
     * @return 如果返回值是1 则两条数据相同,如果返回0 则不同
     */
    public int evaluate(String aids, String bids, String split) {
        int result = 0;
        String[] values = isBlank(aids, split);
        String[] values1 = isBlank(bids, split);
        int length = values.length;
        if (length == values1.length) {
            for (int i = 0; i < length; i++) {
                if (values[i] == values1[i]) {
                    result = 1;
                } else {
                    return 0;
                }
            }
        } else {
            return 0;
        }
        return result;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

UDF类源码



package org.apache.hadoop.hive.ql.exec;

import org.apache.hadoop.hive.ql.udf.UDFType;

/**
 * A User-defined function (UDF) for use with Hive.
 * <p>
 * New UDF classes need to inherit from this UDF class (or from {@link
 * org.apache.hadoop.hive.ql.udf.generic.GenericUDF GenericUDF} which provides more flexibility at
 * the cost of more complexity).
 * <p>
 * Requirements for all classes extending this UDF are:
 * <ul>
 * <li>Implement one or more methods named {@code evaluate} which will be called by Hive (the exact
 * way in which Hive resolves the method to call can be configured by setting a custom {@link
 * UDFMethodResolver}). The following are some examples:
 * <ul>
 * <li>{@code public int evaluate();}</li>
 * <li>{@code public int evaluate(int a);}</li>
 * <li>{@code public double evaluate(int a, double b);}</li>
 * <li>{@code public String evaluate(String a, int b, Text c);}</li>
 * <li>{@code public Text evaluate(String a);}</li>
 * <li>{@code public String evaluate(List<Integer> a);} (Note that Hive Arrays are represented as
 * {@link java.util.List Lists} in Hive.
 * So an {@code ARRAY<int>} column would be passed in as a {@code List<Integer>}.)</li>
 * </ul>
 * </li>
 * <li>{@code evaluate} should never be a void method. However it can return {@code null} if
 * needed.
 * <li>Return types as well as method arguments can be either Java primitives or the corresponding
 * {@link org.apache.hadoop.io.Writable Writable} class.</li>
 * </ul>
 * One instance of this class will be instantiated per JVM and it will not be called concurrently.
 *
 * @see Description
 * @see UDFType
 */
@UDFType(deterministic = true)
public class UDF {

  /**
   * The resolver to use for method resolution.
   */
  private UDFMethodResolver rslv;

  /**
   * The constructor.
   */
  public UDF() {
    rslv = new DefaultUDFMethodResolver(this.getClass());
  }

  /**
   * The constructor with user-provided {@link UDFMethodResolver}.
   */
  protected UDF(UDFMethodResolver rslv) {
    this.rslv = rslv;
  }

  /**
   * Sets the resolver.
   *
   * @param rslv The method resolver to use for method resolution.
   */
  public void setResolver(UDFMethodResolver rslv) {
    this.rslv = rslv;
  }

  /**
   * Get the method resolver.
   */
  public UDFMethodResolver getResolver() {
    return rslv;
  }

  /**
   * This can be overridden to include JARs required by this UDF.
   *
   * @see org.apache.hadoop.hive.ql.udf.generic.GenericUDF#getRequiredJars()
   *      GenericUDF.getRequiredJars()
   *
   * @return an array of paths to files to include, {@code null} by default.
   */
  public String[] getRequiredJars() {
    return null;
  }

  /**
   * This can be overridden to include files required by this UDF.
   *
   * @see org.apache.hadoop.hive.ql.udf.generic.GenericUDF#getRequiredFiles()
   *      GenericUDF.getRequiredFiles()
   *
   * @return an array of paths to files to include, {@code null} by default.
   */
  public String[] getRequiredFiles() {
    return null;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

在源码中我们可以看到UDF有 有参和无参两个构造函数,在构造函数中我们发现一个类叫做UDFMethodResolver 在javadoc中是这样描述的

** 
* The UDF Method resolver interface. A user can plugin a resolver to their UDF 
* by implementing the functions in this interface. Note that the resolver is 
* stored in the UDF class as an instance variable. We did not use a static 
* variable because many resolvers maintain the class of the enclosing UDF as 
* state and are called from a base class e.g. UDFBaseCompare. This makes it 
* very easy to write UDFs that want to do resolution similar to the comparison 
* operators. Such UDFs just need to extend UDFBaseCompare and do not have to 
* care about the UDFMethodResolver interface. Same is true for UDFs that want 
* to do resolution similar to that done by the numeric operators. Such UDFs 
* simply have to extend UDFBaseNumericOp class. For the default resolution the 
* UDF implementation simply needs to extend the UDF class. 
*/

UDFMethodResolver是UDF类的解析接口可以自定义输入数据的类型,如果UDF的构造中没有传入实现了UDFMethodResolver接口的类的话,会指定一个默认的DefaultUDFMethodResolver,当然,也可以通过使用setResolver(UDFMethodResolver rslv)函数指定一个自定义的UDFMethodResolver,这个函数也是可覆写的。

到这里一个简单的UDF就写完了,然而我们怎么去使用它呢? 
如下面的例子 
部署 Jar包 
hdfs -dfs -put czy_hivetest.jar ‘hdfs:///user/hadoop/hiveUDF’ 
创建永久函数 
需在Hive中执行sql语句,格式如下:

CREATE FUNCTION [db_name.]function_name AS class_name
[USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
  • 1
  • 2

例如

CREATE FUNCTION db_name.getResult AS ' test.UDFTest'
USING JAR  'hdfs:///user/hadoop/hiveUDF/czy_hivetest.jar';
  • 1
  • 2

创建临时函数

CREATE TEMPORARY FUNCTION getResult AS ' test.UDFTest';
 SELECT getResult ("22,11,33", "11,33,22",",") FROM YDBTest;
 DROP TEMPORARY FUNCTION getResult ;
  • 1
  • 2
  • 3

创建一个临时函数,指向对应的UDF类就可以在之后的sql中使用这个函数啦,使用完了记得drop掉这个函数哦。 
函数需要属于某个库,如库名为test,当其他库调用的时候,需要加上库名,如“test.getResult ” 
调用方式: select test.getResult (‘127.0.0.1’,’192.168.0.1’,’.’) as result; 
如果是临时函数的话,则不能添加库名。

UDAF

UDAF:聚合函数,所有的聚合函数,用户自定义函数和内置函数,都统称为用户自定义聚合函数(UDAF)。 
聚合函数接受从0行到多行的0个列到多个列,然后返回单一值,例如求和函数sum()

自定义一个UDAF必须要继承UDAF类,并且必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。 
一个计算函数必须实现的5个方法的具体含义如下: 
init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。 
iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。 
terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。 
merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。 
terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

示例代码如下

package test;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class UDAFTest {

    public static class TestUDAFEvaluator implements UDAFEvaluator {
        //用於存放結果的內部類
        public static class PartialResult {
            String skuids;
            String delimiter;
        }

        //            声明一个结果对象
        private PartialResult partial;

        //        初始化结果对象
        public void init() {
            partial = null;
        }

        //            判断输入值并确定分隔符,单次计算
        public boolean iterate(String item_sku_id, String deli) {
//            如果為空直接返回
            if (item_sku_id == null) {
                return true;
            }
            if (partial == null) {
                partial = new PartialResult();
                partial.skuids = new String("");
//                如果没制定分隔符的话默认设置为“,”指定了的话设置为指定符号作为分隔符
                if (deli == null || deli.equals("")) {
                    partial.delimiter = new String(",");
                } else {
                    partial.delimiter = new String(deli);
                }

            }
//            如果partial.skuids有值将分隔符连接到partial.skuids后
            if (partial.skuids.length() > 0) {
                partial.skuids = partial.skuids.concat(partial.delimiter);
            }
//                在分隔符后连接一个item_sku_id
            partial.skuids = partial.skuids.concat(item_sku_id);
            return true;
        }

        //        返回当前聚集结果
        public PartialResult terminatePartial() {
            return partial;
        }

        //        合并一个聚集结果和另一个聚集结果
        public boolean merge(PartialResult other) {
            if (other == null) {
                return true;
            }
            if (partial == null) {
                partial = new PartialResult();
                partial.skuids = new String(other.skuids);
                partial.delimiter = new String(other.delimiter);
            } else {
                if (partial.skuids.length() > 0) {
                    partial.skuids = partial.skuids.concat(partial.delimiter);
                }
                partial.skuids = partial.skuids.concat(other.skuids);
            }
            return true;
        }

        //返回最终结果
        public String terminate() {
            return new String(partial.skuids);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

UDTF

UDTF:表生成函数,和其他函数类别一样,所有的表生成函数包括用户定义和内置的都统称为用户自定义表生成函数(UDTF)。 
表生成函数接受0个或多个输入,然后产生多列或多行输出例如array() 
不过HIVE只允许表生成函数以特定的方式使用,例如我们无法从表中产生其他的列。

本类继承自GenericUDTF接口并实现了三个函数,initialize初始化校验参数是否正确,process处理并返回结果,forward将结果返回。

代码示例

package test;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

/**
 * Created by Administrator on 2017/4/10 0010.
 */
public class UDTFTest extends GenericUDTF {


    public void process(Object[] args) throws HiveException {


        String input = args[0].toString();
        String[] test = input.split(";");
        for (int i = 0; i < test.length; i++) {
            try {
                String[] result = test[i].split(":");
                forward(result);
            } catch (Exception e) {
                continue;
            }
        }
    }

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        if (argOIs.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("col1");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("col2");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    public void close() throws HiveException {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

另:本文中示例所需的jar包的Maven依赖

         <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
        </dependency>
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐