2017-02-22 36 views
0

我们有一个火花流应用,我们从卡夫卡收到DSTREAM和需要存储到dynamoDB ....我与两个方面做试验如下星火地图VS foreachRdd

代码描述requestsWithState是DSTREAM

代码段1与foreachRDD:

requestsWithState.foreachRDD { rdd => 
    println("Data being populated to Pulsar") 
    rdd.foreach { case (id, eventStream) => 
    println("id is " + id + " Event is " + eventStream) 
    DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
    } 
} 

代码段2与图:

requestsWithState.map (rdd => { rdd match { 
     case (id, eventStream) => { 
      println("id is " + id + " Event is " + eventStream) 
      val dynamoConnection = setupDynamoClientConnection() 
      DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
     } 
     } 
    }) 

requestsWithState.print(1) 

代码Snippet1工作的罚款,并填充数据库...第二个代码片段不起作用....我们新手的火花,很想知道它背后的原因和方式让它工作? ........我们正在试验的原因(我们知道这是一个转换,foreachRdd是一个操作)foreachRdd对于集群负载较重的用例非常缓慢,我们发现如果映射更快我们可以得到它的工作.....请帮助我们获取地图代码工作

+0

你应该(几乎)从来没有在地图或flatMap副作用! – JiriS

回答

0

Map是Spark中的一种转换(惰性转换),除非在此之后调用spark动作,否则不会执行。 火花改造和行动,是指下面的链接 http://spark.apache.org/docs/latest/programming-guide.html#transformations

+0

.....我在地图requestsWithState.print(1)后有一个动作,但它仍然不起作用我已经更新了相应的问题请看看 – user2359997

+0

RDD是不可变的,所以地图会返回一个新的rdd。因此,请尝试使用'code' requestsWithState = requestsWithState.map(rdd => {rdd match {id,eventStream)=> {println(“id is”+ id +“Event is”+ eventStream) val dynamoConnection = setupDynamoClientConnection() DBUtils.putItem(dynamoConnection,ID,eventStream.toString()) } } }) requestsWithState.print(1)'code' – Neetika

0

map版本没有任何行动,.map不是一个动作,而是转型。

如果不采取措施,转换不会被执行。

参见例如http://training.databricks.com/visualapi.pdfhttp://spark.apache.org/docs/latest/programming-guide.html#transformations

+0

我确实有地图requestsWithState.print后的动作(1)但仍然不起作用我已经更新了相应的问题请看看 – user2359997

+0

嗨,你添加一个动作requestWithState.print(1)? 我知道这是一个古老的问题,但我正在通过您的使用案例进行调整。谢谢 –

0

DStream.map返回另一个流。您应该在该流上调用打印,而不是原始打印。

所以在斯卡拉:

val transformedStream = requestsWithState.map (rdd => { rdd match { 
     case (id, eventStream) => { 
      println("id is " + id + " Event is " + eventStream) 
      val dynamoConnection = setupDynamoClientConnection() 
      DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
     } 
     } 
    }) 

transformedStream.print(1)