2017-10-18 64 views
4

我一直在尝试使用Crystal和Kemal创建一个非阻塞服务器,它将(a)侦听发送给它的UDP消息流,然后(b)然后转发该消息发送给任何已启动ws连接的浏览器的WebSocket。使用Crystal/Kemal来侦听UDP数据包

到目前为止,我可以管理最好的是:

require "kemal" 
require "socket" 

server = UDPSocket.new 
server.bind "localhost", 1234 
puts "Started..." 

ws "/" do |socket| 

    udp_working = true 

    while udp_working 
     message, client_addr = server.receive 
     socket.send message 
    end 

    socket.on_close do 
     puts "Goodbye..." 
     udp_working = false 
    end 
end 

这一切似乎有点不雅,而事实上,没有按预期工作,因为:

  • 在所有发送的UDP数据包在启动的Crystal服务器和连接到Crystal服务器的第一个Web浏览器之间缓存并发送一个巨大的积压文件
  • 从WebSockets断开连接的浏览器未正确处理,即没有触发socket.on_close,第二循环继续,直到我终止水晶服务器

我希望的是server.on_message型处理,这将使我只有在收到UDP数据包,而不是持续轮询哪些块服务器来运行代码。有没有另外一种方法可以使用Crystal/Kemal来实现这一点?

谢谢!

回答

3

有几个问题你的方法:

首先,socket.on_close不行,因为从来没有达到这条线。 while循环将运行的时间长度只有udp_working == true,并且只会在on_close钩子中设置为false

如果你不想要UDP数据报堆积起来,你需要从头开始接收它们,并且如果没有连接websocket,你可以做任何你想做的事情(也许处理?)。 UDPServer没有on_message挂钩,但receive已经是非阻塞的。所以你可以在一个循环中(在它自己的光纤中)运行它,并在方法返回时采取行动。详情请参阅Crystal Concurrency;还有一个使用TCPSocket的例子,但UDP在这方面应该类似。

3

Crystal为您处理非阻塞,您需要在单独的光纤中编写阻止代码并使用通道进行通信。 Crystal将在幕后使用非阻塞代码和select()调用。此外,您需要一个框架或一些自己的代码来将收到的消息复制到每个侦听的websocket。这通常称为发布/订阅或发布/订阅。


...

ws "/" do |socket| 

    udp_working = true 

    while udp_working 
     message, client_addr = server.receive 
     socket.send message 
    end 

    socket.on_close do 
     puts "Goodbye..." 
     udp_working = false 
    end 
end 

即socket.on_close没有被触发,

您想运行socket.on_close第一,否则事件处理赢得不会被添加,因此直到循环之后才会运行,但由于此循环,循环实际上是无限的。另外,如果在检查udp_working var和调用#send

之间关闭套接字,则由于套接字关闭而导致 socket.send message错误的可能性很小