2015-11-08 41 views
1

我有类似于如下代码:Spark MLLib的LassoWithSGD不能缩放?

val fileContent = sc.textFile("file:///myfile") 

val dataset = fileContent.map(row => { 
    val explodedRow = row.split(",").map(s => s.toDouble) 

    new LabeledPoint(explodedRow(13), Vectors.dense(

    Array(explodedRow(10), explodedRow(11), explodedRow(12)) 
))}) 

val algo = new LassoWithSGD().setIntercept(true) 

val lambda = 0.0 
algo.optimizer.setRegParam(lambda) 
algo.optimizer.setNumIterations(100) 
algo.optimizer.setStepSize(1.0) 

val model = algo.run(dataset) 

我我的虚拟服务器上的20个内核上运行这个在云中。该文件是具有几百万行的“本地”(即不在HDFS中)文件。我以本地模式运行它,用sbt运行(即我不使用集群,我不使用spark-submit)。

当我将spark.master = local [*]设置从local [8]增加到local [40]时,我预料到这会变得越来越快。相反,无论我使用什么设置,它都需要相同的时间量(但我从Spark UI注意到,我的执行程序在任何给定时间都有最大数量的活动任务,等于预期的数量,即本地〜8 [8],对于本地[40]〜40等等 - 因此似乎并行化起作用)。

默认的分区数我的数据集RDD是4。我试图迫使分区数到20,没有成功 - 事实上,它减缓了套索算法下来,甚至更多...

是我的期望的缩放过程不正确?有人可以帮我解决这个问题吗?

回答

3

我对扩展过程的期望是否正确?

好吧,那种。我希望你不介意我用一点Python来证明我的观点。

  1. 让我们可以大方的说几百万行实际上是多万。使用40 000 000个值(截距+ 3特征+每行标签),它可以提供大约380 MB的数据(Java Doubledouble-precision 64-bit IEEE 754 floating point)。让我们创建一些虚拟的数据:

    import numpy as np 
    
    n = 10 * 1000**2 
    X = np.random.uniform(size=(n, 4)) # Features 
    y = np.random.uniform(size=(n, 1)) # Labels 
    theta = np.random.uniform(size=(4, 1)) # Estimated parameters 
    
  2. 梯度下降的每一步(因为默认miniBatchFractionLassoWithSGD为1.0它不是真正随机的)忽略正规化需要这样的操作。

    def step(X, y, theta): 
        return ((X.dot(theta) - y) * X).sum(0) 
    

    所以让我们看看需要多长时间我们的数据本地概念:

    %timeit -n 15 step(X, y, theta) 
    ## 15 loops, best of 3: 743 ms per loop 
    

    比第二每步少,无需任何额外的优化。直观地说,它非常快速,并且不容易匹配。只是为了好玩,让我们看看它有多少需要得到数据封闭形式的解决方案是这样

    %timeit -n 15 np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y) 
    ## 15 loops, best of 3: 1.33 s per loop 
    
  3. 现在,让我们回到火花。可以并行计算单个点的残差。因此,当增加并行处理的分区数时,这是一个线性缩放的部分。

    问题是您必须在本地聚合数据,序列化,传输到驱动程序,反序列化和本地减少以在每个步骤后获得最终结果。然后你计算新的theta,序列化回送等等。

    所有这些都可以通过正确使用小批量和一些进一步的优化来改善,但是在一天结束时,您将受到整个系统延迟的限制。值得注意的是,当您在工作人员一方增加并行性时,您也会增加必须依次在驱动程序上执行的工作量,反之亦然。这种或那种方式Amdahl's law会咬你。

    上面的所有内容都忽略了实际的实现。

    现在让我们执行另一个实验。首先是一些虚拟数据:

    nCores = 8 # Number of cores on local machine I use for tests 
    rdd = sc.parallelize([], nCores) 
    

    和bechmark:

    %timeit -n 40 rdd.mapPartitions(lambda x: x).count() 
    ## 40 loops, best of 3: 82.3 ms per loop 
    

    这意味着,与8个内核,没有任何真正的处理或网络流量,我们得到的地步,我们无法做到通过提高并行好得多在火花(743ms/8 =每分区92.875ms假设并行部分的线性可扩展性)

不仅仅是上文总结:

  • 如果使用渐变下降可以在本地方便地使用封闭解决方案处理数据,这只是浪费时间。如果你想增加并行/减少延迟,你可以使用好的线性代数库
  • Spark被设计来处理大量的数据而不是减少延迟。如果你的数据在几年的老智能手机的内存适合这是一个好兆头,是不正确的工具
  • 如果计算是便宜的常数,成本成为限制因素

旁注:

  • 相对大量的每台机器的核心是一般来讲不是最好的选择,除非你可以用IO吞吐量匹配这个