2011-06-22 41 views
32

有没有办法将重复集合保存到Hive中的收集集合中,或者模拟Hive使用其他方法提供的聚合集合类型?我想将具有相同密钥的列中的所有项目汇总到一个数组中,并带有重复项。COLLECT_SET()在Hive中,保持重复?

即:

hash_id | num_of_cats 
===================== 
ad3jkfk   4 
ad3jkfk   4 
ad3jkfk   2 
fkjh43f   1 
fkjh43f   8 
fkjh43f   8 
rjkhd93   7 
rjkhd93   4 
rjkhd93   7 

应该返回:

hash_agg | cats_aggregate 
=========================== 
ad3jkfk Array<int>(4,4,2) 
fkjh43f Array<int>(1,8,8) 
rjkhd93 Array<int>(7,4,7) 
+0

**如果这不清楚**:请让我知道。我仍然试图解决这个问题:( – batman

回答

23

尝试在Hive 0.13之后使用COLLECT_LIST(col)。0

SELECT 
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set 
FROM 
    tablename 
WHERE 
    blablabla 
GROUP BY 
    hash_id 
; 
+2

GROUP BY hash_id is missing – Tagar

22

没有什么内置的是,但创建用户定义的函数,包括碎石,是没有那么糟糕。唯一粗糙的部分是试图让它们类型泛型,但这里是一个收集示例。

package com.example; 

import java.util.ArrayList; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; 
import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.parse.SemanticException; 
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; 
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; 
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; 

public class CollectAll extends AbstractGenericUDAFResolver 
{ 
    @Override 
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis) 
      throws SemanticException 
    { 
     if (tis.length != 1) 
     { 
      throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected."); 
     } 
     if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE) 
     { 
      throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1."); 
     } 
     return new CollectAllEvaluator(); 
    } 

    public static class CollectAllEvaluator extends GenericUDAFEvaluator 
    { 
     private PrimitiveObjectInspector inputOI; 
     private StandardListObjectInspector loi; 
     private StandardListObjectInspector internalMergeOI; 

     @Override 
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) 
       throws HiveException 
     { 
      super.init(m, parameters); 
      if (m == Mode.PARTIAL1) 
      { 
       inputOI = (PrimitiveObjectInspector) parameters[0]; 
       return ObjectInspectorFactory 
         .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils 
         .getStandardObjectInspector(inputOI)); 
      } 
      else 
      { 
       if (!(parameters[0] instanceof StandardListObjectInspector)) 
       { 
        inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils 
          .getStandardObjectInspector(parameters[0]); 
        return (StandardListObjectInspector) ObjectInspectorFactory 
          .getStandardListObjectInspector(inputOI); 
       } 
       else 
       { 
        internalMergeOI = (StandardListObjectInspector) parameters[0]; 
        inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector(); 
        loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); 
        return loi; 
       } 
      } 
     } 

     static class ArrayAggregationBuffer implements AggregationBuffer 
     { 
      ArrayList<Object> container; 
     } 

     @Override 
     public void reset(AggregationBuffer ab) 
       throws HiveException 
     { 
      ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>(); 
     } 

     @Override 
     public AggregationBuffer getNewAggregationBuffer() 
       throws HiveException 
     { 
      ArrayAggregationBuffer ret = new ArrayAggregationBuffer(); 
      reset(ret); 
      return ret; 
     } 

     @Override 
     public void iterate(AggregationBuffer ab, Object[] parameters) 
       throws HiveException 
     { 
      assert (parameters.length == 1); 
      Object p = parameters[0]; 
      if (p != null) 
      { 
       ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminatePartial(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 

     @Override 
     public void merge(AggregationBuffer ab, Object o) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o); 
      for(Object i : partial) 
      { 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminate(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 
    } 
} 

然后在蜂巢,只是发出add jar Whatever.jar;CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll'; 您应该将能够使用它作为预期。

hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id; 
OK 
ad3jkfk [4,4,2] 
fkjh43f [1,8,8] 
rjkhd93 [7,4,7] 

值得一提的是,元素的顺序,应考虑不确定的,所以如果你打算使用这种饲料信息到n_grams您可能需要展开有点作为所需的数据进行排序。

+0

不错的答案:)我结束了尝试,并有几个问题。仔细查看你的代码,我发现我做错了(type-generic * is * hard),我认为这会奏效。 – batman

12

修改了Jeff Mc的代码以删除限制(大概继承自collect_set),输入必须是原始类型。该版本可以收集结构,地图和数组以及基元。

package com.example; 

import java.util.ArrayList; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; 
import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.parse.SemanticException; 
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; 
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; 
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; 

public class CollectAll extends AbstractGenericUDAFResolver 
{ 
    @Override 
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis) 
      throws SemanticException 
    { 
     if (tis.length != 1) 
     { 
      throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected."); 
     } 
     return new CollectAllEvaluator(); 
    } 

    public static class CollectAllEvaluator extends GenericUDAFEvaluator 
    { 
     private ObjectInspector inputOI; 
     private StandardListObjectInspector loi; 
     private StandardListObjectInspector internalMergeOI; 

     @Override 
     public ObjectInspector init(Mode m, ObjectInspector[] parameters) 
       throws HiveException 
     { 
      super.init(m, parameters); 
      if (m == Mode.PARTIAL1) 
      { 
       inputOI = parameters[0]; 
       return ObjectInspectorFactory 
         .getStandardListObjectInspector(ObjectInspectorUtils 
         .getStandardObjectInspector(inputOI)); 
      } 
      else 
      { 
       if (!(parameters[0] instanceof StandardListObjectInspector)) 
       { 
        inputOI = ObjectInspectorUtils 
          .getStandardObjectInspector(parameters[0]); 
        return (StandardListObjectInspector) ObjectInspectorFactory 
          .getStandardListObjectInspector(inputOI); 
       } 
       else 
       { 
        internalMergeOI = (StandardListObjectInspector) parameters[0]; 
        inputOI = internalMergeOI.getListElementObjectInspector(); 
        loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); 
        return loi; 
       } 
      } 
     } 

     static class ArrayAggregationBuffer implements AggregationBuffer 
     { 
      ArrayList<Object> container; 
     } 

     @Override 
     public void reset(AggregationBuffer ab) 
       throws HiveException 
     { 
      ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>(); 
     } 

     @Override 
     public AggregationBuffer getNewAggregationBuffer() 
       throws HiveException 
     { 
      ArrayAggregationBuffer ret = new ArrayAggregationBuffer(); 
      reset(ret); 
      return ret; 
     } 

     @Override 
     public void iterate(AggregationBuffer ab, Object[] parameters) 
       throws HiveException 
     { 
      assert (parameters.length == 1); 
      Object p = parameters[0]; 
      if (p != null) 
      { 
       ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminatePartial(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 

     @Override 
     public void merge(AggregationBuffer ab, Object o) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o); 
      for(Object i : partial) 
      { 
       agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI)); 
      } 
     } 

     @Override 
     public Object terminate(AggregationBuffer ab) 
       throws HiveException 
     { 
      ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; 
      ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); 
      ret.addAll(agg.container); 
      return ret; 
     } 
    } 
} 
+0

这可能是一个版本问题,但我只是尝试安装到我们的repo中并编译,但是当它在配置单元中被调用时,会出现以下错误:'此任务的诊断消息: 由java.lang.reflect引起。的InvocationTargetException \t在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) \t在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) \t在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc ...' – jlemaitre

11

由于蜂巢0.13,有一个内置的UDAF称为collect_list()是实现这一目的。见here

+1

奇怪的是,'collect_list'不能收集非原始类型(在Hive 0.13.1中)。否则,使用像这样的内置函数将会非常棒。 –

+3

klout团队拥有UDF的GREAT回购,您可以b卢维思。其中有一个处理非基元的收集函数。 https://github.com/klout/brickhouse/tree/master/src/main/java/brickhouse/udf/collect – jlemaitre

+0

@jlemaitre,谢谢你的链接! “其中是一个处理非基元的收集函数” 这是哪一个?提前致谢。 – Tagar

1

这里是确切的蜂巢查询做这件工作(仅适用于蜂巢> 0.13):

SELECT hash_id,collect_set(num_of_cats)FROM GROUP BY hash_id;

1

为了什么是值得的(虽然我知道这是一个较旧的帖子),Hive 0.13.0功能新的collect_list()功能,不重复删除。

+0

你能解释一下这个函数吗?通常这种长度的东西可以更好地用作对答案的评论(不幸的是,你不能这样做,因为你没有足够的评论)。 –

0

解决方法收集结构

假设你有一个表

tableWithStruct(
id string, 
obj struct <a:string,b:string>) 

现在创建另一个表作为

CREATE EXTERNAL TABLE tablename (
id string, 
temp array<string> 
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|' 

插入查询

insert into table tablename select id,collect(concat_ws('|',cast(obj.a as string),cast(obj.b as string)) from tableWithStruct group by id; 

现在建立在同一地点的另一个表作为表名

CREATE EXTERNAL TABLE tablename_final (
id string, 
array_list array<struct<a:string,b:string>> 
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|' 

当你从tablename_final选择您将获得所需的输出

-1

只是想知道 - 如果N的statemnent -

SELECT 
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set 
FROM 
    tablename 
WHERE 
    blablabla 
GROUP BY 
    hash_id 
; 

我们想要排序并限制num_of_cats的元素 - 如何去abou它呢?大数据中的COZ我们处理数据的PB。在这种情况下,我们可能不需要所有这些,但是前10名或限制它。

+0

如果您有新问题,请点击[Ask Question](问问题)(http://stackoverflow.com/questions/ask)按钮。如果有助于提供上下文,请包含此问题的链接。 - [发表评论](/ review/low-quality-posts/10597681) –

+1

好的,先生 - 只是我没有要点添加评论 - 下次尝试保持系统性。 –

+0

谢谢,当你有名誉时,你可以发表评论。 –