2017-09-03 130 views
0

我想将一列的值与具有参考值范围的另一列进行比较。pyspark比较列值与另一列包含值范围

我曾尝试使用下面的代码尝试:

from pyspark.sql.functions import udf, size 
from pyspark.sql.types import * 
df1 = sc.parallelize([([1], [1, 2, 3]), ([2], [4, 5, 6,7])]).toDF(["value", "Reference_value"]) 
intersect = lambda type: (udf(
    lambda x, y: (
     list(set(x) & set(y)) if x is not None and y is not None else None), 
    ArrayType(type))) 
integer_intersect = intersect(IntegerType()) 

# df1.select(
#  integer_intersect("value", "Reference_value"), 
#  size(integer_intersect("value", "Reference_value"))).show() 
df1=df1.where(size(integer_intersect("value", "Reference_value")) > 0) 
df1.show() 

上面的代码工作,如果我们像下面创建数据框:

,因为价值和refernce_value列ARRAY_TYPE与long_type 但如果我读数据框与csv然后我无法转换为数组类型。这里DF1从CSV

df1 is as follows df1= 

category value Reference value 

count   1  1 
n_timer  n20  n40,n20 
frames   54  56 
timer   n8  n3,n6,n7 
pdf   FALSE  TRUE 
zip   FALSE  FALSE 

我想用“Reference_value”列比较“值”列,并推导出两个新的dataframes其中一个数据帧是过滤行,如果值列不在设定的基准读取值。

输出DF2 =

category  value  Reference value 

    count   1  1 
    n_timer  n20  n40,n20 
    zip   FALSE  FALSE 

输出DF3 =

category value Reference value 

frames   54  56 
timer   n8  n3,n6,n7 
pdf   FALSE  TRUE 

是有像array_contains任何更简单的方法。我尝试过Array_contains,但不工作

from pyspark.sql.functions import array_contains 
df.where(array_contains("Reference_value", df1["vale"])) 

回答

-2
#One can copy paste the below code for direct input and outputs 

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql import Row 
from pyspark.sql.functions import udf, size 
from pyspark.sql.types import * 
from pyspark.sql.functions import split 
sc = SparkContext.getOrCreate() 
sqlContext = SQLContext.getOrCreate(sc) 
df1 = sc.parallelize([("count","1","1"), ("n_timer","n20","n40,n20"), ("frames","54","56"),("timer","n8","n3,n6,n7"),("pdf","FALSE","TRUE"),("zip","FALSE","FALSE")]).toDF(["category", "value","Reference_value"]) 
print(df1.show()) 
df1=df1.withColumn("Reference_value", split("Reference_value", ",\s*").cast("array<string>")) 
df1=df1.withColumn("value", split("value", ",\s*").cast("array<string>")) 
intersect = lambda type: (udf(
    lambda x, y: (
     list(set(x) & set(y)) if x is not None and y is not None else None), 
    ArrayType(type))) 
string_intersect = intersect(StringType()) 
df2=df1.where(size(string_intersect("value", "Reference_value")) > 0) 
df3=df1.where(size(string_intersect("value", "Reference_value")) <= 0) 
print(df2.show()) 
print(df3.show()) 

input df1= 
+--------+-----+---------------+ 
|category|value|Reference_value| 
+--------+-----+---------------+ 
| count| 1|    1| 
| n_timer| n20|  n40,n20| 
| frames| 54|    56| 
| timer| n8|  n3,n6,n7| 
|  pdf|FALSE|   TRUE| 
|  zip|FALSE|   FALSE| 
+--------+-----+---------------+ 

df2= 
+--------+-------+---------------+ 
|category| value|Reference_value| 
+--------+-------+---------------+ 
| count| [1]|   [1]| 
| n_timer| [n20]|  [n40, n20]| 
|  zip|[FALSE]|  [FALSE]| 
+--------+-------+---------------+ 

df3= 
+--------+-------+---------------+ 
|category| value|Reference_value| 
+--------+-------+---------------+ 
| frames| [54]|   [56]| 
| timer| [n8]| [n3, n6, n7]| 
|  pdf|[FALSE]|   [TRUE]| 
+--------+-------+---------------+ 
相关问题