2014-10-09 32 views
2

让我从这开始......我根本不了解Python;我在圈子里,我根本不明白。我完全接受替代和简单的方法。Python扭曲的海螺 - 如何停止与多个连接的反应堆?

我的目标:连接到不同的服务器,在每个服务器上运行相同的命令,并且以后(如现在/不在)使用输出来提高生产力。真棒。

我有什么:在某处找到了一些代码(我会试着找到一个链接并更新它)。我修改了一下。它连接到不同的服务器,运行相同的命令。

问题:一旦一切完成,我不知道如何停止反应堆。而我真的想要停止它不按cntrl+c。我想我需要推迟一些事情,但我不知道在哪里或在哪里。我觉得像SSHChannel关闭的时候,需要以某种方式冒泡到SSHConnection,停止服务...所以交通工具可以知道怎么了?我一直想以某种方式将每个reactor.connectTCP(server, 22, factory)都推迟。我觉得我可能需要一个控制器类。我尝试了这些东西,但没有正确尝试。也许gatherResults可能会有所帮助,但是,我不知道该怎么说。

from twisted.conch.ssh import transport, connection, userauth, channel, common 
from twisted.internet import defer, protocol, reactor 
import sys, struct 

USER = 'username' 
PASS = 'thisisforpersonalusesoicanstoreit!' 
CMD = 'echo "merely this and nothing more"' 


from twisted.python import log 
import sys 
log.startLogging(sys.stdout) 


class ClientCommandTransport(transport.SSHClientTransport): 
    def __init__(self, username, password, command): 
     self.username = username 
     self.password = password 
     self.command = command 

    def verifyHostKey(self, pubKey, fingerprint): 
     print fingerprint 
     return defer.succeed(True) 

    def connectionSecure(self): 
     self.requestService(
      PasswordAuth(self.username, self.password, 
         ClientConnection(self.command)))  

class PasswordAuth(userauth.SSHUserAuthClient): 
    def __init__(self, user, password, connection): 
     userauth.SSHUserAuthClient.__init__(self, user, connection) 
     self.password = password 

    def getPassword(self, prompt=None): 
     return defer.succeed(self.password) 

class ClientConnection(connection.SSHConnection): 
    def __init__(self, cmd, *args, **kwargs): 
     connection.SSHConnection.__init__(self) 
     self.command = cmd 

    def serviceStarted(self): 
     self.openChannel(CommandChannel(self.command, conn=self)) 

class CommandChannel(channel.SSHChannel): 
    name = 'session' 

    def __init__(self, command, *args, **kwargs): 
     channel.SSHChannel.__init__(self, *args, **kwargs) 
     self.command = command 
     self.data = '' 

    def channelOpen(self, data): 
     self.conn.sendRequest(
      self, 'exec', common.NS(self.command), wantReply=True).addCallback(
                  self._gotResponse) 

    def _gotResponse(self, _): 
     self.conn.sendEOF(self) 
     self.loseConnection() 

    def dataReceived(self, data): 
     #self.data += data 
     print data 

    def request_exit_status(self, data): 
     (status,) = struct.unpack('>L', data) 
     # print 'exit status = ', status 

class ClientCommandFactory(protocol.ClientFactory): 
    def __init__(self, command=CMD): 
     self.username = USER 
     self.password = PASS 
     self.command = command 

    def buildProtocol(self, addr): 
     protocol = ClientCommandTransport(
      self.username, self.password, self.command) 
     return protocol  


masters = ['server1','server2','server3','server4','server5'] 

factory = ClientCommandFactory() 

for server in masters: 
    print server 
    reactor.connectTCP(server, 22, factory) 

reactor.run() 

我也有推迟getPage一个http请求剧(没有工作),但我似乎无法与电抗器和SSH连接到重新应用它。

这些都是资源,我真希望我能看得懂:


与下面的一个答案......我测试了向下传递到工厂的参考,并最终停止在SSHChannelclosed()校长如果工厂没有它的数组中再连接(或任何python调用数组)。

我厂更新到现在也包括这种方法:

class ClientCommandFactory(protocol.ClientFactory): 

    def clientConnectionLost(self, connector, reason): 
     print reason 

我看了一下日志,因为我对所发生的事情,一般兴趣......(有些是我自己的声明,一些是默认的)

014-10-16 13:42:58-0500 [SSHChannel session (0) on SSHService ssh-connection on ClientCommandTransport,client] closed last TCP connection 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] service stopped 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] connection lost 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost. 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] ] 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] connection lost 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost. 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] ] 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] Stopping factory <__main__.ClientCommandFactory instance at 0x02323030> 
2014-10-16 13:42:58-0500 [-] Main loop terminated. 

所以......它说连接失去了不洁净的方式。有没有更好的办法,我应该阻止事情..?

+0

我在一个星期前写了这个问题...决定放弃paramiko。刚刚意识到我需要使用相同的连接运行多个命令(并且'&&'连接不够)。我仍然不知道该怎么做。最后,我想根据命令的响应来处理断开连接,但没有任何问题。 – 2014-10-15 15:48:19

+0

您是否检查[this](http://stackoverflow.com/questions/13920962/what-is-the-correct-way-to-close-a-twisted-conch-ssh-connection?lq=1)? 这可能是因为你必须自己实施一些事情来妥善地关闭连接。无论如何,这与工作完成后停止反应堆无关。 – koleS 2014-10-18 12:15:32

+0

@ koleS - 谢谢,我没有看到。我尝试在下面有人给我的答案中应用这些东西,但是反应器偶尔停止得太快,因为在第一个或第二个完成后没有添加所有连接。到处都有这么多事情发生,我不确定在哪里或如何捕捉或管理事情。 – 2014-10-18 18:16:46

回答

0

所以首先这是行不通的,因为connectTCP接受与IP address字符串作为第一个参数,你从这个名单通过元素:

masters = ['server1','server2','server3','server4','server5'] 

所有任务后停止反应器完成是很常见的扭曲的用例。一种方法是存储工厂中执行的任务计数器。每次实例化该工厂协议的实例时,每增加一个该协议实例(任务)返回的结果,减少计数器,当计数器达到0时停止反应堆。示例代码:

from twisted.conch.ssh import transport, connection, userauth, channel, common 
from twisted.internet import defer, protocol, reactor 
import sys, struct 

USER = 'username' 
PASS = 'thisisforpersonalusesoicanstoreit!' 
CMD = 'echo "merely this and nothing more"' 


from twisted.python import log 
import sys 
log.startLogging(sys.stdout) 


class ClientCommandTransport(transport.SSHClientTransport): 
    def __init__(self, username, password, command, factory): 
     self.username = username 
     self.password = password 
     self.command = command 
     self.factory = factory 

    def verifyHostKey(self, pubKey, fingerprint): 
     print fingerprint 
     return defer.succeed(True) 

    def connectionSecure(self): 
     self.requestService(
      PasswordAuth(self.username, self.password, 
         ClientConnection(self.command, self.factory))) 

class PasswordAuth(userauth.SSHUserAuthClient): 
    def __init__(self, user, password, connection): 
     userauth.SSHUserAuthClient.__init__(self, user, connection) 
     self.password = password 

    def getPassword(self, prompt=None): 
     return defer.succeed(self.password) 

class ClientConnection(connection.SSHConnection): 
    def __init__(self, cmd, *args, **kwargs): 
     connection.SSHConnection.__init__(self) 
     self.command = cmd 
     self.factory = factory 

    def serviceStarted(self): 
     self.openChannel(CommandChannel(self.command, self.factory, conn=self)) 

class CommandChannel(channel.SSHChannel): 
    name = 'session' 

    def __init__(self, command, factory, *args, **kwargs): 
     channel.SSHChannel.__init__(self, *args, **kwargs) 
     self.command = command 
     self.data = '' 
     self.factory = factory 
     self.factory.num_connections += 1 
     self.factory.connections.append(self) 

    def channelOpen(self, data): 
     self.conn.sendRequest(
      self, 'exec', common.NS(self.command), wantReply=True).addCallback(
                  self._gotResponse) 

    def _gotResponse(self, _): 
     self.conn.sendEOF(self) 
     self.loseConnection() 
     self.factory.num_connections -= 1 
     self.factory.connections.remove(self) 
     if self.factory.num_connections == 0: 
      reactor.stop() 

    def dataReceived(self, data): 
     #self.data += data 
     print data 

    def request_exit_status(self, data): 
     (status,) = struct.unpack('>L', data) 
     # print 'exit status = ', status 

class ClientCommandFactory(protocol.ClientFactory): 
    def __init__(self, command=CMD): 
     self.username = USER 
     self.password = PASS 
     self.command = command 
     self.connections = [] 
     self.num_connections = 0 

    def buildProtocol(self, addr): 
     protocol = ClientCommandTransport(
      self.username, self.password, self.command, self) 
     return protocol  


masters = ['server1','server2','server3','server4','server5'] 

factory = ClientCommandFactory() 

for server in masters: 
    print server 
    reactor.connectTCP(server, 22, factory) 

reactor.run() 

我在这里做的是我在工厂self.connectionsself.num_connections中添加了两个变量来存储对工厂中连接的引用并计算连接数。然后在工厂的buildProtocol工厂将自己传递给ClientCommandTransport,该工厂依次将工厂参考传递给ClientConnection,后者最终将需要的参考传递给工厂,即CommandChannel。每当一个CommandChannel的实例被实例化时,它就有了对工厂的引用,所以它将连接数增加1,并将其自身添加到存储在工厂中的连接列表中。我认为当一个任务/命令完成时,会触发_gotResponse回调。因此,每当它被触发时,它就像之前一样丢失连接,但是现在,此外,它减少了连接计数器并从工厂中删除对自身的引用。它还检查是否有其他开放连接,如果没有停止反应堆。

我还没有测试过这个代码,但它是Twisted中的一个常见模式,即工厂保存了它创建的协议实例的引用列表,以便每个实例都可以通过工厂访问其他实例,并且能够一旦所有事例都已完成,停止反应堆。

注意,这个层次也有些深,Factory - >ClientCommandTransport - >ClientConnection - >CommandChannel,我不知道这是否是最佳的解决方案,通过参考出厂一路下跌。

其中一个变量是实际上是多余的 - 你可以存储或者只self.num_connections和增加/减少,或self.connections,添加/从列表中删除实例并使用len(self.connections),看是否还有任何打开的连接。

+0

哦...所以我发布的确实起作用。我只是无法阻止反应堆,一旦我想完成的事情。我现在没有时间通过​​你的整个回答,但我会稍微回来。 – 2014-10-15 18:41:20

+0

你的答案看起来不错,但...有点什么我在想什么,但我不能这样做是出于某种原因。不是“冒泡”断开连接,我想我需要向下传递信息。只要您确信自己提出的想法,就不需要为我测试代码。 – 2014-10-15 18:47:31

+0

如果你认为我的答案'看起来不错',那么你可以接受它。我不明白你的意思是什么东西向下(什么东西,向下到什么点等)。我认为连接的数量必须存储在工厂中,并且每次调用'buildProtocol'时都会实例化所有其他类,因此工厂是唯一可以跟踪连接数量的实体。 [Here](http://krondo.com/blog/?p=1209)你可以找到一个很好的扭曲教程。其中的一个部分,即[这一个](http://bit.ly/1Dd2wWk)提出了计算任务的想法。 – koleS 2014-10-15 20:00:37