While making a HUGE update to my CacheRepository project, I needed a way to have a dynamic number of semaphores that would lock on a specified cache key. The SemaphoreSlim is great, but I needed a wrapper around it that allowed me have one for each unique cache key being fetched.
The easiest solution was just to have a concurrent dictionary of string to semaphore, but at high load that would grow in size and I did not want to waste memory. Instead I created a class that does keep a dictionary of semaphores, but then removes them from the dictionary and stores them in a queue for reuse once there is nothing locking off on them.
Enough talking! Below is the code, and as always it comes with unit tests! :)
KeyedSemaphoreSlim Implementation
public class KeyedSemaphoreSlim : IDisposable
{
private readonly object _lock = new object();
private readonly Queue<SemaphoreWrapper> _wrapperQueue
= new Queue<SemaphoreWrapper>();
private readonly Dictionary<string, SemaphoreWrapper> _wrapperMap
= new Dictionary<string, SemaphoreWrapper>();
private bool _isDisposed;
public Task<IDisposable> WaitAsync(
string key,
CancellationToken cancelToken = default(CancellationToken))
{
lock (_lock)
{
SemaphoreWrapper wrapper;
if (_wrapperMap.ContainsKey(key))
wrapper = _wrapperMap[key];
else
{
wrapper = _wrapperMap[key] = _wrapperQueue.Count == 0
? new SemaphoreWrapper(Release)
: _wrapperQueue.Dequeue();
wrapper.Key = key;
}
return wrapper.WaitAsync(cancelToken);
}
}
public void Dispose()
{
if (_isDisposed)
return;
lock (_lock)
foreach (var value in _wrapperMap.Values)
value.InternalDispose();
_isDisposed = true;
}
private void Release(SemaphoreWrapper wrapper)
{
lock (_lock)
{
var isEmpty = wrapper.Release();
if (!isEmpty)
return;
_wrapperMap.Remove(wrapper.Key);
_wrapperQueue.Enqueue(wrapper);
}
}
private class SemaphoreWrapper : IDisposable
{
private readonly Action<SemaphoreWrapper> _parentRelease;
private readonly SemaphoreSlim _semaphoreSlim;
private int _useCount;
public SemaphoreWrapper(Action<SemaphoreWrapper> parentRelease)
{
_parentRelease = parentRelease;
_semaphoreSlim = new SemaphoreSlim(1, 1);
}
public string Key { get; set; }
public async Task<IDisposable> WaitAsync(CancellationToken cancelToken)
{
_useCount++;
await _semaphoreSlim.WaitAsync(cancelToken).ConfigureAwait(false);
return this;
}
public bool Release()
{
_semaphoreSlim.Release();
_useCount--;
return _useCount == 0;
}
public void Dispose()
{
_parentRelease(this);
}
public void InternalDispose()
{
_semaphoreSlim.Dispose();
}
}
}
KeyedSemaphoreSlim Tests
public class KeyedSemaphoreSlimTests
{
[Fact]
public async Task WaitAsync()
{
using (var semaphore = new KeyedSemaphoreSlim())
{
// Use three keys to create three semaphores.
var ta1 = semaphore.WaitAsync("A");
var tb1 = semaphore.WaitAsync("B");
var ta2 = semaphore.WaitAsync("A");
var tb2 = semaphore.WaitAsync("B");
var ta3 = semaphore.WaitAsync("A");
var tc1 = semaphore.WaitAsync("C");
// Assert that first entry for each key is complete.
Assert.True(ta1.IsCompleted);
Assert.True(tb1.IsCompleted);
Assert.False(ta2.IsCompleted);
Assert.False(tb2.IsCompleted);
Assert.False(ta3.IsCompleted);
Assert.True(tc1.IsCompleted);
// Complete the first entry by disposing.
var ra1 = await ta1;
ra1.Dispose();
var rb1 = await tb1;
rb1.Dispose();
var rc1 = await tc1;
rc1.Dispose();
await Task.Delay(20);
// Show that the second entry is now complete.
Assert.True(ta2.IsCompleted);
Assert.True(tb2.IsCompleted);
Assert.False(ta3.IsCompleted);
// Complete the second entry by disposing.
var ra2 = await ta2;
ra2.Dispose();
var rb2 = await tb2;
rb2.Dispose();
await Task.Delay(20);
// Show that the third entry is now complete.
Assert.True(ta3.IsCompleted);
// Complete the third entry by disposing.
var ra3 = await ta3;
ra3.Dispose();
await Task.Delay(20);
// Assert that each key shares a unique semaphore instance.
Assert.Same(ra1, ra2);
Assert.Same(ra2, ra3);
Assert.Same(rb1, rb2);
Assert.NotSame(ra1, rb1);
Assert.NotSame(ra1, rc1);
// Get four new keys.
var td1 = semaphore.WaitAsync("D");
var te1 = semaphore.WaitAsync("E");
var tf1 = semaphore.WaitAsync("F");
var tg1 = semaphore.WaitAsync("G");
// Assert that they are all complete.
Assert.True(td1.IsCompleted);
Assert.True(te1.IsCompleted);
Assert.True(tf1.IsCompleted);
Assert.True(tg1.IsCompleted);
// Complete the first = entry for each.
var rd1 = await td1;
rd1.Dispose();
var re1 = await te1;
re1.Dispose();
var rf1 = await tf1;
rf1.Dispose();
var rg1 = await tg1;
rg1.Dispose();
await Task.Delay(20);
// Assert that the first three reuse the orginal
// instances in order of their disposal.
Assert.Same(rc1, rd1);
Assert.Same(rb2, re1);
Assert.Same(ra1, rf1);
// Show that the fourth key is a new semaphore.
Assert.NotSame(rg1, rd1);
Assert.NotSame(rg1, re1);
Assert.NotSame(rg1, rf1);
}
}
}
Enjoy,
Tom
Hi Tom, great code, but why are you not disposing _wrapperQueue in KeyedSemaphoreSlim.Dispose?
ReplyDelete