我使用Scala和星火管理的记录的大数量及每个这些记录在管理一个大aggregateByKey有以下形式:星火:如何在一台机器
single record => (String, Row)
每由组成45种不同类型的值(String
,Integer
,Long
)。
要聚合他们我使用:
myRecords.aggregateByKey (List [Any]()) (
(aggr, value) => aggr ::: (value :: Nil),
(aggr1, aggr2) => aggr1 ::: aggr2
)
的问题是,我得到constanly消息:
15/11/21 17:54:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 147767 ms exceeds timeout 120000 ms
15/11/21 17:54:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 147767 ms
[Stage 3:====> (875 + 24)/3252]
15/11/21 17:57:10 WARN BlockManager: Putting block rdd_14_876 failed
...and finally...
15/11/21 18:00:27 ERROR Executor: Exception in task 876.0 in stage 3.0 (TID 5465)
java.lang.OutOfMemoryError: GC overhead limit exceeded
我可以猜测的是,聚集这么大,匹配新记录的关键是需要越来越多的时间,直到某个任务由于找不到添加记录值的正确位置而超时。
我打了不同的参数,从,如:
spark.default.parallelism => to reduce the size of tasks augmenting this value
spark.executor.memory => usually I put much less then driver memory
spark.driver.memory => the whole driver memory (single machine tho)
--master local[number of cores]
任何想法如何在过程结束时得到不乱内存/超时?
UPDATE
我想基于合并两个CSV文件:
1)基于基于3列CSV列 2)合并加入行,加入他们的行列值 3 )集料/基团此接合&合并的文件与在2做在所述单个集合数据从3一些东西键) 4))
这是代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object MyRecords {
def createKey(k1: String, k2: String, k3: String):String = {
Seq(k1, k2, k3).iterator.map (r => if (r == null) "" else r.trim.toUpperCase).mkString ("")
}
def main(args: Array[String]): Unit = {
val df1FilePath = args (0)
val df2FilePath = args (1)
val sc = new SparkContext (new SparkConf ())
val sqlContext = new SQLContext (sc)
import sqlContext.implicits._
val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df1FilePath).as("one")
df1.registerTempTable("df1")
val df2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df2FilePath)
val df2Renamed = df2.select(
col ("v0").as ("y_v0"),
col ("v1").as ("y_v1"),
col ("v2").as ("y_v2"),
col ("v3").as ("y_v3"),
col ("v4").as ("y_v4"),
col ("v5").as ("y_v5"),
col ("v6").as ("y_v6"),
col ("v7").as ("y_v7"),
col ("v8").as ("y_v8"),
col ("v9").as ("y_v9"),
col ("v10").as ("y_v10"),
col ("v11").as ("y_v11"),
col ("v12").as ("y_v12"),
col ("v13").as ("y_v13"),
col ("v14").as ("y_v14"),
col ("v15").as ("y_15"),
col ("v16").as ("y_16"),
col ("v17").as ("y_17"),
col ("v18").as ("y_18"),
col ("v19").as ("y_19"),
col ("v20").as ("y_20"),
col ("v21").as ("y_21"),
col ("v22").as ("y_22"),
col ("v23").as ("y_23"),
col ("v24").as ("y_24"),
col ("v25").as ("y_25"),
col ("v26").as ("y_26"),
col ("v27").as ("y_27"),
col ("v28").as ("y_28"),
col ("v29").as ("y_29"),
col ("v30").as ("y_30"),
col ("v31").as ("y_31"),
col ("v32").as ("y_32")
).as("two")
df2Renamed.registerTempTable("df2")
val dfJoined = dfArchive.join(df2Renamed, $"one.v0" === $"two.y_v0", "fullouter").as("j")
dfJoined.registerTempTable("joined")
val dfMerged = sqlContext.sql("SELECT * FROM joined").map(r =>
if (r.getAs("y_v1") != null) {
(createKey (r.getAs("y_v2"), r.getAs("y_v3"), r.getAs("y_v4")), r)
} else {
(createKey (r.getAs("v2"), r.getAs("v3"), r.getAs("v4")), r)
})
dfMerged.groupByKey().collect().foreach(println)
sc.stop()
}
}
你可以多显示一下''Row''吗?还有''myRecords''?我认为你的聚合可以变得更有效率 –