2013-10-29 30 views
4

信号灯在Golang与信道来实现:Golang:如何超时信号量?

一个例子是这样的: https://sites.google.com/site/gopatterns/concurrency/semaphores

语境:

我们有几百个服务器,并有共享,我们要限制资源进入。因此,对于给定的资源,我们希望使用信号量来限制对这些服务器只有5个并发访问的访问。为了做到这一点,我们计划使用锁服务器。当一台机器访问资源时,它将首先向锁服务器注册它正在通过密钥访问资源。然后,当它完成时,它会向锁服务器发送另一个请求,表示完成并释放信号量。这确保我们将对这些资源的访问限制为最大数量的并发访问。

问题:想要优雅地处理这个,如果出现错误。

问题

你如何去对信号实现超时?

例子:

比方说,我有5旗语大小有同时存在10个进程试图获取信号量的锁所以在这种情况下,只有5将收购它。

有时候,进程会死而没有响应(真正的原因有点复杂的解释,但基本上有时进程可能无法解锁它),因此信号量中的空间现在永久锁定,从而导致问题。

所以我想对这个超时。这里有一些问题:

的过程将从秒2之间的任何地方运行长达60分钟。

我们有一些竞争条件,因为如果超时,然后进程试图解开它,那么我们已经揭开了信号,而不是一次两次。反之亦然,我们先解锁它,然后超时。

如何采取张贴以上,并把它变成与超时线程安全的旗语的建议图案?

+0

您的要求有多严格?您是否想要限制对资源的访问,或者如果超过5台服务器同时访问共享资源,是否存在硬故障模式? –

+0

有多种转到这里计数信号的例子:https://github.com/tarndt/sema – voidlogic

回答

1

这是一个有点难以弄清楚你想实现什么,但我可以告诉你想拥有的并发够程访问共享资源,并处理它优雅,如果事情不顺利。我对你如何处理这个问题有几点建议。

1)从同步包使用WaitGroup:http://golang.org/pkg/sync/#example_WaitGroup

使用此策略,你每次调用之前基本上添加到柜台到一个新的goroutine并使用延迟,以确保它从柜台中删除(因此,无论它会因为另一个原因超时或返回,它仍然会从柜台上移除)。然后,您使用wg.Wait()命令来确保它在所有返回例程返回之前不会再继续。下面是一个例子:http://play.golang.org/p/wnm24TcBZg请注意,如果没有wg.Wait(),它将不会在正在返回主程序和终止之前等待结束程序完成。

2)使用time.Ticker自动超时:http://golang.org/pkg/time/#Ticker

这种方法基本上都会设置一个计时器,这将在一定的时间间隔断火。您可以使用此计时器来控制基于时间的事件。基本上这必须在for循环中运行,等待通道被打勾,如下例所示:http://play.golang.org/p/IHeqmiFBSS

再次,不完全确定您要完成的工作,但您可以考虑将这两个以便如果您的过程超时并且处于循环中,则股票将捕获它并在一段时间后返回并调用延迟函数,以便等待它的代码部分继续前进。希望这至少有一点帮助。

+0

对不起,我会尽量解释更好。但是这基本上是一个分布式信号量服务器。限制在几百台机器上同时访问资源。因此推迟/ wait.Syncgroup不起作用。 time.AfterFunc或time.Ticker是一个好主意,但是,如果超时,然后处理回来,解开它呢? – samol

+0

弗兰,我只是修改我的问题,我希望现在更有意义? – samol

+0

好的,这里是我想出来的:http://play.golang.org/p/Q2VX25ov4T 它不是那么一回事,但我认为它确实有点你所要求的。评论几乎可以解释所发生的一切,但随时问问你是否有更多问题。代码有点复杂,所以它并不实际在游乐场中运行,但它会在我的系统上运行,直至遇到死锁,但您应该可以根据自己的目的对其进行修改以避免这种情况。 – Verran

0

一些假设:

  • 你需要各地 5个服务器,让过去在锁定时间服务器。
  • 访问该资源的时间较短且类似。

使用配额服务器而不是锁定服务器。以5倍的平均(平均值,第75等)访问/锁定时间补充配额(一个简单的计数器)。只有在小于最大值的情况下才补充配额。平均来说,您将保持大约5个并发访问/锁定。

一些高级功能:

  • 如果共享资源可以检测到它自己的负载它可以告诉它可能需要更多或更少的并发访问配额服务器。
  • 当配额服务器完成后,服务器可以对配额服务器执行ping操作。这不是必需的,但是可以更快地释放资源。
1

由于您正在制作分布式锁定服务,我假设您的锁定服务器在端口上进行侦听,并且当您接受()您循环的连接时,将等待每个连接的goroutine中的命令。当套接字丢失时,该goroutine退出(即:远程节点崩溃)

因此,假设这是事实,您可以做几件事情。

1)创建与深度匹配的沟道多少个并发锁 2)时锁定,将消息发送到该信道(它将如果全框) 3)当解锁,刚读从一个消息通道 4)你可以“推迟释放()”(如果你已经锁定,释放会消耗一条消息)

这里是一个粗略的工作示例,除socket之外的所有东西。 希望它是有道理的。 http://play.golang.org/p/DLOX7m8m6q

package main 

import "fmt" 

import "time" 

type Locker struct { 
    ch chan int 
    locked bool 
} 

func (l *Locker) lock(){ 
    l.ch <- 1 
    l.locked=true 
} 
func (l *Locker) unlock() { 
    if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock 
     l.locked = false // avoid unlocking twice if socket crashes after unlock 
     <- l.ch 
    } 
} 

func dostuff(name string, locker Locker) { 
    locker.lock() 
    defer locker.unlock() 
    fmt.Println(name,"Doing stuff") 
    time.Sleep(1 * time.Second) 
} 

func main() { 
    ch := make(chan int, 2) 
    go dostuff("1",Locker{ch,false}) 
    go dostuff("2",Locker{ch,false}) 
    go dostuff("3",Locker{ch,false}) 
    go dostuff("4",Locker{ch,false}) 
    time.Sleep(4 * time.Second) 
} 
+0

嘿大卫,谢谢你的回答。然而,dostuff部分实际上是在客户端。我们需要超时信号量的原因是客户端可能死机(服务器可能停机)。所以信号量本身必须知道和超时锁定。不知道这是否有道理? – samol

+0

@samol所以有'dostuff'等待在客户端告诉说,它的完成*或*客户端的连接走开,在后一种情况下,你可以假设它的失败,并释放资源(或之后释放额外的超时或在看到客户端是否重新连接之后,无论如何)。即要求客户至少在使用资源的时间内保持TCP连接。 –

0

也许这会有所帮助,但我认为这是实现扩张太
我会感谢有关代码的任何建议。

package main 

import (
    "fmt" 
    "time" 
    "math/rand" 
    "strconv" 
) 

type Empty interface{} 

type Semaphore struct { 
    dur time.Duration 
    ch chan Empty 
} 

func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) { 
    sem = new(Semaphore) 
    sem.dur = dur 
    sem.ch = make(chan Empty, max) 
    return 
} 

type Timeout struct{} 

type Work struct{} 

var null Empty 
var timeout Timeout 
var work Work 

var global = time.Now() 

func (sem *Semaphore) StartJob(id int, job func()) { 
    sem.ch <- null 
    go func() { 
     ch := make(chan interface{}) 
     go func() { 
      time.Sleep(sem.dur) 
      ch <- timeout 
     }() 
     go func() { 
      fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global)) 
      job() 
      ch <- work 
     }() 
     switch (<-ch).(type) { 
     case Timeout: 
      fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global)) 
     case Work: 
      fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global)) 
     } 
     <-sem.ch 
    }() 
} 

func main() { 
    rand.Seed(time.Now().Unix()) 
    sem := NewSemaphore(3, 3*time.Second) 
    for i := 0; i < 10; i++ { 
     id := i 
     go sem.StartJob(i, func() { 
      seconds := 2 + rand.Intn(5) 
      fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs") 
      time.Sleep(time.Duration(seconds) * time.Second) 
     }) 
    } 
    time.Sleep(30 * time.Second) 
}