2017-02-22 20 views
1

我想知道是否有人可以帮助我理解对象处理分区的方式Bag。简而言之,我试图将目前在Bag中的项目分组,以便每个组都位于其自己的分区中。令我困惑的是Bag.groupby()方法要求多个分区。这不应该被分组功能所暗示吗?例如,两个分区如果分组函数返回一个布尔值?将dask.bag项目分组到不同的分区

>>> a = dask.bag.from_sequence(range(20), npartitions = 1) 
>>> a.npartitions 
1 
>>> b = a.groupby(lambda x: x % 2 == 0) 
>>> b.npartitions 
1 

我很明显在这里丢失了一些东西。有没有办法将Bag项目分组到不同的分区?

回答

0

Dask包可能会在一个分区内放置几个组。

In [1]: import dask.bag as db 

In [2]: b = db.range(10, npartitions=3).groupby(lambda x: x % 5) 

In [3]: partitions = b.to_delayed() 

In [4]: partitions 
Out[4]: 
[Delayed(('groupby-collect-f00b0aed94fd394a3c61602f5c3a4d42', 0)), 
Delayed(('groupby-collect-f00b0aed94fd394a3c61602f5c3a4d42', 1)), 
Delayed(('groupby-collect-f00b0aed94fd394a3c61602f5c3a4d42', 2))] 

In [5]: for part in partitions: 
    ...:  print(part.compute()) 
    ...:  
[(0, [0, 5]), (3, [3, 8])] 
[(1, [1, 6]), (4, [4, 9])] 
[(2, [2, 7])] 
+0

的确,这就是让我困惑的原因。我怎样才能让每个组在自己的分区?我可以重新洗牌(并可能更改分区数量)吗? – ajmazurie

+0

尝试将每个组隔离到一个单独的分区对我来说听起来很奇怪。也许还有另一种方法可以完成你想要做的事情。你能解释为什么这是你的原始问题的目标吗? – MRocklin

+0

典型的用例是使用分区表示整个数据集的不同子集,以独立处理(显然在分区间共享相同的模式)。目前,分区仅作为用于优化并行化的内部构件来呈现,而Apache Spark方法则可以表示较大数据集的逻辑子集。 举个例子,你可以考虑时间戳'dask.bag'项目,根据它们共享相同的日期,将它们分组为分区。每天有一个分区有利于下游分析和存储。 – ajmazurie