有显式调用冲洗的方法,但它是无法访问的,实际上我找不到在亚马逊代码方法的调用。似乎缺少某些东西。
当你调用shutdown它执行以下代码异步客户端上:
public void shutdown() {
for(QueueBuffer buffer : buffers.values()) {
buffer.shutdown();
}
realSQS.shutdown();
}
而且QueueBuffer#shutdown()方法是这样的:
/**
* Shuts down the queue buffer. Once this method has been called, the
* queue buffer is not operational and all subsequent calls to it may fail
* */
public void shutdown() {
//send buffer does not require shutdown, only
//shut down receive buffer
receiveBuffer.shutdown();
}
所以好像他们是故意不调用sendBuffer.shutdown()方法,该方法将刷新缓冲区中尚未发送的每条消息。
你找到的情况下,当你关闭SQS客户端,它失去了消息?看起来他们已经意识到这一点,这种情况不应该发生,但如果你想确保你可以用反射来调用这种方法,那真的很讨厌,但它会满足你的需求。
AmazonSQSBufferedAsyncClient asyncSqsClient = <your initialization code of the client>;
Field buffersField = ReflectionUtils.findField(AmazonSQSBufferedAsyncClient.class, "buffers");
ReflectionUtils.makeAccessible(buffersField);
LinkedHashMap<String, Object> buffers = (LinkedHashMap<String, Object>) ReflectionUtils.getField(buffersField, asyncSqsClient);
for (Object buffer : buffers.values()) {
Class<?> clazz = Class.forName("com.amazonaws.services.sqs.buffered.QueueBuffer");
SendQueueBuffer sendQueueBuffer = (SendQueueBuffer) ReflectionUtils.getField(ReflectionUtils.findField(clazz, "sendBuffer"), buffer);
sendQueueBuffer.flush();//finally
}
这样的事情应该工作,我猜。让我知道!
我发现这似乎是同一个问题,目前还没有答案:https://forums.aws.amazon.com/thread.jspa?threadID=122189 – Daenyth