2015-05-06 65 views
1

我有一个流,其中包含一个模块,其行为与source:file模块的行为类似,只是使用redisMetaDataStore和Spring Integration的FileSystemPersistentAcceptOnceFileListFilter只检测一次文件,以便模块可以记住哪些文件已经在容器重启之间被读取。Spring XD源代码失败处理

文件persisting.xml

<beans:bean id="redisMetadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore" > 
     <beans:constructor-arg ref="redisConnectionFactory" /> 
    </beans:bean> 

    <beans:bean id="persistentAcceptOnceFileFilter" class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter"> 
     <beans:constructor-arg ref="redisMetadataStore" /> 
     <beans:constructor-arg value="${metadataPrefix}" /> 
    </beans:bean> 

    <beans:bean id="antPatternFileFilter" class="org.springframework.integration.file.filters.SimplePatternFileListFilter"> 
     <beans:constructor-arg value="${pattern}" /> 
    </beans:bean> 

    <beans:bean id="compositeFileFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter"> 
     <beans:constructor-arg> 
      <beans:list> 
       <beans:ref bean="antPatternFileFilter" /> 
       <beans:ref bean="persistentAcceptOnceFileFilter" /> 
      </beans:list> 
     </beans:constructor-arg> 
    </beans:bean> 

    <file:inbound-channel-adapter 
      auto-startup="false" 
      channel="file" directory="${dir}" filter="compositeFileFilter"> 
     <poller fixed-delay="${fixedDelay}" time-unit="SECONDS"/> 
    </file:inbound-channel-adapter> 

    <channel id="file"/> 

    <chain id="extractContents" input-channel="file" output-channel="output"> 
     <header-enricher> 
      <header name="contentType" value="application/octet-stream"/> 
     </header-enricher> 
     <file:file-to-bytes-transformer/> 
    </chain> 

    <channel id="output"/> 

我遇到的问题是,应该Redis的容器下去,persistentAcceptOnceFileFilter会抛出异常时,它被调用时,在errorChannel登陆他们。

不幸的是,在我的组织内,发布在errorChannel中的所有消息都是由一个服务接收的,该服务通过电子邮件发送给团队,并在我们的售票系统中发布故障单,这意味着如果redis在夜间发生故障,我们的售票系统将会有数以万计的消息被一些糟糕的团队成员不得不经历和关闭所淹没。

要解决这个问题,我认为有错误处理,这样,而不是击中错误通道延伸FileSystemPersistentAcceptOnceFileListFilter,我们干脆拒绝检测,并在XD-容器日志离开消息的所有文件,但是这是一个解决方案我不喜欢这样做,因为我想通过配置完全解决这个问题,并且完全避免了我们的票务/通知系统。相反,我宁愿允许一个单一的发送到errorChannel(所以我们得到一个通知/票),然后停止这个模块是一个组件(或至少,功能上相同,最好通过配置)的流。有没有办法做到这一点?

回答

1

由于我没有看到自定义error-channel,我假设您的票务系统是由日志子系统(通过默认的errorChannel及其日志处理程序订阅者)驱动的。

由于默认errorChannel是发布/订阅,您可以订阅一个第二流它...

<int:transformer input-channel="errorChannel" expression="@fileChannelAdapter.stop()" 
    output-channel="control" /> 

<int:control-bus input-channel="control" /> 

和文件通道适配器上设置id="fileChannelAdapter"

+0

这工作完美,谢谢! 我做了一些补充,以便在恢复redis时重启模块:1)添加了connectionChecker bean,它将RedisConnection.ping()功能封装到一个方便的布尔方法中。 2)添加到模块上下文中: “ \t \t ' –

+0

酷 - 您可能想考虑为[spring-xd-modules] (https://github.com/spring-projects/spring-xd-modules)。 –