2017-10-07 102 views
0

D中很容易使用std.container.dlist创建队列类型。使用队列在D中的线程之间进行通信

我想有多个线程,但让他们与队列通信,而不是消息传递(https://tour.dlang.org/tour/en/multithreading/message-passing)。据我所知,这些消息的设计始终是在代码中的特定点处接收数据;接收线程将阻塞,直到收到预期的数据。 (编辑:我被告知有关receiveTimeout但没有超时,只是一个检查是真的在这种情况下更合适(也许超时0?)。我也不知道如果多个消息API将做什么消息发送任何任何接收之前,我将不得不与该打。)

void main() { 
    spawn(&worker, thisTid); 

    // This line will block until the expected message is received. 
    receive (
     (string message) { 
      writeln("Received the message: ", text); 
     }, 
    ) 
} 

我所需要的仅仅接收数据,如果有一些。事情是这样的:

void main() { 
    Queue!string queue// custom `Queue` type based on DList 

    spawn(&worker, queue); 

    while (true) { 
     // Go through any messages (while consuming `queue`) 
     for (string message; queue) { 
      writeln("Received a message: ", text); 
     } 
     // Do other stuff 
    } 
} 

我一直在使用shared变量(https://tour.dlang.org/tour/en/multithreading/synchronization-sharing)尝试,但DMD抱怨说:“不许别名可变线程本地的数据。”或其他一些错误,具体情况。

这将如何在D中完成?或者,有没有办法使用消息来进行这种沟通?

回答

0

我已经得到了我需要的答案。

简单地说,使用core.thread而不是std.concurrencystd.concurrency为您管理邮件,并且不允许您自己管理邮件。 core.threadstd.concurrency在内部使用的。

较长的答案,这里是我如何完全实现它。

我创建了Queue类型,它基于Singly Linked List,但保留最后一个元素的指针。根据Walter Brights愿景(https://www.youtube.com/watch?v=cQkBOCo8UrE),Queue也使用标准组件inputRange和outputRange(或者至少我认为它)。 Queue也被构建为允许一个线程写入,另一个线程在内部很少进行读取操作,因此应该很快。
队列我共享这里https://pastebin.com/ddyPpLrp

一个简单的实现为具有第二螺纹读取输入:

Queue!string inputQueue = new Queue!string; 
ThreadInput threadInput = new ThreadInput(inputQueue); 
threadInput.start; 

while (true) { 
    foreach (string value; inputQueue) { 
     writeln(value); 
    } 
} 

ThreadInput被定义为这样的:

class ThreadInput : Thread { 
    private Queue!string queue; 

    this(Queue!string queue) { 
     super(&run); 
     this.queue = queue; 
    } 

    private void run() { 
     while (true) { 
      queue.put(readln); 
     } 
    } 
} 

代码https://pastebin.com/w5jwRVrL
Queue再次https://pastebin.com/ddyPpLrp

1

这不回答具体问题,但TI不明朗起来什么,我认为是消息传递API的误解......

就叫receiveTimeout而不是纯receive

http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html

+0

不,这不是我真正需要的,但我确实想念'receiveTimeout'我不知道如何。如果我无法获得其他任何工作,我可能会使用'receiveTimeout'来完成我所需要的工作。 –

+0

给receiveTimout一个负值,如-1会做你想做的。请参阅:https://stackoverflow.com/a/31624806/2026276 – Bauss

0

我使用这个:

shared class Queue(T) { 

    private T[] queue; 

    synchronized void opOpAssign(string op)(T object) if(op == "~") { 
     queue ~= object; 
    } 

    synchronized size_t length(){ 
     return queue.length; 
    } 

    synchronized T pop(){ 
     assert(queue.length, "Please check queue length, is 0"); 
     auto first = queue[0]; 
     queue = queue[1..$]; 
     return first; 
    } 

    synchronized shared(T[]) consume(){ 
     auto copy = queue; 
     queue = []; 
     return copy; 
    } 

}