2017-08-03 38 views
1

我每天都在计算登录到system.Data的每日唯一用户的数量,通过kafka.We维护计数使用mapwithstate功能。每10秒计算数据发送到kafka.I需要在午夜每天重置这些数字。 有没有办法清除“mapwithState”变量中的所有数据?重置mapWithState每天的火花流

public class UserCounts { 


private static JavaStreamingContext createContext(String[] args) { 

    String brokers = args[1]; 
    String inputTopic = args[2]; 
    String outputTopic = args[3]; 
    String masterNode = args[4]; 

    HashMap<String, String> props = new HashMap<>(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer"); 

    // Create the context with a 3 second batch size 
    SparkConf sparkConf = new SparkConf().setAppName("UserCounts").setMaster(masterNode); 
    //sparkConf.set("spark.driver.bindAddress", "127.0.0.1"); 


    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); 

    Set<String> topicsSet = new HashSet<>(Arrays.asList(inputTopic.split(","))); 
    Map<String, String> kafkaParams = new HashMap<>(); 
    kafkaParams.put("metadata.broker.list", brokers); 
    ssc.checkpoint("."); 


    // Create direct kafka stream with brokers and inputTopic 
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
      ssc, 
      String.class, 
      String.class, 
      StringDecoder.class, 
      StringDecoder.class, 
      kafkaParams, 
      topicsSet 
    ); 
    JavaDStream<String> lines = messages.map(Tuple2::_2); 
    JavaDStream<String> userIds = lines.map(x -> { 
     String array[] = x.split(","); 
     return array[3]; 
    }); 

    JavaPairDStream<String, Integer> usersDstream = userIds.mapToPair(s -> new Tuple2<>(s, 1)); 

    // Update the cumulative count function 
    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = 
      (userId, one, state) -> { 
       int sum = one.orElse(0) + (state.exists() ? state.get() : 0); 
       Tuple2<String, Integer> output = new Tuple2<>(userId, sum); 
       state.update(sum); 
       return output; 
      }; 

    // DStream made of get cumulative counts that get updated in every batch 
    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = 
      usersDstream.mapWithState(StateSpec.function(mappingFunc)); 

    JavaPairDStream<String, Integer> stateSnapshotStream = stateDstream.stateSnapshots(); 



    stateSnapshotStream.count().foreachRDD(rdd -> { 

     System.out.println("# events = " + rdd.count()); 
     String date = String.valueOf(System.currentTimeMillis()); 

     rdd.foreachPartition(partition -> { 
        KafkaProducer<String, String> producer = new KafkaProducer(props); 
        while (partition.hasNext()) { 
         Long value = partition.next(); 
         System.out.println("data >>>>>>>>>>" + value.toString()); 
         String data = "{\"timestamp\":\"" + date + "\",\"usersCount\":\"" + value.toString() + "\"}"; 
         producer.send(new ProducerRecord<String, String>(outputTopic, null, data)); 
        } 
        producer.close(); 

       } 
     ); 


    }); 



    return ssc; 
} 

public static void main(String[] args) throws Exception { 

    String checkpointPath = args[0]; 

    Function0<JavaStreamingContext> createContextFunc =() -> createContext(args); 

    JavaStreamingContext ssc = 
      JavaStreamingContext.getOrCreate(checkpointPath, createContextFunc); 

    ssc.start(); 
    ssc.awaitTermination(); 

    } 
} 
+0

'mapWithState'是不走这里的路。您需要能够触摸每批次的所有钥匙才能在00:00重置钥匙。你需要为此使用'updateStateByKey'。 –

+0

你可以请分享一些代码示例,因为我没有得到如何在00:00触发updatestate? –

+1

它不会在'00:00'处触发更新状态,它会在您定义的每个批次间隔触发'updateStateByKey'。你自己将需要检查当前时间并为所有键重置该状态。 –

回答

-1

您可以将时间信息存储在State中,并轻松地重置计数。

+0

你可以给一些示例代码? –

0

我已经使用mapWithState通过修改与userid + date.And的关键,并添加了24小时的超时和过滤数据的当前日期。如果这个解决方案可以改进,请让我来。我在过滤器中创建的日期对象是我现在所关心的问题之一。

public class UserCounts { 


private static JavaStreamingContext createContext(String[] args) { 

    String brokers = args[1]; 
    String inputTopic = args[2]; 
    String outputTopic = args[3]; 
    String masterNode = args[4]; 
    SimpleDateFormat format= new SimpleDateFormat("yyyy-MM-dd"); 
    HashMap<String, String> props = new HashMap<>(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.StringSerializer"); 

    // Create the context with a 3 second batch size 
    SparkConf sparkConf = new SparkConf().setAppName("UserCounts").setMaster(masterNode); 
    //sparkConf.set("spark.driver.bindAddress", "127.0.0.1"); 


    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); 

    Set<String> topicsSet = new HashSet<>(Arrays.asList(inputTopic.split(","))); 
    Map<String, String> kafkaParams = new HashMap<>(); 
    kafkaParams.put("metadata.broker.list", brokers); 
    ssc.checkpoint("."); 


    // Create direct kafka stream with brokers and inputTopic 
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
      ssc, 
      String.class, 
      String.class, 
      StringDecoder.class, 
      StringDecoder.class, 
      kafkaParams, 
      topicsSet 
    ); 
    JavaDStream<String> lines = messages.map(Tuple2::_2); 
    //userId can be duplicated in multiple days,so appended date with it and filtered in snapshot.Keys will be removed by timeout function of mapwithstate after 24 hours 
    JavaDStream<String> userIds = lines.map(x -> { 
     String array[] = x.split(","); 
     String userId = array[3]; 
     String timestamp = array[11]; 

     return userId+"-"+timestamp.substring(0,10); 
    }); 

    JavaPairDStream<String, Integer> usersDstream = userIds.mapToPair(s -> new Tuple2<>(s, 1)); 

    // Update the cumulative count function 
    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = 
      (userId, one, state) -> { 
       int sum = one.orElse(0) + (state.exists() ? state.get() : 0); 
       Tuple2<String, Integer> output = new Tuple2<>(userId, sum); 
       state.update(sum); 
       return output; 
      }; 

    // DStream made of get cumulative counts that get updated in every batch 
    //Timout of 24 hours will make sure the keys which are generated will be removed 
    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = 
      usersDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.seconds(86400))); 

    JavaPairDStream<String, Integer> stateSnapshotStream = stateDstream.stateSnapshots(); 



    stateSnapshotStream.filter(x-> { 
     //create date object to get current date,can't be intialized class level or method level as it should change every day 
     Date date = new Date(); 
     String currentDate = format.format(date); 

     if(x._1.endsWith(currentDate)) { 
      return true; 
     }else { 
      return false; 
     } 

    }).count().foreachRDD(rdd -> { 

     System.out.println("# events = " + rdd.count()); 
     String date = String.valueOf(System.currentTimeMillis()); 

     rdd.foreachPartition(partition -> { 
        KafkaProducer<String, String> producer = new KafkaProducer(props); 
        while (partition.hasNext()) { 
         Long value = partition.next(); 
         System.out.println("data >>>>>>>>>>" + value.toString()); 
         String data = "{\"timestamp\":\"" + date + "\",\"usersCount\":\"" + value.toString() + "\"}"; 
         producer.send(new ProducerRecord<String, String>(outputTopic, null, data)); 
        } 
        producer.close(); 

       } 
     ); 


    }); 



    return ssc; 
} 

public static void main(String[] args) throws Exception { 

    String checkpointPath = args[0]; 

    Function0<JavaStreamingContext> createContextFunc =() -> createContext(args); 

    JavaStreamingContext ssc = 
      JavaStreamingContext.getOrCreate(checkpointPath, createContextFunc); 

    ssc.start(); 
    ssc.awaitTermination(); 

} 

}