以下是我想,像你的当前实现:
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
q = EM::Queue.new
workers = Array.new(10) { Worker.new q }
上面的问题,如果我理解正确的话,就是你不想工人新职位(工种的工作是已经在制作人时间表早些时候到达),而不是任何reload_credentials作业。以下内容应该为此服务(最后要谨慎的话)。
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
class LockingDispatcher
def initialize channel, queue
@channel = channel
@queue = queue
@backlog = []
@channel.subscribe method(:dispatch_with_locking)
@locked = false
end
def dispatch_with_locking item
if locked?
@backlog << item
else
# You probably want to move the specialization here out into a method or
# block that's passed into the constructor, to make the lockingdispatcher
# more of a generic processor
case item.type
when :reload_credentials
lock
deferrable = CredentialReloader.new(item).start
deferrable.callback { unlock }
deferrable.errback { unlock }
else
dispatch_without_locking item
end
end
end
def dispatch_without_locking item
@queue << item
end
def locked?
@locked
end
def lock
@locked = true
end
def unlock
@locked = false
bl = @backlog.dup
@backlog.clear
bl.each { |item| dispatch_with_locking item }
end
end
channel = EM::Channel.new
queue = EM::Queue.new
dispatcher = LockingDispatcher.new channel, queue
workers = Array.new(10) { Worker.new queue }
因此,输入到第一系统自带在q
,但在这个新系统中谈到在channel
。 queue
仍然用于工作人员之间的工作分配,但queue
未在刷新凭证操作进行时填充。不幸的是,由于我没有花更多的时间,我还没有推广LockingDispatcher
,因此它没有与调度CredentialsReloader
的物品类型和代码相结合。我会把它留给你。
你应该在这里注意到,虽然这服务于我对原始请求的理解,但通常放松这种要求会更好。有迹象表明,本质上不能没有在要求改变被消灭几个突出问题:
- 系统不等待执行作业开始凭据工作
- 前完成系统将处理凭证的工作非常糟糕的爆发 - 其他可能可处理的项目将不会。
- 如果证书代码存在错误,积压可能会填满内存并导致失败。一个简单的超时可能足以避免灾难性后果,如果代码是可中止的,并且后续消息可以充分处理以避免进一步的死锁。
它实际上听起来像你有一些在系统中的userid的概念。如果您仔细考虑了需求,则很可能您只需要将与属于凭据处于刷新状态的用户ID相关的项目积压起来。这是一个不同的问题,涉及不同类型的调度。尝试为那些用户锁定待办事项的哈希值,回调凭证完成以将这些待办事项排除在工作人员身上或类似的安排。
祝你好运!
但是,可以多次接收reload_credentials消息。不应该像2线程?保持排队和正在处理的一个? – Geo
是的,如果在处理另一个reload_credentials时收到reload_credentials,它将像其他消息一样排队。 – simulacre
应该像第一个那样处理多个reload_credentials消息。通过将reload_credentials放入EM.defer块中,您可以在另一个线程中执行它。只要你的'处理'代码是非阻塞的,你就会不断收到消息。使用EM兼容库来确保您不会阻止。或者使用EM.defer进行处理。 – simulacre