2013-01-04 124 views
2

延迟队列是一个队列,其中每个消息都有一个与其相关的延迟时间,并且消息只能在其延迟期满时才被采用。队列的头部是那些延迟过去最长的消息。如果没有延迟已经过期,则没有头,并且出队将返回空值。使用一个或多个标准FIFO队列实现延迟队列

实际上,我正在使用Azure编写云应用程序,而在Azure中,只有FIFO队列可用,而不是优先级/延迟队列。所以我来到这里,看看有没有人可以给我一些指引,让我可以从正确的方向开始。我搜索了很多,但只发现了Java中的延迟队列实现,没有一般的关于延迟队列的标准教程/研究论文。

编辑:

我有什么码?
其实,我必须首先设计这些东西,并将其呈现给我的经理,一旦我们完成设计,那么只有我可以开始编码。

更多有关场景
其基于主/从模式的分布式应用程序的详细信息。主服务器生成消息并将它们放入Azure服务总线队列中,并且有多个从服务器(运行在多台机器上)从队列中读取并执行它们。如果万一主站发生故障,则其中一个从站作为主站并开始生成消息。我不想在主数据库中存储任何状态信息,因为如果主数据库出现故障,所有状态信息也会随之一起存储。

+0

请出示一些源代码..你有什么尝试?什么不工作/你卡在哪里? – Yahia

+2

如果你有一个Java实现,它不应该太难转换为C#。甚至可能比从头开始写自己更容易。 – Servy

+0

延迟队列的C#实现将类似于Java实现。 –

回答

9

Windows Azure Queue消息在将消息插入队列时指定了延迟,以秒为单位。在超时延迟被触发之前,消息将不可见。请参阅this MSDN article以查看API详细信息。

隐形超时也在各种语言的SDK实现中实现。由于您正在使用C#,因此调用的内容如下所示。需要注意的AddMessage()第三个参数指定了隐形超时:

 var acct = CloudStorageAccount.DevelopmentStorageAccount; 
     var queueClient = acct.CreateCloudQueueClient(); 
     var queue = queueClient.GetQueueReference("myqueue"); 
     queue.CreateIfNotExist(); 

     var msg = new CloudQueueMessage("test message"); 
     queue.AddMessage(msg, TimeSpan.FromHours(2), TimeSpan.FromMinutes(30)); 
0

你怎么样建立一个队列两步过程中退出的项目。这里是高层次的过程:

  • 出队FIFO队列中的第一项;将其隐身时间设置为N分钟(无论您决定隐身应该是什么) - 这允许您在一段时间内隐藏物品,就好像它不存在于队列中一样。这是我所指的NextVisibleTime属性。

  • 检查DequeueCount属性 - 如果出列计数为0,那么这是第一次出现该项目。忽略该项目并继续前进。由于它的隐身性已经确定,它不会再被提取,直到时间到了。如果出队计数大于或等于1,它将被出队一次,并且必须在所需的时间内被设置为不可见。

这应该允许你实现延迟队列。我也可以考虑其他方法。例如队列中的每个项目作为创建时间;这可以用来动态计算物品需要保持不可见的时间。要更改物业的隐身性,请检查此方法:http://msdn.microsoft.com/en-us/library/microsoft.windowsazure.storageclient.cloudqueue.updatemessage.aspx

3

因此,首先我们需要实施优先级队列。这是我不久前写的一个。这可能不是理想的;它有一个很小的API,它可能会表现得更好,但它是一个足够的起点:包装,当

public class PriorityQueue<TPriority, TElement> 
{ 
    SortedDictionary<TPriority, Queue<TElement>> dictionary = new SortedDictionary<TPriority, Queue<TElement>>(); 
    public PriorityQueue() 
    { 
    } 

    public Tuple<TPriority, TElement> Peek() 
    { 
     var firstPair = dictionary.First(); 
     return Tuple.Create(firstPair.Key, firstPair.Value.First()); 
    } 

    public TElement Pop() 
    { 
     var firstPair = dictionary.First(); 
     TElement output = firstPair.Value.Dequeue(); 

     if (!firstPair.Value.Any()) 
      dictionary.Remove(firstPair.Key); 

     return output; 
    } 

    public void Push(TPriority priority, TElement element) 
    { 
     Queue<TElement> queue; 
     if (dictionary.TryGetValue(priority, out queue)) 
     { 
      queue.Enqueue(element); 
     } 
     else 
     { 
      var newQueue = new Queue<TElement>(); 
      newQueue.Enqueue(element); 
      dictionary.Add(priority, newQueue); 
     } 
    } 
} 

延迟队列很简单:

public class DelayQueue<T> 
{ 
    private PriorityQueue<DateTime, T> queue = new PriorityQueue<DateTime, T>(); 
    public void Enqueue(T item, int delay) 
    { 
     queue.Push(DateTime.Now.AddMilliseconds(delay), item); 
    } 

    public T Dequeue() 
    { 
     if (queue.Peek().Item1 > DateTime.Now) 
      return queue.Pop(); 
     else 
      return default(T); 
    } 
}