0
我想将一列的值与具有参考值范围的另一列进行比较。pyspark比较列值与另一列包含值范围
我曾尝试使用下面的代码尝试:
from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
df1 = sc.parallelize([([1], [1, 2, 3]), ([2], [4, 5, 6,7])]).toDF(["value", "Reference_value"])
intersect = lambda type: (udf(
lambda x, y: (
list(set(x) & set(y)) if x is not None and y is not None else None),
ArrayType(type)))
integer_intersect = intersect(IntegerType())
# df1.select(
# integer_intersect("value", "Reference_value"),
# size(integer_intersect("value", "Reference_value"))).show()
df1=df1.where(size(integer_intersect("value", "Reference_value")) > 0)
df1.show()
上面的代码工作,如果我们像下面创建数据框:
,因为价值和refernce_value列ARRAY_TYPE与long_type 但如果我读数据框与csv然后我无法转换为数组类型。这里DF1从CSV
df1 is as follows df1=
category value Reference value
count 1 1
n_timer n20 n40,n20
frames 54 56
timer n8 n3,n6,n7
pdf FALSE TRUE
zip FALSE FALSE
我想用“Reference_value”列比较“值”列,并推导出两个新的dataframes其中一个数据帧是过滤行,如果值列不在设定的基准读取值。
输出DF2 =
category value Reference value
count 1 1
n_timer n20 n40,n20
zip FALSE FALSE
输出DF3 =
category value Reference value
frames 54 56
timer n8 n3,n6,n7
pdf FALSE TRUE
是有像array_contains任何更简单的方法。我尝试过Array_contains,但不工作
from pyspark.sql.functions import array_contains
df.where(array_contains("Reference_value", df1["vale"]))