<?xml version="1.0"?>
<?xml-stylesheet type="text/css" href="http://www.eqqon.com/skins/common/feed.css?270"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en">
		<id>http://www.eqqon.com/index.php?action=history&amp;feed=atom&amp;title=Streaming_between_Threads_or_Processes</id>
		<title>Streaming between Threads or Processes - Revision history</title>
		<link rel="self" type="application/atom+xml" href="http://www.eqqon.com/index.php?action=history&amp;feed=atom&amp;title=Streaming_between_Threads_or_Processes"/>
		<link rel="alternate" type="text/html" href="http://www.eqqon.com/index.php?title=Streaming_between_Threads_or_Processes&amp;action=history"/>
		<updated>2026-04-26T21:10:26Z</updated>
		<subtitle>Revision history for this page on the wiki</subtitle>
		<generator>MediaWiki 1.16.0</generator>

	<entry>
		<id>http://www.eqqon.com/index.php?title=Streaming_between_Threads_or_Processes&amp;diff=297&amp;oldid=prev</id>
		<title>Henon: New page: == Threading and Streaming == To let one thread send data to other threads without interfering with them we combined a '''Queue''' with an '''AutoResetEvent''' in our '''Stream''' class. T...</title>
		<link rel="alternate" type="text/html" href="http://www.eqqon.com/index.php?title=Streaming_between_Threads_or_Processes&amp;diff=297&amp;oldid=prev"/>
				<updated>2007-10-30T15:46:01Z</updated>
		
		<summary type="html">&lt;p&gt;New page: == Threading and Streaming == To let one thread send data to other threads without interfering with them we combined a &amp;#39;&amp;#39;&amp;#39;Queue&amp;#39;&amp;#39;&amp;#39; with an &amp;#39;&amp;#39;&amp;#39;AutoResetEvent&amp;#39;&amp;#39;&amp;#39; in our &amp;#39;&amp;#39;&amp;#39;Stream&amp;#39;&amp;#39;&amp;#39; class. T...&lt;/p&gt;
&lt;p&gt;&lt;b&gt;New page&lt;/b&gt;&lt;/p&gt;&lt;div&gt;== Threading and Streaming ==&lt;br /&gt;
To let one thread send data to other threads without interfering with them we combined a '''Queue''' with an '''AutoResetEvent''' in our '''Stream''' class. The stream maintains it's own receiver thread which sleeps (without polling) until some token is available. Then the OnReceive event is fired which executes the processing routines without delaying the sender. This decouples the sender and the receiver's execution contexts from each other which is needed for realtime applications. For optimal efficiency the generic Queue&amp;lt;T&amp;gt; has been chosen to avoid a lot of boxing and unboxing operations.&lt;br /&gt;
&lt;br /&gt;
== Stream&amp;lt;T&amp;gt; ==&lt;br /&gt;
&amp;lt;pre&amp;gt;&lt;br /&gt;
using System;&lt;br /&gt;
using System.Threading;&lt;br /&gt;
using System.Collections;&lt;br /&gt;
using System.Collections.Generic;&lt;br /&gt;
&lt;br /&gt;
namespace Threading&lt;br /&gt;
{&lt;br /&gt;
    public class Stream&amp;lt;T&amp;gt;&lt;br /&gt;
    {&lt;br /&gt;
        AutoResetEvent e;&lt;br /&gt;
        Queue&amp;lt;T&amp;gt; q;&lt;br /&gt;
        Thread receiver;&lt;br /&gt;
        bool abort;&lt;br /&gt;
&lt;br /&gt;
        public Stream()&lt;br /&gt;
        {&lt;br /&gt;
            q = new Queue&amp;lt;T&amp;gt;();&lt;br /&gt;
            e = new AutoResetEvent(false);&lt;br /&gt;
            StartReceiving();&lt;br /&gt;
        }&lt;br /&gt;
&lt;br /&gt;
        public AutoResetEvent WaitHandle&lt;br /&gt;
        {&lt;br /&gt;
            get { return e; }&lt;br /&gt;
            set { e = value; }&lt;br /&gt;
        }&lt;br /&gt;
&lt;br /&gt;
        public void Send(T value)&lt;br /&gt;
        {&lt;br /&gt;
            q.Enqueue(value);&lt;br /&gt;
            e.Set();&lt;br /&gt;
        }&lt;br /&gt;
&lt;br /&gt;
        public T Receive()&lt;br /&gt;
        {&lt;br /&gt;
            return q.Dequeue();&lt;br /&gt;
        }&lt;br /&gt;
&lt;br /&gt;
        public void StopReceiving() { abort = true; e.Set(); }&lt;br /&gt;
&lt;br /&gt;
        public void StartReceiving()&lt;br /&gt;
        {&lt;br /&gt;
            abort = false;&lt;br /&gt;
            receiver = new Thread(delegate()&lt;br /&gt;
            {&lt;br /&gt;
                //Console.WriteLine(&amp;quot;Starting Receiver&amp;quot;);&lt;br /&gt;
                while (true)&lt;br /&gt;
                {&lt;br /&gt;
                    //Console.Write(&amp;quot;.&amp;quot;);&lt;br /&gt;
                    e.WaitOne();&lt;br /&gt;
                    if (abort) Thread.CurrentThread.Abort();&lt;br /&gt;
                    if (OnReceive == null) continue;&lt;br /&gt;
                    while (q.Count &amp;gt; 0)&lt;br /&gt;
                        OnReceive(q.Dequeue());&lt;br /&gt;
                }&lt;br /&gt;
            });&lt;br /&gt;
            receiver.Start();&lt;br /&gt;
        }&lt;br /&gt;
&lt;br /&gt;
        public event Action&amp;lt;T&amp;gt; OnReceive;&lt;br /&gt;
    }&lt;br /&gt;
}&lt;br /&gt;
&lt;br /&gt;
&amp;lt;/pre&amp;gt;&lt;br /&gt;
&lt;br /&gt;
[[Category:CSharp]]&lt;/div&gt;</summary>
		<author><name>Henon</name></author>	</entry>

	</feed>