using System; using System.Net; using System.Windows; using System.Windows.Controls; using System.Windows.Documents; using System.Windows.Ink; using System.Windows.Input; using System.Windows.Media; using System.Windows.Media.Animation; using System.Windows.Shapes; using System.Collections.Generic; using System.Reflection; using System.Globalization; using System.Collections.ObjectModel; using System.Reflection.Emit; using System.Threading; namespace Wex.Lib.Reactive { #region Internal Classes from System.Reactive internal class AnonymousDisposable : IDisposable { // Fields private Action dispose; // Methods public AnonymousDisposable(Action dispose) { this.dispose = dispose; } public void Dispose() { this.dispose(); } } internal class AnonymousObservable<T> : IObservable<T> { // Fields private Func<IObserver<T>, IDisposable> subscribe; // Methods public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) { this.subscribe = subscribe; } public IDisposable Subscribe(IObserver<T> observer) { AsyncSubject<IDisposable> signal = new AsyncSubject<IDisposable>(); var arg = new AutoDetachObserver(observer, signal); IDisposable disposable = this.subscribe(arg); signal.OnNext(disposable); return disposable; } // Nested Types private class AutoDetachObserver : AbstractObserver<T> { // Fields private IObserver<T> observer; private IObservable<IDisposable> signal; // Methods public AutoDetachObserver(IObserver<T> observer, IObservable<IDisposable> signal) { this.observer = observer; this.signal = signal; } protected override void Completed() { this.observer.OnCompleted(); this.signal.Subscribe<IDisposable>(delegate(IDisposable d) { d.Dispose(); }); } protected override void Error(Exception exception) { this.observer.OnError(exception); this.signal.Subscribe<IDisposable>(delegate(IDisposable d) { d.Dispose(); }); } protected override void Next(T value) { this.observer.OnNext(value); } } } internal abstract class AbstractObserver<T> : IObserver<T> { // Fields private bool isStopped; // Methods public AbstractObserver() { this.isStopped = false; } protected abstract void Completed(); protected abstract void Error(Exception exception); protected abstract void Next(T value); public void OnCompleted() { if (!this.isStopped) { this.isStopped = true; this.Completed(); } } public void OnError(Exception exception) { if (!this.isStopped) { this.isStopped = true; this.Error(exception); } } public void OnNext(T value) { if (!this.isStopped) { this.Next(value); } } } #endregion }
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.
This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)