|
using System;
using System.Net;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Documents;
using System.Windows.Ink;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Animation;
using System.Windows.Shapes;
using System.Collections.Generic;
using System.Reflection;
using System.Globalization;
using System.Collections.ObjectModel;
using System.Reflection.Emit;
using System.Threading;
namespace Slex.Lib.Reactive
{
#region Internal Classes from System.Reactive
internal class AnonymousDisposable : IDisposable
{
// Fields
private Action dispose;
// Methods
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
this.dispose();
}
}
internal class AnonymousObservable<T> : IObservable<T>
{
// Fields
private Func<IObserver<T>, IDisposable> subscribe;
// Methods
public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
{
this.subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer)
{
AsyncSubject<IDisposable> signal = new AsyncSubject<IDisposable>();
var arg = new AutoDetachObserver(observer, signal);
IDisposable disposable = this.subscribe(arg);
signal.OnNext(disposable);
return disposable;
}
// Nested Types
private class AutoDetachObserver : AbstractObserver<T>
{
// Fields
private IObserver<T> observer;
private IObservable<IDisposable> signal;
// Methods
public AutoDetachObserver(IObserver<T> observer, IObservable<IDisposable> signal)
{
this.observer = observer;
this.signal = signal;
}
protected override void Completed()
{
this.observer.OnCompleted();
this.signal.Subscribe<IDisposable>(delegate(IDisposable d)
{
d.Dispose();
});
}
protected override void Error(Exception exception)
{
this.observer.OnError(exception);
this.signal.Subscribe<IDisposable>(delegate(IDisposable d)
{
d.Dispose();
});
}
protected override void Next(T value)
{
this.observer.OnNext(value);
}
}
}
internal abstract class AbstractObserver<T> : IObserver<T>
{
// Fields
private bool isStopped;
// Methods
public AbstractObserver()
{
this.isStopped = false;
}
protected abstract void Completed();
protected abstract void Error(Exception exception);
protected abstract void Next(T value);
public void OnCompleted()
{
if (!this.isStopped)
{
this.isStopped = true;
this.Completed();
}
}
public void OnError(Exception exception)
{
if (!this.isStopped)
{
this.isStopped = true;
this.Error(exception);
}
}
public void OnNext(T value)
{
if (!this.isStopped)
{
this.Next(value);
}
}
}
#endregion
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
Architect, Developer, Speaker | Wannabe GUT inventor & Data Scientist | Microsoft MVP in C#