2012-12-05 53 views
41

据我所知,Hive集群vs vs排序方式vs排序方式

  • 排序仅由全球订单的事情排序与减速

  • 订单,但猛推一切都变成一个减速

  • 集群通过智能通过关键字hash东西分配到减速机,并作出排序方式

所以我的问题是不是通过保证全球秩序的集群呢?通过将相同的密钥分配到相同的缩减器中,但相邻的密钥又如何?

我能找到的唯一文件是here,从这个例子看来它似乎是在全球订购它们。但从定义来看,我觉得它并不总是这样做。

回答

110

较短的回答:是的,CLUSTER BY保证全球订购,只要您愿意自己加入多个输出文件。

较长的版本:

  • ORDER BY x:保证整体排序,但只通过一个减速推动所有资料,并参考。对于大型数据集来说,这基本上是不可接受的。您最终得到一个排序文件作为输出。
  • SORT BY x:在N个缩减器中的每一个处订购数据,但每个缩减器都可以接收重叠范围的数据。您最终会得到N个或更多具有重叠范围的排序文件。
  • DISTRIBUTE BY x:确保N个reducer中的每一个获得非重叠范围x,但不排序每个reducer的输出。您最终会得到N个或未分类的文件,其范围不重叠。
  • CLUSTER BY x:确保N个reducer中的每个reducer都获得非重叠范围,然后按减法器上的这些范围进行排序。这给你全球订购,和做(DISTRIBUTE BY xSORT BY x)一样。您最终会得到N个或更多分类不重叠的文件。

有意义吗?所以CLUSTER BY基本上是ORDER BY更具可扩展性的版本。

+0

很好的解释.. – minhas23

+7

正如其他答案所述,根据https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy,“CLUSTER BY”和“DISTRIBUTE BY”不能给你提供非常好的解释。重叠的范围。 'CLUSTER BY'不能保证全球订购。 – yhuai

+0

林想知道...什么被认为是“大数据集”?你能量化吗? – idoda

10

让我首先澄清一下:clustered by仅将您的密钥分发到不同的存储桶中,clustered by ... sorted by获取存储桶排序。

通过一个简单的实验(见下文),您可以看到默认情况下您不会获得全局排序。原因是默认分区程序使用哈希代码分割键,而不管实际的键顺序如何。

但是,您可以完全订购您的数据。

Tom Tom的动机是“Hadoop:权威指南”(第3版,第8章,第274页,Total Sort),他讨论了TotalOrderPartitioner。

我会先回答您的TotalOrdering问题,然后介绍几个与排序相关的Hive实验。

请记住:我在这里描述的是'概念验证',我能够使用Claudera的CDH3发行版处理一个示例。

最初我希望org.apache.hadoop.mapred.lib.TotalOrderPartitioner能够做到这一点。不幸的是,它并不是因为它看起来像Hive按值分区,而不是键。所以我给它打上补丁(应该有子类,但我没有时间为):

更换

public int getPartition(K key, V value, int numPartitions) { 
    return partitions.findPartition(key); 
} 

public int getPartition(K key, V value, int numPartitions) { 
    return partitions.findPartition(value); 
} 

现在你可以集(修补)TotalOrderPartitioner为您的蜂巢分区:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; 

hive> set total.order.partitioner.natural.order=false 

hive> set total.order.partitioner.path=/user/yevgen/out_data2 

我也用

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4; 

在我的测试。

文件out_data2告诉TotalOrderPartitioner如何存取值。 您通过对数据进行采样来生成out_data2。在我的测试中,我使用了4个桶和从0到10的按键。我产生out_data2用特别的方法:

import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.hive.ql.io.HiveKey; 
import org.apache.hadoop.fs.FileSystem; 


public class TotalPartitioner extends Configured implements Tool{ 
    public static void main(String[] args) throws Exception{ 
      ToolRunner.run(new TotalPartitioner(), args); 
    } 

    @Override 
    public int run(String[] args) throws Exception { 
     Path partFile = new Path("/home/yevgen/out_data2"); 
     FileSystem fs = FileSystem.getLocal(getConf()); 

     HiveKey key = new HiveKey(); 
     NullWritable value = NullWritable.get(); 

     SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class); 
     key.set(new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why 
     writer.append(key, value); 
     key.set(new byte[]{1, 6}, 0, 2);//partition at 6 
     writer.append(key, value); 
     key.set(new byte[]{1, 9}, 0, 2);//partition at 9 
     writer.append(key, value); 
     writer.close(); 
     return 0; 
    } 

} 

然后我复制导致out_data2到HDFS(进入/用户/叶夫根/ out_data2)

通过这些设置,我得到了分时段/分选我的数据(见最后一个项目在我的实验列表中)。

这是我的实验。

  • 创建采样数据

    的bash>回波-e “1 \ N3 \ N 2 \ N4 \ N5 \ N7 \ N6 \ n8 \ N9 \ N0”> data.txt中

  • 创建基本测试表:

    配置单元> create table test(x int); 配置单元>将数据本地inpath'data.txt'加载到表测试中;

基本上这个表格包含从0到9的数值,没有顺序。

  • 演示表复制如何工作的(真mapred.reduce.tasks参数其中规定的减少执行任务的最大数量)

    蜂巢>创建表测试2(X INT);

    hive> set mapred.reduce.tasks = 4;

    配置单元>插入覆盖表test2 从测试a选择a.x 在a.x = b.x上加入测试b ; - stupied加入迫使非平凡地图减少

    的bash> hadoop的FS -cat /用户/蜂巢/仓库/ TEST2/000001_0

  • 演示bucketing。您可以看到密钥随机分配,没有任何排序顺序:

    配置单元> create table test3(x int) 将(x)聚类为4个存储桶;

    hive> set hive.enforce.bucketing = true;

    配置单元>插入覆盖表test3 select * from test;

    的bash> hadoop的FS -cat /用户/蜂巢/仓库/ TEST3/000000_0

  • 桶装与排序。成果部分排序,不完全排序

    蜂房>创建用(x)用(x降序)排序 聚类成4桶表TEST4(X INT) ;

    配置单元>插入覆盖表test4 select * from test;

    的bash> hadoop的FS -cat /用户/蜂巢/仓库/ TEST4/000001_0

可以看到,值在排序升序。看起来像CDH3中的Hive bug?

  • 入门部分地由声明无簇排序:

    蜂房>创建表TEST5作为 从测试 选择X 用x 排序用x降序分配;

    的bash> hadoop的FS -cat /用户/蜂巢/仓库/ TEST5/000001_0

  • 使用我的修补TotalOrderParitioner:

    蜂房> set hive.mapred.partitioner = org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    蜂房>设置total.order.partitioner.natural.order =假

    蜂房>设置total.order.partitioner。创建表test6(x int) 通过(x)排序将(x)排序为4个存储桶;创建表test6(x int)

    配置单元>插入覆盖表test6 select * from test;

    的bash> hadoop的FS -cat /用户/蜂巢/仓库/ TEST6/000000_0

    的bash> hadoop的FS -cat /用户/蜂巢/仓库/ TEST6/000001_0

    的bash> hadoop的FS -cat /用户/蜂巢/仓库/ TEST6/000002_0

    的bash> hadoop的FS -cat/user/hive/warehouse/test6/000003_0

4

据我所知,简短的回答是No. 你会得到重叠的范围。

SortBy documentation: “Cluster By是Distribute By和Sort By的缩写。” “ ”具有相同“分布依据”列的所有行将转到同一个还原器。“ 但是没有保证非重叠范围分布的信息。

而且,从DDL BucketedTables documentation: “?如何配置单元穿过桶分配的行通常,铲斗数量由表达式hash_function(bucketing_column)MOD num_buckets确定。” 我想通过Select语句中的Cluster使用相同的原则来在reducer之间分配行,因为它主要用于使用数据填充bucketed表。

我用1 int列“a”创建了表,并在其中插入了从0到9的数字。

然后我设置减速器的数量为2 set mapred.reduce.tasks = 2;

而且从数据从该表与集群by子句 select * from my_tab cluster by a;

选择而接收到的结果,我预计:

0 2 4 6 8 1 3 5 7 9

所以第一减速器(0号)得到偶数(因为他们的模式2给出0)

和第二个还原器(编号1)得到了奇数(因为他们的模式2给出了1)

这就是“分配方式”的工作原理。

然后“排序依据”对每个减速器内部的结果进行排序。

1

CLUSTER BY不会产生全局排序。

接受的答案(由Lars Yencken引用)误导了减速器将会收到非重叠的范围。正如Anton Zaviriukhin正确指出BucketedTables文档,CLUSTER BY基本上是DISTRIBUTE BY(与bucketing相同),再加上每个bucket/reducer中的SORT BY。并且DISTRIBUTE BY只是散列和模块到桶中,而散列函数may保持顺序(如果i> j,则散列值为i>散列值),但散列值的模块不会。

下面是示出重叠范围

http://myitlearnings.com/bucketing-in-hive/

0

通过为每减速器排序不是全局群集一个更好的例子。在许多书中,它也被错误地或混淆地提及。在将每个部门分配给特定的缩减器,然后按每个部门中的员工名称进行分类,并且不关心部署的顺序而不关心使用集群的情况下,它可以更好地执行工作,因为工作负载分布在减速器中。

+0

如果您使用collect_set或collect_list分发后,它会保存命令吗? – vkaul11