2016-12-30 40 views
1

我正在使用Spark Streaming创建系统以丰富云数据库中的传入数据。示例 -在PySpark环境中创建缓存的最佳方式

Incoming Message: {"id" : 123} 
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"} 

我对驱动程序类代码如下:

from Sample.Job import EnrichmentJob 
from Sample.Job import FunctionJob 
import pyspark 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark import SparkContext, SparkConf, SQLContext 
from pyspark.streaming import StreamingContext 
from pyspark.sql import SparkSession 

from kafka import KafkaConsumer, KafkaProducer 
import json 

class SampleFramework(): 

    def __init__(self): 
     pass 

    @staticmethod 
    def messageHandler(m): 
     return json.loads(m.message) 

    @staticmethod 
    def processData(rdd): 

     if (rdd.isEmpty()): 
      print("RDD is Empty") 
      return 

     # Expand 
     expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich) 

     # Score 
     scored_rdd = expanded_rdd.map(FunctionJob.function) 

     # Publish RDD 


    def run(self, ssc): 

     self.ssc = ssc 

     directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \ 
                  {"metadata.broker.list": META, 
                  "bootstrap.servers": SERVER}, \ 
                  messageHandler= SampleFramework.messageHandler) 

     directKafkaStream.foreachRDD(SampleFramework.processData) 

     ssc.start() 
     ssc.awaitTermination() 

代码的富集工作如下: 类EnrichmentJob:

cache = {} 

@staticmethod 
def enrich(data): 

    # Assume that Cloudant Connector using the available config 
    cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"]) 
    final_data = [] 
    for row in data: 
     id = row["id"] 
     if(id not in EnrichmentJob.cache.keys()): 
      data = cloudantConnector.getOne({"id": id}) 
      row["data"] = data 
      EnrichmentJob.cache[id]=data 
     else: 
      data = EnrichmentJob.cache[id] 
      row["data"] = data 
     final_data.append(row) 

    cloudantConnector.close() 

    return final_data 

我的问题是 - 有没有办法保持[1]“主存储器上的全局缓存,所有工作人员都可以访问”或者[2]“每个工作人员的本地缓存,这样他们仍然保持在前台achRDD设置“?

我已经探讨了以下 -

  1. 广播变量 - 在这里,我们去了[1]的方式。据我了解,它们的意图是只读和不可变的。我已经检查了这个reference,但它引用了一个不执行/保持广播变量的例子。这是一个很好的做法吗?

  2. 静态变量 - 这里我们走[2]的方法。被引用的类(在本例中为“Enricher”)以静态变量字典的形式维护缓存。但事实证明,ForEachRDD函数为每个传入的RDD生成一个全新的进程,并删除了以前启动的静态变量。这是上面编码的那个。

我现在所拥有的两种可能的解决方案 -

  1. 维护文件系统上的脱机缓存。
  2. 在我的驱动程序节点上执行此丰富任务的整个计算。这会导致整个数据以驱动程序结束并在那里维护。缓存对象将作为映射函数的参数发送到浓缩作业。

这里显然第一个看起来比第二个好,但我希望得出结论,这两个是唯一的方法,在承诺之前。任何指针将不胜感激!

回答

1

有什么方法来维持[1]“主内存,所有员工都可以访问的全局高速缓存”

号没有“主存”,它可以被所有访问工人。每个工作人员在一个单独的进程中运行,并使用套接字与外部世界通信。更不用说在非本地模式下分离不同物理节点。

有一些技术可以应用于实现工作区域缓存内存映射数据(使用SQLite是最简单的),但它需要一些额外的努力来实现正确的方式(避免冲突等)。

或[2]“每个工人的本地缓存,以便它们保持在foreachRDD设置”?

您可以使用标准缓存技术,其范围仅限于单个工作进程。根据配置(静态与dynamic resource allocationspark.python.worker.reuse),可能会或可能不会保留多个任务和批次之间的配置。

考虑以下,简化,例如:

  • map_param.py

    from pyspark import AccumulatorParam 
    from collections import Counter 
    
    class CounterParam(AccumulatorParam): 
        def zero(self, v: Counter) -> Counter: 
         return Counter() 
    
        def addInPlace(self, acc1: Counter, acc2: Counter) -> Counter: 
         acc1.update(acc2) 
         return acc1 
    
  • my_utils.py

    from pyspark import Accumulator 
    from typing import Hashable 
    from collections import Counter 
    
    # Dummy cache. In production I would use functools.lru_cache 
    # but it is a bit more painful to show with accumulator 
    cached = {} 
    
    def f_cached(x: Hashable, counter: Accumulator) -> Hashable: 
        if cached.get(x) is None: 
         cached[x] = True 
         counter.add(Counter([x])) 
        return x 
    
    
    def f_uncached(x: Hashable, counter: Accumulator) -> Hashable: 
        counter.add(Counter([x])) 
        return x 
    
  • main.py

    from pyspark.streaming import StreamingContext 
    from pyspark import SparkContext 
    
    from counter_param import CounterParam 
    import my_utils 
    
    from collections import Counter 
    
    def main(): 
        sc = SparkContext("local[1]") 
        ssc = StreamingContext(sc, 5) 
    
        cnt_cached = sc.accumulator(Counter(), CounterParam()) 
        cnt_uncached = sc.accumulator(Counter(), CounterParam()) 
    
        stream = ssc.queueStream([ 
         # Use single partition to show cache in work 
         sc.parallelize(data, 1) for data in 
         [[1, 2, 3], [1, 2, 5], [1, 3, 5]] 
        ]) 
    
        stream.foreachRDD(lambda rdd: rdd.foreach(
         lambda x: my_utils.f_cached(x, cnt_cached))) 
        stream.foreachRDD(lambda rdd: rdd.foreach(
         lambda x: my_utils.f_uncached(x, cnt_uncached))) 
    
        ssc.start() 
        ssc.awaitTerminationOrTimeout(15) 
        ssc.stop(stopGraceFully=True) 
    
        print("Counter cached {0}".format(cnt_cached.value)) 
        print("Counter uncached {0}".format(cnt_uncached.value)) 
    
    if __name__ == "__main__": 
        main() 
    

实例运行:

bin/spark-submit main.py 
Counter cached Counter({1: 1, 2: 1, 3: 1, 5: 1}) 
Counter uncached Counter({1: 3, 2: 2, 3: 2, 5: 2}) 

正如你可以看到,我们得到预期的结果:

  • 对于 “缓存” 的对象累加器每唯一的密钥更新一次每个工作者进程(分区)。
  • 对于未缓存的对象,累加器在每次发生密钥时都会更新。