2015-10-14 16 views
2

我寻找其满足以下性质的队列算法:算法基于共享字典数据结构并发队列(单个消费者,多个生产者)

  1. 进程仅使用一个共享的字典通信(键值-store)
  2. 不使用超过加载和存储(CAS没有任何其他的原子操作,例如)
  3. 支持多种生产
  4. 支持单次消费
  5. 生产商可以在任何时候死亡和队列必须保持正常运行
  6. 消费者也可以随时死去,后来被重新启动,但绝不会有不止一个消费过程同时

这是运行意味着作为关于合适算法的一般问题,因为我想在几种不同的情况下使用它。但是,为了帮助可视化的要求,这里有一个例子用例:

  • 我有两个页面的网站:producer.html和consumer.html
  • producer.html可以同时在多个标签
  • 打开
  • 每个producer.html增加事件队列consumer.html的
  • 一个副本是开放的,消耗这些事件(聚集和它们流式传输到一个Web服务器,例如)

如果多个生产者 - 标签是由用户而不是页面打开的,这些标签没有可用的引用,所以通常的通信方法(postMessage或直接调用其他标签的JS代码)都没有了。他们仍然可以互相沟通的方式之一是通过LocalStorage建议在这里:Javascript; communication between tabs/windows with same origin。但是LocalStorage不是“线程安全的”,详细here

注:有可能实现在浏览器(闪存,...)交叉表通信等方式,但这些都是这个问题的目的,因为他们将不把我的其他使用 - 案例。这实际上只是我试图找到的一般队列算法的一个示例用例。

一对夫妇更参数:

  • 生产商的数量将永远不会非常大(几十或几百也许),所以人数的比例读取和写入需要相对于生产商的数量是不是真的担心。
  • 我不知道有多少生产者我可能有,并没有立即明确的方式来分配一个数字或索引给他们。 (许多互斥算法(Lamport's Bakery,Eisenberg&McGuire,Szymański's,...)为每个进程维护一个状态数组,但这不一定是一种自然的方法,尽管我不想在事前排除这些方法(如果他们可以的话)使用共享字典以某种方式实现...)
  • 算法应该100%可靠。所以,我想避免像Lamport's first Fast Mutex algorithm (page 2 in the PDF)这样的延迟,因为我没有任何实时保证。
  • 如果队列是FIFO,将会非常有帮助,但这不是严格要求。
  • 该算法不应该受到任何专利缠身等

更新:

Two-Lock Concurrent Queue Algorithm by Michael and Scott看起来像它可以工作,但我需要两两件事来实现:

  • 锁定机制使用共享字典,可以经受一个锁持有人的崩溃
  • 一种可靠的方法来分配一个新的节点(如果我将分配移动到锁定的部分,我可以生成新的随机密钥,直到找到一个尚未使用的密钥,但可能有更好的方法?)

更新2:

看来,我是不是约为字典不够具体:

这真的只不过是一个微不足道的键值店更多。它提供功能get(key)读取密钥的值,put(key, value)更改密钥的值,delete(key)删除密钥。在我的一些用例中,我也可以遍历键,但如果可能的话,我想避免它的一般性。密钥是任意的,生产者和消费者可以根据需要创建或计算它们。字典不提供任何自动生成唯一密钥的功能。

示例包括HTML LocalStorage,Google AppEngine的数据存储,Java Map,Python字典或甚至仅包含单个目录的文件系统(其中键为文件名和文件内容的值)。

+0

你能描述字典和队列及更详细的角色,请的关系? – HuStmpHrrr

+0

@HuStmpHrrr字典是进程之间唯一的“共享内存”。每个进程都可以读取和写入由键标识的值。该字典不提供任何并发管理(如原子增量和获取或比较和设置)。因此,队列实现必须以一种聪明的方式存储需要与字典中的其他进程共享的所有数据。除此之外,两者之间没有任何关系。我希望这有帮助。 –

+0

MPSC队列实现可以基于CAS或锁定(我不知道其他方式)。现有的锁定算法需要CAS或每个进程变量,以* static *列表形式排列。您拥有**既不支持CAS **,也不支持静态进程列表**。看来,你想要的是根本不可能的。 – Tsyvarev

回答

0

只需使用RDBMS。这在MS SQL中非常简单,对于PostgresSQL你必须使用RETURNING关键字,而对于MySQL你可能必须使用触发器。

CREATE TABLE Q ([Key] BIGINT IDENTITY(1,1) PRIMARY KEY, [Message] NVARCHAR(4000)) 

INSERT INTO Q OUTPUT inserted.* VALUE(@message) 

DELETE TOP(1) Q WITH (READPAST) OUTPUT deleted.* 

如果您真的希望算法解决方案,只需使用环形缓冲区。

const int MAX_Q_SIZE = 20000000; 
static string[] Q = new string[MAX_Q_SIZE]; 
static long ProducerID = 0; 
static long ConsumerID = 0; 
public static long Produce(string message) { 
    long key = Interlocked.Increment(ref ProducerID); 
    int idx = (int)(key % MAX_Q_SIZE); 
    Q[idx] = message; 
    return key; 
} 
public static string Consume() { 
    long key = Interlocked.Increment(ref ConsumerID); 
    int idx = (int)(key % MAX_Q_SIZE); 
    string message = Q[idx]; 
    return message; 
} 
+0

不幸的是,我真的只有一本没有锁定的字典。否则,当然,这将是微不足道的... –

+0

@MarkusA。 - 我想你需要解释这个“没有锁定的共享字典”是什么。这是一个单独的过程吗?生产者和消费者如何访问它,向它发送数据,从中接收数据?生产者生成自己的密钥还是字典?消费者是否可以遍历字典的键?了解字典的API必须解决这样的问题。 –

+0

好点。我添加了对原始问题的更新。另外,我添加了一个可能有所帮助的示例答案。抱歉不清楚。 –

1

后颇有几分进一步阅读和睡眠的东西了一晚上,我想出了一个办法是应该能够完成我的需要,但它可能不是最优雅:

本文Wait-Free Algorithms for Fast, Long-Lived Renaming by Moir and Anderson概括兰波特的快速互斥算法#2(第6页here)为以下结构单元(图2):
            building block
当数处理输入的这部分代码,其中最多只有一个将停止,至多n-1将向右移动和最多n-1将下移

在兰波特的算法,停止意味着处理获取的锁,而移动向右或向左只会发送过程回这部分代码的开始。为了释放锁,一个过程简单地将Y返回到。 (实际上并不完全正确......请参阅下面的“更新”...)

这个问题的最大问题是,如果有任何进程在锁定期间死亡(即释放之前),该块将会只是永远保持锁定。

另一个问题是每个进程都需要分配一个唯一的进程ID p

永久锁定问题可以通过借用Moir和Anderson的想法来解决,也就是发送进程结束向右或向下移动而不是回到这个结构块,导致结构像这样(图3中的纸):
            enter image description here
除了在这种情况下,我将不会使用该网格分配进程ID为M &一个DID(虽然我大概可以解决p w的唯一值问题w ith)。相反,网格中的每个框都将对应一个非常简单的队列。如果一个进程停止在一个盒子上,它就获得了相应队列的尾部锁定(例如根据algorithm by Michael and Scott),并继续将一个新元素排入该队列。完成后,它将框的值设置为false,以允许其他进程使用此队列。这样,如果在释放锁之前存在较高的争用或进程死亡,则会根据需要动态创建新的队列。

消费者进程不需要担心在出列元素时锁定队列头,因为这是唯一的过程。所以,它只是遍历盒子树来找到所有队列,并且轻松地帮助自己掌握它们所包含的元素。需要注意的一点是,虽然每个队列都是FIFO,但队列之间没有同步,所以组合队列不一定是FIFO。

如果我们现在改变布尔Ÿ一个时间戳(或/指示),消费者也可以到期锁后一些安全超时重新激活死队列。

有关使用字典执行情况的说明:

  • 共享变量Xÿ与键名X_123Y_123,其中,字典可以是条目是盒子的数量。
  • p可以简单地是任何唯一的随机字符串,并将存储为键值X_123
  • 布尔值或时间戳也被存储为键值Y_123。生产者过程解释为Y_123为缺失条目null/。
  • 箱号需要从移动模式计算。一种方法是从左上角的开始。如果该过程在该框中停止,那么我们就完成了。如果不是,则当前数字(从开始)向左移1(即乘以2),并且如果过程向下移动,也增加1.可以使用a计算更小(并且更少)的数字不同的编号方案(我仍然需要解决),但是这个应该可以工作。
  • 这些队列然后由具有键H_123一个条目保持在其值的队列的当前头部的索引和与关键T_123一个条目保存尾部的索引。如果它们不存在,它们都默认为0。
  • 要排队的项放入队列,尾部索引是从T_123(假设它产生48)中,用键Q_123_48的条目读放入字典,其包含排队的项的值。之后,T_123由1
  • 项目入队后增加的Y_123项设置回/(不删!)
  • 出队的项目,头指数从H_123(假设它产生39)读取并且与尾部索引T_123比较。如果它较小,则可以在Q_123_39处获得一个项目,然后将其从字典中读取和删除。之后,H_123会加1。
  • 要遍历box-tree,消费者将从左上角的框开始。对于每个框(例如)时,如果一个键Y_123存在于字典(即使它包含的值/或),消费者从相应的队列队列中取出物品,然后递归地向右和向下移动到相邻的框。如果没有密钥Y_123存在,该框尚未被任何进程使用,并且不需要考虑(并且下面的框或右边都没有)。

我还没有实际执行这个,但我会做下一步。我只是想发布一下,看看它是否可以激发其他方法,或者是否有人可以看到这个想法有什么问题。

更新:

我只注意到一个复杂:这是可能的,如果两个进程试图同时获取了队列锁,都将失败,并移动到下一个块。这将使该队列永远锁定,因为没有人会留下设置Y回到null/。

这就是为什么“长寿命重命名”由M &算法A以及兰波特的算法#2使用的Ÿ - 值的数组,其中每个进程都有其自己的入口,它也如果重置原因它移动到另一个块。 Y然后仅被视为如果所有条目都是

因为我不知道前手我多少进程都会有,我可以实现这个只有在字典有枚举密钥的一些方法(密钥将被Y_123_456其中是价值每个过程的p)。

但是,罕见的争用和上述超时机制,复活死队列,该问题可能导致只读存储器效率低下的一点点,而不是一个大问题。

更新2:

一种更好的方式来标记盒会是这样的模式:
            enter image description here
如果我们所说的移动总数ñ(也算入左上角的方块,即n  ≥  1)和向右移动ř数,然后就可以计算在盒数使用

           盒=(ñ  × ( n  -   1))/ 2  +  [R

相关问题