1

我使用pyspark与Kafka接收器来处理推文流。我的应用程序的其中一个步骤包括致电Google Natural Language API以获取每条推文的情绪分数。但是,我看到API每次处理的推文都会接到几个电话(我在Google云端控制台中看到了电话号码)。另外,如果我打印tweetIDs(映射函数内),我会得到相同的ID 3或4次。在我的应用程序结束时,推文被发送到卡夫卡的另一个主题,我得到了正确的推文数(没有重复的ID),所以原则上一切正常,但我不知道如何避免调用Google API每次推文不止一次。火花流媒体重复网络电话

这是否与Spark或Kafka中的一些配置参数有关?

这里是我的控制台输出的一个例子:

TIME 21:53:36: Google Response for tweet 801181843500466177 DONE! 
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE! 
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE! 
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE! 
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE! 
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE! 
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE! 
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE! 

但在卡夫卡接收我只得到4周处理的tweet(这是接受,因为他们只有4个独特的鸣叫正确的事)。

执行此代码是:

def sendToKafka(rdd,topic,address): 
    publish_producer = KafkaProducer(bootstrap_servers=address,\ 
          value_serializer=lambda v: json.dumps(v).encode('utf-8')) 
    records = rdd.collect() 
    msg_dict = defaultdict(list) 
    for rec in records: 
     msg_dict["results"].append(rec) 
    publish_producer.send(resultTopic,msg_dict) 
    publish_producer.close() 


kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1}) 

dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\ 
       .map(lambda post: add_normalized_text(post))\ 
       .map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\ 
       .filter(lambda post: post["keywords"] == True)\ 
       .map(lambda post: googleNLP.complementTweetFeatures(post,job_id)) 

dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS)) 
+0

你已经做了什么?你可以把你的代码粘贴到这个问题上吗? –

+0

我用代码更新了问题。 googleNLP.complementTweetFeatures()向Google API发出一个请求并返回响应。 –

回答

1

我已经找到了解决这个!我刚刚与缓存DSTREAM:

dstream_tweets.cache() 

的多个网络调用的发生是因为星火重新计算该DSTREAM内RDDS在我的脚本执行后操作之前。当我缓存()DStream时,只需要计算一次;并且由于它被保存在内存中,以后的函数可以在不重新计算的情况下访问这些信息(在这种情况下,需要重新计算再次调用API,因此值得花费更多的内存使用量)。