2016-09-27 59 views

回答

1

您引用的代码是使用Broadcast.unpersist()方法。如果您检查了Spark API Broadcast.unpersist()方法,它会显示“在执行程序上异步删除此广播的缓存副本。如果在调用此广播后使用该广播,则需要将它重新发送给每个执行程序。有一个重载方法unpersist(布尔阻塞),它将阻塞,直到非完成已完成。所以它取决于你如何在Spark应用程序中使用Broadcast变量。在火花中,如果您改变广播变量,则不会自动重新广播。司机必须重新发送。 Spark文档说你不应该修改广播变量(Immutable)以避免在执行器节点处理时出现任何不一致,但是如果你想控制广播变量的生命周期,可以使用unpersist()和destroy()方法。请参考火花jira https://issues.apache.org/jira/browse/SPARK-6404

+0

感谢您的信息。我的用例是我想从远程服务器下载一些Key-Value数据,将其存储为散列表,将其发送/广播给所有执行者进行本地查找。然后说2分钟后,检查远程服务器的新数据,如果有新数据,获取它,添加hashmap,并发送/广播到所有执行者用本地查找与新数据。我想我可以使用2个广播变量来实现这一点。 – sunillp

+0

首次广播具有初始数据的broadcast_var1。稍后,在数据更改时,将广播带有新数据的broadcast_var2,并且broadcast_var1将被取消(blocking = true)。之前使用“broadcast_var1”的执行程序现在将得到异常,并基于该开关切换到“broadcast_var2”。下一次,在数据更改时,同样的事情会重复,但两个广播变量的角色将会改变。你认为这是可能的/有效的? – sunillp

+1

应该使用广播变量向执行者发送一次大数据,而不是频繁地发送。通过在每2分钟内调用unpersist(blocking = true),流处理的性能将会受到阻碍。它也取决于您对广播变量值的逻辑。您的键值数据有多大?你不能把它放入一个闭包中,以便它可以序列化到执行者? – abaghel