Streaming between Threads or Processes

From eqqon

Revision as of 15:46, 30 October 2007 by Henon (Talk | contribs)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

Threading and Streaming

To let one thread send data to other threads without interfering with them we combined a Queue with an AutoResetEvent in our Stream class. The stream maintains it's own receiver thread which sleeps (without polling) until some token is available. Then the OnReceive event is fired which executes the processing routines without delaying the sender. This decouples the sender and the receiver's execution contexts from each other which is needed for realtime applications. For optimal efficiency the generic Queue<T> has been chosen to avoid a lot of boxing and unboxing operations.

Stream<T>

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

namespace Threading
{
    public class Stream<T>
    {
        AutoResetEvent e;
        Queue<T> q;
        Thread receiver;
        bool abort;

        public Stream()
        {
            q = new Queue<T>();
            e = new AutoResetEvent(false);
            StartReceiving();
        }

        public AutoResetEvent WaitHandle
        {
            get { return e; }
            set { e = value; }
        }

        public void Send(T value)
        {
            q.Enqueue(value);
            e.Set();
        }

        public T Receive()
        {
            return q.Dequeue();
        }

        public void StopReceiving() { abort = true; e.Set(); }

        public void StartReceiving()
        {
            abort = false;
            receiver = new Thread(delegate()
            {
                //Console.WriteLine("Starting Receiver");
                while (true)
                {
                    //Console.Write(".");
                    e.WaitOne();
                    if (abort) Thread.CurrentThread.Abort();
                    if (OnReceive == null) continue;
                    while (q.Count > 0)
                        OnReceive(q.Dequeue());
                }
            });
            receiver.Start();
        }

        public event Action<T> OnReceive;
    }
}