3

选择列我跑以下笔记本在飞艇:满足条件的

%spark.pyspark 
l = [('user1', 33, 1.0, 'chess'), ('user2', 34, 2.0, 'tenis'), ('user3', None, None, ''), ('user4', None, 4.0, ' '), ('user5', None, 5.0, 'ski')] 
df = spark.createDataFrame(l, ['name', 'age', 'ratio', 'hobby']) 
df.show() 

root 
|-- name: string (nullable = true) 
|-- age: long (nullable = true) 
|-- ratio: double (nullable = true) 
|-- hobby: string (nullable = true) 
+-----+----+-----+-----+ 
| name| age|ratio|hobby| 
+-----+----+-----+-----+ 
|user1| 33| 1.0|chess| 
|user2| 34| 2.0|tenis| 
|user3|null| null|  | 
|user4|null| 4.0|  | 
|user5|null| 5.0| ski| 
+-----+----+-----+-----+ 

agg_df = df.select(*[(1.0 - (count(c)/count('*'))).alias(c) for c in df.columns]) 
agg_df.show() 

root 
|-- name: string (nullable = true) 
|-- age: long (nullable = true) 
|-- ratio: double (nullable = true) 
|-- hobby: string (nullable = true) 
+----+---+-------------------+-----+ 
|name|age|    ratio|hobby| 
+----+---+-------------------+-----+ 
| 0.0|0.6|0.19999999999999996| 0.0| 
+----+---+-------------------+-----+ 

现在,我要选择在agg_df仅列其价值是< 0.35。在这种情况下,它应该返回['name','ratio','hobby']

我不知道该怎么做。任何提示?

+0

你想要比较哪个列值? –

回答

3

你的意思是< 0.35 ?.这应该做

>>> [ key for (key,value) in agg_df.collect()[0].asDict().items() if value < 0.35 ] 
['hobby', 'ratio', 'name'] 

用空值替换空白字符串使用下面的udf函数。

from pyspark.sql.functions import udf 
process = udf(lambda x: None if not x else (x if x.strip() else None)) 
df.withColumn('hobby', process(df.hobby)).show() 
+-----+----+-----+-----+ 
| name| age|ratio|hobby| 
+-----+----+-----+-----+ 
|user1| 33| 1.0|chess| 
|user2| 34| 2.0|tenis| 
|user3|null| null| null| 
|user4|null| 4.0| null| 
|user5|null| 5.0| ski| 
+-----+----+-----+-----+ 
+0

在运行聚合之前,如何用无替换空字符串?在爱好中,我应该有2个空值。 –

+0

@SofianeCherchalli我已经更新了答案,以空值代替空白字符串。你能接受答案吗? :) –

+0

完成。对不起,延迟回复 –

0

这是我试图根据rogue-one适应症寻找的功能。不知道它是最快的还是最优化的:

from pyspark.sql.functions import udf, count 
from functools import reduce 

def filter_columns(df, threshold=0.35): 
     process = udf(lambda x: None if not x else (x if x.strip() else None)) # udf for stripping string values 
     string_cols = ([c for c in df.columns if df.select(c).dtypes[0][1] == 'string']) # string columns 
     new_df = reduce(lambda df, x: df.withColumn(x, process(x)), string_cols, df) # process all string columns 

     agg_df = new_df.select(*[(1.0 - (count(c)/count('*'))).alias(c) for c in new_df.columns]) # compute non-null/df.count ratio 
     cols_match_threshold = [ key for (key, value) in agg_df.collect()[0].asDict().items() if value < threshold ] # select only cols which value < threshold 

     return new_df.select(cols_match_threshold) 



filter_columns(df, 0.35).show() 
+-----+-----+ 
|ratio| name| 
+-----+-----+ 
| 1.0|user1| 
| 2.0|user2| 
| null|user3| 
| 4.0|user4| 
| 5.0|user5| 
+-----+-----+