Disclaimer: the methods and code examples in this article are the fruit of my own investigation and self learning, and in no means these are ready to be used in a real, production environment. Use at your own risk. To learn more about parallel and synchronous programming, I recommend reading the official documentation at: https://docs.microsoft.com/dotnet/standard/parallel-processing-and-concurrency 

This post is part of a series of 3 articles about performace improvement with parallel programming:

1. Improve the performance with asynchronous functions to run processes in parallel
2. Make use of ConcurrentBag to store the results from asynchronous processes (this article)
3. Parallel programming in C# with class Parallel

 

From my previous post about parallelizing process execution, I am extending the sample there to show how we can get that different processes in parallel can combine their results into the same object, without the asynchronous execution being a problem. In this case, we are going to get all the results from the execution in the same data list.

Source code in this article can be found on Github: https://github.com/sgisbert/parallelization

 

Starting point

In the previous post we managed to reduce the processing time from 2 seconds down to 0.6 seconds, althoug actually our method was not returning any result. For this sample, I am modifying it to make it return a random number between 1 and 100, while we keep the delay to show the execution time.

Let's modify the Process() method to return an integer (see full file on Github):

private static async Task<int> Process(int id)
{
    var number = await Task<int>.Run(() =>
    {
        Stopwatch timer = new Stopwatch();
        timer.Start();
        Random random = new Random();
        Thread.Sleep(200);

        int number = random.Next(1,10);

        Console.WriteLine($"Process {id}: {timer.Elapsed}");
        return number;
    });
    return number;
}

And add the necessary code to get the results in the main thread:

List<Task<int>> tasks = new List<Task<int>>();
for (int i = 0; i < 10; i++)
{
    tasks.Add(Process(i));
}
Task.WaitAll(tasks.ToArray());

List<int> results = new List<int>();
foreach (var task in tasks)
{
    results.Add(task.Result);
}

Note that changing the return type for Process() to Task<int>, we also need to change it for the list where we are storing the tasks that are being executed to as well to List<Task<int>>, so we can access later to the result with task.Result.

The execution result is as follows:

Process 0: 00:00:00.2077483
Process 1: 00:00:00.2076874
Process 3: 00:00:00.2074275
Process 2: 00:00:00.2057897
Process 7: 00:00:00.2097799
Process 6: 00:00:00.2101881
Process 5: 00:00:00.2103866
Process 4: 00:00:00.2109039
Process 9: 00:00:00.2004370
Process 8: 00:00:00.2006408

Result: 76,2,79,86,12,45,27,17,55,5
Completed: 00:00:00.6971794

In this sample, we are processing all the results after all the tasks have finalized the execution. For this simple example, the second foreach loop has increased the execution time in almost one tenth, getting closer to 0.7s from the originally 0.6s. Imagine a more complex case where you should analyze hundreds or thousands results, it is possible you would lose most of the performance gains you got from parallelizing the process.

 

Getting the results already processed

So that we don't need to post process the results from the asynchronous methods in a loop, we can make use of concurrent data types, which are thread-safe, like ConcurrentBag<T>. There are also concurrent versions for Dictionary, Queue and Stack. With these data types, each process will add its results directly to the same ConcurrentBag as the others, so once back to the main thread, there's no need to loop over the results again.

To do so, let's add the ConcurrentBag as a parameter to our Process() method (see full file on Github):

private static async Task Process(int id, ConcurrentBag<int> cb)
{
    await Task.Run(() =>
    {
        Stopwatch timer = new Stopwatch();
        timer.Start();
        Random random = new Random();
        Thread.Sleep(200);

        int number = random.Next(1,100);
        cb.Add(number);

        Console.WriteLine($"Process {id}: {timer.Elapsed}");
    });
}

So we don't need to return a Task<int>, nor process again the results in the main thread, but we can make use of them directly from the ConcurrentBag:

static async Task Main(string[] args)
{
    ConcurrentBag<int> cb = new ConcurrentBag<int>();
    Stopwatch timer = new Stopwatch();
    timer.Start();

    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 10; i++)
    {
        tasks.Add(Process(i, cb));
    }
    Task.WaitAll(tasks.ToArray());

    Console.WriteLine();
    Console.WriteLine($"Result: {string.Join(",", cb)}");
    Console.WriteLine($"Completed: {timer.Elapsed}");
}

Back to the results:

Process 1: 00:00:00.2029103
Process 2: 00:00:00.2028785
Process 0: 00:00:00.2029188
Process 3: 00:00:00.2028485
Process 5: 00:00:00.2007397
Process 7: 00:00:00.2007120
Process 6: 00:00:00.2007211
Process 4: 00:00:00.2008456
Process 9: 00:00:00.2001557
Process 8: 00:00:00.2007588

Result: 82,83,88,99,57,71,13,54,18,40
Completed: 00:00:00.6276547

And we are back again to be closer to the 0.6 seconds than before, once we removed the unnecessary loop to process the results.

 

Conclusions

  • It is possible to execute processes in parallel and get the results grouped in the same object. This will avoid the need to post-process the results, which would add an unnecessary execution time.

  • There are thread-safe data types for collections that can be used to store the results from asynchronous tasks, like ConcurrentBagConcurrentDictionaryConcurrentQueue or ConcurrentStack. Learning how to use them will give us better options to improve our applications.