我一直试图在PySpark上做一个简单的随机森林回归模型。我在R上有一个很好的机器学习经验。然而,对我而言,Pyspark上的ML似乎完全不同 - 尤其是当涉及到分类变量,字符串索引和OneHotEncoding的处理时(当只有数字变量时,我才能够仅通过以下示例执行RF回归)。虽然有很多示例可用于处理分类变量,例如this和this,但是我没有成功,因为它们中的大多数都超出了我的头(可能是因为我对Python ML不熟悉)。我会感谢任何能够帮助解决这个问题的人。PySpark上的分类输入随机森林回归
这里是我的尝试:inputfile is here
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes
输出是:
DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]
接下来,我选择我的兴趣变量:
IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()
输出是:
DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]
我的因变量是ConversionPayOut
,之前的字符串类型现在转换为double类型。
从这里开始我的困惑: 根据this post,我明白我必须将我的分类字符串类型变量转换为onehot编码向量。这是我尝试在那:
首先一个StringIndexing:
`
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()
`
StringIndexing的输出:
`
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|`
Next, I think, I have to do the OneHOtEncoding of the String Indexes:
`
from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()
`
出后一个热编码是这样的:
`
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[7],[1.0])|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0| (1,[0],[1.0])| (9,[2],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[5],[1.0])|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[0],[1.0])|
`
我无能,如何继续前进。实际上,我对Spark Machine Learning包需要我们做这种单热编码以及哪些不需要编码是毫无头绪的。
如果StackOverflow社区可以阐明如何前进,那么对PySpark的所有新手来说,这将是非常好的学习。
谢谢你的回答。这与我所尝试的类似。但运行VectorAssembler后,我遇到了新的错误。你能看看这个问题吗? https://stackoverflow.com/questions/46377686/how-to-match-and-replace-in-pyspark-when-columns-contain-vectors – kasa
@kasa你可以请尝试这段代码,让我们知道如果你是仍然得到相同的错误? – Prem