2014-05-17 42 views
1

我使用Spring集成TCP服务器,它保持连接到几千客户端。我需要服务器来限制客户端在负载过重的情况下不会丢失消息。春季集成 - 可靠的TCP高容量应用程序

我的服务器配置:

<task:executor id="myTaskExecutor" 
    pool-size="4-8" 
    queue-capacity="0" 
    rejection-policy="CALLER_RUNS" /> 

<int-ip:tcp-connection-factory id="serverTcpConFact" 
    type="server" 
    port="60000" 
    using-nio="true" 
    single-use="false" 
    so-timeout="300000" 
    task-executor="myTaskExecutor" /> 

<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter" 
    channel="tcpInbound" 
    connection-factory="serverTcpConFact" /> 

<channel id="tcpInbound" /> 

<service-activator input-channel="tcpInbound" 
    ref="myService" 
    method="test" /> 

<beans:bean id="myService" class="org.test.tcpserver.MyService" /> 

由于连接工厂默认任务执行者是无界的,我用一个汇集任务执行,以防止内存不足的错误。

一个简单的客户端负载测试:

public class TCPClientTest { 
    static Socket socket; 
    static List<Socket> sl = new ArrayList<>(); 
    static DataOutputStream out; 

    public static void main(String[] args) throws Exception { 
     for (int i = 0; i < 10000; i++) { 
      socket = new Socket("localhost", 60000); 
      sl.add(socket); 
      out = new DataOutputStream(socket.getOutputStream()); 
      out.writeBytes("connection " + i + "\r\n"); 
      System.out.println("Using connection #" + i); 
     } 
     System.in.read(); 
    } 
} 

当我运行它,服务器只接收大约10-20消息,然后客户端获得“连接被拒绝:连接”异常。之后,即使在连接超时之后,服务器也不能再接受任何新的连接。增加池大小仅有助于获得更多的消息。

编辑

我使用Spring集成3.0.2.RELEASE。对于生产,我使用8-40个线程,但它仅在几百次连接之后才会使此测试失败。

MyService.test()并没有做太多......

public class MyService { 
    public void test(byte[] input) { 
     System.out.println("Received: " + new String(input)); 
    } 
} 

Here is the log with trace level logging.

Sources

+1

什么版本的Spring Integration? 'MyService.test()'做了什么?由于你只是在每个套接字上发送一条短消息,所以我不希望这个测试用例有任何线程问题(尽管4-8个线程可能完全不适合具有这个套接字数量的实际应用程序)。我建议你打开服务器端的跟踪级日志记录。 –

+0

@Gary Russell编辑我的问题。 – John29

回答

2

我看看有什么问题,请打开JIRA issue

问题是在执行程序中带有0长度队列的CALLER_RUNS拒绝策略。

有一个线程可以处理所有IO事件(通常为myTaskExecutor-1);当读取事件触发时,他会排队执行以读取数据;阅读器线程排队执行以组装数据(这将阻塞,直到完成消息 - 在您的情况下由CRLF终止)到达)。

在这种情况下,当没有线程可用时,CALLER_RUNS策略意味着IO选择器线程进行读取操作,并成为汇编程序线程,这会阻止等待因为被阻塞而不会到达的数据,在调度不同的线程后阻止读取数据。因为他被封锁,他无法处理新的接受事件。

这里是我的测试日志显示的问题...

TRACE: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioServerConnectionFactory - Port 60000 SelectionCount: 2 
DEBUG: [May-18 10:43:38,923][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Reading... 
DEBUG: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Running an assembler 
TRACE: [May-18 10:43:38,924][myTaskExecutor-1] tcp.connection.TcpNioConnection - localhost:58509:60000:bdc36c59-c31b-470e-96c3-6270e7c46a2f Nio message assembler running... 
DEBUG: [May-18 10:43:38,926][myTaskExecutor-1] tcp.serializer.ByteArrayCrLfSerializer - Available to read:0 

第二行显示选择的线程被用来做读取;他检测到此套接字需要汇编程序,并成为汇编程序,阻塞,等待数据。

你真的相信会有使用无限任务执行程序的问题吗?这些事件通常非常短暂,因此线程将很快被回收。

将执行程序的队列容量增加到0以上也应该有所帮助,但它不会完全保证问题不会发生(尽管大的队列大小不太可能被击中)。

我还不确定如何解决这个问题,除了为IO选择器和读取器线程使用专用的任务执行程序以使它们永远不会用作汇编程序。

+1

感谢您的快速响应和解释。我认为,在真实世界的场景中,使用无限任务执行程序不太可能会遇到问题,但是如果使用它,至少在与java.lang.OutOfMemoryError约4000个连接后出现相同的测试会崩溃。我认为,无论负载如何,都可以运作。我会打开一个JIRA问题。 – John29

+0

我想写一些像[CallerBlocksPolicy](https://gist.github.com/garyrussell/63038cdd886e00ef01b8)一段时间(以涵盖其他用例),但从来没有解决它。 8个线程仍然不足以满足您的测试用例,因为它试图启动汇编程序超时。然而,如同主题中显示的80个线程,在创建近9000个连接之后,我刚刚杀死了你的测试并且仍然很强大。 –

+0

我试过了你的'CallerBlocksPolicy',但正如你所说,它仍然不能很好地处理8个线程。当你说它试图启动一个汇编程序时,它意味着什么超时?我不介意增加线程的数量,但我不明白为什么8个线程仍然会导致测试失败,即使我们不再使用'CALLER_RUNS'拒绝策略?当然,池中的线程数量应该会影响性能,但为什么它也会影响稳定性? – John29

0

昨天我写了一个示例,只是为了使用spring集成创建tcp高性能服务器代码。我使用JMeter TCP采样器对1000个并发客户端请求进行了成功测试。

这里是代码 - https://github.com/rajeshgheware/spring-integration-samples包括JMeter测试配置文件。

我与具有英特尔核i5 M520 2.4GHz的在64位笔记本1000次并发TCP客户端请求(在服务器代码和JMeter测试此机器上运行)成功地测试

我也试图与1500个并发客户端请求,但观察到该服务器无法履行许多请求。我将继续尝试增强此代码以服务10000个并发客户端请求(我知道我可能需要从亚马逊获得用于此测试的优秀EC2计算机:))