Event Processing

The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Handlers. It is the responsibility of the Event Bus to dispatch Event Messages to all components interested. On the receiving end, Event Processors are responsible for handling those events, which includes invocation of the appropriate Event Handlers.

Publishing Events

In the vast majority of cases, the Aggregates will publish events by applying them. However, occasionally, it is necessary to publish an event (possibly from within another component), directly to the Event Bus. To publish an event, simply wrap the payload describing the event in an EventMessage. The GenericEventMessage.asEventMessage(Object) method allows you to wrap any object into an EventMessage. If the passed object is already an EventMessage, it is simply returned.

Event Bus

The EventBus is the mechanism that dispatches events to the subscribed event handlers. Axon provides two implementation of the Event Bus: SimpleEventBus and EmbeddedEventStore. While both implementations support subscribing and tracking processors (see Events Processors below), the EmbeddedEventStore persists events, which allows you to replay them at a later stage. The SimpleEventBus has a volatile storage and 'forgets' events as soon as they have been published to subscribed components.

When using the Configuration API, the SimpleEventBus is used by default. To configure the EmbeddedEventStore instead, you need to supply an implementation of a StorageEngine, which does the actual storage of Events.

    Configurer configurer = DefaultConfigurer.defaultConfiguration();
    configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine());

Event Processors

Event Handlers define the business logic to be performed when an Event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a Unit of Work and possibly a transaction, but also ensure that correlation data can be correctly attached to all messages created during Event processing.

Event Processors come in roughly two forms: Subscribing and Tracking. The Subscribing Event Processors subscribe themselves to a source of Events and are invoked by the thread managed by the publishing mechanism. Tracking Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.

Assigning handlers to processors

All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name, can be considered as two instances of the same processor.

All Event Handlers are attached to a Processor whose name is the package name of the Event Handler's class.

For example, the following classes:

  • org.axonframework.example.eventhandling.MyHandler,

  • org.axonframework.example.eventhandling.MyOtherHandler, and

  • org.axonframework.example.eventhandling.module.MyHandler

will trigger the creation of two Processors:

  • org.axonframework.example.eventhandling with 2 handlers, and

  • org.axonframework.example.eventhandling.module with a single handler

The Configuration API allows you to configure other strategies for assigning classes to processors, or even assign specific instances to specific processors.

Configuring processors

By default, Axon will use Subscribing Event Processors. It is possible to change how Handlers are assigned and how processors are configured using the EventHandlingConfiguration class of the Configuration API.

The EventHandlingConfiguration class defines a number of methods that can be used to define how processors need to be configured.

  • registerEventProcessorFactory allows you to define a default factory method that creates Event Processors for which no explicit factories have been defined.

  • registerEventProcessor(String name, EventProcessorBuilder builder) defines the factory method to use to create a Processor with given name. Note that such Processor is only created if name is chosen as the processor for any of the available Event Handler beans.

  • registerTrackingProcessor(String name) defines that a processor with given name should be configured as a Tracking Event Processor, using default settings. It is configured with a TransactionManager and a TokenStore.

  • usingTrackingProcessors() sets the default to Tracking Processors instead of Subscribing ones.

Tracking Processors, unlike Subscribing ones, need a Token Store to store their progress in. Each message a Tracking Processor receives through its Event Stream is accompanied by a Token. This Token allows the processor to reopen the Stream at any later point, picking up where it left off with the last Event.

The Configuration API takes the Token Store, as well as most other components Processors need from the Global Configuration instance. If no TokenStore is explicitly defined, an InMemoryTokenStore is used, which is not recommended in production.

Distributing Events

In some cases, it is necessary to publish events to an external system, such as a message broker.

Spring AMQP

Axon provides out-of-the-box support to transfer Events to and from an AMQP message broker, such as Rabbit MQ.

Forwarding events to an AMQP Exchange

The SpringAMQPPublisher forwards events to an AMQP Exchange. It is initialized with a SubscribableMessageSource, which is generally the EventBus or EventStore. Theoretically, this could be any source of Events that the publisher can Subscribe to.

To configure the SpringAMQPPublisher, simply define an instance as a Spring Bean. There is a number of setter methods that allow you to specify the behavior you expect, such as Transaction support, publisher acknowledgements (if supported by the broker), and the exchange name.

The default exchange name is 'Axon.EventBus'.

Note

Note that exchanges are not automatically created. You must still declare the Queues, Exchanges and Bindings you wish to use. Check the Spring documentation for more information.

Reading Events from an AMQP Queue

Spring has extensive support for reading messages from an AMQP Queue. However, this needs to be 'bridged' to Axon, so that these messages can be handled from Axon as if they are regular Event Messages.

The SpringAMQPMessageSource allows Event Processors to read messages from a Queue, instead of the Event Store or Event Bus. It acts as an adapter between Spring AMQP and the SubscribableMessageSource needed by these processors.

The easiest way to configure the SpringAMQPMessageSource, is by defining a bean which overrides the default onMessage method and annotates it with @RabbitListener, as follows:

@Bean
public SpringAMQPMessageSource myMessageSource(Serializer serializer) {
    return new SpringAMQPMessageSource(serializer) {
        @RabbitListener(queues = "myQueue")
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            super.onMessage(message, channel);
        }
    };
}

Spring's @RabbitListener annotation tells Spring that this method needs to be invoked for each message on the given Queue ('myQueue' in the example). This method simply invokes the super.onMessage() method, which performs the actual publication of the Event to all the processors that have been subscribed to it.

To subscribe Processors to this MessageSource, pass the correct SpringAMQPMessageSource instance to the constructor of the Subscribing Processor:

// in an @Configuration file:
@Autowired
public void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
    ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}

Note that Tracking Processors are not compatible with the SpringAMQPMessageSource.

Asynchronous Event Processing

The recommended approach to handle Events asynchronously is by using a Tracking Event Processor. This implementation can guarantee processing of all events, even in case of a system failure (assuming the Events have been persisted).

However, it is also possible to handle Events asynchronously in a SubscribingProcessor. To achieve this, the SubscribingProcessor must be configured with an EventProcessingStrategy. This strategy can be used to change how invocations of the Event Listeners should be managed.

The default strategy (DirectEventProcessingStrategy) invokes these handlers in the thread that delivers the Events. This allows processors to use existing transactions.

The other Axon-provided strategy is the AsynchronousEventProcessingStrategy. It uses an Executor to asynchronously invoke the Event Listeners.

Even though the AsynchronousEventProcessingStrategy executes asynchronously, it is still desirable that certain events are processed sequentially. The SequencingPolicy defines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If the policy returns an equal identifier for two events, this means that they must be handled sequentially by the event handler. A null sequence identifier means the event may be processed in parallel with any other event.

Axon provides a number of common policies you can use:

  • The FullConcurrencyPolicy will tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.

  • The SequentialPolicy tells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished.

  • SequentialPerAggregatePolicy will force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.

Besides these provided policies, you can define your own. All policies must implement the SequencingPolicy interface. This interface defines a single method, getSequenceIdentifierFor, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return null if the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.

It is recommended to explicitly define an ErrorHandler when using the AsynchronousEventProcessingStrategy. The default ErrorHandler propagates exceptions, but in an asynchronous execution, there is nothing to propagate to, other than the Executor. This may result in Events not being processed. Instead, it is recommended to use an ErrorHandler that reports errors and allows processing to continue. The ErrorHandler is configured on the constructor of the SubscribingEventProcessor, where the EventProcessingStrategy is also provided.

Last updated