2015-11-21 21 views
1

我使用Scala和星火管理的记录的大数量及每个这些记录在管理一个大aggregateByKey有以下形式:星火:如何在一台机器

single record => (String, Row) 

每由组成45种不同类型的值(String,Integer,Long)。

要聚合他们我使用:

myRecords.aggregateByKey (List [Any]()) (
     (aggr, value) => aggr ::: (value :: Nil), 
     (aggr1, aggr2) => aggr1 ::: aggr2 
) 

的问题是,我得到constanly消息:

15/11/21 17:54:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 147767 ms exceeds timeout 120000 ms 

15/11/21 17:54:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 147767 ms 

[Stage 3:====>    (875 + 24)/3252] 

15/11/21 17:57:10 WARN BlockManager: Putting block rdd_14_876 failed 

...and finally... 

15/11/21 18:00:27 ERROR Executor: Exception in task 876.0 in stage 3.0 (TID 5465) 
java.lang.OutOfMemoryError: GC overhead limit exceeded 

我可以猜测的是,聚集这么大,匹配新记录的关键是需要越来越多的时间,直到某个任务由于找不到添加记录值的正确位置而超时。

我打了不同的参数,从​​,如:

spark.default.parallelism => to reduce the size of tasks augmenting this value 

spark.executor.memory => usually I put much less then driver memory 

spark.driver.memory => the whole driver memory (single machine tho) 

--master local[number of cores] 

任何想法如何在过程结束时得到不乱内存/超时?

UPDATE

我想基于合并两个CSV文件:

1)基于基于3列CSV列 2)合并加入行,加入他们的行列值 3 )集料/基团此接合&合并的文件与在2做在所述单个集合数据从3一些东西键) 4))

这是代码:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.storage.StorageLevel._ 
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext} 
import org.apache.spark.{SparkConf, SparkContext} 

object MyRecords { 

    def createKey(k1: String, k2: String, k3: String):String = { 
    Seq(k1, k2, k3).iterator.map (r => if (r == null) "" else r.trim.toUpperCase).mkString ("") 
    } 

    def main(args: Array[String]): Unit = { 

    val df1FilePath = args (0) 
    val df2FilePath = args (1) 

    val sc = new SparkContext (new SparkConf ()) 
    val sqlContext = new SQLContext (sc) 
    import sqlContext.implicits._ 

    val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df1FilePath).as("one") 

    df1.registerTempTable("df1") 

    val df2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df2FilePath) 

    val df2Renamed = df2.select(
     col ("v0").as ("y_v0"), 
     col ("v1").as ("y_v1"), 
     col ("v2").as ("y_v2"), 
     col ("v3").as ("y_v3"), 
     col ("v4").as ("y_v4"), 
     col ("v5").as ("y_v5"), 
     col ("v6").as ("y_v6"), 
     col ("v7").as ("y_v7"), 
     col ("v8").as ("y_v8"), 
     col ("v9").as ("y_v9"), 
     col ("v10").as ("y_v10"), 
     col ("v11").as ("y_v11"), 
     col ("v12").as ("y_v12"), 
     col ("v13").as ("y_v13"), 
     col ("v14").as ("y_v14"), 
     col ("v15").as ("y_15"), 
     col ("v16").as ("y_16"), 
     col ("v17").as ("y_17"), 
     col ("v18").as ("y_18"), 
     col ("v19").as ("y_19"), 
     col ("v20").as ("y_20"), 
     col ("v21").as ("y_21"), 
     col ("v22").as ("y_22"), 
     col ("v23").as ("y_23"), 
     col ("v24").as ("y_24"), 
     col ("v25").as ("y_25"), 
     col ("v26").as ("y_26"), 
     col ("v27").as ("y_27"), 
     col ("v28").as ("y_28"), 
     col ("v29").as ("y_29"), 
     col ("v30").as ("y_30"), 
     col ("v31").as ("y_31"), 
     col ("v32").as ("y_32") 
    ).as("two") 

    df2Renamed.registerTempTable("df2") 

    val dfJoined = dfArchive.join(df2Renamed, $"one.v0" === $"two.y_v0", "fullouter").as("j") 

    dfJoined.registerTempTable("joined") 

    val dfMerged = sqlContext.sql("SELECT * FROM joined").map(r => 
     if (r.getAs("y_v1") != null) { 
     (createKey (r.getAs("y_v2"), r.getAs("y_v3"), r.getAs("y_v4")), r) 
     } else { 
     (createKey (r.getAs("v2"), r.getAs("v3"), r.getAs("v4")), r) 
     }) 

    dfMerged.groupByKey().collect().foreach(println) 

    sc.stop() 
    } 
} 
+0

你可以多显示一下''Row''吗?还有''myRecords''?我认为你的聚合可以变得更有效率 –

回答

2

因为所有你做的,是通过关键组最好是使用groupByKey代替aggregateByKey,尤其是一,其创建的临时对象的像value :: Nil数量庞大(为什么不直接value :: aggr?)。

由于它不执行地图端聚合,它应该减少垃圾收集器的压力(请参阅SPARK-772)。

参见:Is groupByKey ever preferred over reduceByKey

编辑

关于您在更新它并没有真正意义提供的代码。如果您想使用DataFrames,则没有理由首先使用RDDs对数据进行分组。通过保留Strings和转码值可增加内存使用量并强调GC,也可以复制数据。它看起来像你所需要的大致是这样的(用spark-csv一个小的帮助):

// Load data, optionally add .option("inferSchema", "true") 
val df1 = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("delimiter", "\t") 
    .load(file1Path) 

val df2 = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("delimiter", "\t") 
    .load(file2Path) 

// Join and cache 
val df = df1.join(
    df2, 
    // Join condition 
    df1("foo") === df2("foo") && 
    df1("bar") === df2("bar") && 
    df1("baz") === df2("baz"), 
    "fullouter") 
df.registerTempTable("df") 
sqlContext.cacheTable("df") 

// Perform all the required casting using safe cast methods 
// and replace existing columns 
df.withColumn("some_column", $"some_column".cast(IntegerType)) 

您可以根据需要执行您可以将数据帧without physically grouping the data上执行任何聚合。如果你想子集简单地使用wherefilter

+3

我已经期待在这里看到你;)你的答案是高质量的。 – javadba

+0

同样的问题:'java.lang.OutOfMemoryError:Java堆空间java.util.Arrays.copyOf(Arrays.java:2271)' – Randomize

+0

'java.lang.OutOfMemoryError:超出GC开销限制'和'java.lang.OutOfMemoryError :Java中的堆空间不是同一个问题。实际上,为什么你首先将这些数据转换为行和组?或为此收集。 – zero323