2013-02-12 74 views
9

使用Amazon SWF在服务器之间传递消息?使用Amazon SWF在服务器之间进行通信

  1. 在服务器AI要运行脚本一个
  2. 当这个过程完成我想要将消息发送到服务器B运行一个脚本B
  3. 如果成功完成,我希望它清除作业从工作流队列

我真的很难搞清楚我可以如何组合使用Boto和SWF来做到这一点。我没有完成一些完整的代码,但是我所追求的是如果任何人都可以更多地解释涉及的内容。

  • 我该如何告诉服务器B检查脚本的完成? A?
  • 如何确保服务器A不会选择完成脚本 A并尝试运行脚本B(因为服务器B应该运行此操作)?
  • 我该如何实际通知脚本完成的SWF?你是一面旗帜,还是一条消息,还是什么?

正如你所看到的,我真的很困惑这一切,如果有人可以对此有所了解,我会非常感激。

回答

17

我想你会问一些非常好的问题,这些问题突出显示了SWF如何作为服务提供帮助。简而言之,你不会告诉你的服务器在他们之间协调工作。您的决策者在SWF服务的帮助下为您编排所有这些内容。

您的工作流程会为实现如下:

  1. 注册您的工作流程,以及它与服务(一次性)的活动。
  2. 执行决策者和工人。
  3. 让你的工人和决策者奔跑。
  4. 开始一个新的工作流程。

有许多方法可以将凭据提供给boto.swf的代码。对于这个练习的目的,我建议他们出口到环境中运行下面的代码之前:

export AWS_ACCESS_KEY_ID=<your access key> 
export AWS_SECRET_ACCESS_KEY=<your secret key> 

1)要注册的域名,工作流程和活动执行以下命令:

# ab_setup.py 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

swf.Domain(name=DOMAIN).register() 
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register() 
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register() 
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register() 

2)实施并运行决策者和工人。

# ab_decider.py 
import time 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

class ABDecider(swf.Decider): 

    domain = DOMAIN 
    task_list = 'default_tasks' 
    version = VERSION 

    def run(self): 
     history = self.poll() 
     # Print history to familiarize yourself with its format. 
     print history 
     if 'events' in history: 
      # Get a list of non-decision events to see what event came in last. 
      workflow_events = [e for e in history['events'] 
           if not e['eventType'].startswith('Decision')] 
      decisions = swf.Layer1Decisions() 
      # Record latest non-decision event. 
      last_event = workflow_events[-1] 
      last_event_type = last_event['eventType'] 
      if last_event_type == 'WorkflowExecutionStarted': 
       # At the start, get the worker to fetch the first assignment. 
       decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()), 
        ACTIVITY1, VERSION, task_list='a_tasks') 
      elif last_event_type == 'ActivityTaskCompleted': 
       # Take decision based on the name of activity that has just completed. 
       # 1) Get activity's event id. 
       last_event_attrs = last_event['activityTaskCompletedEventAttributes'] 
       completed_activity_id = last_event_attrs['scheduledEventId'] - 1 
       # 2) Extract its name. 
       activity_data = history['events'][completed_activity_id] 
       activity_attrs = activity_data['activityTaskScheduledEventAttributes'] 
       activity_name = activity_attrs['activityType']['name'] 
       # 3) Optionally, get the result from the activity. 
       result = last_event['activityTaskCompletedEventAttributes'].get('result') 

       # Take the decision. 
       if activity_name == ACTIVITY1: 
        # Completed ACTIVITY1 just came in. Kick off ACTIVITY2. 
        decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()), 
         ACTIVITY2, VERSION, task_list='b_tasks', input=result) 
       elif activity_name == ACTIVITY2: 
        # Server B completed activity. We're done. 
        decisions.complete_workflow_execution() 

      self.complete(decisions=decisions) 
      return True 

工作人员要简单得多,如果你不想要,你不需要使用继承。

# ab_worker.py 
import os 
import time 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

class MyBaseWorker(swf.ActivityWorker): 

    domain = DOMAIN 
    version = VERSION 
    task_list = None 

    def run(self): 
     activity_task = self.poll() 
     print activity_task 
     if 'activityId' in activity_task: 
      # Get input. 
      # Get the method for the requested activity. 
      try: 
       self.activity(activity_task.get('input')) 
      except Exception, error: 
       self.fail(reason=str(error)) 
       raise error 

      return True 

    def activity(self, activity_input): 
     raise NotImplementedError 

class WorkerA(MyBaseWorker): 
    task_list = 'a_tasks' 

    def activity(self, activity_input): 
     result = str(time.time()) 
     print 'worker a reporting time: %s' % result 
     self.complete(result=result) 

class WorkerB(MyBaseWorker): 
    task_list = 'b_tasks' 

    def activity(self, activity_input): 
     result = str(os.getpid()) 
     print 'worker b returning pid: %s' % result 
     self.complete(result=result) 

3)运行你的决策者和工人。 您的决策者和工作人员可能从单独的主机或同一台机器运行。打开四个端子并运行你的演员:

首先你决胜局

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass 
... 

然后工人A,你可以从服务器A做到这一点:从服务器B

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass 

然后工人B,可能但如果你从笔记本电脑上运行它们,它也会工作得很好:

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass 
... 

4)最后,启动工作流程。

$ python 
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import boto.swf.layer2 as swf 
>>> workflows = swf.Domain(name='stackoverflow').workflows() 
>>> workflows 
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>] 
>>> execution = workflows[0].start(task_list='default_tasks') 
>>> 

切换回来看你的演员会发生什么。闲置一分钟后,他们可能会与服务断开连接。如果发生这种情况,请按向上箭头键+输入以重新输入轮询循环。

您现在可以转到AWS管理控制台的SWF面板,查看执行过程如何执行并查看其历史记录。或者,您可以通过命令行查询它。

>>> execution.history() 
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ... 

这只是与活动串行执行的工作流程的例子,但它也可以为决胜局schedule and coordinate parallel execution of activities

我希望这至少可以让你开始。对于一个稍微复杂的串行工作流程示例,我建议使用looking at this

+0

感谢特点你这么多,这是一个非常全面的答复。 – Jimmy 2013-03-11 09:57:22

+0

非常感谢! – Vor 2013-05-12 23:41:39

+0

@ oozie-伟大的答案,真棒课堂。需要在博托文档 - – Yarin 2013-06-27 17:22:14

1

您可以使用SNS, 当一个脚本完成它应该触发SNS,这将触发到服务器B

+0

SNS没有我要求不幸 – Jimmy 2013-02-12 12:10:22

5

的通知,我没有任何的示例代码共享,但你绝对可以使用SWF协调跨两台服务器的脚本执行。这样做的主要想法是创建的代码三块是跟SWF:

  • ,知道哪些脚本来执行第一,一旦第一个脚本执行完毕做什么的组件。这被称为SWF术语中的“决定者”。
  • 两个组件,每个都知道如何执行想要在每台机器上运行的特定脚本。这些被称为SWF条款中的“活动工作者”。

第一个组件决策者调用两个SWF API:PollForDecisionTask和RespondDecisionTaskCompleted。轮询请求将为决策者组件提供执行工作流的当前历史记录,基本上是脚本运行者的“我在哪里”状态信息。你编写了查看这些事件的代码,并确定应执行哪个脚本。执行脚本的这些“命令”将以活动任务的调度的形式出现,该任务作为调用RespondDecisionTaskCompleted的一部分返回。

您编写的第二个组件,活动工作者分别调用两个SWF API:PollForActivityTask和RespondActivityTaskCompleted。轮询请求将为活动工作者指示它应该执行它所了解的脚本以及SWF所称的活动任务。从轮询请求返回给SWF的信息可以包括作为活动任务计划的一部分发送给SWF的单个执行特定数据。您的每个服务器都将独立轮询SWF中的活动任务,以指示该主机上的本地脚本的执行情况。一旦工作人员完成脚本的执行,它就会通过RespondActivityTaskCompleted API调用回SWF。

从您的活动工作人员到SWF的回调导致将新历史记录发送给我已经提到的判定组件。它会查看历史记录,看到第一个脚本已完成,并安排第二个脚本执行。一旦看到第二个完成,它可以使用另一种类型的决定来“关闭”工作流程。

您启动了通过调用StartWorkflowExecution API在每个主机上执行脚本的整个过程。这创建了SWF中整个过程的记录,并将第一个历史记录发送给决策程序,以安排第一个主机上第一个脚本的执行。

希望这会提供更多关于如何使用SWF完成此类工作流程的上下文。如果你还没有,我会看看SWF页面上的开发指南以获取更多信息。

1

很好的例子,

另外,如果你不想你的证书导出到环境中,您可以在类中调用:

swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) 
相关问题