2012-11-14 149 views
2

我有一个简单的(只是测试)状态机,它接受以下输入字符串abcac。状态机被设置如下:使用TPL并行运行任务

​​
s2 --> 'b' --> s3
s3 --> 'c' --> s4
s2 --> s4 (Epsilon transition)

S1是启动状态
S4是接受国家

我想使用TPL来并行执行s1->s2->s3->s4s1->s2->s3->s4(相互独立)。

如果我通过在 'ABC' 作为机器接受输入,即

> Thread 1 - Consumed: a, from State: 1 to State: 2
> Thread 2 - Consumed: b, from State: 2 to State: 3
> Thread 3 - Epsilon transition from State: 2 to State: 3
> Thread 4 - Consumed: c, from State: 3 to State: 4
> Thread 4 - Accepted in state 4

Time taken = 19

Input 'abc' is valid Press any key to exit

但是,如果我通过 '交流' 我得到这个:

> Thread 1 - Consumed: a, from State: 1 to State: 2
> Thread 2 - Epsilon transition from State: 2 to State: 3
> Thread 3 - Consumed: c, from State: 3 to State: 4
> Thread 3 - Accepted in state 4
> Thread 4 - Consumed: c, from State: 3 to State: 4
> Thread 4 - Accepted in state 4

Time taken = 39

Input 'ac' is not valid (Reason: RejectedAmbiguous) Press any key to exit

出于某种原因,状态机接受两次相同的输入(在状态4接受),因为并行执行的两个线接受不同的输入应是不可能的。

我不会发布所有的代码,因为它太多了,但我会发布主要的位,以便您了解我做错了什么。

public enum eResult 
{ 
    Accepted = 0, 
    RejectedAmbiguous, 
    RejectedNoResults, 
    RejectedNoInitialState 
} 

public eResult Execute() 
{ 
    var startState = States.FirstOrDefault(s => s.Initial); 
    if (startState == null) return eResult.RejectedNoInitialState; 

    tasks.Clear(); 

    CancellationTokenSource cts = new CancellationTokenSource(); 
    Task t = new Task(() => 
     { 
      foreach(Transition tr in getTransitions(startState)) 
      { 
       var tr = trans[n]; 
       var actor = new Actor(tr.FromState, this.input); 
       Task<Actor> task = Task<Actor>.Factory.StartNew(obj => 
        { 
         return doTransitionFunction(tr, cts).Invoke((Actor)obj); 
        }, actor, cts.Token); 
       buildContinuationTask(Transitions[tr], task, cts); 
       tasks.Add(task); 
      } 
     }, cts.Token); 

    t.RunSynchronously(); 

    try 
    { 
     Task.WaitAll(tasks.ToArray()); 
    } 
    catch (AggregateException ae) 
    { 
     foreach (Exception e in ae.Flatten().InnerExceptions) 
     { 
      Console.WriteLine(e.Message); 
     } 
    } 

    eResult result = eResult.Accepted; 

    if (!results.Any()) result = eResult.RejectedNoResults; 
    else if (results.Where(r => r.State.Accepted).Count() > 1) result = eResult.RejectedAmbiguous; 

    return result; 
} 

IEnumerable<Transition> getTransitions(AtomicState state) 
{ 
    return Transitions.Keys.Where(k => k.FromState == state); 
} 

bool isAccept(Actor parcel) 
{ 
    return (parcel.State.Accepted && parcel.Cursor.EOF()); 
} 

Func<object, Actor> doTransitionFunction(Transition transition, CancellationTokenSource cts) 
{ 
    return new Func<object, Actor>(obj => 
    { 
     var ts = (Actor)obj; 
     var cur = ts.Cursor.Peek(); 
     if (transition.Epsilon || transition.Input.Invoke() == cur) 
     { 
      if (!transition.Epsilon) ts.Cursor.MoveNext(); 
      ts.State = Transitions[transition]; 
      OnTransitioned(this, new TransitionedEventArgs(transition.FromState, ts.State, cur, transition.Epsilon, Task.CurrentId)); 
      if (isAccept(ts)) 
      { 
       OnAccepted(this, new AcceptedEventArgs(ts.State, Task.CurrentId)); 
       results.Add(ts); 
       cts.Cancel(); 
      } 
     } 
     return ts; 
    }); 
} 

void buildContinuationTask(AtomicState s, Task<Actor> antecedentTask, CancellationTokenSource cts) 
{ 
    var trans = getTransitions(s).ToArray(); 
    for (int n = 0; n < trans.Count(); n++) 
    { 
     Transition tr = trans[n]; 
     Task<Actor> continuation = antecedentTask.ContinueWith<Actor>(antecdent => 
      { 
       if (!cts.IsCancellationRequested) 
        return doTransitionFunction(tr, cts).Invoke((Actor)antecdent.Result.Clone()); 
       else 
        return (Actor)antecdent.Result.Clone(); 
      }, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current); 
     buildContinuationTask(Transitions[tr], continuation, cts); 
     tasks.Add(continuation); 
    } 
} 

纠正我,如果这是不可能的,但我希望发生的事情是这样的:

为第1并行任务接受abc输入:

S1是Task<Actor>
s2是s1的延续
s3是s2的延续
s4是s3的延续

对于第二并行任务接受ac

s1是Task<Actor>
s2为S1
S3的延续为S2的延续(这一个是小量移动)
S4是延续s3

这两个任务都有自己的Actor对象副本,它们将从主要先行任务传递到继续任务。

我知道我几乎在那里,我只需要解决这最后的神秘。

+1

只是好奇,你为什么不使用TPL数据流?它会简化*这很多*。 – casperOne

+0

我从来没有听说过TPL DataFlow。感谢您的关注。我会读一读它,看看它能否帮我解决我的问题。 +1。 – Intrepid

+0

您愿意接受一个TPL DataFlow解决方案吗?您可以根据自己的需求选择,或者您想严格遵循基于“基于任务”的解决方案吗? – casperOne

回答

0

在阅读了关于TPL DataFlow后,这是我的尝试,它似乎以我想要的方式工作。

public interface IScrollableCursor 
{ 
    void MoveNext(); 
    void MovePrevious(); 
    void MoveFirst(); 
    void MoveLast(); 
    bool BOF(); 
    bool EOF(); 
    char Peek(); 
    int CurrentPosition { get; } 
} 

[Serializable] 
public abstract class AtomicState 
{ 
    protected int stateId; 
    protected bool accepted; 

    public int StateId 
    { 
     get 
     { 
      return stateId; 
     } 
    } 

    public AtomicState(int stateId) 
    { 
     this.stateId = stateId; 
     this.accepted = false; 
    } 

    public AtomicState(int stateId, bool accepted) 
     : this(stateId) 
    { 
     this.accepted = accepted; 
    } 

    public abstract bool Initial { get; } 

    public bool Accepted 
    { 
     get 
     { 
      return accepted; 
     } 
    } 

} 


[Serializable] 
public struct Actor : ICloneable 
{ 
    private AtomicState state; 
    private IScrollableCursor cursor; 

    public AtomicState State 
    { 
     get 
     { 
      return state; 
     } 
     set 
     { 
      state = value; 
     } 
    } 

    public IScrollableCursor Cursor 
    { 
     get 
     { 
      return cursor; 
     } 
    } 

    public Actor(AtomicState state, IScrollableCursor cursor) 
    { 
     this.state = state; 
     this.cursor = cursor; 
    } 

    public object Clone() 
    { 
     return this.DeepClone(); 
    } 


} 

public class Transition 
{ 
    protected AtomicState fromState; 
    protected Func<Char> input; 
    protected bool epsilon; 

    public AtomicState FromState 
    { 
     get 
     { 
      return fromState; 
     } 
    } 

    public Func<Char> Input 
    { 
     get 
     { 
      return input; 
     } 
    } 

    public bool Epsilon 
    { 
     get 
     { 
      return epsilon; 
     } 
    } 

    public Transition(AtomicState fromState, Func<Char> input) 
    { 
     this.fromState = fromState; 
     this.input = input; 
    } 

    public Transition(AtomicState fromState, bool epsilon) 
     : this(fromState, null) 
    { 
     this.epsilon = epsilon; 
    } 


} 

public class EpsilonTransition : Transition 
{ 
    public EpsilonTransition(AtomicState fromState) 
     : base(fromState, true) 
    { 
    } 
} 



public eResult Execute() 
{ 
    var startState = States.FirstOrDefault(s => s.Initial); 
    if (startState == null) return eResult.RejectedNoInitialState; 

    tasks.Clear(); 

    CancellationTokenSource cts = new CancellationTokenSource(); 

    ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions(); 
    options.MaxDegreeOfParallelism = 4; 
    options.CancellationToken = cts.Token; 

    // transitions an actor onto it's next state 
    TransformBlock<Tuple<Transition, Actor>, Actor> actorTransitioner = new TransformBlock<Tuple<Transition, Actor>, Actor>(tr => 
     { 
      return doTransitionFunction(tr.Item1, cts).Invoke(tr.Item2); 

     }, options); 

    BroadcastBlock<Actor> actorTransitionerBroadcaster = new BroadcastBlock<Actor>(a => { return a; }); 

    ActionBlock<Actor> actorProcessor = new ActionBlock<Actor>(a => 
     { 
      foreach (Transition t in getTransitions(a.State)) 
      { 
       actorTransitioner.Post(new Tuple<Transition, Actor>(t, (Actor)a.Clone())); 
      } 
     }); 

    // link blocks 
    actorTransitioner.LinkTo(actorTransitionerBroadcaster); 
    actorTransitionerBroadcaster.LinkTo(actorProcessor); 

    actorTransitionerBroadcaster.Post(new Actor(startState, input)); 

    try 
    { 
     actorTransitioner.Completion.Wait(); 
    } 
    catch (AggregateException ex) 
    { 
     foreach (Exception ae in ex.Flatten().InnerExceptions) 
     { 
      Console.WriteLine(ae.Message); 
     } 
    } 

    eResult result = eResult.Accepted; 

    if (!results.Any()) result = eResult.RejectedNoResults; 
    else if (results.Where(r => r.State.Accepted).Count() > 1) result = eResult.RejectedAmbiguous; 

    return result; 
} 

我添加了我在尝试中使用的构造,因此更容易被复制。我需要发布整个代码(大约12个课程)。

+0

您可以添加更多关于用原始构造映射的块吗?这将非常有帮助。 – casperOne

+0

我用块映射使用的构造更新了我的答案。 – Intrepid

+0

有趣的是,我的解决方案似乎挂了一旦发现接受路线。我不太确定如何让块完成信号发送,以便将结果返回给调用者。 – Intrepid

0

我想出了一个更简单的解决方案后,设法回答我自己的问题。我推断TPL DataFlow不适用于此,因为它会创建无法确定计算是否已完成的循环数据网络。所以我决定彻底清理并回到绘图板。

我最终发现Paralell.ForEach(),做正是我想要的,利用所有的处理器内核上并行运行的每个转变:

public eResult Execute() 
{ 
    var startState = States.FirstOrDefault(s => s.Initial); 
    if (startState == null) return eResult.RejectedNoInitialState; 

    CancellationTokenSource cts = new CancellationTokenSource(); 
    CancellationToken token = cts.Token; 

    Task t = new Task(() => 
     { 
      Parallel.ForEach(getTransitions(startState), new ParallelOptions { MaxDegreeOfParallelism = 4 }, tr => 
      { 
       var a0 = new Actor(tr.FromState, (IScrollableCursor)this.input.DeepClone()); 
       var a1 = doTransitionFunction(tr, cts).Invoke(a0); 
       if (a0.State != a1.State) 
        processRecursively(a1.State, a0, cts); 

      }); 

     }, cts.Token); 

    t.RunSynchronously(); 


    eResult result = eResult.Accepted; 

    if (!results.Any()) result = eResult.RejectedNoResults; 
    else if (results.Where(r => r.State.Accepted).Count() > 1) result = eResult.RejectedAmbiguous; 

    return result; 


} 


void processRecursively(AtomicState s, Actor a0, CancellationTokenSource cts) 
{ 
    Parallel.ForEach(getTransitions(s), tr => 
     { 
      var a1 = doTransitionFunction(tr, cts).Invoke(a0); 
      if (a0.State != a1.State) 
       processRecursively(a1.State, a1, cts); 
     }); 
}