2012-05-31 22 views
4

我正在寻找一种方法来强制JGroups使用特定的服务器作为协调器,如果该服务器不存在,则选择一个新的协调器,直到指定的那个人重新加入集群并且它成为协调员。如何强制JGroups使哪个节点成为协调者?

在这种情况下,我们收集了一些信息,我们通过协调器监听主题获取更新,但是提取&处理这些更新可能是资源密集型的,所以我们不希望它向服务器提供任何服务外面的世界。因此,在集群前面的负载均衡器中,我们已将其设置为不发送给协调器。但是因为协调员是随机选出的,所以我们基本上需要关闭集群,直到只有单个机器在那里,然后启动集群的其余部分。

+1

延长'org.jgroups.protocols.pbcast.GMS'并覆盖'地址determineCoordinator()' 。您可以添加一个getter/setter并指定首选地址作为协调者。如果节点在线(“成员”的一部分),它将被选中,如果没有调用super.determineCoordinator()。这或多或少。 – bestsss

回答

5

目前没有办法做到这一点。 Jgroups花费了相当多的时间确保协调器可以是组中的任何节点。维护和监视组成员列表健康状况的所有任务都由组中所有成员共享,以确保协调员的职责不会过多地影响协调员的绩效。标准的GMS(Group MembershipService)协议栈类是负责协调器选择的。目前它只是视图列表中的第一个主机。

为了得到这种行为,你将不得不实现你自己的协议栈。有趣的是,我一直在为Jgroups开发一个协议栈,它大致实现了你所要求的,但是它尚未准备好迎接黄金时段。

然而,其他人可能对这个问题采取了一些措施。我会建议在jgroups mailing list上发帖并询问相同的问题。

0

您可以将所需的节点设置为协调器。 : github sample

,我添加同步块更改完成所有节点和完整代码:

public static final String GMS_DELTA_VIEW_FIELD_NAME = "use_delta_views"; 

/** 
* Change coordinator to {@code desiredCoordinator}. Must be invoked from coordinator. 
* @param desiredCoordinator 
* @return {@code true} if changes success, {@code false} overwise 
*/ 
boolean changeCoordinator(JChannel currentChannel, Address desiredCoordinator) { 

    if(!Util.isCoordinator(currentChannel.getAddress)) { 
     throw new RuntimeException("The current node is not coordinator."); 
    } 

    ArrayList<Address> newMembersOrder = Lists.newArrayList(currentView.getMembers());   

    // Switch desired node to first place 
    Collections.swap(newMembersOrder, 0, newMembersOrder.indexOf(desiredCoordinator));   

    // Create new view 
    long newId = currentView.getViewId().getId() + 1; 
    View newView = new View(newMembersOrder.get(0), newId, newMembersOrder); 

    GMS gms = (GMS)clusterChannel.getProtocolStack().findProtocol(GMS.class); 
    CustomProtocol protocol = new CustomProtocol(newMembersOrder.stream() 
      .filter(item -> !item.equals(currentChannel.getAddress())) 
      .collect(Collectors.toSet())); 

    boolean oldUseDeltaViews = (Boolean)gms.getValue(GMS_DELTA_VIEW_FIELD_NAME); 
    try { 
     // Disable using_delta_views at GMS 
     gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, false); 

     // Insert custom protocol below GMS for synchronizing with VIEW_ACK events 
     currentChannel.getProtocolStack().insertProtocolInStack(protocol, gms, ProtocolStack.BELOW); 
     gms.castViewChange(newView, null, newMembersOrder); 

     // Wait no more than 30 seconds to all VIEW_ACK responses 
     if (!protocol.collector.waitForAllAcks(TimeUnit.SECONDS.toMillis(30))) {     
      return false; 
     } 

     return true; 
    } 
    finally { 
     // Repair old state 
     gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, oldUseDeltaViews); 
     currentChannel.getProtocolStack().removeProtocol(protocol); 
    } 
} 

private class CustomProtocol extends Protocol implements UpHandler { 

    AckCollector collector; 

    public CustomProtocol(Collection<Address> waitedAddresses) { 
     collector = new AckCollector(waitedAddresses); 
    } 

    @Override 
    public Object up(Event evt) { 

     if(evt.getType() == Event.MSG) { 
      final Message msg=(Message)evt.getArg(); 
      GmsHeader hdr=(GmsHeader)msg.getHeader(proto_id); 
      if(hdr != null && hdr.getType() == GmsHeader.VIEW_ACK) {      
       collector.ack(msg.getSrc()); 
      } 
     } 

     return super.up(evt); 
    } 
} 
相关问题