2015-11-19 47 views
2

弗林克0.10.0刚刚发布最近。我有一些代码需要从0.9.1迁移。但出现以下错误:弗林克InvalidTypesException:在的TypeVariable“类”“K”的类型不能确定

org.apache.flink.api.common.functions.InvalidTypesException:类'fi.aalto.dmg.frame.FlinkPairWorkloadOperator'中TypeVariable'K'的类型无法确定。这很可能是一种类型的删除问题。只有在返回类型中的所有变量都可以从输入类型推导出来的情况下,类型提取才支持具有通用变量的类型。

下面是代码:

public class FlinkPairWorkloadOperator<K,V> implements PairWorkloadOperator<K,V> { 

    private DataStream<Tuple2<K, V>> dataStream; 

    public FlinkPairWorkloadOperator(DataStream<Tuple2<K, V>> dataStream1) { 
     this.dataStream = dataStream1; 
    } 



    public FlinkGroupedWorkloadOperator<K, V> groupByKey() { 
     KeyedStream<Tuple2<K, V>, K> keyedStream = this.dataStream.keyBy(new KeySelector<Tuple2<K, V>, K>() { 
      @Override 
      public K getKey(Tuple2<K, V> value) throws Exception { 
       return value._1(); 
      } 
     }); 
     return new FlinkGroupedWorkloadOperator<>(keyedStream); 
    } 
} 

要了解InvalidTypesException是如何发生的,我有抛出此异常也另一个例子,我有没有关于它的想法。在这个演示中,该程序与scala.Tuple2一起使用,但不能链接Tuple2。

public class StreamingWordCount { 
    public static void main(String[] args) throws Exception { 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     DataStream<String> counts = env 
      .socketTextStream("localhost", 9999) 
      .flatMap(new Splitter()); 

     DataStream<Tuple2<String, Integer>> pairs = mapToPair(counts, mapToStringIntegerPair); 
     pairs.print(); 
     env.execute("Socket Stream WordCount"); 
    } 

    public static class Splitter implements FlatMapFunction<String, String> { 
     @Override 
     public void flatMap(String sentence, Collector<String> out) throws Exception { 
      for (String word: sentence.split(" ")) { 
       out.collect(word); 
      } 
     } 
    } 

    public static <K,V,T> DataStream<Tuple2<K,V>> mapToPair(DataStream<T> dataStream , final MapPairFunction<T, K, V> fun){ 
     return dataStream.map(new MapFunction<T, Tuple2<K, V>>() { 
      @Override 
      public Tuple2<K, V> map(T t) throws Exception { 
       return fun.mapPair(t); 
      } 
     }); 
    } 

    public interface MapPairFunction<T, K, V> extends Serializable { 
    Tuple2<K,V> mapPair(T t); 
    } 

    public static MapPairFunction<String, String, Integer> mapToStringIntegerPair = new MapPairFunction<String, String, Integer>() { 
     public Tuple2<String, Integer> mapPair(String s) { 
      return new Tuple2<String, Integer>(s, 1); 
     } 
    }; 
} 
+0

你能张贴'FlinkPairWorkloadOperator'的完整代码?在这个文件中https://github.com/wangyangjun/RealtimeStreamBenchmark/blob/master/StreamBench/flink/src/main/java/fi/aalto/dmg/frame/FlinkPairWorkloadOperator.java –

+0

reduceByKey和groupByKey功能。目前我使用Object而不是K. –

+0

我可以重现您的问题。将研究它。 –

回答

1

的问题是,你在弗林克的Java API组合使用scala.Tuple2代替org.apache.flink.api.java.tuple.Tuple2。 Java API的TypeExtractor不理解Scala元组。因此,它不能提取类型变量K的类型。

如果您改用org.apache.flink.api.java.tuple.Tuple2,那么TypeExtractor将能够解析类型变量。

+0

感谢您的帮助。有没有办法解决这个问题,如果我使用scala.Tuple2?比如使用ResultTypeQueryable?因为我有一个需要在Spark中实现的高级API,它需要scala.Tuple2。 –

+0

嗨,我刚刚更新了我的问题。我有一个与scala.Tuple2一起使用的演示,但是通过flink Tuple2获得相同的例外。 –

+0

你为什么不简单使用Flink的Scala API?有了这个,你可以轻松处理'scala.Tuples'。即使'ResultTypeQueryable'你会不会解决问题Java API中,因为你必须要到什么地方了'TypeInformation [K]'你不能从'GenericTypeInfo [scala.Tuple2]'获得。你必须显式传递一个'TypeInformation [K]'到'FlinkPairWorkloadOperator'中。 –