2016-07-21 35 views
0

我是新的火花,我想要保存到Hbase表的recommendProductsForUsers的输出。我发现一个示例(https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/)显示使用JavaPairRDD和saveAsNewAPIHadoopDataset进行保存。如何保存火花MatrixFactorizationModel recommendedProductsForUsers到Hbase

如何将JavaRDD<Tuple2<Object, Rating[]>>转换为JavaPairRDD<ImmutableBytesWritable, Put>以便我可以使用saveAsNewAPIHadoopDataset?

//Loads the data from hdfs 
    MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(), trainedDataPath); 

//Get recommendations for all users 
    JavaRDD<Tuple2<Object, Rating[]>> ratings3 = sameModel.recommendProductsForUsers(noOfProductsToReturn).toJavaRDD(); 
+0

您想保存模型或建议吗? – eliasah

+0

@eliasah我想保存推荐 – Ani

回答

0

这就是我如何解决上述问题,希望这会对某人有所帮助。

JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts1 = ratings3 
        .mapToPair(new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() { 

         @Override 
         public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> arg0) 
           throws Exception { 
          Rating[] userAndProducts = arg0._2; 
          System.out.println("***********" + userAndProducts.length + "**************"); 
          List<Item> items = new ArrayList<Item>(); 
          Put put = null 
          String recommendedProduct = "";       
          for (Rating r : userAndProducts) { 

//Some logic here to convert Ratings into appropriate put command 
// recommendedProduct = r.product; 

} 

          put.addColumn(Bytes.toBytes("recommendation"), Bytes.toBytes("product"),Bytes.toBytes(recommendedProduct));      Bytes.toBytes("product"),Bytes.toBytes(response.getItems().toString())); 

          return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); 
         } 
        }); 

      System.out.println("*********** Number of records in JavaPairRdd: "+ hbasePuts1.count() +"**************"); 
      hbasePuts1.saveAsNewAPIHadoopDataset(newApiJobConfig.getConfiguration()); 
      jsc.stop();   
0

通过使用mapToPair。从你提供的示例相同的源(我改变了手工类型):

JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = javaRDD.mapToPair(
    new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() { 
@Override 
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> row) throws Exception { 

    Put put = new Put(Bytes.toBytes(row.getString(0))); 
    put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier1"), Bytes.toBytes(row.getString(1))); 
    put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier2"), Bytes.toBytes(row.getString(2))); 

     return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);  
} 
}); 

它是这样的,你cretne看跌的构造排密钥提供它的新实例,然后为每列调用补充。然后你返回创建的put。