2014-10-10 41 views
1

我已经在Amazon WebServices中设置了Kinesis流。我也想完成以下任务:Amazon Kinesis GetRecords Api最佳方法

  1. 将记录到单流路与单碎片(C#API) - 成功
  2. 我也写了示例应用程序中,多个生产商正在对不同的流 - 成功
  3. 我也设置示例应用程序来执行多发工人把数据放到单流路 - 成功

此外,我希望能够强制执行SequenceNumberOrdering在Reacords。

但真正的痛苦是使用Kinesis C#Api的GetRecords使用者操作。

我为记录创建了一个示例应用程序。问题是即使Kinesis Stream中没有记录,它也不会停止迭代。将SequenceNumber保存在数据库或某个文件中并再次检索文件非常耗时 - 使用Kinesis Stream for GetRecords有什么优势?

为什么即使Stream中没有数据,它也会继续迭代?

我对REFERENCE使用了以下一段代码;

private static void GetFilesKinesisStream() 
     { 
      IAmazonKinesis kinesis = AWSClientFactory.CreateAmazonKinesisClient(); 
      try 
      { 
       ListStreamsResponse listStreams = kinesis.ListStreams(); 
       int numBuckets = 0; 
       if (listStreams.StreamNames != null && 
        listStreams.StreamNames.Count > 0) 
       { 
        numBuckets = listStreams.StreamNames.Count; 
        Console.WriteLine("You have " + numBuckets + " Amazon Kinesis Streams."); 
        Console.WriteLine(string.Join(",\n", listStreams.StreamNames.ToArray())); 

        DescribeStreamRequest describeRequest = new DescribeStreamRequest(); 
        describeRequest.StreamName = "******************"; 

        DescribeStreamResponse describeResponse = kinesis.DescribeStream(describeRequest); 
        List<Shard> shards = describeResponse.StreamDescription.Shards; 
        foreach (Shard s in shards) 
        { 
         Console.WriteLine("shard: " + s.ShardId); 
        } 

        string primaryShardId = shards[0].ShardId; 

        GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest(); 
        iteratorRequest.StreamName = "*********************"; 
        iteratorRequest.ShardId = primaryShardId; 
        iteratorRequest.ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER; 
        iteratorRequest.StartingSequenceNumber = "49544005271533118105145368110776211536226129690186743810"; 

        GetShardIteratorResponse iteratorResponse = kinesis.GetShardIterator(iteratorRequest); 
        string iterator = iteratorResponse.ShardIterator; 

        Console.WriteLine("Iterator: " + iterator); 
        //Step #3 - get records in this iterator 
        GetShardRecords(kinesis, iterator); 

        Console.WriteLine("All records read."); 
        Console.ReadLine(); 
       } 
       // sr.WriteLine("You have " + numBuckets + " Amazon S3 bucket(s)."); 
      } 
      catch (AmazonKinesisException ex) 
      { 
       if (ex.ErrorCode != null && ex.ErrorCode.Equals("AuthFailure")) 
       { 
        Console.WriteLine("The account you are using is not signed up for Amazon EC2."); 
        Console.WriteLine("You can sign up for Amazon EC2 at http://aws.amazon.com/ec2"); 
       } 
       else 
       { 
        Console.WriteLine("Caught Exception: " + ex.Message); 
        Console.WriteLine("Response Status Code: " + ex.StatusCode); 
        Console.WriteLine("Error Code: " + ex.ErrorCode); 
        Console.WriteLine("Error Type: " + ex.ErrorType); 
        Console.WriteLine("Request ID: " + ex.RequestId); 
       } 
      } 
     } 

     private static void GetShardRecords(IAmazonKinesis client, string iteratorId) 
     { 
      //create reqest 
      GetRecordsRequest getRequest = new GetRecordsRequest(); 
      getRequest.Limit = 100; 
      getRequest.ShardIterator = iteratorId; 


      //call "get" operation and get everything in this shard range 
      GetRecordsResponse getResponse = client.GetRecords(getRequest); 
      //get reference to next iterator for this shard 
      string nextIterator = getResponse.NextShardIterator; 
      //retrieve records 
      List<Record> records = getResponse.Records; 

      //print out each record's data value 
      foreach (Record r in records) 
      { 
       //pull out (JSON) data in this record 
       string s = Encoding.UTF8.GetString(r.Data.ToArray()); 
       Console.WriteLine("Record: " + s); 
       Console.WriteLine("Partition Key: " + r.PartitionKey); 
      } 

      if (null != nextIterator) 
      { 
       //if there's another iterator, call operation again 
       GetShardRecords(client, nextIterator); 
      } 
     } 

回答

1

为什么一个kinesis使用者在数据“结束”后继续迭代?

因为没有“结束”。 Kinesis有点像排队,但不完全。把它想象成一个记录事件的移动时间窗口。你不消耗记录,你可以被动地检查目前在窗口中的记录(亚马逊硬编码到24小时)。因为窗户一直在移动,所以一旦你到达“最后”记录,它就会实时观看。新的记录可能随时出现;消费者不知道没有任何生产者。

如果您想根据某些条件停止,则必须将该条件包含在您的有效负载中。例如,如果您希望在“现在”时停止,则有效负载的一部分可能是时间戳,消费者会检查其当前时间的接近程度。

+0

我想每30分钟运行一次Servie,如何在已经使用记录后检索记录。 – 2014-11-03 05:57:29

+0

记录您读取的最后一个序列号,并使用分片迭代器类型“AFTER_SEQUENCE_NUMBER”从此处恢复。 – engineerC 2014-11-03 15:24:31