2016-03-06 31 views
1

的C#实现我有三个问题:做一个LinkedRingBuffer线程安全

  1. 你通常觉得我的方法来解决给定的问题是什么?
  2. 您认为我可以进一步提高性能?
  3. 最重要的一点:我如何让我的实现真的线程安全?

首先简化的场景我在: 我通过消息系统与不同设备进行通信。我在相当短的时间内接收并发送数千和数千条消息。我处于多线程环境中,因此许多不同的任务正在发送和期待消息。对于消息接收来说,事件驱动方法让我们在使线程安全的意义上遇到了很多麻烦。 我有几个接收器任务从外部获取消息,并且必须将这些消息传递给许多消费者任务。

所以我想出了一个不同的方法: 为什么没有几千条消息的历史记录,每条新消息都排队,消费者任务可以从最新的项目向后搜索到最后处理的项目以获得所有新到的消息。当然这必须是快速和线程安全的。

我想出了一个连接环形缓冲区的想法,并实施了以下内容:

public class LinkedRingBuffer<T> 
    { 
    private LinkedRingBufferNode<T> firstNode; 
    private LinkedRingBufferNode<T> lastNode; 

    public LinkedRingBuffer(int capacity) 
    { 
     Capacity = capacity; 
     Count = 0; 
    } 

    /// <summary> 
    /// Maximum count of items inside the buffer 
    /// </summary> 
    public int Capacity { get; } 

    /// <summary> 
    /// Actual count of items inside the buffer 
    /// </summary> 
    public int Count { get; private set; } 

    /// <summary> 
    /// Get value of the oldest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetFirst() 
    { 
     return firstNode.Item; 
    } 

    /// <summary> 
    /// Get value of the newest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetLast() 
    { 
     return lastNode.Item; 
    } 

    /// <summary> 
    /// Add item at the end of the buffer. 
    /// If capacity is reached the link to the oldest item is deleted. 
    /// </summary> 
    public void Add(T item) 
    { 
     /* create node and set to last one */ 
     var node = new LinkedRingBufferNode<T>(lastNode, item); 
     lastNode = node; 
     /* if it is the first node, the created is also the first */ 
     if (firstNode == null) 
      firstNode = node; 
     /* check for capacity reach */ 
     Count++; 
     if(Count > Capacity) 
     {/* deleted all links to the current first so that its eventually gc collected */ 
      Count = Capacity; 
      firstNode = firstNode.NextNode; 
      firstNode.PreviousNode = null; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to the newest item 
    /// </summary> 
    public IEnumerable<T> LastToFirst() 
    { 
     var current = lastNode; 
     while(current != null) 
     { 
      yield return current.Item; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to the oldest item 
    /// </summary> 
    public IEnumerable<T> FirstToLast() 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      current = current.NextNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to given item. 
    /// If item doesn't exist it iterates until it reaches the newest 
    /// </summary> 
    public IEnumerable<T> LastToReference(T item) 
    { 
     var current = lastNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to given item. 
    /// If item doesn't exist it iterates until it reaches the oldest 
    /// </summary> 
    public IEnumerable<T> FirstToReference(T item) 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Represents a linked node inside the buffer and holds the data 
    /// </summary> 
    private class LinkedRingBufferNode<A> 
    { 
     public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item) 
     { 
      Item = item; 
      NextNode = null; 
      PreviousNode = previousNode; 
      if(previousNode != null) 
       previousNode.NextNode = this; 
     } 
     internal A Item { get; } 
     internal LinkedRingBufferNode<A> PreviousNode { get; set; } 
     internal LinkedRingBufferNode<A> NextNode { get; private set; } 
    } 
    } 

但遗憾的是我一种新的多线程环境,所以我怎么会做这个缓冲区线程安全的多个读取和写入?

谢谢!

回答

1

我认为最简单的方法就是在每次执行线程关键代码时都有一个同步object,您将lock打开。 lock块中的代码被称为critical section,并且一次只能由一个线程访问。任何其他希望访问它的线程都将等待,直到锁被释放。

定义和初始化:

private object Synchro; 

public LinkedRingBuffer(int capacity) 
{ 
    Synchro = new object(); 
    // Other constructor code 
} 

用法:

public T GetFirst() 
{ 
    lock(Synchro) 
    { 
     return firstNode.Item; 
    } 
} 

当编写线程安全的代码,lock荷兰国际集团某些部分似乎是显而易见的。但是,如果你不知道是否要lock声明或代码块,用于读取和写入的安全,你需要考虑:此代码

  • 是否能影响其他的行为或结果锁定关键部分。
  • 是否有其他锁定关键部分可以影响此代码的行为或结果。

您还需要重写某些自动实现的属性以具有后台字段。它应该是非常简单的,但是...

你的yield return使用,同时在单一线程上下文相当聪明,高效的,会导致在多线程方面的麻烦。这是因为yield return doesn't release a lock statement(它不应该)。您将不得不在包装中使用yield return进行实现。

你的线程安全的代码看起来是这样的:

public class LinkedRingBuffer<T> 
{ 
    private LinkedRingBufferNode<T> firstNode; 
    private LinkedRingBufferNode<T> lastNode; 
    private object Synchro; 

    public LinkedRingBuffer(int capacity) 
    { 
     Synchro = new object(); 
     Capacity = capacity; 
     Count = 0; 
    } 

    /// <summary> 
    /// Maximum count of items inside the buffer 
    /// </summary> 
    public int Capacity { get; } 

    /// <summary> 
    /// Actual count of items inside the buffer 
    /// </summary> 
    public int Count 
    { 
     get 
     { 
      lock (Synchro) 
      { 
       return _count; 
      } 
     } 
     private set 
     { 
      _count = value; 
     } 
    } 
    private int _count; 

    /// <summary> 
    /// Get value of the oldest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetFirst() 
    { 
     lock (Synchro) 
     { 
      return firstNode.Item; 
     } 
    } 

    /// <summary> 
    /// Get value of the newest buffer entry 
    /// </summary> 
    /// <returns></returns> 
    public T GetLast() 
    { 
     lock (Synchro) 
     { 
      return lastNode.Item; 
     } 
    } 

    /// <summary> 
    /// Add item at the end of the buffer. 
    /// If capacity is reached the link to the oldest item is deleted. 
    /// </summary> 
    public void Add(T item) 
    { 
     lock (Synchro) 
     { 
      /* create node and set to last one */ 
      var node = new LinkedRingBufferNode<T>(lastNode, item); 
      lastNode = node; 
      /* if it is the first node, the created is also the first */ 
      if (firstNode == null) 
       firstNode = node; 
      /* check for capacity reach */ 
      Count++; 
      if (Count > Capacity) 
      { 
       /* deleted all links to the current first so that its eventually gc collected */ 
       Count = Capacity; 
       firstNode = firstNode.NextNode; 
       firstNode.PreviousNode = null; 
      } 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to the newest item 
    /// </summary> 
    public IEnumerable<T> LastToFirst() 
    { 
     lock (Synchro) 
     { 
      var materialized = LastToFirstInner().ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> LastToFirstInner() 
    { 
     var current = lastNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to the oldest item 
    /// </summary> 
    public IEnumerable<T> FirstToLast() 
    { 
     lock (Synchro) 
     { 
      var materialized = FirstToLastInner().ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> FirstToLastInner() 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      current = current.NextNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the oldest to given item. 
    /// If item doesn't exist it iterates until it reaches the newest 
    /// </summary> 
    public IEnumerable<T> LastToReference(T item) 
    { 
     lock (Synchro) 
     { 
      var materialized = LastToReferenceInner(item).ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> LastToReferenceInner(T item) 
    { 
     var current = lastNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Iterate through the buffer from the newest to given item. 
    /// If item doesn't exist it iterates until it reaches the oldest 
    /// </summary> 
    public IEnumerable<T> FirstToReference(T item) 
    { 
     lock (Synchro) 
     { 
      var materialized = FirstToReferenceInner(item).ToList(); 
      return materialized; 
     } 
    } 

    private IEnumerable<T> FirstToReferenceInner(T item) 
    { 
     var current = firstNode; 
     while (current != null) 
     { 
      yield return current.Item; 
      if (current.Item.Equals(item)) 
       break; 
      current = current.PreviousNode; 
     } 
    } 

    /// <summary> 
    /// Represents a linked node inside the buffer and holds the data 
    /// </summary> 
    private class LinkedRingBufferNode<A> 
    { 
     public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item) 
     { 
      Item = item; 
      NextNode = null; 
      PreviousNode = previousNode; 
      if (previousNode != null) 
       previousNode.NextNode = this; 
     } 
     internal A Item { get; } 
     internal LinkedRingBufferNode<A> PreviousNode { get; set; } 
     internal LinkedRingBufferNode<A> NextNode { get; private set; } 
    } 
} 

可以有一些优化完成,例如你不需要创建临界区里面的LinkedRingBufferNode对象,但是你必须复制在创建对象之前将lastNode值赋给临界区域内的局部变量。

+0

感谢您的快速回复!所以当迭代器的一个使用ToList通话将创造一个全新的列表这种方法(同步),它可以在foreach迭代之后,是正确的?或者是锁定整个foreach迭代过程?我还担心ToList()调用会对具有几千个项目的缓冲区产生巨大的性能影响,但我会在明天运行一个基准测试。也许只是忘记了foreach,并用锁定的GetPrevious()/ GetNext()调用手动搜索列表会更快。 – sirloin