2016-06-07 69 views
7

我试图解析表示消息的传入字节流。 我需要拆分流并为每个部分创建一个消息结构。Rx.Net消息解析器

消息始终以0x81(BOM)开始,以0x82(EOM)结尾。

start: 0x81 
header: 3 bytes 
data: arbitrary length 
stop: 0x82 

数据部分使用转义字节0x1B(ESC)转义:每当在数据部分中的一个字节包含控制字节{ESC,BOM,EOM}中的一个,它带有前缀ESC。

标题部分未转义,并可能包含控制字节。

我想使用Rx.Net以功能反应式的方式对此进行编码,方法是使用IObservable<byte>并将其转换为IObservable<Message>

什么是最习惯的方式来做到这一点?

一些例子:

[81 01 02 03 82] single message 
[81 82 81 82 82] single message, header = [82 81 82] 
[81 01 02 1B 82] single message, header = [01 02 1B]. 
[81 01 02 03 1B 82 82] single message, header = [01 02 03], (unescaped) data = [82] 
[81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored. 
          header = [01 02 03], (unescaped) data = [1B] 

这里有一个状态机汲取这个: enter image description here

回答

2

如果你正在寻找的东西的是“功能更强大”,那么这可能会有帮助,但是通过@Evk答案通过这些测试过。

首先,我可以建议,从提供可验证的答案中解脱出来,能否提供一个测试套件来实现这样的复杂问题。

这样的事情会非常有帮助。

var scheduler = new TestScheduler(); 
var source = scheduler.CreateColdObservable<byte>(
    ReactiveTest.OnNext<byte>(01,0x81), //BOM m1 
    ReactiveTest.OnNext<byte>(02,0x01), 
    ReactiveTest.OnNext<byte>(03,0x02), 
    ReactiveTest.OnNext<byte>(04,0x03), 
    ReactiveTest.OnNext<byte>(05,0x82), //EOM m1 
    ReactiveTest.OnNext<byte>(06,0x81), //BOM m2 
    ReactiveTest.OnNext<byte>(07,0x82), 
    ReactiveTest.OnNext<byte>(08,0x81), 
    ReactiveTest.OnNext<byte>(09,0x82), 
    ReactiveTest.OnNext<byte>(10,0x82), //EOM m2 
    ReactiveTest.OnNext<byte>(11,0x81), //BOM m3 
    ReactiveTest.OnNext<byte>(12,0x01),  
    ReactiveTest.OnNext<byte>(13,0x02), 
    ReactiveTest.OnNext<byte>(14,0x1B), 
    ReactiveTest.OnNext<byte>(15,0x82), //EOM m3 
    ReactiveTest.OnNext<byte>(16,0x81), //BOM m4 
    ReactiveTest.OnNext<byte>(17,0x01), 
    ReactiveTest.OnNext<byte>(18,0x02), 
    ReactiveTest.OnNext<byte>(19,0x03), 
    ReactiveTest.OnNext<byte>(20,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(21,0x82), //Data 
    ReactiveTest.OnNext<byte>(22,0x82), //EOM m4 
    ReactiveTest.OnNext<byte>(23,0x81), //BOM m5 
    ReactiveTest.OnNext<byte>(24,0x01), 
    ReactiveTest.OnNext<byte>(25,0x02), 
    ReactiveTest.OnNext<byte>(26,0x03), 
    ReactiveTest.OnNext<byte>(27,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(28,0x1B), //Data 
    ReactiveTest.OnNext<byte>(29,0x82), //EOM m5 
    ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81) 

var observer = scheduler.CreateObserver<Message>(); 

//CurrentAnswer(source) 
MyAnswer(source) 
    .Subscribe(observer); 

scheduler.Start(); 

ReactiveAssert.AreElementsEqual(
    new[] { 
     ReactiveTest.OnNext(05, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[0]{}}), 
     ReactiveTest.OnNext(10, new Message(){Header=new byte[]{0x82, 0x81, 0x82}, Data=new byte[0]{}}), 
     ReactiveTest.OnNext(15, new Message(){Header=new byte[]{0x01, 0x02, 0x1B}, Data=new byte[0]{}}), 
     ReactiveTest.OnNext(22, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x82}}), 
     ReactiveTest.OnNext(29, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x1B}}), 
    },             
    observer.Messages); 

我也写了一个版本的Message,让我验证码

public class Message 
{ 
    public static readonly byte BOM = 0x81; 
    public static readonly byte EOM = 0x82; 
    public static readonly byte Control = 0x1B; 

    public byte[] Header { get; set; } 
    public byte[] Data { get; set; } 

    public static Message Create(byte[] bytes) 
    { 
     if(bytes==null) 
      throw new ArgumentNullException(nameof(bytes)); 
     if(bytes.Length<3) 
      throw new ArgumentException("bytes<3").Dump(); 


     var header = new byte[3]; 
     Array.Copy(bytes, header, 3); 

     var body = new List<byte>(); 
     var escapeNext = false; 
     for (int i = 3; i < bytes.Length; i++) 
     { 
      var b = bytes[i]; 

      if (b == Control && !escapeNext) 
      { 
       escapeNext = true; 
      } 
      else 
      { 
       body.Add(b); 
       escapeNext = false; 
      } 
     } 
     var msg = new Message { Header = header, Data = body.ToArray()}; 
     return msg; 
    } 

    public override string ToString() 
    { 
     return string.Format("Message(Header=[{0}], Data=[{1}])", ByteArrayString(Header), ByteArrayString(Data)); 
    } 

    private static string ByteArrayString(byte[] bytes) 
    { 
     return string.Join(",", bytes.Select(b => b.ToString("X"))); 
    } 

    public override bool Equals(object obj) 
    { 
     var other = obj as Message; 
     if(obj==null) 
      return false; 
     return Equals(other); 
    } 

    protected bool Equals(Message other) 
    { 
     return IsSequenceEqual(Header, other.Header) 
      && IsSequenceEqual(Data, other.Data); 
    } 

    private bool IsSequenceEqual<T>(IEnumerable<T> expected, IEnumerable<T> other) 
    { 
     if(expected==null && other==null) 
      return true; 
     if(expected==null || other==null) 
      return false; 
     return expected.SequenceEqual(other); 
    } 

    public override int GetHashCode() 
    { 
     unchecked 
     { 
      return ((Header != null ? Header.GetHashCode() : 0) * 397)^(Data != null ? Data.GetHashCode() : 0); 
     } 
    } 
} 

现在,我有所有的管道,我可以专注于企业的实际问题。

public static IObservable<Message> MyAnswer(IObservable<byte> source) 
{ 
    return source.Publish(s => 
     { 

      return 
       Observable.Defer(()=> 
        //Start consuming once we see a BOM 
        s.SkipWhile(b => b != Message.BOM) 
        .Scan(new Accumulator(), (acc, cur)=>acc.Accumulate(cur)) 
       ) 
       .TakeWhile(acc=>!acc.IsEndOfMessage()) 
       .Where(acc=>!acc.IsBeginingOfMessage()) 
       .Select(acc=>acc.Value()) 
       .ToArray() 
       .Where(buffer=>buffer.Any()) 
       .Select(buffer => Message.Create(buffer)) 
       .Repeat(); 
     }); 

} 
public class Accumulator 
{ 
    private int _index = 0; 
    private byte _current =0; 
    private bool _isCurrentEscaped = false; 
    private bool _isNextEscaped = false; 

    public Accumulator Accumulate(byte b) 
    { 
     _index++; 
     _current = b; 
     _isCurrentEscaped = _isNextEscaped; 
     _isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control); 
     return this; 
    } 
    public byte Value() 
    { 
     return _current; 
    } 

    private bool IsHeader() 
    { 
     return _index < 5; 
    } 
    public bool IsBeginingOfMessage() 
    { 
     return _index == 1 && _current == Message.BOM; 
    } 
    public bool IsEndOfMessage() 
    { 
     return !IsHeader() 
      && _current == Message.EOM 
      && !_isCurrentEscaped; 
    } 
} 

为了完整起见,这里是@ EVK的答案的胆量,所以你很容易地交换和退出的实现。

public static IObservable<Message> CurrentAnswer(IObservable<byte> source) 
{ 
    return Observable.Create<Message>(o => 
    { 
     // some crude parsing code for the sake of example 
     bool nextIsEscaped = false; 
     bool readingHeader = false; 
     bool readingBody = false; 
     List<byte> body = new List<byte>(); 
     List<byte> header = new List<byte>(); 
     return source.Subscribe(b => 
     { 
      if (b == 0x81 && !nextIsEscaped && !readingHeader) 
      { 
       // start 
       readingHeader = true; 
       readingBody = false; 
       nextIsEscaped = false; 
      } 
      else if (b == 0x82 && !nextIsEscaped && !readingHeader) 
      { 
       // end 
       readingHeader = false; 
       readingBody = false; 
       if (header.Count > 0 || body.Count > 0) 
       { 
        o.OnNext(new Message() 
        { 
         Header = header.ToArray(), 
         Data = body.ToArray() 
        }); 
        header.Clear(); 
        body.Clear(); 
       } 
       nextIsEscaped = false; 
      } 
      else if (b == 0x1B && !nextIsEscaped && !readingHeader) 
      { 
       nextIsEscaped = true; 
      } 
      else 
      { 
       if (readingHeader) 
       { 
        header.Add(b); 
        if (header.Count == 3) 
        { 
         readingHeader = false; 
         readingBody = true; 
        } 
       } 
       else if (readingBody) 
        body.Add(b); 
       nextIsEscaped = false; 
      } 

     }); 
    }); 

} 
+0

感谢您的精心解答! 您在声明这实质上是一台状态机时是正确的。我甚至在我的问题中添加了一个图表。 我理解你的解决方案,但我希望有一种更直观的方式来描述Rx中的状态机。 –

+1

看起来像你的状态机的Stateless这样的东西会非常适合(替换我的Accumulator类)。那么你只需要在它周围放一些Rx。我能够在7个Rx操作员和一个状态机上玩耍并获得一个很好的解决方案。可测试,干净,封装。不知道你还在追寻什么。 –

+0

我只希望能有一种方式来表达状态机的功能。我也想如果主管道可以写成'stream.SplitOnMessageBoundaries()。Select(UnescapeMessageBuffer).Select(CreateMessage);' –

2

你可以只使用基本的构建模块:Observable.CreateSubscribe。首先,让我们抓住一些代码,这将有助于我们读流中观察到的byte []的(也有该怎么做许多不同的例子):

static class Extensions { 
    public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize) {   
     var buffer = new byte[bufferSize];    
     var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
      stream.BeginRead, 
      stream.EndRead); 
     return Observable.While(
      () => stream.CanRead, 
      Observable.Defer(() => asyncRead(buffer, 0, bufferSize)) 
       .Select(readBytes => buffer.Take(readBytes).ToArray())); 
    } 
} 

然后定义消息类:

class Message { 
    public byte[] Header { get; set; } 
    public byte[] Body { get; set; } 
} 

然后将它放入小控制台应用程序:

static void Main(string[] args) { 
     // original stream 
     var stream = new MemoryStream(new byte[] { 0x81, 0x01,0x02,0x03,0x1B,0x1B,0x82,0x82}); 
     // your initial IObservable<byte[]> 
     IObservable<byte[]> bytes = stream.AsyncRead(128); // or any other buffer size 
     // your IObservable<Message> 
     var observable = Observable.Create<Message>(observer => { 
      // some crude parsing code for the sake of example 
      bool nextIsEscaped = false; 
      bool readingHeader = false; 
      bool readingBody = false; 
      List<byte> body = new List<byte>(); 
      List<byte> header = new List<byte>(); 
      return bytes.Subscribe(buffer => { 
       foreach (var b in buffer) { 
        if (b == 0x81 && !nextIsEscaped && !readingHeader) { 
         // start 
         readingHeader = true; 
         readingBody = false; 
         nextIsEscaped = false; 
        } 
        else if (b == 0x82 && !nextIsEscaped && !readingHeader) { 
         // end 
         readingHeader = false; 
         readingBody = false; 
         if (header.Count > 0 || body.Count > 0) { 
          observer.OnNext(new Message() { 
           Header = header.ToArray(), 
           Body = body.ToArray() 
          }); 
          header.Clear(); 
          body.Clear(); 
         } 
         nextIsEscaped = false; 
        } 
        else if (b == 0x1B && !nextIsEscaped && !readingHeader) { 
         nextIsEscaped = true; 
        } 
        else { 
         if (readingHeader) { 
          header.Add(b); 
          if (header.Count == 3) { 
           readingHeader = false; 
           readingBody = true; 
          } 
         } 
         else if (readingBody) 
          body.Add(b); 
         nextIsEscaped = false; 
        } 
       } 
      }); 
     }); 
     observable.Subscribe(msg => 
     { 
      Console.WriteLine("Header: " + BitConverter.ToString(msg.Header)); 
      Console.WriteLine("Body: " + BitConverter.ToString(msg.Body)); 
     }); 
     Console.ReadKey(); 
    } 
+0

感谢您的回答! 我了解你的方法,但对我来说,这感觉非常程序化:它基本上是一个包装rx的状态机读取器。 我想知道是否还有其他更多功能的方法。 –

+0

你的问题中确实有状态机的元素。你需要知道当前角色是什么,角色在它之前是什么,以及可能之前的角色(转义角色本身是否逃脱了?)。在我的测试中,此代码符合您的要求 –