1

DataFramesUDF避免竞争条件这里的问题是你如何重新使用UDF的对象,但避免竞争状态?星火1.5.2在对象重新使用

我使用的是UDF我的火花应用程序内和单元测试,由于竞争的条件似乎不确定性。有时,他们有时会通过他们失败...

我试图通过创建并将其传递给UDF求效益,以强制再利用的对象。然而,似乎共享相同的Spark Context和JVM的单独“测试”正在使用这些对象并导致错误。

def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={ 
    sdfOut.format(sdfIn.parse(input)) 
    } 

    val datePartitionFormat = new SimpleDateFormat("yyyyMMdd") 
    val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd") 
    val validDateFormat = new SimpleDateFormat("yyyy-MM-dd") 

    val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat)) 
    val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat)) 

有时,当我跑我的单元测试,我得到以下错误使用此项功能:

17/01/13 11时45分45秒ERROR执行人:异常的任务0.0舞台2.0 (TID 2)java.lang.NumberFormatException:多点在 sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)at java.lang。 Double.parseDouble(Double.java:538) java.text.DigitList.getDouble(DigitList。 java:169) java.text.DecimalFormat.parse(DecimalFormat.java:2056)at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1867)at java.text.SimpleDateFormat.parse(SimpleDateFormat.java: 1514)在 java.text.DateFormat.parse(DateFormat.java:364)在 com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF $ .reformatDate(mDnsPreviouslySeenDomainsStartOfDayDF.scala:22)

我使用的功能,像这样:

val df = df2 
    .filter(
    datediff(
     to_date(partitionToDateUDF($"dt")) 
     ,to_date(dTStampToDate($"d_last_seen")) 
    ) < 90 
) 

并且在调试已发现了输入“DF2”为:

+-----------+--------+-------------------------+--------------------------------+ 
|d_last_seen|  dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")| 
+-----------+--------+-------------------------+--------------------------------+ 
| 2016/11/02|20161102|2016-11-02    |2016-11-02      | 
| 2016/11/01|20161102|2016-11-02    |2016-11-01      | 
+-----------+--------+-------------------------+--------------------------------+ 

我使用conf.setMaster(“本地[2]”),会不会是火花使用线程,并且因此共享相同的JVM运行时在本地,但是这种情况在部署时不会发生,因为独立的执行程序将拥有自己的JVM,因此它们拥有自己的对象实例?

回答

2

SimpleDateFormat是不是线程安全的(见例如Why is Java's SimpleDateFormat not thread-safe?)。这意味着,如果你在任何UDF使用它(即使是在一个星星之火工作),你可能会得到意想不到的结果,因为火花将在几个任务其在单独线程多线程访问它在结束了运行使用UDF同一时间。对于本地模式和实际分布式集群都是如此 - 单个副本将被每个执行程序上的多个线程使用。为了克服这一点 - 只需使用不同的格式化程序,其中线程安全的,例如,乔达的DateTimeFormatter

+1

谢谢Tzach, 我想补充一点,这里的整体问题的答案是: 由于多个任务在每个执行程序的多个线程上运行,因此您必须在Spark UDF中保持线程安全。 Tzach为我提供的解决方案是线程安全的。 –