2016-02-24 81 views
3

我使用的是磁通风暴(0.10.0)DSL部署以下拓扑结构(简化为只保留了它的相关部分):多个可调用构造函数

--- 
name: "my-topology" 

components: 
    - id: "esConfig" 
    className: "java.util.HashMap" 
    configMethods: 
     - name: "put" 
     args: 
      - "es.nodes" 
      - "${es.nodes}" 

bolts: 
    - id: "es-bolt" 
    className: "org.elasticsearch.storm.EsBolt" 
    constructorArgs: 
     - "myindex/docs" 
     - ref: "esConfig" 
    parallelism: 1 

# ... other bolts, spouts and streams here 

正如你所看到的,我用的螺栓中的一个是org.elasticsearch.storm.EsBolt它具有以下构造(see code):

public EsBolt(String target) { ... } 
public EsBolt(String target, boolean writeAck) { ... } 
public EsBolt(String target, Map configuration) { ... } 

最后一个应该叫,因为我传递一个字符串,并在constructorArgs地图。但是,当我部署拓扑我得到下面的异常,仿佛流量无法推断类型(字符串,图)的权利构造:

storm jar mytopology-1.0.0.jar org.apache.storm.flux.Flux --local mytopology.yml --filter mytopology.properties 

... 
Version: 0.10.0 
Parsing file: mytopology.yml 
958 [main] INFO o.a.s.f.p.FluxParser - loading YAML from input stream... 
965 [main] INFO o.a.s.f.p.FluxParser - Performing property substitution. 
969 [main] INFO o.a.s.f.p.FluxParser - Not performing environment variable substitution. 
1252 [main] INFO o.a.s.f.FluxBuilder - Detected DSL topology... 
1431 [main] WARN o.a.s.f.FluxBuilder - Found multiple invokable constructors for class class org.elasticsearch.storm.EsBolt, given arguments [myindex/docs, {es.nodes=localhost}]. Using the last one found. 
Exception in thread "main" java.lang.IllegalArgumentException 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
    at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:291) 
    at org.apache.storm.flux.FluxBuilder.buildBolts(FluxBuilder.java:372) 
    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:88) 
    at org.apache.storm.flux.Flux.runCli(Flux.java:153) 
    at org.apache.storm.flux.Flux.main(Flux.java:98) 

什么可能会发生任何想法?这里是how Storm Flux finds a compatible constructor。神奇的是在canInvokeWithArgs方法。

这些流量调试日志,在那里你看到FluxBuilder如何找到最合适的构造函数:

Version: 0.10.0 
Parsing file: mytopology.yml 
559 [main] INFO o.a.s.f.p.FluxParser - loading YAML from input stream... 
566 [main] INFO o.a.s.f.p.FluxParser - Performing property substitution. 
569 [main] INFO o.a.s.f.p.FluxParser - Not performing environment variable substitution. 
804 [main] INFO o.a.s.f.FluxBuilder - Detected DSL topology... 
[email protected] 
1006 [main] DEBUG o.a.s.f.FluxBuilder - Found constructor arguments in definition: java.util.ArrayList 
1006 [main] DEBUG o.a.s.f.FluxBuilder - Checking arguments for references. 
1010 [main] DEBUG o.a.s.f.FluxBuilder - Target class: org.elasticsearch.storm.EsBolt 
1011 [main] DEBUG o.a.s.f.FluxBuilder - found constructor with same number of args.. 
1011 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class class java.lang.String to object class class java.lang.String to see if assignment is possible. 
1011 [main] DEBUG o.a.s.f.FluxBuilder - Yes, they are the same class. 
1012 [main] DEBUG o.a.s.f.FluxBuilder - ** invokable --> true 
1012 [main] DEBUG o.a.s.f.FluxBuilder - found constructor with same number of args.. 
1012 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class class java.lang.String to object class class java.lang.String to see if assignment is possible. 
1012 [main] DEBUG o.a.s.f.FluxBuilder - Yes, they are the same class. 
1012 [main] DEBUG o.a.s.f.FluxBuilder - ** invokable --> true 
1012 [main] DEBUG o.a.s.f.FluxBuilder - Skipping constructor with wrong number of arguments. 
1012 [main] WARN o.a.s.f.FluxBuilder - Found multiple invokable constructors for class class org.elasticsearch.storm.EsBolt, given arguments [myindex/docs, {es.nodes=localhost}]. Using the last one found. 
1014 [main] DEBUG o.a.s.f.FluxBuilder - Found something seemingly compatible, attempting invocation... 
1044 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class class java.lang.String to object class class java.lang.String to see if assignment is possible. 
1044 [main] DEBUG o.a.s.f.FluxBuilder - They are the same class. 
1044 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class boolean to object class class java.util.HashMap to see if assignment is possible. 
Exception in thread "main" java.lang.IllegalArgumentException 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422) 
    at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:291) 
... 

回答

0

作为一个蹩脚的解决办法,我终于扩展EsBolt仅暴露我需要的建设者和避免冲突。

package com.mypackage; 

import java.util.Map; 
import org.elasticsearch.storm.EsBolt; 

public class EsBoltWrapper extends EsBolt { 

    public EsBoltWrapper(String target) { 
     super(target); 
    } 

    public EsBoltWrapper(String target, Map configuration) { 
     super(target, configuration); 
    } 
} 

现在我的拓扑结构是这样的:

bolts: 
    - id: "es-bolt" 
    className: "com.mypackage.EsBoltWrapper" # THE NEW CLASS 
    constructorArgs: 
     - "myindex/docs" 
     - ref: "esConfig" 
    parallelism: 1 

这似乎是在风暴通量的错误。