2017-08-02 106 views
0

我想在Spark中使用MongoDB提供的数据运行k-means。 我有一个工作实施例中,其作用对一个平面文件:如何在kmeans中映射Spark中的MongoDB数据?

sc = SparkContext(appName="KMeansExample") # SparkContext 
data = sc.textFile("/home/mhoeller/kmeans_data.txt") 
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')])) 
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random") 

这是平面文件的格式为:

现在我想用MongoDB的更换简单文件:

spark = SparkSession \ 
.builder \ 
.appName("myApp") \ 
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \ 
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \ 
.getOrCreate() 

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load() 

# <<<< Here I am missing the parsing >>>>> 

clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random") 

我喜欢了解如何映射df中的数据,以便它可以用作kmeans的输入。

的数据库的 “布局” 是:

| - _id:字符串(可为空=真)
| - field0:二进制(可为空=真)
| - FIELD1:二进制(可空=真)
| - 场2:二进制(可为空=真)
| - 场3:二进制(可为空=真)
| - 字段4:二进制(可为空=真)
| - 字段5:二进制(空值=真)
| - 基尔D6:二进制(可为空=真)
| - 字段7:二进制(可为空=真)
| - 字段8:二元的(可为空=真)
| - 字段9:二元的(可为空=真)

回答

1

我喜欢了解如何映射df中的数据,以便它可以用作kmeans的输入。

根据您的代码段,我假定您使用PySpark。

如果你看看clustering.KMeans的Python API文档,你可以看到,第一个参数需要使用MongoDB Spark Connector

df = spark.read.format("com.mongodb.spark.sql.DefaultSource") 
       .option("uri","mongodb://127.0.0.1/ycsb.usertable") 
       .load() 

从MongoDB的负载数据,你有什么要RDD of Vector or convertible sequence types

后,你下面的代码执行在df是一个DataFrame,所以我们需要将它转换成可转换为Vector类型的东西。

因为你在你的文本文件例如使用numpy.array,我们可以继续使用这种阵列式学习转变。

根据提供的layout,首先我们需要删除_id列,因为它不需要进行聚类训练。有关更多信息,另请参阅Vector数据类型。

通过以上信息,让我们来看看它:

# Drop _id column and get RDD representation of the DataFrame 
rowRDD = df.drop("_id").rdd 

# Convert RDD of Row into RDD of numpy.array 
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row])) 

# Feed into KMeans 
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random") 

如果你想保持布尔值(真/假),而不是整数(1/0),那么你可以删除int部分。如下图所示:

parsedRdd = rowRDD.map(lambda row: array([x for x in row])) 

把所有的人都在一起:

from numpy import array 
from pyspark.mllib.clustering import KMeans 
import org.apache.spark.sql.SparkSession 

spark = SparkSession \ 
.builder \ 
.appName("myApp") \ 
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \ 
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \ 
.getOrCreate() 

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load() 

rowRDD = df.drop("_id").rdd 
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row])) 

clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random") 
clusters.clusterCenters 
相关问题