Job Queueing With Commands
Work that can be processed in the background such as sending emails, pdf generation, report creation, etc. that doesn't require a result to be available immediately would be prime candidates for the Job Queue functionality in FastEndpoints.
Job queues allow you to schedule Commands to be executed in the background while limiting how many command instances of the same type can be executed at the same time. It is sometimes crucial to be in control of the degree of parallelism for certain types of tasks such as long-running or CPU intensive work to prevent the server from grinding to a halt, as well as to stay within access control limits of third-party services.
Queueing A Job
Similarly to the Command Bus, the same ICommand and it's companion ICommandHandler<TCommand> is used to define the data contract and the execution logic such as the following:
sealed class MyCommand : ICommand
{
...
}
sealed class MyCommandHandler : ICommandHandler<MyCommand>
{
public Task ExecuteAsync(MyCommand command, CancellationToken ct)
{
...
}
}
When you need to queue a command as a job, instead of executing it immediately, simply call the extension method QueueJobAsync() on the command DTO like so:
await new MyCommand { ... }.QueueJobAsync();
A background job encapsulating the command is created and added to a queue for that type of command. There's a job queue per each command type in your application. If there's 10 command types, there'd be 10 independent queues processing jobs.
Execution Options
At the time of queueing, it's possible to specify a future point of time after which the command/job is to be executed instead of immediately (which is the default behavior if you don't specify anything). This does not however mean that the job will be executed at the exact given time. It just will not be executed before that time.
The default expiry time of jobs is 4 hours from the time of creation, which you can override as shown below. If for some reason the job doesn't execute/complete successfully before the expiry time, it will be considered stale/incomplete and can be purged from the queue (which is discussed in the job persistence section below).
.QueueJobAsync(
executeAfter: DateTime.UtcNow.AddMinutes(30),
expireOn: DateTime.UtcNow.AddHours(8));
Enabling Job Queues
Job queues are not enabled by default and must be configured at startup. Since job queues are designed to be reliable and not lose data in case of server restarts/crashes, it's required of you to implement a storage provider on any database/storage medium of your choice. How to implement the storage provider is discussed below. For now, let's focus on the startup configuration.
var bld = WebApplication.CreateBuilder();
bld.Services
.AddFastEndpoints()
.AddJobQueues<JobRecord, JobStorageProvider>(); //ignore generic arguments for now
var app = bld.Build();
app.UseFastEndpoints();
app.UseJobQueues();
app.Run();
Job queues can be used independently of FastEndpoints REST endpoints by installing just the following nuget package. There's no need to install it if your project is already referencing the FastEndpoints main library.
dotnet add package FastEndpoints.JobQueues
Per Queue Execution Limits
By default, each queue will process multiple commands in parallel. The default limit is the number of logical processors of the machine. For example, if the server has 4 cores/threads, at most only 4 commands of the same type will execute at the same time. You can customize the max concurrency setting at startup like this:
.UseJobQueues(o => o.MaxConcurrency = 2);
Queued jobs can be given a certain amount of time to execute. Command executions exceeding that time limit would automatically get cancelled and retried. By default however, commands are allowed to execute without a limit. You can specify a maximum execution time like so:
.UseJobQueues(o => o.ExecutionTimeLimit = TimeSpan.FromSeconds(10));
Specifying the limits like above applies to all types of commands, which you can override per type if needed:
.UseJobQueues(o =>
{
//general per queue limits
o.MaxConcurrency = 2;
o.ExecutionTimeLimit = TimeSpan.FromSeconds(10);
//applicable only to MyCommand
o.LimitsFor<MyCommand>(
maxConcurrency: 8,
timeLimit: TimeSpan.FromSeconds(5));
});
That's all the configuration needed (other than implementing the storage provider discussed below). As with the command bus, there's no need to register individual commands & handlers. They are auto discovered by the library.
Job Persistence
In order to provide the storage mechanism for job queues, two interfaces must be implemented.
- IJobStorageRecord for the job storage entity. ( See example)
- IJobStorageProvider<TStorageRecord> for the storage provider. ( See example)
The storage record entity is simply a POCO containing the actual command DTO together with some metadata. As for the storage provider class, it simply needs to delegate data access to whatever database/storage engine that stores the jobs as shown with the MongoDB example below:
sealed class JobStorageProvider : IJobStorageProvider<JobRecord>
{
private readonly DbContext db;
public bool DistributedJobProcessingEnabled => false;
public JobProvider(DbContext db)
{
this.db = db; //inject the dbContext
}
public Task StoreJobAsync(JobRecord job, CancellationToken ct)
{
// persist the provided job record to the database
return db.SaveAsync(job, ct);
}
public async Task<IEnumerable<JobRecord>> GetNextBatchAsync(PendingSearchParams<JobRecord> p)
{
// return a batch of pending jobs to be processed next
return await db
.Find<JobRecord>()
.Match(p.Match) //use the provided boolean lambda expression to match entities
.Limit(p.Limit) //either use the provided limit or choose your own
.ExecuteAsync(p.CancellationToken); //pass the provided cancellation token
}
public Task MarkJobAsCompleteAsync(JobRecord job, CancellationToken ct)
{
// either replace the supplied job record in the db.
// or do a partial update of just the 'IsComplete' property.
// or delete the entity now if batch deleting later is not preferred.
return db
.Update<JobRecord>()
.MatchID(job.ID)
.Modify(r => r.IsComplete, true)
.ExecuteAsync(ct);
}
public Task CancelJobAsync(Guid trackingId, CancellationToken ct)
{
// do a partial update of just the 'IsComplete' property.
// or delete the entity now if batch deleting later is not preferred.
return db.Update<JobRecord>()
.Match(r => r.TrackingID == trackingId)
.Modify(r => r.IsComplete, true)
.ExecuteAsync(ct);
}
public Task OnHandlerExecutionFailureAsync(JobRecord job, Exception e, CancellationToken c)
{
// this is called whenever execution of a command's handler fails.
// do nothing here if you'd like it to be automatically retried.
// or update the 'ExecuteAfter' property to reschedule it to a future time.
// or delete (or mark as complete) the entity if retry is unnecessary.
return db
.Update<JobRecord>()
.MatchID(job.ID)
.Modify(r => r.ExecuteAfter, DateTime.UtcNow.AddMinutes(1))
.ExecuteAsync(c);
}
public Task PurgeStaleJobsAsync(StaleJobSearchParams<JobRecord> p)
{
// this method is called hourly.
// do whatever you like with the stale (completed/expired) jobs.
return db.DeleteAsync(p.Match, p.CancellationToken);
}
}
The full source code of the above example is available on GitHub.
Using a document database such as MongoDB, LiteDB, etc. may be more suitable for implementing a job storage provider rather than using EF Core & SQL Server, as the EF Core DbContext needs additional configuration in order to support embedding command objects as well as supporting multithreading when being used as a singleton. See this example project that shows how to configure a pooled db context factory, which is the recommended way to use EF Core DbContext in storage providers.
Job Cancellations
Queued jobs can be cancelled anytime/from anywhere with its Tracking Id. When cancelled, the job will not be picked up for execution. If the job is already running, cancellation is requested via the cancellation token passed down to the command handler. Periodically check if the cancellation token is in a cancellation requested state and gracefully stop execution without throwing exceptions. See example here.
var trackingId = await new LongRunningCommand().QueueJobAsync();
await JobTracker<LongRunningCommand>.CancelJobAsync(trackingId);
Use either the JobTracker<TCommand> generic class or inject a IJobTracker<TCommand> instance from the DI Container to access the CancelJobAsync() method.
Jobs With Results
A command that returns a result (ICommand<TResult>) can also be queued up as a job. The result can be retrieved from anywhere via the JobTracker using the job's Tracking Id. To enable support for job results, simply implement the following addon interfaces:
sealed class JobRecord : IJobStorageRecord, IJobResultStorage
{
...
public object? Result { get; set; } // a property for storing the result
}
sealed class JobStorageProvider : IJobStorageProvider<JobRecord>, IJobResultProvider
{
...
public Task StoreJobResultAsync<TResult>(Guid trackingId, TResult result, CancellationToken ct)
{
// 1.) retrieve the job by trackingId.
// 2.) set the result on the job like so:
((IJobResultStorage)job).SetResult(result);
// 3.) persist the job entity back to the database.
}
public Task<TResult?> GetJobResultAsync<TResult>(Guid trackingId, CancellationToken ct)
{
// 1.) retrieve the job by trackingId.
// 2.) extract the result from the job like so:
var result = ((IJobResultStorage)job).GetResult<TResult>();
// 3.) return the result
}
Once the storage record entity and provider is set up, you can queue commands that return results as usual and use the job tracker to retrieve the result as follows:
// queue the command as a job and obtain the tracking id
var trackingId = new MyCommand { ... }.QueueJobAsync();
// retrieve the result of the command using the tracking id
var result = await JobTracker<MyCommand>.GetJobResultAsync<MyResult>(trackingId);
Use either the JobTracker<TCommand> generic class or inject a IJobTracker<TCommand> instance from the DI Container to access the GetJobResultAsync() method. The result will be default for value types and null for reference types until the command handler completes its work. Click here for an EF Core example.
Tracking Job Execution Progress
With job progress tracking, the command handler can provide intermediate progress data during execution. This progress data can be retrieved from anywhere using the job's Tracking Id. To use this functionality, ensure the following:
- The job storage record and storage provider must be set up to handle results as explained above.
- Command classes must implement the ITrackableJob<JobResult<TResult>> interface (instead of ICommand<TResult>), where your actual result is wrapped in a customizable JobResult<T> wrapper.
An example command/job can be written like so:
sealed class MyJob : ITrackableJob<JobResult<MyEndResult>>
{
public Guid TrackingID { get; set; } // required by the interface
public string MyName { get; set; }
}
sealed class MyEndResult
{
public string MyMessage { get; set; }
}
The command handler for the above would look like this:
sealed class MyJobHandler(IJobTracker<MyJob> tracker) // inject the job tracker
: ICommandHandler<MyJob, JobResult<MyEndResult>>
{
public async Task<JobResult<MyEndResult>> ExecuteAsync(MyJob job, CancellationToken ct)
{
var jobResult = new JobResult<MyEndResult>(totalSteps: 100); // set total number of steps
for (var i = 0; i < 100; i++)
{
// update & store the current progress via tracker
jobResult.CurrentStep = i;
jobResult.CurrentStatus = $"completed step: {i}";
await tracker.StoreJobResultAsync(job.TrackingID, jobResult, ct);
}
jobResult.CurrentStatus = "all done!";
jobResult.Result = new() { MyMessage = $"thank you {job.MyName}!" }; // set the end-result
return jobResult; // return the job-result instance
}
}
Here's an example endpoint that can be used to poll the ongoing progress of the above job and ultimately retrieve the end-result:
sealed class JobProgressEndpoint : EndpointWithoutRequest<string>
{
public override void Configure()
{
Post("job/progress/{trackingId:guid}");
AllowAnonymous();
}
public override async Task HandleAsync(CancellationToken c)
{
var trackingId = Route<Guid>("trackingId");
var jobResult = await JobTracker<MyJob>
.GetJobResultAsync<JobResult<MyEndResult>>(trackingId, c);
if (jobResult is null)
{
await Send.OkAsync("job execution hasn't begun yet!");
return;
}
switch (jobResult.IsComplete)
{
case false:
await Send.OkAsync($"[{jobResult.ProgressPercentage}%] |" +
$" status: {jobResult.CurrentStatus}");
break;
case true:
await Send.OkAsync($"end result: {jobResult.Result.MyMessage}");
break;
}
}
}
Working examples of all of the above can be found here.
Distributed Job Processing
By default, job queues are designed for a single worker instance. If you deploy multiple instances of your application that share the same underlying data store, more than one worker could pick up and execute the same job simultaneously. Distributed job processing solves this by ensuring each job is only processed by a single worker, even when multiple instances are running.
The approach is lease based. When a worker picks up a job, it "claims" it by setting a future timestamp on the record. Other workers skip claimed records. If the claiming worker crashes before completing the job, the lease expires and the job becomes available for another worker to pick up automatically.
No external tools or distributed locks are needed. The data store itself acts as the synchronization mechanism via atomic database operations native to itself.
Step 1: Enable Distributed Mode On The Storage Provider
Set DistributedJobProcessingEnabled to true on your storage provider. This tells the engine to start polling the storage immediately at startup to discover jobs added by other workers. When false (the default for single-instance deployments), polling only begins after the first job is queued on this instance avoiding unnecessary database queries for unused queues.
sealed class JobStorageProvider : IJobStorageProvider<JobRecord>
{
public bool DistributedJobProcessingEnabled => true;
...
}
Step 2: Update The Job Storage Record
Add a DequeueAfter property to your job storage record class. This property acts as a lease timestamp. A value in the past (or DateTime.MinValue) means the job is eligible for processing. A value in the future means it's claimed by a worker.
sealed class JobRecord : IJobStorageRecord
{
...
public DateTime DequeueAfter { get; set; } // add this property
}
Ensure this property is mapped to a column/field in your database. Setting a database index on DequeueAfter along with the other query fields (QueueID, IsComplete, ExecuteAfter, ExpireOn) is recommended for performance.
Step 3: Implement Atomic Claiming In GetNextBatchAsync
In distributed scenarios, your GetNextBatchAsync implementation must use a database-level atomic operation to find matching records and set their DequeueAfter to a future time in a single step. The engine supplies a pre-built Match expression via the search parameters that includes all eligibility checks (including DequeueAfter <= now) for databases that support simply passing it down such as MongoDB. The ExecutionTimeLimit value from the search parameters can be used as a guide for determining a suitable lease duration.
A plain "read then update" approach has a race window where two workers can read the same record before either writes. You MUST use an atomic operation provided by your database.
Sample: MongoDB
MongoDB's atomic update-and-retrieve is a single-document operation, so claiming a batch requires calling it in a loop. The following example uses the MongoDB.Entities library:
public async Task<ICollection<JobRecord>> GetNextBatchAsync(PendingJobSearchParams<JobRecord> p)
{
var now = DateTime.UtcNow;
var leaseTime = p.ExecutionTimeLimit == Timeout.InfiniteTimeSpan
? TimeSpan.FromMinutes(30)
: p.ExecutionTimeLimit + TimeSpan.FromMinutes(1);
var claimed = new List<JobRecord>();
for (var i = 0; i < p.Limit; i++)
{
var record = await db
.UpdateAndGet<JobRecord>()
.Match(p.Match)
.Modify(r => r.DequeueAfter, now + leaseTime)
.ExecuteAsync(p.CancellationToken);
if (record is null)
break; // no more available records
claimed.Add(record);
}
return claimed;
}
Each UpdateAndGet call atomically matches a single unclaimed record (using the pre-built p.Match expression) and sets its DequeueAfter in one operation. No two workers can claim the same record. The loop runs at most Limit times and stops early when no more eligible records are found.
Sample: EF Core + PostgreSQL
PostgreSQL supports UPDATE ... RETURNING combined with FOR UPDATE SKIP LOCKED for atomic claiming:
public async Task<ICollection<JobRecord>> GetNextBatchAsync(PendingJobSearchParams<JobRecord> p)
{
var now = DateTime.UtcNow;
var leaseTime = p.ExecutionTimeLimit == Timeout.InfiniteTimeSpan
? TimeSpan.FromMinutes(30)
: p.ExecutionTimeLimit + TimeSpan.FromMinutes(1);
return await db.Jobs
.FromSqlInterpolated(
$"""
UPDATE "Jobs"
SET "DequeueAfter" = {now + leaseTime}
WHERE "Id" IN (
SELECT "Id" FROM "Jobs"
WHERE "QueueID" = {p.QueueID}
AND "IsComplete" = false
AND "ExecuteAfter" <= {now}
AND "ExpireOn" >= {now}
AND "DequeueAfter" <= {now}
ORDER BY "ExecuteAfter"
FOR UPDATE SKIP LOCKED
LIMIT {p.Limit}
)
RETURNING *
""")
.AsNoTracking()
.ToListAsync(p.CancellationToken);
}
FOR UPDATE SKIP LOCKED ensures that rows already being claimed by another worker's in-flight transaction are silently skipped, eliminating contention entirely.
Sample: EF Core + SQL Server
SQL Server uses UPDLOCK, ROWLOCK, READPAST hints and UPDATE ... OUTPUT for the same effect:
public async Task<ICollection<JobRecord>> GetNextBatchAsync(PendingJobSearchParams<JobRecord> p)
{
var now = DateTime.UtcNow;
var leaseTime = p.ExecutionTimeLimit == Timeout.InfiniteTimeSpan
? TimeSpan.FromMinutes(30)
: p.ExecutionTimeLimit + TimeSpan.FromMinutes(1);
return await db.Jobs
.FromSqlInterpolated(
$"""
WITH cte AS (
SELECT TOP ({p.Limit}) *
FROM Jobs WITH (UPDLOCK, ROWLOCK, READPAST)
WHERE QueueID = {p.QueueID}
AND IsComplete = 0
AND ExecuteAfter <= {now}
AND ExpireOn >= {now}
AND DequeueAfter <= {now}
ORDER BY ExecuteAfter
)
UPDATE cte
SET DequeueAfter = {now + leaseTime}
OUTPUT inserted.*
""")
.AsNoTracking()
.ToListAsync(p.CancellationToken);
}
READPAST is SQL Server's equivalent of SKIP LOCKED. It skips rows that are currently locked by another transaction.
Step 4: Reset DequeueAfter On Execution Failure
When a command handler throws an exception, OnHandlerExecutionFailureAsync is called. In distributed scenarios, you must reset the DequeueAfter property so the job becomes eligible for pickup again (by any worker). If you're also rescheduling the retry via ExecuteAfter, set DequeueAfter to the same value or to DateTime.MinValue:
public Task OnHandlerExecutionFailureAsync(JobRecord job, Exception e, ...)
{
var retryAt = DateTime.UtcNow.AddMinutes(1);
return db
.Update<JobRecord>()
.MatchID(job.ID)
.Modify(r => r.ExecuteAfter, retryAt)
.Modify(r => r.DequeueAfter, retryAt) // release the lease
.ExecuteAsync(c);
}
If you don't reset DequeueAfter, the job will remain "claimed" until the lease naturally expires, which delays the retry unnecessarily.
How Crash Recovery Works
If a worker crashes mid-execution without calling MarkJobAsCompleteAsync or OnHandlerExecutionFailureAsync, the job's DequeueAfter remains set to the lease expiry time. Once that time passes, the record satisfies the DequeueAfter <= now condition again, and any worker can claim and re-process it. No manual intervention is required.