2017-10-06 43 views
0

我想在Spark DataFrame中使用Pyspark创建一个新列,它代表基于交替布尔值组的自动增量(或ID)。可以说我有以下数据框:Pyspark自动增量交替组值

df.show() 
+-----+------------+-------------+ 
|id |par_id  |is_on  | 
+-----+------------+-------------+ 
|40002|1   |true   | 
|40003|2   |true   | 
|40004|null  |false  | 
|40005|17   |true   | 
|40006|2   |true   | 
|40007|17   |true   | 
|40008|240   |true   | 
|40009|1861  |true   | 
|40010|1862  |true   | 
|40011|2   |true   | 
|40012|null  |false  | 
|40013|1863  |true   | 
|40014|626   |true   | 
|40016|208   |true   | 
|40017|2   |true   | 
|40018|null  |false  | 
|40019|2   |true   | 
|40020|1863  |true   | 
|40021|2   |true   | 
|40022|2   |true   | 
+-----+------------+-------------+ 

我想与使用is_on属性称为id2增量ID扩展这个数据帧。也就是说,每组布尔值应该得到一个增加的id。由此产生的DataFrame应该看起来像这样:

df.show() 
+-----+------------+-------------+-----+ 
|id |par_id  |is_on  |id2 | 
+-----+------------+-------------+-----+ 
|40002|1   |true   |1 | 
|40003|2   |true   |1 | 
|40004|null  |false  |2 | 
|40005|17   |true   |3 | 
|40006|2   |true   |3 | 
|40007|17   |true   |3 | 
|40008|240   |true   |3 | 
|40009|1861  |true   |3 | 
|40010|1862  |true   |3 | 
|40011|2   |true   |3 | 
|40012|null  |false  |4 | 
|40013|1863  |true   |5 | 
|40014|626   |true   |5 | 
|40016|208   |true   |5 | 
|40017|2   |true   |5 | 
|40018|null  |false  |6 | 
|40019|2   |true   |7 | 
|40020|1863  |true   |7 | 
|40021|2   |true   |7 | 
|40022|2   |true   |7 | 
+-----+------------+-------------+-----+ 

您有任何建议吗?我如何为此编写用户定义函数?

回答

-1
 #this is python spark testing file 

     from pyspark.sql import SparkSession 
     from pyspark.sql.functions import count, col, udf, struct 
     from pyspark.sql.functions import * 
     from pyspark.sql.types import * 

     spark=SparkSession.builder.master("local").appName("durga prasad").config("spark.sql.warehouse.dir","/home/hadoop/spark-2.0.1-bin-hadoop2.7/bin/test_warehouse").getOrCreate() 
     df=spark.read.csv("/home/hadoop/stack_test.txt",sep=",",header=True) 


     # This is udf 

     count=1 # these variable is changed based on function call 
     prStr='' # these variable is changed based on function call 
     def test_fun(str): 
      global count 
      global prStr 
      if str=="false": 
      count=count + 1 
      prStr=str 
      return count 
      if str=="true" and prStr =='false': 
      count=count + 1 
      prStr=str 
      return count 
      elif str=='true': 
      count=count 
      prStr=str 
      return count 
     # udf function end 


     testUDF = udf(test_fun, StringType()) # register udf 
     df.select("id","par_id","is_on",testUDF('is_on').alias("id2")).show() 


     ####output 
       +-----+------+-----+---+ 
       | id|par_id|is_on|id2| 
       +-----+------+-----+---+ 
       |40002|  1| true| 1| 
       |40003|  2| true| 1| 
       |40004| null|false| 2| 
       |40005| 17| true| 3| 
       |40006|  2| true| 3| 
       |40007| 17| true| 3| 
       |40008| 240| true| 3| 
       |40009| 1861| true| 3| 
       |40010| 1862| true| 3| 
       |40011|  2| true| 3| 
       |40012| null|false| 4| 
       |40013| 1863| true| 5| 
       |40014| 626| true| 5| 
       |40016| 208| true| 5| 
       |40017|  2| true| 5| 
       |40018| null|false| 6| 
       |40019|  2| true| 7| 
       |40020| 1863| true| 7| 
       |40021|  2| true| 7| 
       |40022|  2| true| 7| 
       +-----+------+-----+---+