2017-06-15 78 views
0

我试图设置一个新的流来连接Tika螺栓到warc螺栓。为warc螺栓设置一个新的流

import com.digitalpebble.stormcrawler.tika.ParserBolt; 
import com.digitalpebble.stormcrawler.warc.WARCHdfsBolt; 

builder.setBolt("tika", new ParserBolt(), numWorkers) 
    .localOrShuffleGrouping("shunt","tika"); 

WARCHdfsBolt warcbolt = getWarcBolt("XX"); 

builder.setBolt("warc", warcbolt, numWorkers) 
    .localOrShuffleGrouping("tika", "warc"); 

在蒂卡定义我已经修改了outputDeclarerFields功能如下定义我的新“WARC”流:

@Override 
public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    declarer.declare(new Fields("url", "content", "metadata", "text")); 
    declarer.declareStream(StatusStreamName, new Fields("url", "metadata", "status")); 
    declarer.declareStream("warc", new Fields("url", "content", "metadata", "text")); 
} 

然而,当我启动在本地模式拓扑我得到:

14308 [主] WARN oasdsSlot - SLOT debian8:1027开始在 状态空 - 空分配14308 [主] WARN oasdsSlot - SLOT debian8:1028挑动克状态EMPTY - 分配空14308 [主要] WARN oasdsSlot - SLOT debian8:1029在状态EMPTY开始 - 分配空14309 [主要] INFO oaslAsyncLocalizer - 清洁 未使用的拓扑结构在 /TMP/a1e3b7f5-e251- 40ae-a032-b0839ca103c8/supervisor/stormdist 14318 [main] INFO oasdsSupervisor - 在主机debian8上启动主管,ID为 f42c64cd-7c36-40ab-9f85-4b7751ed2d6a。 15030 [main] WARN o.a.s.d.nimbus - 拓扑提交异常。 (拓扑 name ='xxCrawler')#error {:导致nil:通过[{:type org.apache.storm.generated.InvalidTopologyException:message nil
:at [org.apache.storm.daemon.common $ validate_structure_BANG_调用 common.clj 185]}]:跟踪 [org.apache.storm.daemon.common $ validate_structure_BANG_调用 common.clj 185]
[org.apache.storm.daemon.common $ system_topology_BANG_调用常用 。 CLJ 378]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782 submitTopologyWithOpts nimbus.clj 1694]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $重ify__10782 submitTopology nimbus.clj 1726]
[sun.reflect.NativeMethodAccessorImpl invoke0 NativeMethodAccessorImpl.java -2]
[sun.reflect.NativeMethodAccessorImpl调用 NativeMethodAccessorImpl.java 62]
[sun.reflect.DelegatingMethodAccessorImpl调用 DelegatingMethodAccessorImpl .java 43] [java.lang.reflect.Method invoke Method.java 498] [clojure.lang.Reflector invokeMatchingMethod Reflector.java 93] [clojure.lang.Reflector invokeInstanceMethod Reflector.java 28] [org.apache。 storm.testing $ submit_local_topology invoke testing.clj 310]
[org.apache.storm.LocalCluster $ _submitTopology调用LocalCluster.clj 49] [org.apache.storm.LocalCluster submitTopology零-1]
[com.digitalpebble.stormcrawler.ConfigurableTopology提交 ConfigurableTopology.java 76]
[com.digitalpebble.stormcrawler.ConfigurableTopology提交 ConfigurableTopology.java 65] [xx.xx.xx.xx.xxTopology运行 xxTopology.java 111]
[com.digitalpebble.stormcrawler.ConfigurableTopology开始 ConfigurableTopology.java 50] [xx.xx.xx.xx.xx拓扑学主 xxTopology.java 53]]} 15035 [main]错误 oassoazsNIOServerCnxnFactory - 线程线程[main,5,main]死亡 org.apache.storm.generated。InvalidTopologyException:null at org.apache.storm.daemon.common $ validate_structure_BANG_.invoke(common.clj:185) 〜[storm-core-1.1.0.jar:1.1.0] at org.apache.storm。 daemon.common $ system_topology_BANG_.invoke(common.clj:378) 〜[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782.submitTopologyWithOpts(nimbus .clj:1694) 〜[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782.submitTopology(nimbus.clj:1726) 〜[storm -core-1.1.0.jar:1.1.0] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)〜[?:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java :62) 〜[:?1.8.0_131] 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 〜[:?1.8.0_131] 在java.lang.reflect.Method.invoke(方法.java:498)〜[?:1.8.0_131] at clojure.lang.Reflector。在clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 〜[clojure-1.7.0.jar :?] 。 invokeInstanceMethod(Reflector.java:28) 〜[clojure-1.7.0.jar :?] at org.apache.storm.testing $ submit_local_topology.invoke(testing.clj:310) 〜[storm-core-1.1。 0.jar:1.1.0] 在org.apache.storm.LocalCluster $ _submitTopology.invoke(LocalCluster.clj:49) 〜[风暴核-1.1.0.jar:1.1.0] 在org.apache.storm.LocalCluster.submitTopology(未知来源)〜[风暴芯1.1.0.jar:1.1.0] 在com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:76) 〜[xx-crawler-1.1.jar :?] at com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:65) 〜[xx-1.1.jar :?] at xx.xx.xx.xx .xxTopology.run(xxTopology.java:111)〜[xx-crawler-1.1.jar :?] at com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50) 〜[xx-crawler-1.1。 jar:?] at xx.xx.xx.xx.xxTopology.main(xxTopology.java:53)〜[xx-crawler-1.1.jar :?]

任何帮助,将不胜感激!

需要注意的是,如果我使用StatusStreamName(“状态”)流连接蒂卡和WARC螺栓它工作正常。

谢谢

艾蒂安

回答

0

WARCs从原始的,非解析的内容生成。您应该将WARC连接到Fetcher的输出而不是解析器螺栓。

你并不需要声明一个新的流只为WARC,你可以简单地将螺栓WARC连接到走出蒂卡螺栓的默认流。

我看到你的代码

进口com.digitalpebble.stormcrawler.tika.ParserBolt;

这将表明您依赖于默认实现(不会生成'warc'流)。你能否忘记用你的修改实现来替换它?

+0

亲爱的朱利安, 非常感谢有用的意见。 我想要的只是存档到WARC文件只有选定的网页。选择基于解析器中的正则表达式。然后,如果我有一个匹配项,则将该页面的内容(使用“warc”流)进行存档。因此解析器和warc螺栓之间的连接。 这种方法对您听起来是否正确?或者我应该去实施呢? 再次感谢您! Etienne – EJO

+0

嗨艾蒂安。已经编辑了我的答案。当我最初回答解析器螺栓也生成二进制内容时,忘记了,所以你根本不需要修改蒂卡螺栓。您可以编写一个定制螺栓来检查元数据的内容,并仅传递在解析过程中设置的具有K/V的元组。 如果您仅处理HTML页面,请使用JsoupParser插件。 由于导入不正确,BTW是最初的问题。如果是这样,请将我的答案标记为正确。谢谢! –