我每天都在计算登录到system.Data的每日唯一用户的数量,通过kafka.We维护计数使用mapwithstate功能。每10秒计算数据发送到kafka.I需要在午夜每天重置这些数字。 有没有办法清除“mapwithState”变量中的所有数据?重置mapWithState每天的火花流
public class UserCounts {
private static JavaStreamingContext createContext(String[] args) {
String brokers = args[1];
String inputTopic = args[2];
String outputTopic = args[3];
String masterNode = args[4];
HashMap<String, String> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Create the context with a 3 second batch size
SparkConf sparkConf = new SparkConf().setAppName("UserCounts").setMaster(masterNode);
//sparkConf.set("spark.driver.bindAddress", "127.0.0.1");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(inputTopic.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
ssc.checkpoint(".");
// Create direct kafka stream with brokers and inputTopic
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = messages.map(Tuple2::_2);
JavaDStream<String> userIds = lines.map(x -> {
String array[] = x.split(",");
return array[3];
});
JavaPairDStream<String, Integer> usersDstream = userIds.mapToPair(s -> new Tuple2<>(s, 1));
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
(userId, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(userId, sum);
state.update(sum);
return output;
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
usersDstream.mapWithState(StateSpec.function(mappingFunc));
JavaPairDStream<String, Integer> stateSnapshotStream = stateDstream.stateSnapshots();
stateSnapshotStream.count().foreachRDD(rdd -> {
System.out.println("# events = " + rdd.count());
String date = String.valueOf(System.currentTimeMillis());
rdd.foreachPartition(partition -> {
KafkaProducer<String, String> producer = new KafkaProducer(props);
while (partition.hasNext()) {
Long value = partition.next();
System.out.println("data >>>>>>>>>>" + value.toString());
String data = "{\"timestamp\":\"" + date + "\",\"usersCount\":\"" + value.toString() + "\"}";
producer.send(new ProducerRecord<String, String>(outputTopic, null, data));
}
producer.close();
}
);
});
return ssc;
}
public static void main(String[] args) throws Exception {
String checkpointPath = args[0];
Function0<JavaStreamingContext> createContextFunc =() -> createContext(args);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointPath, createContextFunc);
ssc.start();
ssc.awaitTermination();
}
}
'mapWithState'是不走这里的路。您需要能够触摸每批次的所有钥匙才能在00:00重置钥匙。你需要为此使用'updateStateByKey'。 –
你可以请分享一些代码示例,因为我没有得到如何在00:00触发updatestate? –
它不会在'00:00'处触发更新状态,它会在您定义的每个批次间隔触发'updateStateByKey'。你自己将需要检查当前时间并为所有键重置该状态。 –