Archive

Archive for the ‘Threading’ Category

Distributed Lock with AppFabric Caching

July 30, 2013 2 comments

When building distributed applications you often need to synchronize access to shared resources. For example you might have multiple instances of a web service running on a web farm and you need to only have one of those services perform a given task. Steve Marx proposed a solution for Windows Azure which relies on leases but unfortunately that will not work on premise. Other caching products, such as Redis, offer this functionality out of the box so I thought it would be useful to have this for AppFabric.

I’ve developed a DataCache extension method that works the same way as ServiceStack’s Redis Client. Here’s how to use it:

DataCache cache = factory.GetCache("MyCache");
using (cache.AcquireLock("MyLock"))
{
    // Lock acquired, perform synchronized work here
}

The AcquireLock method will block until it can acquire an exclusive lock or until the optional timeout value is reached. Here is the implementation:

public static IDisposable AcquireLock(this DataCache cache, string key, TimeSpan? timeout = null)
{
    if (cache == null)
    {
        throw new ArgumentNullException("cache");
    }
    if (key == null)
    {
        throw new ArgumentNullException("key");
    }

    return new DataCacheLock(cache, key, timeout);
}

Very simple so far, the core of the logic is in the private DataCacheLock class:

private class DataCacheLock : IDisposable
{
    private DataCache _cache;            
    private DataCacheLockHandle _lockHandle;
    private string _key;

    public DataCacheLock(DataCache cache, string key, TimeSpan? timeout)
    {
        _cache = cache;
        _key = key;
        RetryUntilTrue(() =>
        {
            try
            {
                cache.GetAndLock(key, timeout ?? TimeSpan.MaxValue, out _lockHandle);
                return true;
            }
            catch (DataCacheException ex)
            {
                if (ex.ErrorCode == DataCacheErrorCode.KeyDoesNotExist)
                {
                    try
                    {
                        cache.Add(key, string.Empty);
                    }
                    catch (DataCacheException)
                    {
                    }
                    return false;
                }
                else if (ex.ErrorCode == DataCacheErrorCode.ObjectLocked)
                {
                    return false;
                }
                throw;
            }
        }, timeout);
    }

    public void Dispose()
    {
        if (_lockHandle != null)
        {
            _cache.Unlock(_key, _lockHandle);
            _lockHandle = null;
        }
    }           
}

Essentially a client will try to GetAndLock the specified key. If the key doesn’t exist it will create it. If it exists but is already locked, it will retry indefinitely or until the timeout value is reached.

I am not a big fan of having logic implemented around exceptions but we don’t have any other choices here with the AppFabric Caching API. You’ve probably noticed the use of the helper method RetryUntilTrue, here’s how this is implemented:

private static readonly Random _random = new Random();

private static void RetryUntilTrue(Func<bool> action, TimeSpan? timeout)
{
    int i = 0;
    DateTime utcNow = DateTime.UtcNow;
    while (!timeout.HasValue || DateTime.UtcNow – utcNow < timeout.Value)
    {
        i++;
        if (action())
        {
            return;
        }
        Thread.Sleep(_random.Next((int)Math.Pow(i, 2), (int)Math.Pow(i + 1, 2) + 1));
    }
    throw new TimeoutException(string.Format("Exceeded timeout of {0}", timeout.Value));
}

This will make the client retry at increasingly random intervals.

One thing to keep in mind, if you don’t specify a timeout and the client holding the lock terminates abruptly (Dispose doesn’t get called), all your other clients will be deadlocked indefinitely. A possible solution to avoid this behavior would be to limit the timeout to 1 minute internally and implement a timer within the DataCacheLock class that would call ResetObjectTimeout every 30 seconds and extend the object’s timeout for another minute for example.

Advertisements

Generic ConcurrentPool

July 9, 2013 Leave a comment

This situation is fairly common: you need to re-use the same class over and over and you wish you didn’t have to pay the price for multiple allocations and garbage collection. The .NET framework doesn’t come with a generic pool class unfortunately so you have to create your own. I’ve seen very complicated implementations as well as others that weren’t thread-safe.

The following generic implementation relies on the .NET 4.0 ConcurrentBag<T> for storage which is thread-safe. The capacity parameter specifies the initial capacity of the pool.

public class ConcurrentPool<T> where T : class
{
    private ConcurrentBag<T> _bag = new ConcurrentBag<T>();
    private Func<T> _factoryMethod;

    public ConcurrentPool(int capacity)
    {
        if (capacity < 0)
            throw new ArgumentOutOfRangeException("capacity");

        if (typeof(T).GetConstructor(Type.EmptyTypes) == null)
            throw new ArgumentException(string.Format("Type '{0}' " +
                "doesn't have a parameterless constructor. " +
                "Specify a factoryMethod.", typeof(T).Name));

        _factoryMethod = () => Activator.CreateInstance<T>();
        Initialize(capacity);
    }
    public ConcurrentPool(int capacity, Func<T> factoryMethod)
    {
        if (capacity < 0)
            throw new ArgumentOutOfRangeException("capacity");

        if (factoryMethod == null)
            throw new ArgumentNullException("factoryMethod");

        _factoryMethod = factoryMethod;
        Initialize(capacity);
    }

    private void Initialize(int capacity)
    {
        for (int i = 0; i < capacity; i++)
        {
            _bag.Add(_factoryMethod());
        }
    }

    public virtual T Take()
    {
        T item;
        return _bag.TryTake(out item) ? item : _factoryMethod();
    }

    public virtual void Add(T item)
    {
        _bag.Add(item);
    }
}

As more objects are requested, new instances will be created. Note that the Take and Add methods are virtual if you need to perform some specific logic when taking or re-adding an object to the pool. You can also implement the IProducerConsumerCollection<T> interface in the ConcurrentPool if this is something you need (I’ve omitted it to keep this article short).

Any other IProducerConsumerCollection could be used as the underlying collection (ConcurrentQueue or ConcurrentStack for example). I’ve used the ConcurrentBag here because it provides the best performance out of those 3 for retrieval from what I’ve measured.