2017-03-28 59 views
0

我在写一个Spark作业,它接收来自多个源的数据,过滤不良的输入行并输出稍微修改后的输入版本。该作业有两个附加要求:Spark累加器混淆

  • 我必须跟踪每个来源的错误输入行数,以通知这些上游提供商。
  • 我必须支持每个源的输出限制。

工作看起来很简单,我使用累加器来跟踪问题,以跟踪每个源过滤的行数。但是,当我执行最后的.limit(N)时,我的累加器行为发生了变化。下面是触发对单一来源的行为的一些条纹下来示例代码:

from pyspark.sql import Row, SparkSession 
from pyspark.sql.types import * 
from random import randint 

def filter_and_transform_parts(rows, filter_int, accum): 
    for r in rows: 
     if r[0] == filter_int: 
      accum.add(1) 
      continue 

     yield r[0], r[1] + 1, r[2] + 1 

def main(): 
    spark= SparkSession \ 
      .builder \ 
      .appName("Test") \ 
      .getOrCreate() 

    sc = spark.sparkContext 
    accum = sc.accumulator(0) 

    # 20 inputs w/ tuple having 4 as first element                                               
    inputs = [(4, randint(1, 10), randint(1, 10)) if x % 5 == 0 else (randint(6, 10), randint(6, 10), randint(6, 10)) for x in xrange(100)] 

    rdd = sc.parallelize(inputs) 
    # filter out tuples where 4 is first element                                               
    rdd = rdd.mapPartitions(lambda r: filter_and_transform_parts(r, 4, accum)) 


    # if not limit, accumulator value is 20                                                
    # if limit and limit_count <= 63, accumulator value is 0                                            
    # if limit and limit_count >= 64, accumulator value is 20                                            
    limit = True 
    limit_count = 63 

    if limit: 
     rdd = rdd.map(lambda r: Row(r[0], r[1], r[2])) 
     df_schema = StructType([StructField("val1", IntegerType(), False), 
           StructField("val2", IntegerType(), False), 
           StructField("val3", IntegerType(), False)]) 
     df = spark.createDataFrame(rdd, schema=df_schema) 
     df = df.limit(limit_count) 
     df.write.mode("overwrite").csv('foo/') 
    else: 
     rdd.saveAsTextFile('foo/') 

    print "Accum value: {}".format(accum.value) 

if __name__ == "__main__": 
    main() 

的问题是,我的蓄能器有时会报告过滤的行数,有时并不取决于限制规定,并投入数为一个来源。但是,在所有情况下,被过滤的行都不会进入输出,这意味着过滤器发生了,并且累加器应该有一个值。

如果您可以对此有所了解,那会非常有帮助,谢谢!

更新

  • 添加rdd.persist()调用后mapPartitions作出累加器行为是一致的。

回答

0

其实,limit_count的值是什么并不重要。

之所以有时Accum value为0,是因为你performe 蓄电池转换(例如:rdd.map,rdd.mapPartitions)。 (:rdd.foreach如)

让我们做出改变你的代码一点点:

from pyspark.sql import * 
from random import randint 


def filter_and_transform_parts(rows, filter_int, accum): 
    for r in rows: 
     if r[0] == filter_int: 
      accum.add(1) 


def main(): 
    spark = SparkSession.builder.appName("Test").getOrCreate() 

    sc = spark.sparkContext 
    print(sc.applicationId) 
    accum = sc.accumulator(0) 

    inputs = [(4, x * 10, x * 100) if x % 5 == 0 else (randint(6, 10), x * 10, x * 100) for x in xrange(100)] 
    rdd = sc.parallelize(inputs) 
    rdd.foreachPartition(lambda r: filter_and_transform_parts(r, 4, accum)) 

    limit = True 
    limit_count = 10 or 'whatever' 

    if limit: 
     rdd = rdd.map(lambda r: Row(val1=r[0], val2=r[1], val3=r[2])) 
     df = spark.createDataFrame(rdd) 
     df = df.limit(limit_count) 
     df.write.mode("overwrite").csv('file:///tmp/output') 
    else: 
     rdd.saveAsTextFile('file:///tmp/output') 

    print "Accum value: {}".format(accum.value) 


if __name__ == "__main__": 
    main() 

总积累值

,在累的作品,以及内部 行动

星火只担保一直等于20

欲了解更多信息:

http://spark.apache.org/docs/2.0.2/programming-guide.html#accumulators

+0

感谢您的链接到相关的文档。尽管如此,我仍然有点困惑,你是否发现以下陈述是正确的? saveAsTextFile是一个动作,并触发我的上游累加器正确填充。 df.write不是一个动作,也不会对累加器作任何保证(这会导致它根据极限有时填充的情况) – cbrown