2017-04-25 51 views
0

我已经在此处看到类似于我的问题,但是在尝试一些可接受的答案时,我的代码仍然出现错误。我有一个包含三列的数据框 - 创建_at,文本和单词(这只是文本的标记版本)。请看下图:如果文本列包含指定列表中的单词,则过滤pyspark数据框

enter image description here

现在,我公司['Starbucks', 'Nvidia', 'IBM', 'Dell']的名单,我只是想保持在文本包括上述的那些话行。

我已经尝试了一些东西,但没有成功:

small_DF.filter(lambda x: any(word in x.text for word in test_list)) 

返回:类型错误:条件应该是字符串或列

我试图创建一个函数,并使用foreach()

def filters(line): 
    return(any(word in line for word in test_list)) 
df = df.foreach(filters) 

将df变成'Nonetype'

而最后一个我想:

df = df.filter((col("text").isin(test_list)) 

这将返回一个空的数据帧,因为我没有得到任何错误,这是很好的,但显然不是我想要的。

回答

0

我认为filter is not working becuase it expect a boolean output from lambda function and isin just with column。您正试图将单词列表与单词列表进行比较。这里是什么,我想可以给你一些方向 -

# prepare some test data ==> 

words = [x.lower() for x in ['starbucks', 'Nvidia', 'IBM', 'Dell']] 
data = [['i love Starbucks'],['dell laptops rocks'],['help me I am stuck!']] 
df = spark.createDataFrame(data).toDF('text') 


from pyspark.sql.types import * 

def intersect(row): 
    # convert each word in lowecase 
    row = [x.lower() for x in row.split()] 
    return True if set(row).intersection(set(words)) else False 


filterUDF = udf(intersect,BooleanType()) 
df.where(filterUDF(df.text)).show() 

输出:

+------------------+ 
|    text| 
+------------------+ 
| i love Starbucks| 
|dell laptops rocks| 
+------------------+ 
+0

我试过了你在我的数据框中写的UDF,用我的数据替换了df.where和df.text;但是,我收到错误:AttributeError:'NoneType'对象没有'split'属性。 对于交集函数,从技术上说,你传入一列(df.text)作为参数,对吗?是错误,因为它不是逐行迭代? – sjc725

+0

你的列文本是否有空值,可以解释为NoneType错误。你将不得不在udf中处理这个“if在None中返回False”。对于相交的行值,即“我爱星巴克”。如果你发布你已经尝试过的样本数据,这可能会有帮助。数据的图像不是很有帮助。 – Pushkr

0

上dataframes你.filter返回一个错误,因为它是SQL过滤功能(期待BooleanType()列)不RDD上的过滤功能。您列"text"

small_DF.rdd.filter(lambda x: any(word in x.text for word in test_list)) 

您不必使用UDF,你可以在pyspark使用正则表达式.rlike

from pyspark.sql import HiveContext 
hc = HiveContext(sc) 
import pyspark.sql.functions as psf 

words = [x.lower() for x in ['starbucks', 'Nvidia', 'IBM', 'Dell']] 
data = [['i love Starbucks'],['dell laptops rocks'],['help me I am stuck!']] 
df = hc.createDataFrame(data).toDF('text') 
df.filter(psf.lower(df.text).rlike('|'.join(words))) 
如果你想使用RDD一个,只需添加 .rdd
相关问题