2014-06-17 44 views
6

我使用AWS SDK for Java和我使用的缓冲异步SQS客户端批量请求让我降低成本。如何强制AmazonSQSBufferedAsyncClient刷新消息?

当我的应用程序关闭时,我想确保没有消息在缓冲区中等待,但没有.flush()方法,我可以在客户端上看到。

是否调用时AmazonSQSBufferedAsyncClient.shutdown()刷新我的邮件?我看着source code,目前还不清楚。该方法调用,它具有每QueueBuffershutdown(),但里面QueueBuffer.shutdown()它说

public void shutdown() { 
    //send buffer does not require shutdown, only 
    //shut down receive buffer 
    receiveBuffer.shutdown(); 
} 

此外,.shutdown()该文件说:

关闭此客户端对象,释放任何资源,可能是 保持开放。 这是一个可选的方法,而来电者预计不会 称之为,但如果他们想明确地释放任何打开 资源。一旦客户端已经关闭,它不应该用于 发出更多请求。

对于这个应用程序,我需要确保没有消息在被缓冲时丢失。我是否需要使用正常的AmazonSQSClient而不是缓冲/异步手动来处理这个问题?

+1

我发现这似乎是同一个问题,目前还没有答案:https://forums.aws.amazon.com/thread.jspa?threadID=122189 – Daenyth

回答

1

随着37年1月11日版本的SDK,还有就是用于此目的的QueueBufferConfig的配置参数。

AmazonSQSBufferedAsyncClient bufClient = 
    new AmazonSQSBufferedAsyncClient(
     realAsyncClient, 
     new QueueBufferConfig() 
      .withFlushOnShutdown(true) 
    ); 
1

有显式调用冲洗的方法,但它是无法访问的,实际上我找不到在亚马逊代码方法的调用。似乎缺少某些东西。

当你调用shutdown它执行以下代码异步客户端上:

public void shutdown() { 
    for(QueueBuffer buffer : buffers.values()) { 
     buffer.shutdown(); 
    } 
    realSQS.shutdown(); 
} 

而且QueueBuffer#shutdown()方法是这样的:

/** 
* Shuts down the queue buffer. Once this method has been called, the 
* queue buffer is not operational and all subsequent calls to it may fail 
* */ 
public void shutdown() { 
    //send buffer does not require shutdown, only 
    //shut down receive buffer 
    receiveBuffer.shutdown(); 
} 

所以好像他们是故意不调用sendBuffer.shutdown()方法,该方法将刷新缓冲区中尚未发送的每条消息。

你找到的情况下,当你关闭SQS客户端,它失去了消息?看起来他们已经意识到这一点,这种情况不应该发生,但如果你想确保你可以用反射来调用这种方法,那真的很讨厌,但它会满足你的需求。

AmazonSQSBufferedAsyncClient asyncSqsClient = <your initialization code of the client>; 
    Field buffersField = ReflectionUtils.findField(AmazonSQSBufferedAsyncClient.class, "buffers"); 
    ReflectionUtils.makeAccessible(buffersField); 
    LinkedHashMap<String, Object> buffers = (LinkedHashMap<String, Object>) ReflectionUtils.getField(buffersField, asyncSqsClient); 
    for (Object buffer : buffers.values()) { 
     Class<?> clazz = Class.forName("com.amazonaws.services.sqs.buffered.QueueBuffer"); 
     SendQueueBuffer sendQueueBuffer = (SendQueueBuffer) ReflectionUtils.getField(ReflectionUtils.findField(clazz, "sendBuffer"), buffer); 
     sendQueueBuffer.flush();//finally 
    } 

这样的事情应该工作,我猜。让我知道!