我有一个spring的xd模块,用于从s3中拉取文件并逐行分割并在处理后将其删除(ExpressionAdvice)。我的文件中有大约100万条消息(行) s3。文件被下载到xd容器框中,并且我检查了md5sum和它的相同并且具有相同的行。我看到只有260k个奇怪的消息来到输出通道,这是处理器。我丢失了大约740条消息。有时它随机一次,我看到所有消息,如100万在我的输出通道,有时只有250k。我正在测量这个使用计数器为我的stream.File下载,但我觉得它被删除之前处理所有记录/行在10秒内,我的文件大小是700Mb左右。请让我知道表达式建议是否在处理前删除。文件在处理之前被删除
module.aws-s3-source.count=1 and module.aws-s3-source.concurrency=70
stream1 as-s3-source |processor|sink
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-aws="http://www.springframework.org/schema/integration/aws"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws-1.0.xsd">
<context:property-placeholder location="classpath*:test-${region}.properties" />
<int:poller fixed-delay="${fixedDelay}" default="true">
<int:advice-chain>
<ref bean="pollAdvise"/>
</int:advice-chain>
</int:poller>
<bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
<constructor-arg ref="healthCheckStrategy"/>
</bean>
<bean id="healthCheckStrategy" class="test.ServiceHealthCheckPollSkipStrategy">
<property name="url" value="${url}"/>
<property name="doHealthCheck" value="${doHealthCheck}"/>
<property name="restTemplate" ref="restTemplate"/>
</bean>
<bean id="restTemplate"
class="org.springframework.web.client.RestTemplate">
<constructor-arg ref="requestFactory"/>
</bean>
<bean id="requestFactory"
class="test.BatchClientHttpRequestFactory">
<constructor-arg ref="verifier"/>
</bean>
<bean id="verifier"
class="test.NullHostnameVerifier">
</bean>
<bean id="encryptedDatum" class="test.EncryptedSecuredDatum"/>
<bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
<property name="proxyHost" value="${proxyHost}"/>
<property name="proxyPort" value="${proxyPort}"/>
<property name="preemptiveBasicProxyAuth" value="false"/>
</bean>
<bean id="s3Operations" class="test.CustomC1AmazonS3Operations">
<constructor-arg index="0" ref="clientConfiguration"/>
<property name="awsEndpoint" value="s3.amazonaws.com"/>
<property name="temporaryDirectory" value="${temporaryDirectory}"/>
<property name="awsSecurityKey" value=""/>
</bean>
<bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
</bean>
<int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
bucket="${bucket}"
s3-operations="s3Operations"
credentials-ref="credentials"
file-name-wildcard="${fileNameWildcard}"
remote-directory="${prefix}"
channel="splitChannel"
local-directory="${localDirectory}"
accept-sub-folders="false"
delete-source-files="true"
archive-bucket="${archiveBucket}"
archive-directory="${archiveDirectory}">
</int-aws:s3-inbound-channel-adapter>
<int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:splitter>
<int:channel-interceptor pattern="*" order="3">
<bean class="org.springframework.integration.channel.interceptor.WireTap">
<constructor-arg ref="loggingChannel" />
</bean>
</int:channel-interceptor>
<int:logging-channel-adapter id="loggingChannel" log-full-message="true" level="INFO"/>
<int:channel id="output"/>
</beans>
更新2:
我的流是象下面 AWS-S3-源|处理器| HTTP客户端|处理器>队列:testQueue
1)现在我分裂流象下面这样:
aws-s3-source> queue:s3Queue
我能够非常快的读取所有我的1万条消息。 2)现在,我增加了一个流像下面我看到的问题又是S3停止拉文件和邮件都将丢失每次
queue:s3Queue>processor|http-client| processor> queue:testQueue
3)观察是当我添加HTTP客户端这一问题再次发生,即。来自输入源的一些消息丢失。
4)现在我将文件分割成125个MB的5个文件,而不是660mb一个文件.IE 200点多K的记录5 files.I没有看到这个问题让我的所有消息
我也看到很多消息在http-client之前堵塞在队列中。 我在想这是关于xd内存或线程的事情吗?
不,我正在监视我的本地文件夹文件只能在完全上传后拉动,并且拉出的文件和s3中的文件具有相同的md5hash和大小以及行数。我观察到的一个观察结果是文件在8 -10秒它被删除了。我觉得它不可能在10秒内处理100万条记录 – constantlearner
那么,在分离器发送所有行之前,建议也不可能删除文件。您需要添加调试。 –
让我检查添加调试器 – constantlearner