Task.Factory.StartNew is great, but often abused.
Do you need to make a call that could take a long time to complete, but you don't care about the results? If so, then you need to make an async call. Should you make these calls by constantly creating and starting new Tasks? No, as this could use up a lot of resouces, exhaust your thread pool, or possibly even tear down your app domain.
I was recently introduced to System.Collections.Concurrent.BlockingCollection, and I absolutely love that class. However, 99% of my use cases with BlockingCollections are actually more specific to queuing. My solution: create a generic BlockingQueue!
Simple File Example
public class SimpleFile
{
public string Path { get; set; }
public string Contents { get; set; }
}
public class SimpleFileQueue : BlockingQueue<SimpleFile>
{
public SimpleFileQueue(int threadCount) : base(threadCount) { }
protected override void ProcessModel(SimpleFile model)
{
System.IO.File.WriteAllText(model.Path, model.Contents);
}
protected override void HandleException(Exception ex)
{
// TODO: Log me!
}
}
public static class SimpleFileExample
{
public static readonly SimpleFileQueue Queue = new SimpleFileQueue(3);
public static void EnqueueSimpleFile(string path, string content)
{
Queue.Enqueue(new SimpleFile
{
Path = path,
Contents = content
});
}
}
BlockingQueue<T> Implementation
public abstract class BlockingQueue<T> : IDisposable
{
#region Private Members
private const int Timeout = 60000;
private bool _disposed;
private readonly CancellationTokenSource _tokenSource;
private readonly BlockingCollection<T> _collection;
private readonly Task[] _tasks;
#endregion
#region Public Properties
public int Count
{
get { return _collection.Count; }
}
public bool IsCanceled
{
get { return _tokenSource.IsCancellationRequested; }
}
public bool IsCompleted
{
get { return _tasks.All(t => t.IsCompleted); }
}
#endregion
#region Constructor & Destructor
protected BlockingQueue(int threadCount)
{
_tokenSource = new CancellationTokenSource();
var queue = new ConcurrentQueue<T>();
_collection = new BlockingCollection<T>(queue);
_tasks = new Task[threadCount];
for(var i=0; i<threadCount; i++)
_tasks[i] = Task.Factory.StartNew(ProcessQueue);
}
~BlockingQueue()
{
Dispose(true);
}
#endregion
#region Abstracts
protected abstract void HandleException(Exception ex);
protected abstract void ProcessModel(T model);
#endregion
#region Methods
public void Enqueue(T model)
{
if (IsCompleted)
throw new Exception("BlockingQueue has been Completed");
if (IsCanceled)
throw new Exception("BlockingQueue has been Canceled");
_collection.Add(model);
}
public void Cancel()
{
if (!IsCanceled)
_tokenSource.Cancel(false);
}
public void CancelAndWait()
{
Cancel();
Task.WaitAll(_tasks);
}
private void ProcessQueue()
{
while (!IsCanceled)
{
try
{
T model;
var result = _collection.TryTake(out model, Timeout, _tokenSource.Token);
if (result && model != null)
ProcessModel(model);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
HandleException(ex);
}
}
}
#endregion
#region IDisposable
public void Dispose()
{
Dispose(false);
}
private void Dispose(bool finalizing)
{
if (_disposed)
return;
Cancel();
if (!finalizing)
GC.SuppressFinalize(this);
_disposed = true;
}
#endregion
}
Enjoy,
Tom
Does this handle persistence and locking?
ReplyDeleteLocking yes, persistence no.
ReplyDelete