2017-09-06 48 views
0

我想听听使用阿卡流SQS阿卡流的状态,我得到的消息从它的Q 使用此代码段:如何管理使用TTL

implicit val system = ActorSystem() 
implicit val mat = ActorMaterializer() 
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ConfigUtils.dtsConfiguration.ioThreadPoolSize)) 
val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder 
    .standard() 
    .withCredentials(new ClasspathPropertiesFileCredentialsProvider()) 
    .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, ConfigUtils.dtsConfiguration.regionName)) 
    .build() 

val future = SqsSource(sqsEndpoint)(awsSqsClient) 
    .takeWhile(_ => true) 
    .mapAsync(parallelism = 2)(m => { 
    val msgBody = SqsMessage.deserializeJson(m.getBody) 
    msgBody match { 
     case Right(body) => //for each stream add (body.ID,body.Record.FileContent) or concatenate the new fileContent 
          // with current map (of same id) 
          // that for each key in the map - if the filecontent size > 100 kb remove the relevant tuple from 
         the map and perform an operation on it 
    } 
    Future(m, Ack()) 
    }) 
    .to(SqsAckSink(sqsEndpoint)(awsSqsClient)) 
    .run() 

我已经在评论代码中的特定点需要操作流。

我需要基本上是每个记录获取从SQS要做到这一点:

我希望把它的内容到地图[诠释,字符串]这个int代表键,该字符串是记录的内容。 (对于其他键我将连接它的内容,直到它的大小大于1kb)

(就像保存每个键的状态一样)。

然后我想要执行以下操作:

我想为每一个元组(不断地流) 执行操作时,它的内容大小> 1KB,然后从地图中移除。

我还需要一个ttl的记录,没有更新太空peiod说30秒的地图。

可以使用阿卡流吗?

谢谢。

回答

0

Flow.statefulMapConcat是一种swizz-army-knife,它可以让你保持状态并有条件地向下游排放价值。如果您只是在元素到达时才高兴地丢弃TTL,那么这也是可行的,但是触发驱逐的勾号有点棘手,并且可以通过实现自定义GraphStage来代替。

这里是一个简化的例子,它将累积每个键的值,直到达到限制值,然后向下游发射。

import scala.collection.immutable.Iterable 
val theThing: Flow[(String, Int), (String, Int), NotUsed] = 
    Flow[(String, Int)] 
    .statefulMapConcat {() => 
     // state kept in factory function scope 
     var state = Map[String, Int]() 

     // for each incoming tuple 
     { 
     case (key, value) => 
      val newValueForKey = state.getOrElse(key, 0) 

      // ... evicting old elements could go here ... 

      if (newValueForKey > 10) { 
      // max size, emit something downstream 
      state = state - key 
      Iterable(key -> newValueForKey) 
      } else { 
      // just update state, don't emit anything 
      state = state + (key -> newValueForKey) 
      Iterable.empty 
      } 
     } 
    } 

要做到这个,你会以某种方式引入蜱元素剔,但这可能还回压的影响,因此,导致回自定义GraphStage。 GraphStages有一个计时器API,允许他们将滴答作为不受背压影响的侧通道。您可以在Akka文档的这一部分找到有关如何实现此类功能的详细信息:http://doc.akka.io/docs/akka/current/scala/stream/stream-customize.html#custom-linear-processing-stages-using-graphstage