In-Process Command Bus Pattern
Similarly to the Event Bus, you can take a decoupled, command driven approach with the distinction that a command can only have a single handler which may or may not return a result. Whereas an event can have many handlers and they cannot return results back to the publisher.
1. Define A Command
This is the data contract that will be handed to the command handler. Mark the class with either the ICommand or ICommand
public class GetFullName : ICommand<string>
{
public string FirstName { get; set; }
public string LastName { get; set; }
}2. Define A Command Handler
This is the code that will be executed when a command of the above type is executed. Implement either the ICommandHandler<TCommand, TResult> or ICommandHandler
public class FullNameHandler : ICommandHandler<GetFullName, string>
{
public Task<string> ExecuteAsync(GetFullName command, CancellationToken ct)
{
var result = command.FirstName + " " + command.LastName;
return Task.FromResult(result);
}
}3. Execute The Command
Simply call the ExecuteAsync() extension method on the command object.
var fullName = await new GetFullName()
{
FirstName = "john",
LastName = "snow"
}
.ExecuteAsync();Generic Commands & Handlers
Generic commands & handlers require a bit of special handling. Say for example, you have a generic command type and a generic handler that's supposed to handle that generic command such as the following:
//command
public class MyCommand<T> : ICommand<IEnumerable<T>> { ... }
//handler
public class MyCommandHandler<T> : ICommandHandler<MyCommand<T>, IEnumerable<T>> { ... }In order to make this work, you need to register the association between the two with open generic types like so:
app.Services.RegisterGenericCommand(typeof(MyCommand<>), typeof(MyCommandHandler<>));Once registered, it's business as usual and you can execute generic commands such as this:
var results = await new MyCommand<SomeType>().ExecuteAsync();
var results = await new MyCommand<AnotherType>().ExecuteAsync();Manipulating Endpoint Error State
By implementing command handlers using the CommandHandler<> abstract types instead of the interfaces mentioned above, you are able to manipulate the validation/error state of the endpoint that issued the command like so:
public class GetFullNameEndpoint : EndpointWithoutRequest<string>
{
...
public override async Task HandleAsync(CancellationToken c)
{
AddError("an error added by the endpoint!");
//command handler will be adding/throwing it's own validation errors
Response = await new GetFullName
{
FirstName = "yoda",
LastName = "minch"
}.ExecuteAsync();
}
}public class FullNameHandler : CommandHandler<GetFullName, string>
{
public override Task<string> ExecuteAsync(GetFullName cmd, CancellationToken ct = default)
{
if (cmd.FirstName.Length < 5)
AddError(c => c.FirstName, "first name is too short!");
if (cmd.FirstName == "yoda")
ThrowError("no jedi allowed here!");
ThrowIfAnyErrors();
return Task.FromResult(cmd.FirstName + " " + cmd.LastName);
}
}In this particular case, the client will receive the following error response:
{
"statusCode": 400,
"message": "One or more errors occured!",
"errors": {
"generalErrors": [
"an error added by the endpoint!",
"no jedi allowed here!"
],
"firstName": [
"first name is too short!"
]
}
}Command Middleware Pipeline
If you'd like to make use of the Chain Of Responsibility Pattern, middleware components can be made to wrap around the command handlers in a layered fashion. Rather than calling a command handler directly, the execution of a command is passed through a pipeline of middleware components. Each middleware piece can execute common logic such as logging, validation, error handling, etc. before and after invoking the next piece of middleware in the chain.
Create open-generic middleware components implementing the ICommandMiddleware<TCommand, TResult> interface:
sealed class CommandLogger<TCommand, TResult>(ILogger<TCommand> logger)
: ICommandMiddleware<TCommand, TResult> where TCommand : ICommand<TResult>
{
public async Task<TResult> ExecuteAsync(TCommand command,
CommandDelegate<TResult> next,
CancellationToken ct)
{
logger.LogInformation("Executing command: {name}", command.GetType().Name);
var result = await next();
logger.LogInformation("Got result: {value}", result);
return result;
}
}Then register each component in the exact order you need them executed:
bld.Services.AddCommandMiddleware(
c =>
{
c.Register(typeof(CommandLogger<,>),
typeof(CommandValidator<,>),
typeof(ResultLogger<,>));
});Closed generic middleware can be written like so:
sealed class ClosedGenericMiddleware : ICommandMiddleware<MyCommand, string>
{
...
}Closed generic middleware needs to be registered with the generic overload of the Register() method:
bld.Services.AddCommandMiddleware(c => c.Register<MyCommand, string, ClosedGenericMiddleware>());Dependency Injection
Dependencies in command handlers can be resolved as described here.
Streaming Commands
If a command needs to return a stream of results instead of a single result, mark it with the IStreamCommand
public class GetNumbers : IStreamCommand<int>
{
public int Count { get; set; }
}
public class GetNumbersHandler : IStreamCommandHandler<GetNumbers, int>
{
public async IAsyncEnumerable<int> ExecuteAsync(
GetNumbers command, [EnumeratorCancellation] CancellationToken ct)
{
for (var i = 0; i < command.Count; i++)
{
await Task.Delay(100, ct);
yield return i;
}
}
}Execute the command the same way as a regular command. The only difference is that you iterate the returned IAsyncEnumerable
await foreach (var number in new GetNumbers { Count = 5 }.ExecuteAsync(ct))
{
Console.WriteLine(number);
}Streaming command handlers can also derive from the StreamCommandHandler<TCommand, TResult> base type if they need to manipulate the validation/error state of the endpoint that issued the command.
public class GetNumbersHandler : StreamCommandHandler<GetNumbers, int>
{
public override async IAsyncEnumerable<int> ExecuteAsync(
GetNumbers command, [EnumeratorCancellation] CancellationToken ct = default)
{
if (command.Count < 1)
ThrowError("count must be greater than zero!");
for (var i = 0; i < command.Count; i++)
yield return i;
}
}Generic Streaming Commands
Open-generic stream commands and handlers need to be registered explicitly, just like regular generic commands:
public class GetItems<T> : IStreamCommand<T> { ... }
public class GetItemsHandler<T> : IStreamCommandHandler<GetItems<T>, T> { ... }
app.Services.RegisterGenericStreamCommand(typeof(GetItems<>), typeof(GetItemsHandler<>));Streaming Command Middleware
Streaming commands have their own middleware pipeline. Create middleware by implementing IStreamCommandMiddleware<TCommand, TResult> and call the next delegate to continue the chain.
sealed class StreamLogger<TCommand, TResult>(ILogger<TCommand> logger)
: IStreamCommandMiddleware<TCommand, TResult> where TCommand : IStreamCommand<TResult>
{
public async IAsyncEnumerable<TResult> ExecuteAsync(
TCommand command,
StreamCommandDelegate<TResult> next,
[EnumeratorCancellation] CancellationToken ct)
{
logger.LogInformation("Executing stream command: {name}", command.GetType().Name);
await foreach (var item in next().WithCancellation(ct))
yield return item;
}
}Register stream command middleware separately from regular command middleware:
bld.Services.AddStreamCommandMiddleware(
c =>
{
c.Register(typeof(StreamLogger<,>));
});Command Bus Without FastEndpoints
The Command Bus can be used independently of the FastEndpoints library, even with Blazor WASM projects. Simply install the messaging library like so:
dotnet add package FastEndpoints.Messaging Register the messaging services with the IOC container as shown in the following console application example:
using FastEndpoints; //add this
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
var bld = Host.CreateApplicationBuilder();
bld.Services.AddMessaging(); //add this
var host = bld.Build();
host.Services.UseMessaging(); //add this
var greetUser = new GreetUser();
await greetUser.ExecuteAsync();
await host.RunAsync();
sealed class GreetUser : ICommand
{
public string Message => "Welcome to the App!";
}
sealed class GreetUserHandler(ILogger<GreetUserHandler> logger) : ICommandHandler<GreetUser>
{
public Task ExecuteAsync(GreetUser e, CancellationToken c)
{
logger.LogInformation("{@msg}", e.Message);
return Task.CompletedTask;
}
}