using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using Pfz.Caching;
using Pfz.DynamicObjects;
using Pfz.DynamicObjects.Internal;
using Pfz.Extensions;
using Pfz.Remoting.Instructions;
using Pfz.Serialization;
using Pfz.Threading;
namespace Pfz.Remoting
{
/// <summary>
/// Class used to access objects remotelly.
/// </summary>
public class RemotingClient:
RemotingCommon
{
#region Fields
private IChanneller _channeller;
internal long _id;
internal BidirectionalDictionary _objectsUsedByTheOtherSide;
internal Dictionary<long, RemotingProxy> _wrappers = new Dictionary<long, RemotingProxy>();
internal object _wrappersLock = new object();
internal AutoResetEvent _collectWrappersEvent = new AutoResetEvent(false);
internal bool _weCausedActualCollection;
internal readonly object _registeredEventsLock = new object();
internal Dictionary<EventInfo, Dictionary<Delegate, WeakList<object>>> _registeredEvents;
private BaseDisposableThreadLocal _threadDatas = new BaseDisposableThreadLocal();
[Serializable]
private sealed class FakeNull
{
}
private static readonly FakeNull _fakeNull = new FakeNull();
#endregion
#region Constructor
/// <summary>
/// Creates a new RemotingClient.
/// </summary>
public RemotingClient()
{
GCUtils.Collected += _Collected;
_objectsUsedByTheOtherSide = new BidirectionalDictionary();
}
/// <summary>
/// Creates a new RemotingClient with the given parameters
/// </summary>
public RemotingClient(RemotingParameters parameters):
base(parameters)
{
GCUtils.Collected += _Collected;
_objectsUsedByTheOtherSide = new BidirectionalDictionary();
}
#endregion
#region Dispose
/// <summary>
/// Frees the unmanaged resources of this RemotingClient.
/// </summary>
protected override void Dispose(bool disposing)
{
if (disposing)
{
GCUtils.Collected -= _Collected;
var collectWrappersEvent = _collectWrappersEvent;
if (collectWrappersEvent != null)
_collectWrappersEvent.Set();
Disposer.Dispose(ref _channeller);
Disposer.Dispose(ref _objectsUsedByTheOtherSide);
Disposer.Dispose(ref _threadDatas);
var registeredEventsLock = _registeredEventsLock;
if (registeredEventsLock != null)
{
lock(registeredEventsLock)
{
var registeredEvents = _registeredEvents;
if (registeredEvents != null)
{
foreach(var pair in registeredEvents)
{
var eventInfo = pair.Key;
var delegateDictionary = pair.Value;
foreach(var pair2 in delegateDictionary)
{
var handler = pair2.Key;
var weakList = pair2.Value;
foreach(object obj in weakList)
eventInfo.RemoveEventHandler(obj, handler);
}
}
}
}
}
var disposed = Disposed;
if (disposed != null)
disposed(this, null);
}
base.Dispose(disposing);
}
#endregion
#region _Collected
private void _Collected()
{
if (!_weCausedActualCollection)
{
var collectWrappersEvent = _collectWrappersEvent;
if (collectWrappersEvent != null)
_collectWrappersEvent.Set();
}
try
{
lock(_registeredEventsLock)
{
var oldRegisteredEvents = _registeredEvents;
if (oldRegisteredEvents != null)
{
var newRegisteredEvents = new Dictionary<EventInfo, Dictionary<Delegate, WeakList<object>>>();
foreach(var pair in oldRegisteredEvents)
{
var delegateDictionary = pair.Value;
var newDelegateDictionary = new Dictionary<Delegate, WeakList<object>>();
foreach(var pair2 in delegateDictionary)
{
if (pair2.Value.Count > 0)
newDelegateDictionary.Add(pair2.Key, pair2.Value);
}
if (newDelegateDictionary.Count > 0)
newRegisteredEvents.Add(pair.Key, newDelegateDictionary);
}
if (newRegisteredEvents.Count > 0)
_registeredEvents = newRegisteredEvents;
else
_registeredEvents = null;
}
}
}
catch
{
var channeller = _channeller;
if (!WasDisposed && channeller != null && !channeller.WasDisposed)
throw;
}
}
#endregion
#region Properties
#region ExecutingClient
[ThreadStatic]
private static RemotingClient _executingClient;
/// <summary>
/// Gets the RemotingClient that invoked the actual method, directly or indirectly.
/// Will return null if the actual method was not invoked by a remote call.
/// </summary>
public static RemotingClient ExecutingClient
{
get
{
return _executingClient;
}
}
#endregion
#region Channeller
/// <summary>
/// Gets the channeller used by this remoting client.
/// </summary>
public IChanneller Channeller
{
get
{
return _channeller;
}
}
#endregion
#region IsStarted
private bool _isStarted;
/// <summary>
/// Gets a value indicating if Start was already called or not.
/// </summary>
public bool IsStarted
{
get
{
return _isStarted;
}
}
#endregion
#endregion
#region Methods
#region GetFromRemoteObject
/// <summary>
/// Gets the RemotingClient that created the given remote object.
/// Returns null if the object is not remote.
/// </summary>
public static RemotingClient GetFromRemoteObject(object obj)
{
BaseImplementedProxy baseImplementedProxy = obj as BaseImplementedProxy;
if (baseImplementedProxy == null)
return null;
object fieldValue = baseImplementedProxy._proxyObject;
RemotingProxy remotingProxy = fieldValue as RemotingProxy;
if (remotingProxy == null)
return null;
return remotingProxy.RemotingClient;
}
#endregion
#region Start
/// <summary>
/// Starts this remoting client.
/// Parameters will not accept changes anymore.
/// </summary>
public void Start(IChanneller channeller)
{
if (channeller == null)
throw new ArgumentException("channeller can't be null.");
CheckThread();
if (_isStarted)
throw new RemotingException("This RemotingClient is already running.");
_parameters._isReadOnly = true;
_isStarted = true;
_channeller = channeller;
channeller.ChannelCreated += _RunAsServer;
channeller.Start();
UnlimitedThreadPool.Run(_CollectWrappers);
}
/// <summary>
/// Starts this RemotingClient.
/// </summary>
public void Start(string hostname, int port)
{
Start(HybridChanneller.Connect(hostname, port, DefaultBufferSizePerChannel));
}
#endregion
#region Disconnect
/// <summary>
/// Disconnects the active connection.
/// This may end-up disposing the RemotingClient if it does not supports reconnections.
/// </summary>
public void Disconnect()
{
var channeller = _channeller;
if (channeller != null)
channeller.Dispose();
}
#endregion
#region _CreateConnectionIfNeeded
private object _connectLock = new object();
internal void _CreateConnectionIfNeeded()
{
CheckUndisposed();
if (!_isStarted)
throw new RemotingException("You must call Start().");
}
#endregion
#region _GetWrappedDelegate
internal object _GetWrappedDelegate(WrappedDelegate wrappedDelegate)
{
long id = wrappedDelegate.Id;
RemotingProxyDelegate proxy = new RemotingProxyDelegate(this, id);
var result = DelegateProxier.Proxy(proxy, wrappedDelegate.DelegateType);
proxy.ImplementedDelegate = result;
lock(_wrappersLock)
_wrappers.Add(id, proxy);
return result;
}
#endregion
#region _GetWrappedObject
internal object _GetWrappedObject(Wrapped wrappedObject)
{
RemotingProxyObject proxy = new RemotingProxyObject(this, wrappedObject);
var result = InterfaceProxier.Proxy(proxy, wrappedObject.InterfaceTypes);
proxy.ImplementedObject = result;
lock(_wrappersLock)
_wrappers.Add(wrappedObject.Id, proxy);
return result;
}
#endregion
#region _GetReferencedObject
internal object _GetReferencedObject(RemotingSerializer serializer, Reference reference)
{
RemotingProxy result;
if (!_wrappers.TryGetValue(reference.Id, out result))
throw new RemotingException("Got a reference to an inexisting wrapper.");
RemotingProxyObject proxyObj = result as RemotingProxyObject;
if (proxyObj != null)
return proxyObj.ImplementedObject;
RemotingProxyDelegate delegateProxy = (RemotingProxyDelegate)result;
return delegateProxy.ImplementedDelegate;
}
#endregion
#region _GetThreadData
private ThreadData _GetThreadData()
{
var thread = Thread.CurrentThread;
var theadDatas = _threadDatas;
lock(theadDatas._lock)
{
DisposeAssurer disposeCaller;
if (theadDatas._dictionary.TryGetValue(thread, out disposeCaller))
return (ThreadData)disposeCaller.Value;
var threadData = new ThreadData(_channeller.CreateChannel(), this);
disposeCaller = DisposeAssurer.Create((IDisposable)threadData);
theadDatas._dictionary.Add(thread, disposeCaller);
return threadData;
}
}
#endregion
#region _Invoke
internal object _Invoke(Instruction instruction)
{
var threadData = _GetThreadData();
threadData.Serialize(instruction);
object resultObject;
while(true)
{
try
{
resultObject = threadData.Deserialize();
}
catch(Exception exception)
{
Dispose(exception);
throw;
}
instruction = resultObject as Instruction;
if (instruction == null)
break;
instruction.Run(this, threadData);
}
RemotingResult result = (RemotingResult)resultObject;
var exception2 = result.Exception;
if (exception2 != null)
throw exception2;
var resultValue = result.Value;
return resultValue;
}
internal object _Invoke(Instruction instruction, MethodInfo methodInfo, object[] outParameters)
{
var threadData = _GetThreadData();
threadData.Serialize(instruction);
object resultObject;
while(true)
{
try
{
resultObject = threadData.Deserialize();
}
catch(Exception exception)
{
Dispose(exception);
throw;
}
instruction = resultObject as Instruction;
if (instruction == null)
break;
instruction.Run(this, threadData);
}
RemotingResult result = (RemotingResult)resultObject;
var exception2 = result.Exception;
if (exception2 != null)
throw exception2;
_ProcessOut(methodInfo, result.OutValues, outParameters);
var resultValue = result.Value;
return resultValue;
}
#endregion
#region _CollectWrappers
private void _CollectWrappers()
{
try
{
ThreadData threadData = null;
while(true)
{
_collectWrappersEvent.WaitOne();
if (WasDisposed)
return;
if (threadData == null)
threadData = _GetThreadData();
lock(_objectsUsedByTheOtherSide.DisposeLock)
{
if (WasDisposed)
return;
if (_objectsUsedByTheOtherSide._dictionary1.Count < 2)
continue;
threadData.Serialize(InstructionCollect.Instance);
long[] collectedIds = (long[])threadData.DeserializeNoLock();
_objectsUsedByTheOtherSide.RemoveIds(collectedIds);
}
}
}
catch(Exception exception)
{
Dispose(exception);
}
finally
{
Disposer.Dispose(ref _collectWrappersEvent);
}
}
#endregion
#region _RunAsServer
private void _RunAsServer(object sender, ChannelCreatedEventArgs args)
{
var data = args.Data;
if (data != null)
{
if (data is FakeNull)
args.Data = null;
OnUserChannelCreated(args);
return;
}
try
{
var channel = args.Channel;
var thread = Thread.CurrentThread;
var threadData = new ThreadData(channel, this);
_threadDatas._Value = threadData;
while(true)
{
object instructionObject;
instructionObject = threadData.Deserialize();
var instruction = (Instruction)instructionObject;
var oldExecutingClient = _executingClient;
try
{
_executingClient = this;
instruction.Run(this, threadData);
}
finally
{
_executingClient = oldExecutingClient;
}
}
}
catch(Exception exception)
{
if (!WasDisposed && _channeller.WasDisposed)
Dispose(exception);
}
}
#endregion
#region CreateUserChannel
/// <summary>
/// Creates an stream to communicate to the other side, without opening a new tcp ip port.
/// </summary>
public IChannel CreateUserChannel(object serializableData = null)
{
_CreateConnectionIfNeeded();
if (serializableData == null)
serializableData = _fakeNull;
var result = _channeller.CreateChannel(serializableData);
return result;
}
#endregion
#region InvokeStaticMethod
/// <summary>
/// Invokes a registered static method on the other side.
/// </summary>
public object InvokeStaticMethod(string methodName, params object[] parameters)
{
_CreateConnectionIfNeeded();
var instruction = new InstructionInvokeStaticMethod();
instruction.MethodName = methodName;
instruction.Parameters = parameters;
return _Invoke(instruction);
}
#endregion
#region Create
/// <summary>
/// Creates a registered object on the other side.
/// </summary>
public object Create(string name, params object[] parameters)
{
_CreateConnectionIfNeeded();
var instruction = new InstructionCreateObject();
instruction.Name = name;
instruction.Parameters = parameters;
return _Invoke(instruction);
}
/// <summary>
/// Creates an interface registered on the other side, using its default name and constructor.
/// </summary>
public T Create<T>()
{
return (T)Create(typeof(T).FullName);
}
#endregion
#region Out Values in General
private static readonly Dictionary<MethodInfo, int[]> _outIndexesDictionary = new Dictionary<MethodInfo, int[]>();
private static readonly ReaderWriterLockSlim _outIndexesDictionaryLock = new ReaderWriterLockSlim();
internal static int[] _GetOutIndexes(MethodInfo methodInfo)
{
bool result;
int[] outIndexes;
using(_outIndexesDictionaryLock.ReadLock())
result = _outIndexesDictionary.TryGetValue(methodInfo, out outIndexes);
if (!result)
{
using(_outIndexesDictionaryLock.UpgradeableLock())
{
result = _outIndexesDictionary.TryGetValue(methodInfo, out outIndexes);
if (result)
return outIndexes;
List<int> list = new List<int>();
int parameterIndex = -1;
foreach(var parameter in methodInfo.GetParameters())
{
parameterIndex++;
if (parameter.ParameterType.IsByRef)
list.Add(parameterIndex);
}
if (list.Count > 0)
outIndexes = list.ToArray();
using(_outIndexesDictionaryLock.WriteLock())
_outIndexesDictionary.Add(methodInfo, outIndexes);
}
}
return outIndexes;
}
private static void _ProcessOut(MethodInfo methodInfo, object[] resultOutParameters, object[] outParameters)
{
if (resultOutParameters == null)
return;
var outIndexes = _GetOutIndexes(methodInfo);
int resultIndex = -1;
foreach(int index in outIndexes)
{
resultIndex++;
outParameters[index] = resultOutParameters[resultIndex];
}
}
internal static object[] _GetOutValues(MethodInfo methodInfo, object[] outParameters)
{
if (outParameters == null)
return null;
var outIndexes = _GetOutIndexes(methodInfo);
if (outIndexes == null)
return null;
int count = outIndexes.Length;
object[] result = new object[count];
for(int i=0; i<count; i++)
{
int index = outIndexes[i];
result[i] = outParameters[index];
}
return result;
}
#endregion
#endregion
#region Events
#region Disposed
/// <summary>
/// Event invoked when this RemotingClient is disposed.
/// To guarantee that it will be invoked, set this event in the RemotingClientParameters before
/// creating it.
/// </summary>
public event EventHandler Disposed;
#endregion
#endregion
}
}