2016-06-19 100 views
0

我写这里所描述的参数化类型的自定义编码器厂:Dataflow output parameterized type to avro fileDATAFLOW AvroCoder java.lang.IllegalArgumentException异常

p.getCoderRegistry().registerCoder(MyOutput.class, new CoderFactory() { 
    @Override 
    public Coder<?> create(List<? extends Coder<?>> componentCoders) { 
    AvroCoder tCoder = (AvroCoder) componentCoders.get(0); 
    AvroCoder sCoder = (AvroCoder) componentCoders.get(1); 
    Schema schema = makeMyOutputSchema(tCoder.getSchema(), 
     sCoder.getSchema()); 
    return AvroCoder.of(MyOutput.class, schema); 
    } 
    @Override 
    public List<Object> getInstanceComponents(Object value) { 
    MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value; 
    return Arrays.asList(new Object[] {myOutput.foo, myOutput.bar}); 
    } 
}); 

生成的模式是这样的:

{ 
    "type":"record", 
    "name":"MyOutput", 
    "namespace":"myNamespace", 
    "fields":[ 
    {"name":"baz","type":"boolean"}, 
    { 
     "name":"foo", 
     "type":{ 
     "type":"record", 
     "name":"Foo", 
     "namespace":"myNamespace", 
     "fields":[{"name":"id","type":"string"}] 
     } 
    },{ 
     "name":"bar", 
     "type":{ 
     "type":"record", 
     "name":"Bar", 
     "namespace":"myNamespace", 
     "fields":[{"name":"id","type":"string"}]}}]} 

的架构正确分析,但是当我尝试执行管道时,我得到:

com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Unable to get field id from class null 
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:186) 
    at com.google.cloud.dataflow.sdk.testing.TestPipeline.run(TestPipeline.java:106) 
    at mypackage.GenericsTest.testGenerics(GenericsTest.java:116) 
Caused by: java.lang.IllegalArgumentException: Unable to get field id from class null 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.getField(AvroCoder.java:710) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:548) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:567) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:430) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.<init>(AvroCoder.java:189) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:144) 
    at mypackage.GenericsTest$1.create(GenericsTest.java:102) 
    at com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoderFromFactory(CoderRegistry.java:797) 
    at com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:748) 
    at com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:719) 
    at com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:696) 
    at com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:178) 
    at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:147) 
    at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) 

管道只是应用输出MyOutput对象的单个转换的虚拟管道。它在输出非参数化类型时正确运行。

public static class MyTransform extends PTransform< 
    PCollection<String>, 
    PCollection<MyOutput<Foo, Bar>>> { 
    @Override 
    public PCollection<MyOutput<Foo, Bar>> apply(
     PCollection<String> input) { 
     PCollection<MyOutput<Foo, Bar>> output = input.apply(
     ParDo.of(new DoFn<String, MyOutput<Foo, Bar>>() { 
      @Override 
      public void processElement(ProcessContext c) { 
      c.output(new MyOutput<Foo, Bar>(new Foo(), new Bar())); 
      } 
     })); 
     return output; 
    } 
} 

为什么我得到这个错误?

static class MyOutput<T, S> { 
    T foo; 
    S bar; 
    Boolean baz; 
    public MyOutput() {} 
    public MyOutput(T foo, S bar) {this.foo=foo; this.bar=bar; this.baz=false;} 
} 
@DefaultCoder(AvroCoder.class) 
static class Bar { 
    String id; 
    public Bar() {this.id="t";} 
} 
@DefaultCoder(AvroCoder.class) 
static class Foo { 
    String id; 
    public Foo() {this.id="s";} 
} 

static Schema makeMyOutputSchema(Schema tSchema, Schema sSchema) { 
    Schema schema = new Schema.Parser().parse("{\"type\":\"record\"," 
    + "\"name\":\"MyOutput\"," 
    + "\"namespace\":\"impersonation\"," 
    + "\"fields\":[" 
    + " {\"name\":\"baz\", \"type\": \"boolean\"}," 
    + " {\"name\":\"foo\", \"type\": " + tSchema.toString() + "}," 
    + " {\"name\":\"bar\", \"type\": " + sSchema.toString() + "}" 
    + "]}"); 
    LOG.info(schema.toString()); 
    return schema; 
} 
+0

您可以分享'makeMyOutputSchema'的实现以及'MyOutput'的外观吗?这些将有助于复制。 –

+0

更新谢谢你看看 – natb1

回答

相关问题