How to process a stream of events using JMS

How to process a stream of events sent to a JMS message broker

You can manage a stream of events using message brokers such as Kafka, ActiveMQ, or RabbitMQ. Each broker has its own advantages and disadvantages. The relative strengths of each broker are not the subject of this article.

In this hypothetical use case, we will simulate some physical devices that generate a data stream of events. The events will be sent to a JMS broker, in this case, ActiveMQ. Each event contains data containing the device ID and the status of whether the device is in a healthy state or not.

A message queue receives the event data stream whilst a Martini service subscribes to the queue, processes the events, and triggers an action depending on the data in the event.

Download this demo to Martini and learn how to process events from IoT devices and trigger actions based on the data received

  • If you haven't already done so, get yourself a Martini.
  • Within either Martini Desktop or Martini Online select the "Event Processing" demo from the Welcome Screen and click Install:

screenshot-welcome-screen

  • The Packagedemo010-processing-events-iot will be automatically downloaded. Right-click the Package and click Start. A green icon will indicate it has started:

screenshot-package

  • Documentation containing a Test Procedure to run the demo is included in the Package readme file \resources\readme\readme.md. The readme file is automatically opened when the Package is started.
screenshot-documentation
  • The Test Procedure will guide you through the process to send events from 3 mock devices to a message queue. Each device will send a status code of whether the device is available or in error. The Martini service subscribing to the event queue will listen for changes in the status of a device and log any changes to a database.

Line by Line: How this event processing using JMS demo works

Sending Messages to the JMS Endpoint

Open the service SimulateDeviceEvents under \\code\demo010\services\ folder
    • On line 3-5: A Jms.publishString function is called to send messages to our JMS Endpoint
      • The function consists of 3 parameters
        • 1st parameter: The name of the destination where to send the object to
        • 2nd parameter: The message to send
        • 3rd paremeter: The properties to send

Processing Messages

Open the service ProcessEvents under \\code\demo010\services\ folder
    • On line 3: Iterate step is called to process each property received from the service
    • On line 4-5: Fork step is added to check the property name and the case should device
    • On line 6: An Cache.cacheGet function is called to check if there's an existing value in memory
    • On line 7: Fork step is added to check if the an value is existing in memory
      • Case: value not existing
        • On line 9-10: Insert record to database and save the value in memory
        • On line 11: Logs the result of the current value
      • Case: value is existing:
        • On line 13: Fork step is added to compare the current value to the memory value
          • Case: current value and memory value are the same
            • On line 15: Logs the result that the current value is still the same
          • Case: current value and memory value are not the same
            • On line 17-18: Insert the record to the database and update the memory value
            • On line 19: Logs the result of the current value