我试图做基于每个分区的一些工作,我想作为输入返回相同的数据:如何使Apache Spark mapPartition正常工作?
from urllib3 import HTTPConnectionPool
rdd = sc.parallelize(["peter", "john", "harris"])
def sendPartition(iterator):
pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10)
for record in iterator:
r = pool.request('GET', '/ajax/services/search/web', fields={'q': 'urllib3', 'v': '1.0'})
return iterator
rdd.mapPartitions(sendPartition).count()
我收到此错误:
TypeError: 'NoneType' object is not iterable
PS:这只是简化我想要实现的目标。我想为每个元素执行对ElasticSearch的复杂地理政治请求(所以我不能使用Spark Elasticsearch连接器)。在这个地图分区之前,我有一个巨大的过滤器,地图等管道。
PPS:我重新启动了我的火花,现在我得到了“0”作为比错误更好的输出,但是我期望它是“3”。
请提供一个可重现的例子。用您提供的代码重现问题是不可能的。如果我不得不猜测代码中的某个点,那么就有'sc.parallelize(None)'等价的东西。在旁边注意,Spark不是这样的好工具。 – zero323
@ zero323我添加了一些细节。 – poiuytrez
据我可以告诉它在1.3-1.5和当前主控上工作得很好。 – zero323