我正在使用Spark Streaming创建系统以丰富云数据库中的传入数据。示例 -在PySpark环境中创建缓存的最佳方式
Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}
我对驱动程序类代码如下:
from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from kafka import KafkaConsumer, KafkaProducer
import json
class SampleFramework():
def __init__(self):
pass
@staticmethod
def messageHandler(m):
return json.loads(m.message)
@staticmethod
def processData(rdd):
if (rdd.isEmpty()):
print("RDD is Empty")
return
# Expand
expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich)
# Score
scored_rdd = expanded_rdd.map(FunctionJob.function)
# Publish RDD
def run(self, ssc):
self.ssc = ssc
directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \
{"metadata.broker.list": META,
"bootstrap.servers": SERVER}, \
messageHandler= SampleFramework.messageHandler)
directKafkaStream.foreachRDD(SampleFramework.processData)
ssc.start()
ssc.awaitTermination()
代码的富集工作如下: 类EnrichmentJob:
cache = {}
@staticmethod
def enrich(data):
# Assume that Cloudant Connector using the available config
cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"])
final_data = []
for row in data:
id = row["id"]
if(id not in EnrichmentJob.cache.keys()):
data = cloudantConnector.getOne({"id": id})
row["data"] = data
EnrichmentJob.cache[id]=data
else:
data = EnrichmentJob.cache[id]
row["data"] = data
final_data.append(row)
cloudantConnector.close()
return final_data
我的问题是 - 有没有办法保持[1]“主存储器上的全局缓存,所有工作人员都可以访问”或者[2]“每个工作人员的本地缓存,这样他们仍然保持在前台achRDD设置“?
我已经探讨了以下 -
广播变量 - 在这里,我们去了[1]的方式。据我了解,它们的意图是只读和不可变的。我已经检查了这个reference,但它引用了一个不执行/保持广播变量的例子。这是一个很好的做法吗?
静态变量 - 这里我们走[2]的方法。被引用的类(在本例中为“Enricher”)以静态变量字典的形式维护缓存。但事实证明,ForEachRDD函数为每个传入的RDD生成一个全新的进程,并删除了以前启动的静态变量。这是上面编码的那个。
我现在所拥有的两种可能的解决方案 -
- 维护文件系统上的脱机缓存。
- 在我的驱动程序节点上执行此丰富任务的整个计算。这会导致整个数据以驱动程序结束并在那里维护。缓存对象将作为映射函数的参数发送到浓缩作业。
这里显然第一个看起来比第二个好,但我希望得出结论,这两个是唯一的方法,在承诺之前。任何指针将不胜感激!