2017-01-03 67 views
3

我试图调整下面的过程,因为我有一个非常Java heap space error.调整星火工作

望着星火UI,有一个cogroup,在一个非常奇怪的方式表现。 在这个阶段之前,一切看起来都很平衡(目前我已经硬编码了48个分区)。在方法loadParentMPoint中有cogroup转换,基本上当我要执行下一个计数时,cogroup被计算,基本上有48个任务被安排,但是其中有47个立即终止(似乎没有什么可处理的),除非开始做洗牌读取,直到它填满堆空间并引发异常。

我已经用相同的数据集启动了几次进程,并且结束总是相同的。 每次它只是一个执行者,而以前是平衡的。

为什么我有这种行为?也许我错过了什么?我在cogroup之前试过repartition的数据,因为我认为它是不平衡的,但它不起作用,当我试图使用partitionBy时也是如此。

这是代码摘录:

class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler { 

    implicit val ctx = sc 
    val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess]) 
    val ipc = new Handler[ConsumptionComputationBigDataIPC] 
    val billingOrderDao = new Handler[BillingOrderDao] 
    val mPointDao = new Handler[MeasurementPointDAO] 
    val billingOrderBDao = new Handler[BillingOrderBDAO] 
    val ccmDiscardBdao = new Handler[CCMDiscardBDAO] 
    val ccmService = new Handler[ConsumptionComputationBillingService] 
    val registry = new Handler[IncrementalRegistryTableData] 
    val podTimeZoneHelper = new Handler[PodDateTimeUtils] 
    val billingPodStatusDao = new Handler[BillingPodStatusBDAO] 
    val config = new Handler[PropertyManager] 
    val paramFacade = new Handler[ConsumptionParameterFacade] 
    val consumptionMethods = new Handler[ConsumptionMethods] 
    val partitions = config.get.defaultPartitions() 
    val appName = sc.appName 
    val appId = sc.applicationId 
    val now = new DateTime 

    val extracted = ctx.accumulator(0l, "Extracted from planning") 
    val generated = ctx.accumulator(0l, "Billing orders generated") 
    val discarded = ctx.accumulator(0l, "Billing orders discarded") 

    // initialize staging 
    val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea()) 
    staging.prepareReading 

    val rddExtractedFromPlanning = staging 
     .read[ExtractedPO]() 
     .repartition(48) 
     .setName("rddExtractedFromPlanning") 
     .cache 

    val rddExtracted = rddExtractedFromPlanning 
     .filter { x => 
     extracted += 1 
     (x.getExtracted == EExtractedType.EXTRACTED || 
     x.getExtracted == EExtractedType.EXTRACTED_BY_USER || 
     x.getExtracted == EExtractedType.EXTRACTED_BY_TDC) 
     } 
     .map { x => 
     log.info("1:extracted>{}", x) 
     val bo = MapperUtil.mapExtractedPOtoBO(x) 
     bo 
     } 

    val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e => 
     val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate()) 
     val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr) 
     log.info("2:last Billing order>{}", last); 
     (e.getPod, e, last) 
    } 
     .setName("podWithExtractedAndLastBillingOrderPO") 
     .cache() 

    val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3)))) 

    val rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO 
     .map(e => (e._1,1)) 
     .reduceByKey(_+_) 
     .keys 

    val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List()) 

    val rddExtractedWithMPoint = ConsumptionComputationUtil 
     .groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory) 
     .filter{ e => 
     val mPoint = e._3 
     val condition = mPoint != null 
     condition match { 
      case false => log.error("MPoint is NULL for POD -> " + e._1) 
      case true => 
     } 
     condition 
     } 
     .setName("rddExtractedWithMPoint") 
     .cache 

    rddExtractedWithMPoint.count 

    val rddExtractedWithMPointWithParent = ConsumptionComputationUtil 
     .groupWithParent(rddExtractedWithMPoint) 
     .map{ 
     case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) => 
      if (!parentMpointId.isEmpty) { 
      val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get) 
      log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod) 
      (pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory) 
      } else { 
      log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod) 
      (pod, extracted, measurementPoint, billOrder, null, factory) 
      } 
     } 
     .setName("rddExtractedWithMPointWithParent") 
     .cache() 

    rddExtractedWithMPointWithParent.count 

    val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent 
     .filter(e => Option(e._5).isDefined) 
     .map(e => (e._5,1)) 
     .reduceByKey(_+_) 
     .keys 

    rddRegistryFactoryParentKeys.count 

    val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List()) 

    rddRegistryFactoryParent.count 

    val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder] 

    val rddNew = rddExtractedWithMPointWithParent.map({ 
     case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) => 
     (parentPod, (pod, extracted, measurementPoint, billingOrder, factory)) 
    }) 
    rddNew.count 

    val p = rddNew.cogroup(rddRegistryFactoryParent) 
    p.count 

    val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty } 
    .flatMap{ case (pod, (inputs, mpFactories)) => 
     val factory = mpFactories.headOption //eventually one or none factory 
     val results = inputs.map{e => 
      val measurementPointTupla = factory.flatMap{f => 
      Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f) 
     } 
      val tupla = measurementPointTupla.getOrElse(null) 
      val toBeBilled = if(tupla!=null && tupla._1!=null) false else true 
      val m = if(tupla!=null && tupla._1!=null) tupla._1 else null 
      val f = if(tupla!=null && tupla._2!=null) tupla._2 else null 
      (e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f) 
     } 
     results 
    } 
    .setName("rddExtractedWithMPointWithMpointParent") 
    .cache() 

    rddExtractedWithMPointWithMpointParent.foreach({ e => 
     log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1) 
    }) 
} 

这些是参与到协同组操作两个RDDS阶段,rddNew:

enter image description here

rddRegistryFactory:

enter image description here

,这是协同组的阶段:

enter image description here

这是存储的情况:

enter image description here

这是执行人凸片:

enter image description here

注:我添加了计数操作仅用于调试目的。

UPDATE:

  • 我试图删除缓存ADN再次启动的过程中,现在每个执行人大约有用于存储数据100M,但行为是一样的:随机读恰好一遗嘱执行人。
  • 我也试图在cogroup之前做相同的两个RDD之间的连接操作,只是为了知道我遇到的问题是仅与cogroup相关还是扩展到了所有宽转换以及连接,行为已经完全一样。
+0

好像你的“缓存”正在创造内存压力。为什么要在这里缓存?你有没有尝试过缓存? –

+0

我添加了两张图片,代表存储和执行者的情况。也许是有堆压力的一点点,但行为是奇怪的,这可能只是缓存滥用? – Giorgio

+0

有各种各样的因素没有一个,请删除缓存,并看到 –

回答

2

我解决了它,问题与partitioning有关。基本上,数据到rdd调用cogroup操作中的所有密钥都具有相同的值,因此当发生cogroup时,Spark试图对两个RDD进行散列分区,将两个rdd的密钥放在同一个执行程序中以便对它们进行组合。

+0

那么你如何解决它? –

+0

基本上问题不在于cogroup本身,而是在执行cogroup的那一刻,数据完全没有分区,实际上我发现了一个导致所有数据进入同一个分区的bug,所以修复只是简单地修复了进入cogroup的数据。 – Giorgio

+0

@Giorgio,你好我在使用'cogroup'时遇到了内存问题,任何建议和提示都会很棒,因为我是新手,并且不知道如何解决它。请在这里找到问题[https://stackoverflow.com/questions/47180307/how-to-use-cogroup-for-large-datasets] – Vignesh

2
  • 我坚信这Java heap space error是因为缓存RDDS这似乎是基于你的最后截屏存储选项卡没有必要的。

enter image description here

根据数据集多少次访问,并参与这样的工作量,重新计算可以比增加的内存压力所付出的代价要快。

毫无疑问,如果你只读过一个数据集而没有缓存它,它实际上会让你的工作变慢。

  • 为了计算调试目的,您可以使用countApprox()而不是count。一旦测试完成,你可以将其删除,以便实际使用你的工作
+0

我跑了它,但行为是一样的,正在从一个执行者读取洗牌。现在执行者可以免费存储(每个只有大约100M)。任何其他建议? – Giorgio

+0

确定堆空间错误再次出现? –

+0

是的,和以前完全一样,请看帖子,我已经用我做的下一个测试更新了它 – Giorgio