2017-07-12 101 views
0

我试图来连接两个相等长度的火花dataframes像 -地图功能火花RDD不工作

DF1 -

| A | 
| 1 | 
| 2 | 
| 3 | 
| 4 | 

DF2 -

| B | 
| a | 
| b | 
| c | 
| d | 

结果DF -

| A | B | 
| 1 | a | 
| 2 | b | 
| 3 | c | 
| 4 | d | 

为此,我正在使用下面的代码 -

val combinedRow = df1.rdd.zip(df2.select("B").rdd). map({ 
case (df1Data, df2Data) => { 
    Row.fromSeq(df1Data.toSeq ++ df2Data.toSeq) 
} 
}) 
val combinedschema = StructType(df1.schema.fields ++ df2.select("B").schema.fields) 
val resultDF = spark.sqlContext.createDataFrame(combinedRow, combinedschema) 

但是代码没有取得任何进展。它也没有显示任何异常。它只是卡住了。 有什么建议可能是错误的吗?提前致谢。

编辑 -

成功执行最新语句后生成的日志。

[main] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 13.848847 ms 
[broadcast-exchange-0] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 14.323824 ms 
[broadcast-exchange-0] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_35 stored as values in memory (estimated size 1024.1 KB, free 871.5 MB) 
[broadcast-exchange-0] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_35_piece0 stored as bytes in memory (estimated size 417.0 B, free 871.5 MB) 
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_35_piece0 in memory on 192.168.20.181:38202 (size: 417.0 B, free: 872.9 MB) 
[broadcast-exchange-0] INFO org.apache.spark.SparkContext - Created broadcast 35 from run at ThreadPoolExecutor.java:1142 
[main] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 27.697751 ms 
[main] INFO org.apache.spark.SparkContext - Starting job: show at Train.scala:180 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 19 (show at Train.scala:180) with 1 output partitions 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 31 (show at Train.scala:180) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 31 (MapPartitionsRDD[106] at show at Train.scala:180), which has no missing parents 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_36 stored as values in memory (estimated size 14.3 KB, free 871.5 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_36_piece0 stored as bytes in memory (estimated size 6.4 KB, free 871.5 MB) 
[dispatcher-event-loop-2] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_36_piece0 in memory on 192.168.20.181:38202 (size: 6.4 KB, free: 872.9 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 36 from broadcast at DAGScheduler.scala:996 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 31 (MapPartitionsRDD[106] at show at Train.scala:180) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 31.0 with 1 tasks 
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 31.0 (TID 1267, localhost, executor driver, partition 0, PROCESS_LOCAL, 5961 bytes) 
[Executor task launch worker for task 1267] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 31.0 (TID 1267) 
[Executor task launch worker for task 1267] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 32.758147 ms 
[Executor task launch worker for task 1267] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 20 (head at Train.scala:161) with 1 output partitions 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 32 (head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 32 (MapPartitionsRDD[110] at head at Train.scala:161), which has no missing parents 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_37 stored as values in memory (estimated size 26.9 KB, free 871.4 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_37_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.4 MB) 
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_37_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 37 from broadcast at DAGScheduler.scala:996 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 32 (MapPartitionsRDD[110] at head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 32.0 with 1 tasks 
[dispatcher-event-loop-2] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 32.0 (TID 1268, localhost, executor driver, partition 0, PROCESS_LOCAL, 5813 bytes) 
[Executor task launch worker for task 1268] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 32.0 (TID 1268) 
[Executor task launch worker for task 1268] INFO org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD - closed connection 
[Executor task launch worker for task 1268] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 32.0 (TID 1268). 1979 bytes result sent to driver 
[task-result-getter-3] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 32.0 (TID 1268) in 132 ms on localhost (executor driver) (1/1) 
[task-result-getter-3] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 32.0, whose tasks have all completed, from pool 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 32 (head at Train.scala:161) finished in 0.128 s 
[Executor task launch worker for task 1267] INFO org.apache.spark.scheduler.DAGScheduler - Job 20 finished: head at Train.scala:161, took 0.140223 s 
[Executor task launch worker for task 1267] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 8.366053 ms 
[Executor task launch worker for task 1267] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 31.0 (TID 1267). 1501 bytes result sent to driver 
[task-result-getter-0] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 31.0 (TID 1267) in 393 ms on localhost (executor driver) (1/1) 
[task-result-getter-0] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 31.0, whose tasks have all completed, from pool 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 31 (show at Train.scala:180) finished in 0.393 s 
[main] INFO org.apache.spark.scheduler.DAGScheduler - Job 19 finished: show at Train.scala:180, took 0.413534 s 
[main] INFO org.apache.spark.SparkContext - Starting job: show at Train.scala:180 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 21 (show at Train.scala:180) with 4 output partitions 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 33 (show at Train.scala:180) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 33 (MapPartitionsRDD[106] at show at Train.scala:180), which has no missing parents 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_38 stored as values in memory (estimated size 14.3 KB, free 871.4 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_38_piece0 stored as bytes in memory (estimated size 6.4 KB, free 871.4 MB) 
[dispatcher-event-loop-2] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_38_piece0 in memory on 192.168.20.181:38202 (size: 6.4 KB, free: 872.9 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 38 from broadcast at DAGScheduler.scala:996 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 4 missing tasks from ResultStage 33 (MapPartitionsRDD[106] at show at Train.scala:180) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 33.0 with 4 tasks 
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 33.0 (TID 1269, localhost, executor driver, partition 1, PROCESS_LOCAL, 5961 bytes) 
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 33.0 (TID 1270, localhost, executor driver, partition 2, PROCESS_LOCAL, 5961 bytes) 
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 33.0 (TID 1271, localhost, executor driver, partition 3, PROCESS_LOCAL, 5961 bytes) 
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 33.0 (TID 1272, localhost, executor driver, partition 4, PROCESS_LOCAL, 5961 bytes) 
[Executor task launch worker for task 1269] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 33.0 (TID 1269) 
[Executor task launch worker for task 1271] INFO org.apache.spark.executor.Executor - Running task 2.0 in stage 33.0 (TID 1271) 
[Executor task launch worker for task 1272] INFO org.apache.spark.executor.Executor - Running task 3.0 in stage 33.0 (TID 1272) 
[Executor task launch worker for task 1270] INFO org.apache.spark.executor.Executor - Running task 1.0 in stage 33.0 (TID 1270) 
[Executor task launch worker for task 1269] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 55.127045 ms 
[Executor task launch worker for task 1271] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 22 (head at Train.scala:161) with 1 output partitions 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 34 (head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 34 (MapPartitionsRDD[117] at head at Train.scala:161), which has no missing parents 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_39 stored as values in memory (estimated size 26.9 KB, free 871.4 MB) 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned shuffle 10 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 31267 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 31268 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25303 
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_34_piece0 on 192.168.20.181:38202 in memory (size: 22.8 KB, free: 872.9 MB) 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25298 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25304 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 31269 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned shuffle 11 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25299 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25301 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25300 
[dispatcher-event-loop-2] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_37_piece0 on 192.168.20.181:38202 in memory (size: 12.7 KB, free: 872.9 MB) 
[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_33_piece0 on 192.168.20.181:38202 in memory (size: 22.6 KB, free: 872.9 MB) 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25305 
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_36_piece0 on 192.168.20.181:38202 in memory (size: 6.4 KB, free: 873.0 MB) 
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25302 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_39_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.6 MB) 
[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_39_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 39 from broadcast at DAGScheduler.scala:996 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 34 (MapPartitionsRDD[117] at head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 34.0 with 1 tasks 
[Executor task launch worker for task 1272] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 92.57204 ms 
[Executor task launch worker for task 1269] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 23 (head at Train.scala:161) with 1 output partitions 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 35 (head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 35 (MapPartitionsRDD[122] at head at Train.scala:161), which has no missing parents 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_40 stored as values in memory (estimated size 26.9 KB, free 871.6 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_40_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.5 MB) 
[dispatcher-event-loop-1] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_40_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 40 from broadcast at DAGScheduler.scala:996 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 35 (MapPartitionsRDD[122] at head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 35.0 with 1 tasks 
[Executor task launch worker for task 1270] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 24 (head at Train.scala:161) with 1 output partitions 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 36 (head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 36 (MapPartitionsRDD[124] at head at Train.scala:161), which has no missing parents 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_41 stored as values in memory (estimated size 26.9 KB, free 871.5 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_41_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.5 MB) 
[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_41_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 41 from broadcast at DAGScheduler.scala:996 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 36 (MapPartitionsRDD[124] at head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 36.0 with 1 tasks 
[Executor task launch worker for task 1272] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 25 (head at Train.scala:161) with 1 output partitions 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 37 (head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List() 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 37 (MapPartitionsRDD[126] at head at Train.scala:161), which has no missing parents 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_42 stored as values in memory (estimated size 26.9 KB, free 871.5 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_42_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.5 MB) 
[dispatcher-event-loop-1] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_42_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB) 
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 42 from broadcast at DAGScheduler.scala:996 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 37 (MapPartitionsRDD[126] at head at Train.scala:161) 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 37.0 with 1 tasks 

而它卡在这里。

+0

我无法重现了“不进取” – eliasah

+0

@eliasah ..我的意思是,它什么都不做,只是卡住。我认为代码不是问题。我已经成功执行了一些语句后添加了日志。 – Ishan

+0

您之后执行了一个操作吗? :| – eliasah

回答

1

我不能在你的代码中复制任何错误,它也应该可以正常工作。

你也可以简单地连接两个dataframes与分配到两个dataframes

df1.withColumn("id", monotonically_increasing_id()) 
.join(df2.withColumn("id", monotonically_increasing_id()), "id").drop("id") 

希望这有助于一个ID!

+0

谢谢你Shankar。但我认为,问题在于火花。它也没有执行你的代码。 – Ishan

+0

错误是什么意思?这对少数数据正常工作,你有多少行 –

+0

是的,我知道。问题不在我想的代码中。我只有10行。 – Ishan

1

您也可以同时使用RDDS zipWithIndex:

val df1 = sc.parallelize(Seq("A", "1", "2", "3", "4")).toDF("A") 
val df2 = sc.parallelize(Seq("B", "a", "b", "c", "d")).toDF("B") 
val zip1 = df1.rdd.zipWithIndex.map { case (k, v) => (v, k.mkString)} 
val zip2 = df2.rdd.zipWithIndex.map { case (k, v) => (v, k.mkString)} 
zip1.join(zip2).map{ case (k, v) => v }.collect()