2016-07-15 77 views
0

我想将从频道接收的数据广播到频道列表。频道列表是动态的,可以在运行阶段修改。在Go中通过多个频道广播频道

作为Go中的新开发者,我写了这段代码。我发现我想要的东西很重。有一个更好的方法吗?

package utils 

import "sync" 

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels 
type StringChannelBroadcaster struct { 
    Source  chan string 
    Subscribers map[string]*StringChannelSubscriber 
    stopChannel chan bool 
    mutex  sync.Mutex 
    capacity uint64 
} 

// NewStringChannelBroadcaster creates a StringChannelBroadcaster 
func NewStringChannelBroadcaster(capacity uint64) (b *StringChannelBroadcaster) { 
    return &StringChannelBroadcaster{ 
     Source:  make(chan string, capacity), 
     Subscribers: make(map[string]*StringChannelSubscriber), 
     capacity: capacity, 
    } 
} 

// Dispatch starts dispatching message 
func (b *StringChannelBroadcaster) Dispatch() { 
    b.stopChannel = make(chan bool) 
    for { 
     select { 
     case val, ok := <-b.Source: 
      if ok { 
       b.mutex.Lock() 
       for _, value := range b.Subscribers { 
        value.Channel <- val 
       } 
       b.mutex.Unlock() 
      } 
     case <-b.stopChannel: 
      return 
     } 
    } 
} 

// Stop stops the Broadcaster 
func (b *StringChannelBroadcaster) Stop() { 
    close(b.stopChannel) 
} 

// StringChannelSubscriber defines a subscriber to a StringChannelBroadcaster 
type StringChannelSubscriber struct { 
    Key  string 
    Channel chan string 
} 

// NewSubscriber returns a new subsriber to the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) NewSubscriber() *StringChannelSubscriber { 
    key := RandString(20) 
    newSubscriber := StringChannelSubscriber{ 
     Key:  key, 
     Channel: make(chan string, b.capacity), 
    } 
    b.mutex.Lock() 
    b.Subscribers[key] = &newSubscriber 
    b.mutex.Unlock() 

    return &newSubscriber 
} 

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) RemoveSubscriber(subscriber *StringChannelSubscriber) { 
    b.mutex.Lock() 
    delete(b.Subscribers, subscriber.Key) 
    b.mutex.Unlock() 
} 

谢谢

朱利安

+0

有时候,代码感觉“沉重”,因为低级别操作周围没有语法糖包装。这对我来说似乎是一种正常的做法;你想看到什么“更轻”? –

回答

1

我想你可以把它简化一下:摆脱stopChannelStop方法。您可以关闭Source而不是调用Stop,并检测Dispatch(ok将为false)以退出(您可以实际在源通道范围内)。

您可以摆脱Dispatch,并且只需在for循环中启动NewStringChannelBroadcaster的goroutine,因此外部代码无需单独启动调度循环。

您可以使用通道类型作为映射键,因此您的映射可以变为​​(空结构,因为您不需要映射值)。因此,您的NewSubscriber可以采用通道类型参数(或创建一个新通道并返回),并将其插入地图中,而不需要随机字符串或StringChannelSubscriber类型。

我也做了一些改进,如关闭用户渠道:

package main 

import "sync" 

import (
    "fmt" 
    "time" 
) 

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels 
type StringChannelBroadcaster struct { 
    Source  chan string 
    Subscribers map[chan string]struct{} 
    mutex  sync.Mutex 
    capacity uint64 
} 

// NewStringChannelBroadcaster creates a StringChannelBroadcaster 
func NewStringChannelBroadcaster(capacity uint64) *StringChannelBroadcaster { 
    b := &StringChannelBroadcaster{ 
     Source:  make(chan string, capacity), 
     Subscribers: make(map[chan string]struct{}), 
     capacity: capacity, 
    } 
    go b.dispatch() 
    return b 
} 

// Dispatch starts dispatching message 
func (b *StringChannelBroadcaster) dispatch() { 
    // for iterates until the channel is closed 
    for val := range b.Source { 
     b.mutex.Lock() 
     for ch := range b.Subscribers { 
      ch <- val 
     } 
     b.mutex.Unlock() 
    } 
    b.mutex.Lock() 
    for ch := range b.Subscribers { 
     close(ch) 
     // you shouldn't be calling RemoveSubscriber after closing b.Source 
     // but it's better to be safe than sorry 
     delete(b.Subscribers, ch) 
    } 
    b.Subscribers = nil 
    b.mutex.Unlock() 
} 

func (b *StringChannelBroadcaster) NewSubscriber() chan string { 
    ch := make(chan string, b.capacity) 
    b.mutex.Lock() 
    if b.Subscribers == nil { 
     panic(fmt.Errorf("NewSubscriber called on closed broadcaster")) 
    } 
    b.Subscribers[ch] = struct{}{} 
    b.mutex.Unlock() 

    return ch 
} 

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) RemoveSubscriber(ch chan string) { 
    b.mutex.Lock() 
    if _, ok := b.Subscribers[ch]; ok { 
     close(ch)     // this line does have to be inside the if to prevent close of closed channel, in case RemoveSubscriber is called twice on the same channel 
     delete(b.Subscribers, ch) // this line doesn't need to be inside the if 
    } 
    b.mutex.Unlock() 
} 

func main() { 
    b := NewStringChannelBroadcaster(0) 

    var toberemoved chan string 

    for i := 0; i < 3; i++ { 
     i := i 

     ch := b.NewSubscriber() 
     if i == 1 { 
      toberemoved = ch 
     } 
     go func() { 
      for v := range ch { 
       fmt.Printf("receive %v: %v\n", i, v) 
      } 
      fmt.Printf("Exit %v\n", i) 
     }() 
    } 

    b.Source <- "Test 1" 
    b.Source <- "Test 2" 
    // This is a race condition: the second reader may or may not receive the first two messages. 
    b.RemoveSubscriber(toberemoved) 
    b.Source <- "Test 3" 

    // let the reader goroutines receive the last message 
    time.Sleep(2 * time.Second) 

    close(b.Source) 

    // let the reader goroutines write close message 
    time.Sleep(1 * time.Second) 
} 

https://play.golang.org/p/X-NcikvbDM

编辑:我加你的编辑关闭Source后调用RemoveSubscriber当修复恐慌,但你不应该不要这样做,你应该在通道关闭后让结构和其中的所有内容都被垃圾回收。 如果在关闭Source之后调用NewSubscriber,我也增加了一个恐慌。以前你可以做到这一点,它会泄漏创建的频道,大概是在那个频道上永远屏蔽的goroutine。

如果您可以在已关闭的广播公司上致电NewSubscriber(或RemoveSubscriber),这可能意味着您的代码在某处存在错误,因为您坚持不应该是广播公司。