2017-07-15 66 views
0

我想从Spark MLlib中调用LogisticRegressionWithLBFGS.train,并提供解决多类逻辑回归的训练数据。我的训练集数据被表示为:调用Spark MLlib时出现TypeError LogisticRegressionWithLBFGS.train

trainingData = sXYdf.rdd.map(lambda x: reg.LabeledPoint(x[0]-1,x[1:])) 
trainingData.take(2) 

的出LabeledPoints(2行)的是:(我不输出完整的标签和功能,因为它是2x401标签特征矩阵与特征从COL 1占据-401而标签是在山坳0相同的数据是这样的: -

[LabeledPoint(9.0, [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.56059679589e-06,1.94035947712e-06,-0.00073743872549,-0.0081340379902,-0.0186104473039,-0.0187412865354,-0.018757250817,-0.0190963541667...])] 

现在,当我打电话

lrm=LogisticRegressionWithLBFGS.train(trainingData,numClasses=10) 

我得到以下错误:

TypeError         Traceback (most recent call last) 
<ipython-input-20-9b0c5530b34b> in <module>() 
     1 #lr=LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0.0) 
----> 2 lrm=LogisticRegressionWithLBFGS.train(trainingData,numClasses=10) 

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\classification.py in train(cls, data, iterations, initialWeights, regParam, regType, intercept, corrections, tolerance, validateData, numClasses) 
    396     else: 
    397      initialWeights = [0.0] * len(data.first().features) * (numClasses - 1) 
--> 398   return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights) 
    399 
    400 

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\regression.py in _regression_train_wrapper(train_func, modelClass, data, initial_weights) 
    214   weights, intercept, numFeatures, numClasses = train_func(
    215    data, _convert_to_vector(initial_weights)) 
--> 216   return modelClass(weights, intercept, numFeatures, numClasses) 
    217  else: 
    218   weights, intercept = train_func(data, _convert_to_vector(initial_weights)) 

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\classification.py in __init__(self, weights, intercept, numFeatures, numClasses) 
    174    self._dataWithBiasSize = self._coeff.size/(self._numClasses - 1) 
    175    self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1, 
--> 176                 self._dataWithBiasSize) 
    177 
    178  @property 

TypeError: 'float' object cannot be interpreted as an integer 

增加了更多的日志: - 貌似工作者线程的创建有问题..

17/07/15 19:59:14 WARN TaskSetManager: Stage 123 contains a task of very large size (17658 KB). The maximum recommended task size is 100 KB. 
17/07/15 19:59:24 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 123) 
org.apache.spark.SparkException: Python worker did not connect back in time 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138) 
     at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67) 
     at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116) 
     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
     at java.lang.Thread.run(Unknown Source) 
Caused by: java.net.SocketTimeoutException: Accept timed out 
     at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) 
     at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) 
     at java.net.AbstractPlainSocketImpl.accept(Unknown Source) 
     at java.net.PlainSocketImpl.accept(Unknown Source) 
     at java.net.ServerSocket.implAccept(Unknown Source) 
     at java.net.ServerSocket.accept(Unknown Source) 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133) 
     ... 27 more 
17/07/15 19:59:24 WARN TaskSetManager: Lost task 0.0 in stage 123.0 (TID 123, localhost, executor driver): org.apache.spark.SparkException: Python worker did not connect back in time 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138) 
     at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67) 
     at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116) 
     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
     at java.lang.Thread.run(Unknown Source) 
Caused by: java.net.SocketTimeoutException: Accept timed out 
     at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) 
     at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) 
     at java.net.AbstractPlainSocketImpl.accept(Unknown Source) 
     at java.net.PlainSocketImpl.accept(Unknown Source) 
     at java.net.ServerSocket.implAccept(Unknown Source) 
     at java.net.ServerSocket.accept(Unknown Source) 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133) 
     ... 27 more 

17/07/15 19:59:24 ERROR TaskSetManager: Task 0 in stage 123.0 failed 1 times; aborting job 
Traceback (most recent call last): 
    File "C:\Users\Sunil\Anaconda3\lib\runpy.py", line 193, in _run_module_as_main 
    "__main__", mod_spec) 
    File "C:\Users\Sunil\Anaconda3\lib\runpy.py", line 85, in _run_code 
    exec(code, run_globals) 
    File "C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 211, in <module> 
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it 
[I 20:01:12.525 NotebookApp] Saving file at /mltclasspyspark.ipynb 

回答

1

嗯,看来there is a bug在星火2.1.1与Python 3产生上述错误(我无法重现它与Python 2.7)。

因此,如果您不能升级到2.1.2星火或2.2,其中的问题已经解决了报道,或使用Python 2.7,而不是,我建议正在修改你的map功能如下,让您的标签现在整数,而不是浮动(虽然没有测试它):

trainingData = sXYdf.rdd.map(lambda x: reg.LabeledPoint(int(x[0]-1),x[1:])) 
+0

不幸的是,类型转换为int没有帮助。让我试着升级火花并给它一个镜头。 – sunny

+1

感谢您的帮助。升级到火花2.2.0做了工作,虽然我保持了类型转换。 – sunny