2017-09-22 65 views
3

我一直试图在PySpark上做一个简单的随机森林回归模型。我在R上有一个很好的机器学习经验。然而,对我而言,Pyspark上的ML似乎完全不同 - 尤其是当涉及到分类变量,字符串索引和OneHotEncoding的处理时(当只有数字变量时,我才能够仅通过以下示例执行RF回归)。虽然有很多示例可用于处理分类变量,例如thisthis,但是我没有成功,因为它们中的大多数都超出了我的头(可能是因为我对Python ML不熟悉)。我会感谢任何能够帮助解决这个问题的人。PySpark上的分类输入随机森林回归

这里是我的尝试:inputfile is here

from pyspark.mllib.linalg import Vectors 
from pyspark.ml import Pipeline 
from pyspark.ml.feature import StringIndexer, VectorIndexer 
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.sql.types import Row 
from pyspark.sql.functions import col, round 
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv') 
train.cache() 
train.dtypes 

输出是:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double] 

接下来,我选择我的兴趣变量:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"] 
train = train.fillna("XXX") 
train = train.select([column for column in train.columns if column in IMP]) 
from pyspark.sql.types import DoubleType 
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double")) 
train.cache() 

输出是:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double] 

我的因变量是ConversionPayOut,之前的字符串类型现在转换为double类型。

从这里开始我的困惑: 根据this post,我明白我必须将我的分类字符串类型变量转换为onehot编码向量。这是我尝试在那:

首先一个StringIndexing:

`

from pyspark.ml import Pipeline 
from pyspark.ml.feature import StringIndexer 
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ] 
pipeline = Pipeline(stages=indexers) 
train_catind = pipeline.fit(train).transform(train) 
train_catind.show() 

`

StringIndexing的输出:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+ 
|Country|Carrier|TrafficType| Device|  Browser|  OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index| 
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+ 
|  TH| 20.0|   A| Lava|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   7.0| 
|  BR| 217.0|   A|  LG|  chrome|Android|  26.2680574| 0.0|    0.0|   2.0|   0.0|  0.0|   5.0| 
|  TH| 20.0|   A|Generic|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   0.0|` 


Next, I think, I have to do the OneHOtEncoding of the String Indexes: 

`

from pyspark.ml.feature import OneHotEncoder, StringIndexer 
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ] 
pipeline = Pipeline(stages=indexers_ON) 
train_OHE = pipeline.fit(train_catind).transform(train_catind) 
train_OHE.show() 

`

出后一个热编码是这样的:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+ 
|Country|Carrier|TrafficType| Device|  Browser|  OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec| 
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+ 
|  TH| 20.0|   A| Lava|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   7.0|  (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[7],[1.0])| 
|  BR| 217.0|   A|  LG|  chrome|Android|  26.2680574| 0.0|    0.0|   2.0|   0.0|  0.0|   5.0|  (1,[0],[1.0])| (9,[2],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[5],[1.0])| 
|  TH| 20.0|   A|Generic|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   0.0|  (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[0],[1.0])| 

`

我无能,如何继续前进。实际上,我对Spark Machine Learning包需要我们做这种单热编码以及哪些不需要编码是毫无头绪的。

如果StackOverflow社区可以阐明如何前进,那么对PySpark的所有新手来说,这将是非常好的学习。

回答

1

要对预处理数据运行随机森林,您可以继续使用下面的代码。

from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.classification import RandomForestClassifier 

#use VectorAssembler to combine all the feature columns into a single vector column 
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"] 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") 
pipeline = Pipeline(stages=assembler) 
df = pipeline.fit(train_OHE).transform(train_OHE) 
df = df.withColumn("label", train_OHE.ConversionPayOut) 

#randomly split data into training and test dataset 
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111) 

# train RandomForest model 
rf = RandomForestClassifier(labelCol="label", featuresCol="features") 
rf_model = rf.fit(train_data) 

# Make predictions on test data 
predictions = rf_model.transform(test_data) 


希望这有助于!

+0

谢谢你的回答。这与我所尝试的类似。但运行VectorAssembler后,我遇到了新的错误。你能看看这个问题吗? https://stackoverflow.com/questions/46377686/how-to-match-and-replace-in-pyspark-when-columns-contain-vectors – kasa

+0

@kasa你可以请尝试这段代码,让我们知道如果你是仍然得到相同的错误? – Prem