2015-10-30 48 views
1

我试图做基于每个分区的一些工作,我想作为输入返回相同的数据:如何使Apache Spark mapPartition正常工作?

from urllib3 import HTTPConnectionPool 

rdd = sc.parallelize(["peter", "john", "harris"]) 
def sendPartition(iterator): 
    pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10) 

    for record in iterator: 
     r = pool.request('GET', '/ajax/services/search/web', fields={'q': 'urllib3', 'v': '1.0'}) 

    return iterator 


rdd.mapPartitions(sendPartition).count() 

我收到此错误:

TypeError: 'NoneType' object is not iterable

PS:这只是简化我想要实现的目标。我想为每个元素执行对ElasticSearch的复杂地理政治请求(所以我不能使用Spark Elasticsearch连接器)。在这个地图分区之前,我有一个巨大的过滤器,地图等管道。

PPS:我重新启动了我的火花,现在我得到了“0”作为比错误更好的输出,但是我期望它是“3”。

+1

请提供一个可重现的例子。用您提供的代码重现问题是不可能的。如果我不得不猜测代码中的某个点,那么就有'sc.parallelize(None)'等价的东西。在旁边注意,Spark不是这样的好工具。 – zero323

+0

@ zero323我添加了一些细节。 – poiuytrez

+2

据我可以告诉它在1.3-1.5和当前主控上工作得很好。 – zero323

回答

0

关于类型错误,它看起来不像它可以使用问题中包含的代码进行复制。我的猜测是在某点None值已被传递给RDD构造函数或从sendPartition返回。

作为输出的空RDD问题是您如何使用分区迭代器的结果。 PySpark使用itertools.chain将数据传递给mapPartition,其行为与Scala Iterator的行为方式大致相同。

import itertools 

iter = itertools.chain(range(10)) 
iter.next() 
## 0 

完成之后一个for

for x in iter: 
    x 

你结束了一个空chain

type(iter) 
## itertools.chain 

iter.nex() 
## Traceback (most recent call last) 
##  ... 
## StopIteration: 

虽然StopIteration正常迭代逻辑的一部分的处理没有数据回来。

有几种方法来处理这其中最干净的是提取的功能和使用列表理解

def make_request(record, pool): 
    r = pool.request('GET', '/ajax/services/search/web', 
     fields={'q': 'urllib3', 'v': '1.0'}) 
    return r.read() # Or any other data you need. 

def sendPartition(iterator): 
    pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10) 
    return [make_request(record, pool) for record in iterator] 

请注意,如果你想用你来读取数据连接池退出mapPartitions之前。这意味着没有懒惰的评估(如发电机)。就我个人而言,我会考虑异步请求(例如,使用3.5中的async/await,其他地方的RxPy),并在退出之前进行评估。

相关问题