2017-08-02 54 views
4

说我有一个类型T在无扩展预定义的顺序排序可观察

class T { 
    public int identifier; //Arbitrary but unique for each character (Guids in real-life) 
    public char character; //In real life not a char, but I chose char here for easy demo purposes 
} 

而且我有标识的预定义的有序序列:

int[] identifierSequence = new int[]{ 
    9, 3, 4, 4, 7 
}; 

我现在需要订购的IObservable<T>这产生以下对象序列:

{identifier: 3, character 'e'}, 
{identifier: 9, character 'h'}, 
{identifier: 4, character 'l'}, 
{identifier: 4, character 'l'}, 
{identifier: 7, character 'o'} 

因此,生成的IObservable生成hello。 我不想使用ToArray,因为我希望一到它就接收对象,而不是等到一切都被观察到。 更具体地说,我希望得到他们这样的:

Input: e h l l o 
Output: he l l o 

什么是做到这一点的正确反应呢? 我能想出的最好的是这样的:

Dictionary<int, T> buffer = new Dictionary<int, T>(); 
int curIndex = 0; 

inputObserable.SelectMany(item => 
{ 
    buffer[item.identifier] = item; 

    IEnumerable<ReportTemplate> GetReadyElements() 
    { 
     while (true) 
     { 
      int nextItemIdentifier = identifierSequence[curIndex]; 
      T nextItem; 
      if (buffer.TryGetValue(nextItemIdentifier, out nextItem)) 
      { 
       buffer.Remove(nextItem.identifier); 
       curIndex++; 
       yield return nextItem; 
      } 
      else 
      { 
       break; 
      } 
     } 
    } 

    return GetReadyElements(); 
}); 

编辑:

Schlomo提出了一些非常有效的问题,我的代码,这就是为什么我标志着他的答案是正确的。我做了一些修改,他的代码,它是可用:

  • 通用标识和对象类型
  • 迭代而不是递归以防止在非常大的观测
  • 潜在计算器转换匿名类型的实际类可读性
  • 只要有可能,查找只有一次在字典中的值,并存储为变量,而不是看它多次
  • 固定式

这给了我下面的代码:

public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc) 
    { 
     var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>()); 
     return source.Scan(initialState, (oldState, item) => 
      { 
       //Function to be called upon receiving new item 
       //If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item 
       //Otherwise, if nothing is available yet, just return the input state 
       OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state) 
       { 
        int index = state.Index; 
        ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer; 
        IList<T> output = new List<T>(); 

        while (index < identifierSequence.Count) 
        { 
         TId key = identifierSequence[index]; 
         ImmutableList<T> nextValues; 
         if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty) 
         { 
          //No values available yet 
          break; 
         } 

         T toOutput = nextValues[nextValues.Count - 1]; 
         output.Add(toOutput); 

         buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1)); 
         index++; 
        } 

        return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output); 
       } 

       //Before calling the recursive function, add the new item to the buffer 
       TId itemIdentifier = identifierFunc(item); 

       ImmutableList<T> valuesList; 
       if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList)) 
       { 
        valuesList = ImmutableList<T>.Empty; 
       } 
       var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item)); 

       return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>())); 
      }) 
      // Use Dematerialize/Notifications to detect and emit end of stream. 
      .SelectMany(output => 
      { 
       var notifications = output.Output 
        .Select(item => Notification.CreateOnNext(item)) 
        .ToList(); 

       if (output.Index == identifierSequence.Count) 
       { 
        notifications.Add(Notification.CreateOnCompleted<T>()); 
       } 

       return notifications; 
      }) 
      .Dematerialize(); 
    } 

    class OrderByIdentifierSequenceState<T, TId> 
    { 
     //Index shows what T we're waiting on 
     public int Index { get; } 
     //Buffer holds T that have arrived that we aren't ready yet for 
     public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; } 
     //Output holds T that can be safely emitted. 
     public IEnumerable<T> Output { get; } 

     public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output) 
     { 
      this.Index = index; 
      this.Buffer = buffer; 
      this.Output = output; 
     } 
    } 

然而,该代码仍然有几个问题:

  • 国家(主要是ImmutableDictionary)的不断复制,这可能是非常昂贵的。可能的解决方案:为每个观察者保持一个单独的状态,而不是每个项目收到。
  • 当源观察中不存在identifierSequence中的一个或多个元素时,会出现问题。这目前阻止了有序的观察,它永远不会完成。可能的解决方案:超时,当源可观察性完成时抛出异常,当源可观察性完成时返回所有可用项,...
  • 当源observable包含的元素多于identifierSequence时,我们得到内存泄漏。可观察源中但不在identifierSequence中的项目目前已添加到字典中,但不会在源观察者完成之前被删除。这是潜在的内存泄漏。可能的解决方案:在将其添加到字典之前,检查项目是否在identifierSequence之前,绕过代码并立即输出项目...

我的解决方案:

/// <summary> 
    /// Takes the items from the source observable, and returns them in the order specified in identifierSequence. 
    /// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed. 
    /// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable. 
    /// If there are items in the source observable that are not in identifierSequence, these items will be ignored. 
    /// </summary> 
    /// <typeparam name="T">The type that is produced by the source observable</typeparam> 
    /// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam> 
    /// <param name="source">The source observable</param> 
    /// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param> 
    /// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param> 
    /// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns> 
    public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc) 
    { 
     if (source == null) 
     { 
      throw new ArgumentNullException(nameof(source)); 
     } 
     if (identifierSequence == null) 
     { 
      throw new ArgumentNullException(nameof(identifierSequence)); 
     } 
     if (identifierFunc == null) 
     { 
      throw new ArgumentNullException(nameof(identifierFunc)); 
     } 

     if (identifierSequence.Count == 0) 
     { 
      return Observable.Empty<T>(); 
     } 

     HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence); 

     return Observable.Create<T>(observer => 
     { 
      //current index of pending item in identifierSequence 
      int index = 0; 
      //buffer of items we have received but are not ready for yet 
      Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>(); 

      return source.Select(
        item => 
        { 
         //Function to be called upon receiving new item 
         //We search for the current pending item in the buffer. If it is available, we yield return it and repeat. 
         //If it is not available yet, stop. 
         IEnumerable<T> GetAvailableOutput() 
         { 
          while (index < identifierSequence.Count) 
          { 
           TId key = identifierSequence[index]; 
           List<T> nextValues; 
           if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0) 
           { 
            //No values available yet 
            break; 
           } 

           yield return nextValues[nextValues.Count - 1]; 

           nextValues.RemoveAt(nextValues.Count - 1); 
           index++; 
          } 
         } 

         //Get the identifier for this item 
         TId itemIdentifier = identifierFunc(item); 

         //If this item is not in identifiersInSequence, we ignore it. 
         if (!identifiersInSequence.Contains(itemIdentifier)) 
         { 
          return Enumerable.Empty<T>(); 
         } 

         //Add the new item to the buffer 
         List<T> valuesList; 
         if (!buffer.TryGetValue(itemIdentifier, out valuesList)) 
         { 
          valuesList = new List<T>(); 
          buffer[itemIdentifier] = valuesList; 
         } 
         valuesList.Add(item); 

         //Return all available items 
         return GetAvailableOutput(); 
        }) 
       .Subscribe(output => 
       { 
        foreach (T cur in output) 
        { 
         observer.OnNext(cur); 
        } 

        if (index == identifierSequence.Count) 
        { 
         observer.OnCompleted(); 
        } 
       },(ex) => 
       { 
        observer.OnError(ex); 
       },() => 
       { 
        //When source observable is completed, return the remaining available items 
        while (index < identifierSequence.Count) 
        { 
         TId key = identifierSequence[index]; 
         List<T> nextValues; 
         if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0) 
         { 
          //No values available 
          index++; 
          continue; 
         } 

         observer.OnNext(nextValues[nextValues.Count - 1]); 

         nextValues.RemoveAt(nextValues.Count - 1); 
         index++; 
        } 

        //Mark observable as completed 
        observer.OnCompleted(); 
       }); 
     }); 
    } 

回答

4

请注意,您的实现有几个问题:

  1. 如果两个“L走出自己的时间之前,一个被吞噬,然后举起整个序列。你的字典应该映射到一个集合,而不是一个单一的项目。
  2. 没有OnCompleted消息。
  3. 多个用户可能搞垮了这个状态。试试这个(其中GetPatternMatchOriginal是你的代码):

-

var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 }); 

stateMachine.Take(3).Dump(); //Linqpad 
stateMachine.Take(3).Dump(); //Linqpad 

第一ouptut是h e l第二输出l o。他们都应该输出h e l

此实现解决这些问题,也就是没有副作用用一成不变的数据结构:

public static class X 
{ 
    public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence) 
    { 
     //State is held in an anonymous type: 
     // Index shows what character we're waiting on, 
     // Buffer holds characters that have arrived that we aren't ready yet for 
     // Output holds characters that can be safely emitted. 
     return source 
      .Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() }, 
      (state, item) => 
      { 
       //Function to be called recursively upon receiving new item 
       //If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item 
       //Otherwise just return the inputs 
       (int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results) 
       { 
        if (index == identifierSequence.Length) 
         return (index, buffer, results); 

        var key = identifierSequence[index]; 
        if (buffer.ContainsKey(key) && buffer[key].Any()) 
        { 
         var toOuptut = buffer[key][buffer[key].Count - 1]; 
         return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut })); 
        } 
        else 
         return (index, buffer, results); 
       } 

       //Before calling the recursive function, add the new item to the buffer 
       var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier) 
        ? state.Buffer 
        : state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty); 

       var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item)); 

       var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>()); 
       return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output }; 
      }) 
      // Use Dematerialize/Notifications to detect and emit end of stream. 
      .SelectMany(output => 
      { 
       var notifications = output.Output 
        .Select(item => Notification.CreateOnNext(item)) 
        .ToList(); 
       if (output.Index == identifierSequence.Length) 
        notifications.Add(Notification.CreateOnCompleted<T>()); 
       return notifications; 
      }) 
      .Dematerialize(); 
    } 
} 

,那么你可以调用它像这样:

var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 }); 
stateMachine.Dump(); //LinqPad 

src.OnNext(new T { Identifier = 4, Character = 'l' }); 
src.OnNext(new T { Identifier = 4, Character = 'l' }); 
src.OnNext(new T { Identifier = 7, Character = 'o' }); 
src.OnNext(new T { Identifier = 3, Character = 'e' }); 
src.OnNext(new T { Identifier = 9, Character = 'h' }); 
+0

感谢您的回答,尤其是您使用我的代码发现的问题。我喜欢你解决这些问题的方式,但是我无法承受不断复制ImmutableDictionary的性能开销。我已经更新了我的初始文章,其中包含一个修改后的代码版本,用于维护每个观察者的状态,而不是每个收到的项目。 – Wouter

1

尼斯问题:-)

鉴于多个相同的钥匙,它看起来像以任意顺序对我的模式匹配。这是我想出的:

编辑:修改为在字典中查找预期项目。

public static class MyExtensions 
{ 
    public static IObservable<TSource> MatchByKeys<TSource, TKey>(this IObservable<TSource> source, IEnumerable<TKey> keys, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> keyComparer = null) 
    { 
     if (source == null) throw new ArgumentNullException(nameof(source)); 
     if (keys == null) throw new ArgumentNullException(nameof(keys)); 
     if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); 
     if (keyComparer == null) keyComparer = EqualityComparer<TKey>.Default; 

     return Observable.Create<TSource>(observer => 
     { 
      var pattern = new LinkedList<SingleAssignment<TSource>>(); 
      var matchesByKey = new Dictionary<TKey, LinkedList<SingleAssignment<TSource>>>(keyComparer); 
      foreach (var key in keys) 
      { 
       var match = new SingleAssignment<TSource>(); 
       pattern.AddLast(match); 
       LinkedList<SingleAssignment<TSource>> matches; 
       if (!matchesByKey.TryGetValue(key, out matches)) 
       { 
        matches = new LinkedList<SingleAssignment<TSource>>(); 
        matchesByKey.Add(key, matches); 
       } 
       matches.AddLast(match); 
      } 

      if (pattern.First == null) 
      { 
       observer.OnCompleted(); 
       return Disposable.Empty; 
      } 

      var sourceSubscription = new SingleAssignmentDisposable(); 
      Action dispose =() => 
      { 
       sourceSubscription.Dispose(); 
       pattern.Clear(); 
       matchesByKey.Clear(); 
      }; 

      sourceSubscription.Disposable = source.Subscribe(
       value => 
       { 
        try 
        { 
         var key = keySelector(value); 
         LinkedList<SingleAssignment<TSource>> matches; 
         if (!matchesByKey.TryGetValue(key, out matches)) return; 
         matches.First.Value.Value = value; 
         matches.RemoveFirst(); 
         if (matches.First == null) matchesByKey.Remove(key); 

         while (pattern.First != null && pattern.First.Value.HasValue) 
         { 
          var match = pattern.First.Value; 
          pattern.RemoveFirst(); 
          observer.OnNext(match.Value); 
         } 
         if (pattern.First != null) return; 
         dispose(); 
         observer.OnCompleted(); 
        } 
        catch (Exception ex) 
        { 
         dispose(); 
         observer.OnError(ex); 
        } 
       }, 
       error => 
       { 
        dispose(); 
        observer.OnError(error); 
       }, 
       () => 
       { 
        dispose(); 
        observer.OnCompleted(); 
       }); 
      return Disposable.Create(dispose); 
     }); 
    } 

    private sealed class SingleAssignment<T> 
    { 
     public bool HasValue { get; private set; } 

     private T _value; 
     public T Value 
     { 
      get 
      { 
       if (!HasValue) throw new InvalidOperationException("No value has been set."); 
       return _value; 
      } 
      set 
      { 
       if (HasValue) throw new InvalidOperationException("Value has alredy been set."); 
       HasValue = true; 
       _value = value; 
      } 
     } 
    } 
} 

测试代码:

var src = new Subject<T>(); 
var ordered = src.MatchByKeys(new[] { 9, 3, 4, 4, 7 }, t => t.Identifier); 
var result = new List<T>(); 
using (ordered.Subscribe(result.Add)) 
{ 
    src.OnNext(new T { Identifier = 3, Character = 'e' }); 
    src.OnNext(new T { Identifier = 9, Character = 'h' }); 
    src.OnNext(new T { Identifier = 4, Character = 'l' }); 
    src.OnNext(new T { Identifier = 4, Character = 'l' }); 
    src.OnNext(new T { Identifier = 7, Character = 'o' }); 
    src.OnCompleted(); 
} 
Console.WriteLine(new string(result.Select(t => t.Character).ToArray())); 
+0

感谢您的回答!我喜欢你的代码是多么整齐和写得好:)但是,虽然我认为你的答案可能会消耗更少的内存,但我认为它具有最坏情况的O(N^2)性能,因为O(N)查找链表与O(N)在我的答案(O(1)在hashmaps中查找) – Wouter

+0

完全同意,爱哈希。完美的解决方案将跟踪具有相同密钥/标识符的多个项明天吧。无论如何都是为了好玩。 – tinudu

+0

已修改为使其成为O(N)。现在开心。您? – tinudu

0

鉴于你有这样的:

IObservable<T> source = new [] 
{ 
    new T() { identifier = 3, character = 'e' }, 
    new T() { identifier = 9, character = 'h'}, 
    new T() { identifier = 4, character = 'l'}, 
    new T() { identifier = 4, character = 'l'}, 
    new T() { identifier = 7, character = 'o'} 
}.ToObservable(); 

int[] identifierSequence = new int[] 
{ 
    9, 3, 4, 4, 7 
}; 

...那么这个工程:

IObservable<T> query = 
    source 
     .Scan(new { index = 0, pendings = new List<T>(), outputs = new List<T>() }, (a, t) => 
     { 
      var i = a.index; 
      var o = new List<T>(); 
      a.pendings.Add(t); 
      var r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault(); 
      while (r != null) 
      { 
       o.Add(r); 
       a.pendings.Remove(r); 
       i++; 
       r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault(); 
      } 
      return new { index = i, a.pendings, outputs = o }; 
     }) 
     .SelectMany(x => x.outputs);