Consuming a custom stream (IEnumerable<T>)

  • A+
Category:Languages

I'm using a custom implementation of a Stream that will stream an IEnumerable<T> into a stream. I'm using this EnumerableStream implementation to perform the conversion.

I'm using it to perform streaming over WCF in streaming mode. I'm able to convert the IEnumerable to a stream without problem. Once, I'm in the client side, I can deserialize and get all the data, however I'm not able to find the condition to stop looping over my stream. I'm getting:

System.Runtime.Serialization.SerializationException: End of Stream encountered before parsing was completed.

Here's sample example of what I'm trying to achieve:

class Program {     public static void Main()     {         var ListToSend = new List<List<string>>();         var ListToReceive = new List<List<string>>();         ListToSend = SimulateData().ToList();         using (Stream stream = GetStream(ListToSend))         {             var formatter = new BinaryFormatter();             while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything???             {                 List<string> row = formatter.Deserialize(stream) as List<string>;                 ListToReceive.Add(row);             }             Printer(ListToReceive);             Console.WriteLine("Done");         }     }      private static void Printer(List<List<string>> data)     {         Console.WriteLine("Printing");         foreach (var row in data)         {             foreach (var cell in row)             {                 Console.Write(cell + "/t");             }             Console.WriteLine("-------------------------------------------------------------------------------");         }     }     private static Stream GetStream(IEnumerable<List<string>> data)     {         return EnumerableStream.Create(data, DeserializerCallback);     }      private static List<byte> DeserializerCallback(object obj)     {         var binFormatter = new BinaryFormatter();         var mStream = new MemoryStream();         binFormatter.Serialize(mStream, obj);         return mStream.ToArray().ToList();     }      private static IEnumerable<List<string>> SimulateData()     {         Random randomizer = new Random();         for (var i = 0; i < 10; i++)         {             var row = new List<string>();             for (var j = 0; j < 1000; j++)             {                 row.Add((randomizer.Next(100)).ToString());             }             yield return row;         }     } } 

I did not include the custom stream. I created a fiddle for those that want to see the entire code.

  • Do I need to add something in the custom stream itself to notify that all the data have been read?
  • Is it because the format of the deserializer and serialiser are not the same (I don't think so).
  • I also want to know why when I put a break point in the read function, the buffer size is changing randomly.
  • PLEASE, stop answering the question by wrapping the code with a try and catch, it's NOT the answer that I want. I want a clean solution that does not crash. Thank you.

It would be great if someone could enlighten me!

 


Do I need to add something in the custom stream itself to notify that all the data have been read?

You can, but that wouldn't help in the WCF scenario where the received Stream is a different class.

There are two standard (official, by design) ways of determining the end of the Stream data:

(1) ReadByte returning -1

Returns

The unsigned byte cast to an Int32, or -1 if at the end of the stream.

(2) Read returning 0 when called with count > 0

Returns

The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.

Unfortunately both they consume the current byte (advance to next) and will break the deserializer.

What are the possible solutions?

First, implementing some serialization/deserialization format (protocol) which allows you to know if there are more elements to deserialize. for instance, List<T> stores Count before elements, T[] stores Length before elements etc. Since the EnumerableStream<T> does not know the count in advance, one simple solution would be to emit a single fake byte before each element:

private bool SerializeNext() {     if (!_source.MoveNext())         return false;      buf.Enqueue(1); // <--     foreach (var b in _serializer(_source.Current))         _buf.Enqueue(b);      return true; } 

This would allow you to use

while (stream.ReadByte() != -1) {     // ... } 

Second, if you want to keep the current format, a more general solution would be to implement a custom stream, which wraps another stream and implements PeekByte method with the same semantics as the standard ReadByte, but without consuming the current byte:

public class SequentialStream : Stream {     private Stream source;     private bool leaveOpen;     private int? nextByte;      public SequentialStream(Stream source, bool leaveOpen = false)     {         if (source == null) throw new ArgumentNullException(nameof(source));         if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));         this.source = source;         this.leaveOpen = leaveOpen;     }      protected override void Dispose(bool disposing)     {         if (disposing && !leaveOpen)             source.Dispose();         base.Dispose(disposing);     }      public override bool CanRead => true;     public override bool CanSeek => false;     public override bool CanWrite => false;     public override long Length => throw new NotSupportedException();     public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }     public override void Flush() { }     public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();     public override void SetLength(long value) => throw new NotSupportedException();     public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();      public int PeekByte()     {         if (nextByte == null)             nextByte = source.ReadByte();         return nextByte.Value;     }      public override int Read(byte[] buffer, int offset, int count)     {         if (count <= 0) return 0;         if (nextByte != null)         {             if (nextByte.Value < 0) return 0;             buffer[offset] = (byte)nextByte.Value;             if (count > 1)             {                 int read = source.Read(buffer, offset + 1, count - 1);                 if (read == 0)                     nextByte = -1;                 else                     nextByte = null;                 return read + 1;             }             else             {                 nextByte = null;                 return 1;             }         }         else         {             int read = source.Read(buffer, offset, count);             if (read == 0)                 nextByte = -1;             return read;         }     } }  

This basically implements read only forward only stream with 0 or 1 byte read ahead functionality.

The usage will be like this:

using (var stream = new SequentialStream(GetStream(ListToSend))) {     // ...     while (stream.PeekByte() != -1)      {         // ...     }     // ... } 

P.S. What about

I also want to know why when I put a break point in the read function, the buffer size is changing randomly.

It's not randomly. BinaryFormatter internally uses BinaryReader to read typed values like Int32, Byte, String etc., passing the desired size as count, e.g. 4, 1, number of the string encoded bytes (which it knows because stores them in the stream before actual the data and reads it before trying to read the actual data) etc.

Comment

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: