我们有一个火花流应用,我们从卡夫卡收到DSTREAM和需要存储到dynamoDB ....我与两个方面做试验如下星火地图VS foreachRdd
代码描述requestsWithState是DSTREAM
代码段1与foreachRDD:
requestsWithState.foreachRDD { rdd =>
println("Data being populated to Pulsar")
rdd.foreach { case (id, eventStream) =>
println("id is " + id + " Event is " + eventStream)
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
代码段2与图:
requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
requestsWithState.print(1)
代码Snippet1工作的罚款,并填充数据库...第二个代码片段不起作用....我们新手的火花,很想知道它背后的原因和方式让它工作? ........我们正在试验的原因(我们知道这是一个转换,foreachRdd是一个操作)foreachRdd对于集群负载较重的用例非常缓慢,我们发现如果映射更快我们可以得到它的工作.....请帮助我们获取地图代码工作
你应该(几乎)从来没有在地图或flatMap副作用! – JiriS