2016-08-15 308 views
2

使用带有pyspark的Apache Spark 2.0,我有一个包含1000行数据的DataFrame,并且希望将该DataFrame分割/分割成2个独立的DataFrame;在Apache Spark中分割数据帧

  • 第一个数据帧应包含第750行
  • 第二个数据帧应该包含剩余的250行

注:随机种子是不够的,因为我打算重复这种分裂方法多次,并希望控制哪些数据用于第一个和第二个DataFrame。

我发现take(n)方法对生成第一个结果很有用。
但我似乎无法找到正确的方式(或任何方式)获取第二个DataFrame。

任何指针在正确的方向将不胜感激。

在此先感谢。

更新:我现在已经设法通过排序和再次应用take(n)来找到解决方案。这仍然感觉虽然次优解:

# First DataFrame, simply take the first 750 rows 
part1 = spark.createDataFrame(df.take(750)) 
# Second DataFrame, sort by key descending, then take 250 rows 
part2 = spark.createDataFrame(df.rdd.sortByKey(False).toDF().take(250)) 
# Then reverse the order again, to maintain the original order 
part2 = part2.rdd.sortByKey(True).toDF() 
# Then rename the columns as they have been reset to "_1" and "_2" by the sorting process 
part2 = part2.withColumnRenamed("_1", "label").withColumnRenamed("_2", "features") 

回答

3

你是对使用取,因为它绘制数据的驱动程序,然后重新分配createDataFrame它在集群质疑。如果您的驱动程序没有足够的内存来存储数据,则效率低下并可能失败。

下面是创建一个行索引列和片上解决方案:

from pyspark.sql.functions import monotonicallyIncreasingId 

idxDf = df.withColumn("idx", monotonicallyIncreasingId()) 
part1 = idxDf.filter('idx < 750') 
part2 = idxDf.filter('idx >= 750')