我想听听使用阿卡流SQS阿卡流的状态,我得到的消息从它的Q 使用此代码段:如何管理使用TTL
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ConfigUtils.dtsConfiguration.ioThreadPoolSize))
val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder
.standard()
.withCredentials(new ClasspathPropertiesFileCredentialsProvider())
.withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, ConfigUtils.dtsConfiguration.regionName))
.build()
val future = SqsSource(sqsEndpoint)(awsSqsClient)
.takeWhile(_ => true)
.mapAsync(parallelism = 2)(m => {
val msgBody = SqsMessage.deserializeJson(m.getBody)
msgBody match {
case Right(body) => //for each stream add (body.ID,body.Record.FileContent) or concatenate the new fileContent
// with current map (of same id)
// that for each key in the map - if the filecontent size > 100 kb remove the relevant tuple from
the map and perform an operation on it
}
Future(m, Ack())
})
.to(SqsAckSink(sqsEndpoint)(awsSqsClient))
.run()
我已经在评论代码中的特定点需要操作流。
我需要基本上是每个记录获取从SQS要做到这一点:
我希望把它的内容到地图[诠释,字符串]这个int代表键,该字符串是记录的内容。 (对于其他键我将连接它的内容,直到它的大小大于1kb)
(就像保存每个键的状态一样)。
然后我想要执行以下操作:
我想为每一个元组(不断地流) 执行操作时,它的内容大小> 1KB,然后从地图中移除。
我还需要一个ttl的记录,没有更新太空peiod说30秒的地图。
可以使用阿卡流吗?
谢谢。