我最初的RDD如下所示RDD[(String, List[(String,String)])]
:获取列表[Serializable接口]而不是列表[(字符串,字符串)
(600,List((22,33),(55,88)))
(700,List((12,13),(15,18),(18,88)))
我想从Redis的缓存数据库获得额外的数据每个条目追加。为此,我使用Sedis
,它是Scala的Jedis
的包装。这是我的代码片段:
import org.sedis._
import redis.clients.jedis._
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
val appended = filtered.map({line => (line._1,
redisPool.withJedisClient { client =>
val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
line._2.union(additionalData)
})
})
的问题是,appended
是格式RDD[(String, List[Serializable]
而不是RDD[(String, List[(String,String)])]
的。我究竟做错了什么? 另外,我在哪里使用redisPool
内部map
足够高效或有没有其他更好的选择?
什么是'line._2's类型? –
你说你的初始RDD的类型是'RDD [(String,List [(String,String)])]',但是你提供的示例数据不匹配 - '(600,List(22,33), List(55,88))'是类型'(String,List [(String,String)],List [(String,String)])'...并且第二条记录再次不同。由编译器为您提供的_input_推断的类型是'RDD [(带有可序列化的产品)]' –
@TzachZohar:对不起,我的错。它确实是'List((22,33),(55,88))' – HackerDuck