2017-06-16 16 views
0

大部分的spark数据集函数都是每行操作。但是,我想分配ML任务的执行以在Spark上运行 - 大多数ML任务自然是作为表的函数的操作,而不是行的各种功能。 (我研究过MLLib--它的方式太有限了,而且在许多情况下,通过在可能适合于单个内核的许多内核上进行分布式操作,执行速度要慢几个数量级)。是否可以在远程节点上执行表操作? (与行操作相比)

ML算法处理行的集合,而不是单行,这一点很重要,所以我想将一个表物化为节点上的内存。 (我小指诺言它会适合核心)。我怎样才能做到这一点?

功能,我想这样做:

def mlsubtask(table, arg2, arg3): 
    data = table.collect() 
    ... 

sc = SparkContext(...) 
sqlctx = SQLContext(sc) 
... 
df = sqlctx.sql("SELECT ...") 
results = sc.parallelize([(df,arg2,arg3),(df,arg2,arg3),(df,arg2,arg3)]).map(mlsubtask).collect() 

如果可以执行执行这样的:

sc = SparkContext(...) 
sqlctx = SQLContext(sc) 
... 
df = sqlctx.sql("SELECT ...") 
df = df.collect() 
results = sc.parallelize([(df,arg2,arg3),(df,arg2,arg3),(df,arg2,arg3)]).map(mlsubtask).collect() 

...但是这带来的数据到客户端,这在然后重新序列化,相当低效。

回答

0

对于单任务:

def mlsubtask(iter_rows): 
    data_table = list(iter_rows) # Or other way of bringing into memory. 
    ... 

df.repartition(1).mapPartitions(mlsubtask) 
相关问题