2013-03-07 52 views
0

我想创建一个使用JGroups的领导选举协议,以便我的程序的N个实例可以选择一个主节点,所有客户端都可以获得这个主节点的IP。 或多或少,当前的实现依赖于每个实例试图获取锁定通道的锁定,并且当它成功获取该通道时,它变成主设备,其他所有设备都切换到客户端。JGroups没有与UDP形成集群

import java.net.InetAddress; 
import java.net.NetworkInterface; 
import java.util.*; 
import org.jgroups.*; 
import org.jgroups.blocks.locking.LockService; 

public class AutoDiscovery 
{ 
    static org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AutoDiscovery.class); //used for logging purposes (see log4j library) 
    /* this variable indicates whether I have become the master or I'm just a client*/ 
    public volatile AtomicBoolean becomeMaster = new AtomicBoolean(false); 
    /* The address of the server if we are a client or of ourself if we are 
    * server */ 
    public String serverAddress; 
    /* A channel on which to acquire a lock, so that only one can become server */ 
    private JChannel lockChannel; 
    /* A shared channel ffor communication between client and master*/ 
    private JChannel communicationChannel; 
    private LockService lockService; 
    /* A thread which tries to acquire a lock */ 
    private Thread acquiringThread; 
    /* A thread which listens for the server ip which may change */ 
    private Thread listeningThread; 
    /* A thread which lists the status and initializes the acquiring thread*/ 
    private Thread statusThread; 
    private String name; 
    /* If we pass from being a client to being a server we must stop the listening 
    * thread however we cannot call listeningThread.stop() but instead we change 
    * the stopListening boolean to true */ 
    private boolean stopListening = false; 
    /* This lock communicates I have finally become either master or client so 
    * the serverAddress and becomeMaster variables are correctly set */ 
    public final Object finishedLock = new Object(); 

    public static void main(String[] args) throws Exception 
    { 
     Thread.currentThread().setName("MyMainThread"); 
     Random rand = new Random(); 

     AutoDiscovery master = new AutoDiscovery("Node" + rand.nextInt(10)); 

     master.lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml")); 
     master.lockChannel.connect("lock-channel"); 

     master.communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml")); 
     master.communicationChannel.connect("communication-channel"); 

     master.lockService = new LockService(master.lockChannel); 
     master.startStatusPrinterThread(); 
    } 

    public AutoDiscovery(String name) 
    { 
     this.name = name; 
    } 

    public AutoDiscovery() 
    { 
     try 
     { 
      Thread.currentThread().setName("MyMainThread"); 
      Random rand = new Random(); 

      this.name = ("Node" + rand.nextInt(10)); 

      lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml")); 
      lockChannel.connect("lock-channel"); 

      communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml")); 
      communicationChannel.connect("communication-channel"); 

      lockService = new LockService(lockChannel); 
      startStatusPrinterThread(); 
     } 
     catch (Exception ex) 
     { 
      Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void startAcquiringThread() 
    { 
     acquiringThread = new Thread() 
     { 
      @Override 
      public void run() 
      { 
       while (true) 
       { 
        //if you have become Master send your ip every now and then 
        if (becomeMaster.get()) 
        { 
         try 
         { 
          communicationChannel.send(new Message(null, null, "serverip " + serverAddress)); 
         } 
         catch (Exception ex) 
         { 
          Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex); 
         } 
        } 
        else 
        { 
         try 
         { 
          Thread.currentThread().setName(name + "AcquiringThread"); 
          Lock lock = lockService.getLock("serverLock"); 
          if (lock.tryLock(4, TimeUnit.SECONDS)) 
          { 
           becomeMaster.set(true); 
           stopListening = true; 
           /* Now that I'm server I must find out my own ip address on which to listen */ 
           Enumeration<NetworkInterface> networkInterfaces; 
           try 
           { 
            networkInterfaces = NetworkInterface.getNetworkInterfaces(); 
            for (NetworkInterface netint : Collections.list(networkInterfaces)) 
            { 
             Enumeration<InetAddress> inetAddresses = netint.getInetAddresses(); 
             for (InetAddress inetAddress : Collections.list(inetAddresses)) 
             { 
              if (isIPAddress(inetAddress.getHostAddress()) 
                && !inetAddress.getHostAddress().equals("127.0.0.1")) 
              { 
               serverAddress = inetAddress.getHostAddress(); 
              } 
             } 
            } 
            /* I notify to the rest of the program I have correctly initialized 
            * becomeMaster and serverAddress */ 
            synchronized (finishedLock) 
            { 
             finishedLock.notify(); 
            } 
           } 
           catch (Exception e) 
           { 
            Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, e); 
            System.exit(0); 
           } 
           log.info(Thread.currentThread().getName() + ": I acquired lock! will become master! my ip is " + serverAddress); 
          } 
          else 
          { 
           becomeMaster.set(false); 
           stopListening = false; 
           if (listeningThread == null || !listeningThread.isAlive()) 
           { 
            if (!stopListening) //??? this codnition might be useless 
            { 
             startListeningThread(); 
            } 
           } 
          } 
         } 
         catch (Exception e) 
         { 
          e.printStackTrace(); 
         } 
        } 
        try 
        { 
         sleep(5000L); 
        } 
        catch (InterruptedException ex) 
        { 
         Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     }; 
     acquiringThread.setDaemon(true); 
     acquiringThread.start(); 
    } 

    public void startListeningThread() 
    { 
     listeningThread = new Thread() 
     { 
      @Override 
      public void run() 
      { 
       try 
       { 
        while (true) 
        { 
         Thread.currentThread().setName(name + "ListeningThread"); 
         communicationChannel.setReceiver(new ReceiverAdapter() 
         { 
          @Override 
          public void receive(Message msg) 
          { 
           if (msg.getObject() != null) 
           { 
            String leaderServerAddress = (msg.getObject().toString().substring(9)); 
            if (isIPAddress(leaderServerAddress)) 
            { 
             serverAddress = leaderServerAddress; 
             log.info(name + " Master server has ip" + serverAddress); 
             /* I notify to the rest of the program I have correctly initialized 
             * becomeMaster and serverAddress */ 
             synchronized (finishedLock) 
             { 
              finishedLock.notify(); 
             } 
            } 
            else 
            { 
             log.info(name + ": discarded message " + msg.getObject().toString()); 
            } 
           } 
          } 
         }); 
         sleep(10000L); 
         if (stopListening) 
         { 
          return; 
         } 
        } 
       } 
       catch (Exception e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     }; 
     listeningThread.setDaemon(true); 
     listeningThread.start(); 
    } 

    private void startStatusPrinterThread() 
    { 
     statusThread = new Thread() 
     { 
      @Override 
      public void run() 
      { 
       Thread.currentThread().setName(name + "StatusPrinterThread"); 
       startAcquiringThread(); 
       while (true) 
       { 
        try 
        { 
         if (becomeMaster.get()) 
         { 
          log.info(name + " startStatusPrinterThread(): I am happily a Master!"); 
         } 
         else 
         { 
          if (!acquiringThread.isAlive()) 
          { 
           startAcquiringThread(); 
          } 
         } 
         sleep(5000L); 
        } 
        catch (InterruptedException e) 
        { 
         e.printStackTrace(); 
        } 
       } 
      } 
     }; 
     statusThread.setDaemon(true); 
     statusThread.start(); 
    } 

    private static boolean isIPAddress(String str) 
    { 
     Pattern ipPattern = Pattern.compile("^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\." 
       + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\." 
       + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\." 
       + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])$"); 
     return ipPattern.matcher(str).matches(); 
    } 
} 

现在我的当前udp.xml是

<config xmlns="urn:org:jgroups" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd"> 
    <UDP 
     mcast_port="${jgroups.udp.mcast_port:45588}" 
     tos="8" 
     ucast_recv_buf_size="20M" 
     ucast_send_buf_size="640K" 
     mcast_recv_buf_size="25M" 
     mcast_send_buf_size="640K" 
     loopback="true" 
     level="WARN" 
     log_discard_msgs="false" 
     max_bundle_size="64K" 
     max_bundle_timeout="30" 
     ip_ttl="${jgroups.udp.ip_ttl:8}" 
     enable_diagnostics="true" 
     thread_naming_pattern="cl" 

     timer_type="new" 
     timer.min_threads="4" 
     timer.max_threads="10" 
     timer.keep_alive_time="3000" 
     timer.queue_max_size="500" 

     thread_pool.enabled="true" 
     thread_pool.min_threads="2" 
     thread_pool.max_threads="8" 
     thread_pool.keep_alive_time="5000" 
     thread_pool.queue_enabled="true" 
     thread_pool.queue_max_size="10000" 
     thread_pool.rejection_policy="discard" 

     oob_thread_pool.enabled="true" 
     oob_thread_pool.min_threads="1" 
     oob_thread_pool.max_threads="8" 
     oob_thread_pool.keep_alive_time="5000" 
     oob_thread_pool.queue_enabled="false" 
     oob_thread_pool.queue_max_size="100" 
     oob_thread_pool.rejection_policy="Run"/> 

    <PING timeout="2000" 
      num_initial_members="3"/> 
    <MERGE2 max_interval="30000" 
      min_interval="10000"/> 
    <FD_SOCK/> 
    <FD_ALL/> 
    <VERIFY_SUSPECT timeout="1500" /> 
    <BARRIER /> 
    <pbcast.NAKACK exponential_backoff="300" 
        xmit_stagger_timeout="200" 
        use_mcast_xmit="false" 
        discard_delivered_msgs="true"/> 
    <UNICAST /> 
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" 
        max_bytes="4M"/> 
    <pbcast.GMS print_local_addr="true" join_timeout="3000" 
       view_bundling="true"/> 
    <UFC max_credits="2M" 
     min_threshold="0.4"/> 
    <MFC max_credits="2M" 
     min_threshold="0.4"/> 
    <FRAG2 frag_size="60K" /> 
    <pbcast.STATE_TRANSFER /> 
    <CENTRAL_LOCK /> 
    <!-- pbcast.FLUSH /--> 
</config> 

现在上述工程当我运行在同一计算机上的程序的N个实例(具有N-1的成员成为客户机和1成为主) 。 当两台连接到同一局域网的不同计算机上运行时,显然在每个成员使用相同的clustername调用JChannel.connect()之后,每个成员都创建它的通道,并且没有创建公共集群。结果是,在向客户端发送消息时,另一个主服务器为同一群集名称看到不同的物理地址,并且所有消息都被丢弃。

所以我得到这样的警告:

7683 [Incoming-1,communication-channel,pc-home-41714] WARN org.jgroups.protocols.pbcast.NAKACK - [JGRP00011] pc-home-41714: dropped message 293 from non-member cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d (view=[pc-home-41714|0] [pc-home-41714]) 

1207996 [TransferQueueBundler,communication-channel,pc-home-5280] WARN org.jgroups.protocols.UDP - pc-home-5280: no physical address for cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d, dropping message 
1209526 [TransferQueueBundler,lock-channel,pc-home-59082] WARN org.jgroups.protocols.UDP - pc-home-59082: no physical address for efbe6408-0e21-d119-e2b8-f1d5762d9b45, dropping message 

如果我改变udp.xml回环=“true”将回送=“假”会发生什么情况是,它们都连接到同一个集群,但随后他们给的错误,如:

55539 [Node0StatusPrinterThread] INFO plarz.net.planningig.autodiscovery.AutoDiscovery - Node0 startStatusPrinterThread(): I am happily a Master! 
59077 [TransferQueueBundler,lock-channel,pc-test-6919] ERROR org.jgroups.protocols.UDP - pc-test-6919: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:43109 (130 bytes):, cause: java.io.IOException: Network is unreachable 
59505 [TransferQueueBundler,communication-channel,pc-test-35303] ERROR org.jgroups.protocols.UDP - pc-test-35303: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:55053 (139 bytes):, cause: java.io.IOException: Network is unreachable 
+0

有人在另一个论坛建议呼吁-Djava.net .preferIPv4Stack = true为了禁用IPV6,但在我的情况下,它没有区别。 – dendini 2013-03-07 14:37:19

+0

看看你的机器域名解析为什么。您可以尝试-Djgroups.bind_address。检查您的防火墙以允许机器之间的所有流量(您可以关闭它,直到它启动并稍后调整它)。检查您的系统组播路由是否设置为正确的网络接口。 – akostadinov 2013-03-15 14:38:18

回答

0

Error:

[Server:ha-server-3] 13:59:13,122 WARNING [org.jgroups.protocols.UDP] (OOB-15,null) null: no physical address for 766de5c9-8ac2-6d30-89ef-78d39aa5f7eb, dropping message

In My case, it was due to having multiple jboss clusters at a same network and each of the clusters were having same name. For example ha-server-1 and ha-server-2 existed at two different clusters in different machines.

Cluster-1(10.10.10.10): | +- ha-server-1 +- ha-server-2

Cluster-2(10.10.10.20): | +- ha-server-1 +- ha-server-2

I have resolved this problem by changing the ha-server names. Note: Both were independent cluster. I assume it happened due to the multicast issue of JGroups. Any further explanation from an expert like you will be nice.

http://icfun.blogspot.com/2013/10/no-physical-address-for-766de5c9-8ac2.html