2017-10-17 33 views

回答

0

终于找到了一个优雅的方式来实现这一点。 Hadoop的配置创建一个广播可变

Configuration configuration = JavaSparkContext.toSparkContext(context).hadoopConfiguration(); 
Broadcast<SerializableWritable<Configuration>> bc = context.broadcast(new SerializableWritable<Configuration>(configuration)); 

通过这个广播变量的变换或行动,并使用下面的代码片段获得的Hadoop文件系统:

FileSystem fileSystem = FileSystem.get(bc.getValue().value()); 

希望这帮助,如果别人是在同一船。

干杯!

1
JavaPairInputDStream<String, byte[]> input = KafkaUtils.createJDQDirectStream(ssc, String.class, byte[].class, 
     StringDecoder.class, DefaultDecoder.class, kafkaParams, Collections.singleton(topicName)); 

JavaPairDStream<String, byte[]> output = input.transformToPair(new Function<JavaPairRDD<String, byte[]>, JavaPairRDD<String, byte[]>>() { 
    public JavaPairRDD<String, byte[]> call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception { 
     JavaSparkContext sc = JavaSparkContext.fromSparkContext(stringJavaPairRDD.context()); 
     stringJavaPairRDD.saveAsTextFile("hdfs://"); 
     return stringJavaPairRDD; 
    } 
}); 
+0

感谢张的回复,感谢您发布一个方法来做到这一点。但在我的情况下,中间数据不是RDD,也不是流数据。 –

+0

我终于从你的答案中选择了一些想法,并能够得到解决方案。作为另一个答案发布......谢谢! –