2017-07-14 31 views
0

我有一个RDD的字符串。每行对应各种日志。映射函数写在全局火花rdd

我有一个单一函数中的多个正则表达式匹配RDD的行以应用适应的正则表达式。

我想在RDD上映射这个独特的函数,因此它可以快速处理每一行,并将每行处理存储在另一个全局rdd中。

问题是,因为我希望这项任务能够并行化,所以我的全局RDD必须可以同时访问以添加每条处理过的行。

我想知道是否有其他方式来做到这一点或任何事情!我期待着提高我的火花技能。

例如,这就是我想做的事:

我有这样一个txt:

错误:Hahhaha PARAM_ERROR = 8 param_err2 = HTTPS

警告:HUHUHUHUH param_warn = tchu param_warn2 = wifi

我的正则表达式函数会将包含“ERROR”的行与数组匹配,例如Array("Error","8","https")

而另一个正则表达式函数将匹配包含“警告”与阵列例如Array("Warning","tchu","wifi")

行最后,我想获得用于处理每一个线条RDD[Array[String]]

如何让它与Spark并行?

+0

“我有一个单一功能的多个正则表达式匹配/情况下,RDD的线条应用适应正则表达式” - 可以你编辑你的文章以包含这个函数的_signature_? –

回答

2

首先,理解在Spark中没有什么像“全局RDD”,也没有理由需要类似的东西。在使用Spark时,您应该考虑根据转换成另一个RDD,而不是根据更新 RDD(这是不可能的 - RDD是不可变的)。每个这样的转换将由Spark分布式执行(并行)。

在这种情况下,如果我正确理解你的要求,你会想map每个记录到以下结果之一:

  • Array[String],其中第一项是"ERROR",或:
  • 一个Array[String]其中第一项是"WARNING",或:
  • 如果没有模式匹配的记录,删除

要做到这一点,你可以使用RDDmap(f)collect(f)方法:

// Sample data: 
val rdd = sc.parallelize(Seq(
    "ERROR : Hahhaha param_error=8 param_err2=https", 
    "WARNING : HUHUHUHUH param_warn=tchu param_warn2=wifi", 
    "Garbage - not matching anything" 
)) 

// First we can split in " : " to easily identify ERROR vs. WARNING 
val splitPrefix = rdd.map(line => line.split(" : ")) 

// Implement these parsing functions as you see fit; 
// The input would be the part following the " : ", 
// and the output should be a list of the values (not including the ERROR/WARNING) 
def parseError(v: String): List[String] = ??? // example input: "Hahhaha param_error=8 param_err2=https" 
def parseWarning(v: String): List[String] = ??? // example input: "HUHUHUHUH param_warn=tchu param_warn2=wifi" 

// Now we can use these functions in a pattern-matching function passed to RDD.collect, 
// which will transform each value that matches one of the cases, and will filter out 
// values that don't match anything 
val result: RDD[List[String]] = splitPrefix.collect { 
    case Array(l @ "ERROR", v) => l :: parseError(v) 
    case Array(l @ "WARNING", v) => l :: parseWarning(v) 
    // NOT adding a default case, so records that didn't match will be removed 
}  

// If you really want Array[String] and not List[String]:  
val arraysRdd: RDD[Array[String]] = result.map(_.toArray) 
+0

哦,是的该死的,这正是我想要的!非常感谢,我不知道我们可以使用像这样的收集功能。每天都在Spark上进步,谢天谢地! :p – tricky