2016-05-16 57 views
1

我已经在我的计算机中运行代码并使用频繁模式挖掘。我使用FP-growth,但是pyspark抛出一个错误,我不知道如何解决它,所以有人可以使用pyspark帮助我吗?FP-growth - 交易中的项目必须是唯一的

首先,我得到

data = sc.textFile(somewhere) 

这一步没有错误的数据 然后

transactions = data.map(lambda line: line.strip().split(' ')) 

接下来是

model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10) 

它抛出一个错误

An error occurred while calling o19.trainFPGrowthModel.:org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 3, localhost): org.apache.spark.SparkException: Items in a transaction must be unique but got WrappedArray(, , A, , Seq, 0xBB20C554Ack, 0xE6A8BA01Win, 0x7D78TcpLen, 20). 

我的数据是这样的

transactions.take(1) 

[[u'03/07', 
    u' 10', 
    u' 22', 
    u' 04.439824', 
    u' 139', 
    u' 1', 
    u' 1', 
    u' spp_sdf', 
    u' SDFCombinationAlert', 
    u' Classification', 
    u' SenstiveData', 
    u' Priority', 
    u' 2', 
    u' PROTO', 
    u' 254', 
    u' 197.218.177.69', 
    u' 172.16.113.84']] 

回答

4

那么,你得到的例外是非常不言自明。传递给FP增长的每个存储桶都必须包含一组项目,因此不会有重复项目。因此,例如,这不是有效的输入:

transactions = sc.parallelize([["A", "A", "B", "C"], ["B", "C", "A", "A"]]) 
FPGrowth.train(transactions, minSupport=0.2, numPartitions=10) 
## Py4JJavaError: An error occurred while calling o71.trainFPGrowthModel. 
## ... 
## Caused by: org.apache.spark.SparkException: 
## Items in a transaction must be unique but got WrappedArray(A, A, B, C). 

您已确保项目在通过这些下游之前是唯一的。

unique = transactions.map(lambda x: list(set(x))).cache() 
FPGrowth.train(unique, minSupport=0.2, numPartitions=10) 

注意

  • 是奔跑FPGrowth前一个好主意,cache数据。
  • 主观上它不是您使用的数据的最佳选择。
+0

是的,我找到了问题的根源,我将确保数据的唯一性,并且非常感谢您。 –

+0

另一个问题,我如何将结果保存到本地文件?我一直尝试saveAsTextFile()和write(),但它返回一个错误,谢谢。 –

+0

使用'model.save'。 – zero323

相关问题