Scheduling tons of orchestrator functions concurrently in C#

I had a call with an engineer on another team who wanted advice for how to schedule a large number of concurrent orchestration functions in Durable Functions. I shared with him some sample code for how we do it in our own internal performance tests and…


This content originally appeared on DEV Community and was authored by Chris Gillum

I had a call with an engineer on another team who wanted advice for how to schedule a large number of concurrent orchestration functions in Durable Functions. I shared with him some sample code for how we do it in our own internal performance tests and decided it might be useful to share publicly too.

First, here is the orchestration that we're running for the test. It's just a basic sequential orchestrator that calls a SayHello activity function 5 times.

[FunctionName(nameof(HelloSequence))]
public static async Task<List<string>> HelloSequence(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var outputs = new List<string>
    {
        await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
        await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),
        await context.CallActivityAsync<string>(nameof(SayHello), "London"),
        await context.CallActivityAsync<string>(nameof(SayHello), "Amsterdam"),
        await context.CallActivityAsync<string>(nameof(SayHello), "Mumbai")
    };

    return outputs;
}

[FunctionName(nameof(SayHello))]
public static string SayHello([ActivityTrigger] string name) => $"Hello {name}!";

And here is the HTTP trigger function we use to trigger a performance run. It takes a count parameter from the query string as the number of concurrent "HelloSequence" orchestrations to run. In our tests, we'll often run more than 100K orchestrations concurrently.

[FunctionName(nameof(StartManySequences))]
public static async Task<IActionResult> StartManySequences(
    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
    [DurableClient] IDurableClient starter,
    ILogger log)
{
    if (!int.TryParse(req.Query["count"], out int count) || count < 1)
    {
        return new BadRequestObjectResult("A 'count' query string parameter is required and it must contain a positive number.");
    }

    string prefix = await ScheduleManyInstances(starter, nameof(HelloSequence), count, log);
    return new OkObjectResult($"Scheduled {count} orchestrations prefixed with '{prefix}'.");
}

This method calls into a ScheduleManyInstances helper method, which does the actual scheduling. Before I get into our implementation, however, I think it would be useful to describe what not to do.

Naïve implementation #1: Sequential

The most common naïve implementation is to use a for-loop with an await in each iteration.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    for (int i = 0; i < count; i++)
    {
        // Start each instance one-at-a-time
        string instanceId = $"{prefix}-{i:X16}";
        await client.StartNewAsync(orchestrationName, instanceId);
    }

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}

The above is really slow because you're only enqueuing a single orchestration start message at a time. If you're scheduling a large number of orchestrations, then the client that invoked this HTTP function will probably time-out before all the orchestrations are scheduled. Ideally we'd schedule orchestrations in parallel, which leads us to the next bad practice.

Naïve implementation #2: Too much parallelism

Next we try using Task.WhenAll to schedule all the orchestrations in parallel. This is better than scheduling orchestrations sequentially because it allows us to queue up new work much more quickly. However, it has a major scalability problem, which I'll describe below.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    // Run all StartNewAsync tasks concurrently
    var startTasks = new Task[count];
    for (int i = 0; i < count; i++)
    {
        string instanceId = $"{prefix}-{i:X16}";
        startTasks[i] = client.StartNewAsync(orchestrationName, instanceId);
    }

    await Task.WhenAll(startTasks);

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}

The problem is that if count is a large number (like 100K), you will very quickly exhaust both threads and outbound TCP connections on your VM. This is because the .NET thread scheduler will try to aggressively allocate a huge number of threads to satisfy your concurrency demands. Each of those threads will also try to open a connection to Azure Storage concurrently, requiring a new TCP connection. The result is often that the function will fail, making this implementation highly unreliable.

Naïve implementation #3: Throttled Parallelism for Parallel.For

This next approach uses Parallel.For to use a throttled concurrency approach.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    // Use up to 200 threads to schedule orchestrations concurrently
    var maxConcurrencyOptions = new ParallelOptions { MaxDegreeOfParallelism = 200 };
    Parallel.For(0, count, maxConcurrencyOptions, i =>
    {
        string instanceId = $"{prefix}-{i:X16}";

        // Use GetAwaiter().GetResult() to block since Parallel.For() doesn't support async
        client.StartNewAsync(orchestrationName, instanceId).GetAwaiter().GetResult();
    });

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}

This works much better than the first two solutions. It fixes the TCP connection exhaustion issue by giving the system enough time to reuse existing TCP connections. It also addresses the thread starvation issue by ensuring we don't use more than 200 threads at the same time.

However, the Parallel.For solution is still inefficient because each degree of parallelism is occupying a dedicated thread, and those threads will get blocked waiting for the StartNewAsync call to complete. Threads are expensive in terms of CPU and memory and take time to allocate. Ideally we'd do this work in a non-blocking way that allows us to aggressively reuse threads.

Final implementation: Throttled parallelism

The solution we use is a variation of the above that provides much better thread reuse by using a fully async implementation. I defined a ParallelForEachAsync helper extension method to achieve this.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    await Enumerable.Range(0, count).ParallelForEachAsync(200, i =>
    {
        string instanceId = $"{prefix}-{i:X16}";
        return client.StartNewAsync(orchestrationName, instanceId);
    });

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}

Here is the ParallelForEachAsync extension method implementation, which includes the async parallel throttling behavior.

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> items, int maxConcurrency, Func<T, Task> action)
{
    List<Task> tasks;
    if (items is ICollection<T> itemCollection)
    {
        // optimization to reduce the number of memory allocations
        tasks = new List<Task>(itemCollection.Count);
    }
    else
    {
        tasks = new List<Task>();
    }

    using var semaphore = new SemaphoreSlim(maxConcurrency);
    foreach (T item in items)
    {
        tasks.Add(InvokeThrottledAction(item, action, semaphore));
    }

    await Task.WhenAll(tasks);
}

static async Task InvokeThrottledAction<T>(T item, Func<T, Task> action, SemaphoreSlim semaphore)
{
    await semaphore.WaitAsync();
    try
    {
        await action(item);
    }
    finally
    {
        semaphore.Release();
    }
}

As you can see, we use a SemaphoreSlim to ensure we don't execute more than maxConcurrency concurrent actions at the same time. Because the code path is fully async, we can now run many operations in parallel with a much smaller number of threads. Right now we have maxConcurrency set to 200, but it's very possible that a larger number could have worked as well.

This ParallelForEachAsync extension method is generic and can be used for any code that needs to execute asynchronous tasks with a cap on concurrency. You could even use it inside your orchestrator functions when scheduling activities or sub-orchestrations! This is useful, for example, if you need to throttle activity function concurrency in a distributed way to protect downstream resources, like databases, which can only handle a certain number of concurrent operations.

Anyways, I hope this is helpful for anyone doing performance work with Durable Functions. If there is a simple way to do this in .NET and I just didn't know about it, I'm interested in learning about that too!


This content originally appeared on DEV Community and was authored by Chris Gillum


Print Share Comment Cite Upload Translate Updates
APA

Chris Gillum | Sciencx (2021-04-26T18:44:57+00:00) Scheduling tons of orchestrator functions concurrently in C#. Retrieved from https://www.scien.cx/2021/04/26/scheduling-tons-of-orchestrator-functions-concurrently-in-c/

MLA
" » Scheduling tons of orchestrator functions concurrently in C#." Chris Gillum | Sciencx - Monday April 26, 2021, https://www.scien.cx/2021/04/26/scheduling-tons-of-orchestrator-functions-concurrently-in-c/
HARVARD
Chris Gillum | Sciencx Monday April 26, 2021 » Scheduling tons of orchestrator functions concurrently in C#., viewed ,<https://www.scien.cx/2021/04/26/scheduling-tons-of-orchestrator-functions-concurrently-in-c/>
VANCOUVER
Chris Gillum | Sciencx - » Scheduling tons of orchestrator functions concurrently in C#. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2021/04/26/scheduling-tons-of-orchestrator-functions-concurrently-in-c/
CHICAGO
" » Scheduling tons of orchestrator functions concurrently in C#." Chris Gillum | Sciencx - Accessed . https://www.scien.cx/2021/04/26/scheduling-tons-of-orchestrator-functions-concurrently-in-c/
IEEE
" » Scheduling tons of orchestrator functions concurrently in C#." Chris Gillum | Sciencx [Online]. Available: https://www.scien.cx/2021/04/26/scheduling-tons-of-orchestrator-functions-concurrently-in-c/. [Accessed: ]
rf:citation
» Scheduling tons of orchestrator functions concurrently in C# | Chris Gillum | Sciencx | https://www.scien.cx/2021/04/26/scheduling-tons-of-orchestrator-functions-concurrently-in-c/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.