我已经在Amazon WebServices中设置了Kinesis流。我也想完成以下任务:Amazon Kinesis GetRecords Api最佳方法
- 将记录到单流路与单碎片(C#API) - 成功
- 我也写了示例应用程序中,多个生产商正在对不同的流 - 成功
- 我也设置示例应用程序来执行多发工人把数据放到单流路 - 成功
此外,我希望能够强制执行SequenceNumberOrdering在Reacords。
但真正的痛苦是使用Kinesis C#Api的GetRecords使用者操作。
我为记录创建了一个示例应用程序。问题是即使Kinesis Stream中没有记录,它也不会停止迭代。将SequenceNumber保存在数据库或某个文件中并再次检索文件非常耗时 - 使用Kinesis Stream for GetRecords有什么优势?
为什么即使Stream中没有数据,它也会继续迭代?
我对REFERENCE使用了以下一段代码;
private static void GetFilesKinesisStream()
{
IAmazonKinesis kinesis = AWSClientFactory.CreateAmazonKinesisClient();
try
{
ListStreamsResponse listStreams = kinesis.ListStreams();
int numBuckets = 0;
if (listStreams.StreamNames != null &&
listStreams.StreamNames.Count > 0)
{
numBuckets = listStreams.StreamNames.Count;
Console.WriteLine("You have " + numBuckets + " Amazon Kinesis Streams.");
Console.WriteLine(string.Join(",\n", listStreams.StreamNames.ToArray()));
DescribeStreamRequest describeRequest = new DescribeStreamRequest();
describeRequest.StreamName = "******************";
DescribeStreamResponse describeResponse = kinesis.DescribeStream(describeRequest);
List<Shard> shards = describeResponse.StreamDescription.Shards;
foreach (Shard s in shards)
{
Console.WriteLine("shard: " + s.ShardId);
}
string primaryShardId = shards[0].ShardId;
GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
iteratorRequest.StreamName = "*********************";
iteratorRequest.ShardId = primaryShardId;
iteratorRequest.ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER;
iteratorRequest.StartingSequenceNumber = "49544005271533118105145368110776211536226129690186743810";
GetShardIteratorResponse iteratorResponse = kinesis.GetShardIterator(iteratorRequest);
string iterator = iteratorResponse.ShardIterator;
Console.WriteLine("Iterator: " + iterator);
//Step #3 - get records in this iterator
GetShardRecords(kinesis, iterator);
Console.WriteLine("All records read.");
Console.ReadLine();
}
// sr.WriteLine("You have " + numBuckets + " Amazon S3 bucket(s).");
}
catch (AmazonKinesisException ex)
{
if (ex.ErrorCode != null && ex.ErrorCode.Equals("AuthFailure"))
{
Console.WriteLine("The account you are using is not signed up for Amazon EC2.");
Console.WriteLine("You can sign up for Amazon EC2 at http://aws.amazon.com/ec2");
}
else
{
Console.WriteLine("Caught Exception: " + ex.Message);
Console.WriteLine("Response Status Code: " + ex.StatusCode);
Console.WriteLine("Error Code: " + ex.ErrorCode);
Console.WriteLine("Error Type: " + ex.ErrorType);
Console.WriteLine("Request ID: " + ex.RequestId);
}
}
}
private static void GetShardRecords(IAmazonKinesis client, string iteratorId)
{
//create reqest
GetRecordsRequest getRequest = new GetRecordsRequest();
getRequest.Limit = 100;
getRequest.ShardIterator = iteratorId;
//call "get" operation and get everything in this shard range
GetRecordsResponse getResponse = client.GetRecords(getRequest);
//get reference to next iterator for this shard
string nextIterator = getResponse.NextShardIterator;
//retrieve records
List<Record> records = getResponse.Records;
//print out each record's data value
foreach (Record r in records)
{
//pull out (JSON) data in this record
string s = Encoding.UTF8.GetString(r.Data.ToArray());
Console.WriteLine("Record: " + s);
Console.WriteLine("Partition Key: " + r.PartitionKey);
}
if (null != nextIterator)
{
//if there's another iterator, call operation again
GetShardRecords(client, nextIterator);
}
}
我想每30分钟运行一次Servie,如何在已经使用记录后检索记录。 – 2014-11-03 05:57:29
记录您读取的最后一个序列号,并使用分片迭代器类型“AFTER_SEQUENCE_NUMBER”从此处恢复。 – engineerC 2014-11-03 15:24:31