Queue Consumers
Overview
A Queue Consumer is a specific service that can be utilized to consume messages from a perticular queue and process them. This is used in conjunction with a Queue defined in a Process Connection Diagram.
This Deep-Dive will progress in the form of an exercise to explain how this is defined, configured and executed.
What You Will Learn
- What are Queue Consumers
- How are they executed
- How to create a new Queue Consumer
Defining a Queue
When Enactor applications needs to directly interact with a Queue, it usually identifies a Queue by following through a Connected Application.
- When it needs to write to a Queue, it identifies the Queue via a Connected Application outputs.
- When it needs to read from a Queue, it identifies the Queue via a Connected Application input.
For the purpose of this deep-dive, let's imagine a scenario where we write messages to a Queue which then transfers those messages to a different queue via a Queue Connector. Ultimately, Enactor application intends to read from this final queue using a Queue Consumer.
The relevant Process Connection Diagram for this would look similar to the following.
- The connected application
ExampleQueueWriteApp
would be used for finding the queue when writing to theExampleQueueToWrite
queue. - The connected application
ExampleQueueReaderApp
would be used for finding the queue when reading from theExampleReadFromQ
queue.
Both the queues will use the LocalJDBC
as their provider which would means both these queues would be using the local database to retain messages for those queues.
The Provider ID of the connection will be updated to use the LocalJDBC
provider in order to achieve this.
For a Queue Consumer to function, we would only need the reader side of these in the connection diagram. However, there needs to be an external way of writing to this queue in that case.
Queue Consumer Process
Enactor platform provides an out of the box process that can be configured as a service which can consume a queue named MessageService/ConfigurableQueueConsumerService
.
There is also a multi-threaded version of this implementation which we will not talk about in this document.
The Configurable Queue Consumer Service accepts a ServiceDefinition as an input in which we can define the details of the queue that needs to be consumed and how the payload should be handled.
Queue Consumer Service Type
In order to define a ServiceDefinition, first we must define a ServiceType for that service.
This can be configured via EM or can be imported as configuration data via File Import as well.
Property | Description |
---|---|
Definition Type | This needs to be set as 'queueConsumerServiceDefinition' in order to let the underlying processes know the specific definition type. |
Definition Namespace | This also needs to be specified as 'http://www.enactor.com/core' in order to be able to resolve the correct definition entity type |
Implementation Type | This needs to be specified as 'Process' for the Queue Consumer as the service is executed in the form of a process in this case. |
Implementation Name | This needs to be specified as 'MessageService/ConfigurableQueueConsumerService' to utilize the built-in Queue Consumer |
Runtime Context | This is an important property which specifies in which runtime should this service get executed. For example, Enactor POS will use the 'Enactor Pos' runtime whereas Enactor EM Processing will use 'Enactor Web Retail Processing'. This value needs to be defined according to run where necessary. If a perticular service needs to be run in two different runtime contexts, two different service types needs to be created for that. |
Queue Consumer Service Definition
A service definition is the granular level confiuration entity related to a service that is run. To execute a Queue Consumer as a service, we need a ServiceDefinition that is of Queue Consumer Service Type.
Property | Description |
---|---|
Service Type | This should be a Queue Consumer Service Type |
Device Type | This defines the device types this service should run in. Please keep in mind that the Runtime Context configured in the Service Type works in conjunction with this and both needs to be set accordingly for the service to start. |
Auto Start | This should be ticked for the service to start automatically. If not, the service can be started using the Service Status maintenance app. |
Property | Description |
---|---|
Connected Process Id | The Connected Process Application Id in the connection diagram which points to the queue to read. In this above example, this should be 'ExampleQueueReaderApp' |
Connection Point Id | This is the input ID of the connection point in the above application. For the above example, this should be 'ExampleQueueReadInput' |
Failed Connected Process Id | This can be used as a separate queue where failed messages can be redirected if the processing fails. |
Failed Connection Point Id | This is the input that points form the queue for the failed messages |
Payload Handler Process Id | This is a process that is capable of handling the specific messages expected in this queue. This process usually needs to be developed to handle the specific messages as required. Please refer to the Payload Handling Process section for more information. |
Payload Qualified Names | This defines a list of entities that this queue understands. This is required to be in place in order for the Enactor applications to deserialise data in the queue. |
Payload Handler Process
A process should be created which can handle the messages that in the defined queue. This process is programatically invoked by the Queue Consumer processes parsing the messages that was read from the queue as inputs to this process.
The process should have the following defined at minimum.
Inputs
- enactor.coreUI.Entity (type = com.enactor.core.entities.IEntity)
Outputs The outputs defined are for Enactor internal messaging and is not mandatory to be configured when used externally.
Outcomes
- Success - The Queue Consumer commits the Queue Session and continues
- Fail - The Queue Consumer rollbacks the Queue Session and that message would be read again if the messaging session is configured as a transactional session.
Following is an example payload handler implementation where it simply logs the queue message and returns success.
Conclution
Queue Consumer is a generic process which can be configured and utilized via the Configurable Queue Consumer that enables us to consume a Queue while retaining compatibility and best practices. While what needs to be happen after consuming a message from a queue needs to be defined and developed, consuming a message from a queue itself can be achieved via configurations.
Exercise
A as exercise complete the following.
- Create two queues connected via a queue connector where messages can be written one one and they are transferred to the second queue.
- Create a simple process to write a message to this queue. A simple process can be created by implementing any existing extension point that can be invoked via any standard function.
Following is an example of such an implementation.
The process MessageService/WriteEntityXMLMessageToQueue
can be utilized to write an entity as a message to a Queue. Enactor database queues expects messages to be of type com.enactor.messageService.message.IMessage
whereas other external messaging system implementations may not.
In this example, the Device object that gets automatically mapped into process inputs in POS is used as the input entity so that no additional inputs needs to be defined.
The actions com.enactor.coreUI.queues.actions.ResolveQueueConnectionAction
and com.enactor.coreUI.queues.actions.ResolveQueueNameAction
are utilized in this process to create the input data required for the WriteEntityXMLMessageToQueue process.
-
Create a Queue Consumer Payload Handler process to consume a messages and log it.
-
Create a Queue Consumer Service Type.
-
Create a Queue Consumer Service Definition to read massages the queue that the Queue Connector passes the messages to and pass it on to the payload handler process created above.
-
Execute the function that writes the message to a queue and validate if the queue consumer consumes the message from the other queue and logs it.
You should see a log printed similar to the following.
2025-03-17 21:14:46.465 [MessageService/ConfigurableQueueConsumerService] INFO Process.Example.ProcessExampleQueue - Logger=Process.Example.ProcessExampleQueue Level=INFO Message=<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<retail:device xmlns:core="http://www.enactor.com/core" xmlns:hta="http://docs.oasis-open.org/ns/bpel4people/ws-humantask/api/200803" xmlns:htd="http://docs.oasis-open.org/ns/bpel4people/ws-humantask/200803" xmlns:htt="http://docs.oasis-open.org/ns/bpel4people/ws-humantask/types/200803" xmlns:ns10="http://www.enactor.com/crm/customerLoyalty/service" xmlns:ns4="http://www.enactor.com/crm" xmlns:ns5="http://www.enactor.com/addressLookup/service" xmlns:ns6="http://www.enactor.com/retail/storedRetailTransaction/service" xmlns:retail="http://www.enactor.com/retail" xmlns:sref="http://docs.oasis-open.org/wsbpel/2.0/serviceref" xmlns:tools="http://www.enactor.com/tools" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<retail:deviceId>CMTEST4</retail:deviceId>
<retail:deviceName>CMTEST4</retail:deviceName>
<retail:deviceNotInUse>false</retail:deviceNotInUse>
<retail:deviceType>POS</retail:deviceType>
<retail:hostName>localhost</retail:hostName>
<retail:locationId>0001</retail:locationId>
<retail:notes></retail:notes>
<retail:dataSource>
<retail:url></retail:url>
<retail:driverClassName></retail:driverClassName>
<retail:username></retail:username>
</retail:dataSource>
<retail:lastUpdated>2025-02-06T10:57:33.503Z</retail:lastUpdated>
<retail:httpPort>8080</retail:httpPort>
<retail:rmiPort>2099</retail:rmiPort>
</retail:device>
:Execute Action:'UILogMessageAction' State:'Start' Process:'Example/ProcessExampleQueue'