Hello and welcome. In this post, we will try to explain an idea very useful for using knowledge sessions to perform Complex Event Processing (CEP) in a distributed environment for High Availability.
Complex Event Processing is a method of tracking and analyzing streams of data from multiple sources that refers to special occurrences of events in time, and to infer events or patterns that suggest more complicated circumstances. The goal is to identify meaningful events and respond to them as quickly as possible.
This set of functionality is easily implemented in Drools thanks to the Drools Fusion component. However, when thinking of having a CEP system in a productive environment, we usually find a few things we want that require special consideration:
We want to handle thousands of events at the same time, and want it to be as fast as possible(so using a persistent session is not something we want to be using most of the time)
90% of the cases require to have events correlated in time by a very small amount of time, from a few seconds, to maybe a day or two
We need to find a way to not lose all data if the server fails. However, because we want to make it as fast as possible, using a persistent session might not be the best solution.
In order to meet all these requirements, one possible solution was to have the knowledge session replicated among different nodes receiving events from a particular unified implementation of sources (such as a JMS Broker), like the following image shows:
This approach has one big flaw: All knowledge sessions are independent from each other. That means that either no common data exists between two or more sessions, or if it is, rules would be fired as many times as nodes exist in the cluster. Many times rules reference outside systems where the actions are to be performed, and if a complex event is detected, we usually want the rule to be fired just once.
Another approach taken to solve this issue is using a single persistent session shared between many nodes by the database:
This is the safest way to go. When using a persistent session, all the contents of the knowledge session are stored in a database. Every time an event occurs, the session is reloaded from the database, the event is added, the rules are reevaluated and it is persisted again.
The one problem with this type of configuration is, sadly, performance. When you persist the knowledge session, you basically serialize all (or most) of the data that exists in the knowledge session into a blob field in the session info table. Every time you add a new event, the blob grows (unless you configure an object marshalling strategy, but some ID data referencing the object is always stored with the session’s blob, even if you store the rest of the event is stored elsewhere). And when you have thousands to maybe millions of tiny events, all happening really fast, that recording will become slow.
Also, for most cases, the information about the primordial events (the very basic events that compose a more complex event) is not of much interest unless they actually fire a rule. Most of the time they’re only relevant as a collection. So we decided on implementing the following approach to manage our knowledge sessions: We created a non-persistent knowledge session that:
Notifies a group of sessions when a fact is inserted, deleted or update
Notifies a group of sessions when it has already fired a rule for a specific group of facts
Checks if anyone else in the sessions group has already fired a rule for a specific group of facts before firing it again.
To do so, our class (called HAKieSession) receives a special registry interface (called HAKieSessionRegistry) that implements all those events. We provide a special implementation of that registry class that works using JMS, so our schematics looks like this:
It’s very similar to the first implementation, except we don’t only hear events of the JMS broker, but we also send events to it, to notify other sessions when a rule has been fired or when the working memory has been altered.
In this way, we make an equilibrium between performance and high availability. If one of the nodes fails, the others can continue with the operations. Only one node will fire the rules at a time, but any of the nodes can fire the rules if the others fail. Only if all nodes fail would you lose events information. However, there could be backup functionality added to this implementation that creates some database backup only when some server is idle.
But how is it made?
The implementation of this HAKieSession was rather simple, thanks to the elegant API that the KieSession provides. We first wrote the HAKieSessionRegistry with the following methods:
ruleAlreadyFired: Checks if a rule has already been fired by another node. Returns true if so
fireRuleFired: Creates a record that this particular node has fired a rule, to prevent other nodes from firing it.
WorkingMemoryEventListener methods: To create notifications of each working memory change to other nodes.
Then, we simply created an extension to the stateful knowledge session that registers the registry class, first as a working memory event listener, and then creates an internal AgendaFilter. The AgendaFilter checks when a rule has been fired or not by other nodes (using the registry class) before actually firing the rules.
The following UML diagram shows all these classes together. It’s just that simple!
The only thing the HAKieSession class overrides from the original stateful knowledge session is the fireAllRules and fireUntilHalt methods. It merely decorates them to make sure that you always use its internal AgendaFilter. If you already call it with an AgendaFilter, it generates a composite agenda filter, that uses your AgendaFilter’s conditions AND its internal AgendaFilter condition.
Perhaps the most complex of components is the actual implementation of the HAKieSessionRegistry interface we created (JMSKieSessionRegistry). It holds both the initialization or connection to a Topic for all rule firing and working memory change information exchange. It implements also MessageListener to read information sent by other nodes through JMS using other instances of the JMSKieSessionRegistry class.
All the code is available through these URLs to anyone who wishes to download it and try it:
I hope you find it useful. Feel free to contact us to get more information, or to share your experience!