(Quick Reference)

4 Consuming Messages - Reference Documentation

Authors: Bud Byrd

Version: 3.1.3

4 Consuming Messages

Consuming messages is accomplished by creating a message consumer class. Consumer classes are placed into the grails-app/rabbit-consumers, and must end with Consumer.groovy. These classes are Spring beans, meaning that they can inject other Spring beans such as services, the grailsApplication instance, etc. Each consumer class must be configured in a certain way. If there is a misconfiguration in a consumer, the plugin will log an error stating so and not register the consumer to any queues.

4.1 Message Handlers

One of the requirements for a consumer to be registered to the RabbitMQ server is that a message handler be declared in the consumer class. The message handler is the mechanism by which messages are consumed.

4.1.1 Basic Usage

In its most basic form, a message handler method takes in the body of the received message, and a MessageContext object that contains the message parameters received from the RabbitMQ server, along with the consumer's channel that the handler should publish messages through.

This is the most generic form of a message handler:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { // ...

def handleMessage(def body, MessageContext context) { // Do work } }

4.1.2 Typed Message Handlers

The logic surrounding the message consumer classes will by default attempt to intelligently convert the body of the received message from a byte array to a converted type, such as a String. Before routing the message to the consumer and handler, the plugin will run through a list of converters that will attempt to convert the message, and if the conversion was successful, determine if an appropriate handler has been defined.

For example, consider this JSON blob:

{"foo":"bar","hi":"there"}

If the above message is received, the converter for the Map class type will convert the byte array to a Map of the JSON data. If a valid handler for the Map type is defined, the handler will receive the converted JSON so that the handler does not need to handle the conversion.

The following handlers would accept the converted Map:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { // ...

def handleMessage(Map body, MessageContext context) { // Do work }

def handleMessage(def body, MessageContext context) { // Since def is a generic type (Object) } }

If a converter was successfully able to convert the message body, but no handler was defined to handle the class type, other converters will get a chance to convert the message body. In the above example, if only a handler for the String type was defined, the handler will receive the JSON blob as a String.

If no converter is able to convert the message body, the plugin will fall back to passing the handler the raw byte array received from the RabbitMQ server.

If no handler is defined that can handle the received message (including the raw byte array), an error will be logged and the message will be rejected.

The plugin has built-in converters for Integer, Map, List, GString, and String types. The plugin allows users to define their own converters to convert other object types, which will be discussed later in this guide.

4.1.3 Short-Form Usage

In addition to the 2-parameter handler method signature, there are 2 shortcut versions available for use. One form only takes the converted message body, and the other only takes the MessageContext object. The order of preference when multiple valid handlers are defined is:
  1. Long form (body, context)
  2. Short form (body)
  3. MessageContext form

The short-form handlers are shown below:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { // ...

def handleMessage(String message) { // Do work }

def handleMessage(MessageContext context) { // Do work } }

The MessageContext-only handler will only be called if there is no other handler defined that can possibly handle any conversion of the message body.

4.1.4 MessageContext Object

The message context is just an object that encapsulates the data relevant to the received message. Below is a list of properties of the class.
PropertyDescription
bodyIncoming message in its raw byte format.
channelThe RabbitMQ channel the message handler should use to publish messages. This is especially important when using transactions.
consumerTagConsumer tag
envelopeProperties of the message's delivery (see RabbitMQ's documentation)
propertiesProperties of the message (see RabbitMQ's documentation)

4.1.5 RPC-Style Messages

When a client publishes a message and waits for a return reply, this is considered an RPC-style operation. Typically, the server-side of the operation (in this case, a message consumer/handler) must respond to the client on a queue that the client requested manually. This plugin provides a very convenient method to respond to the client without having to manually construct a response message.

The client will provide a response queue to the server to reply to. This queue name is stored in the MessageContext.properties.replyTo variable. If a client publishes this variable, and the handler returns some data, the plugin will convert the data returned from the message handler and build a response message for you.

The following example illustrates responding via a handler's returned data.

class RpcExampleConsumer {
    def handleMessage(String message, MessageContext messageContext) {
        println "received ${message}"

return "response" // this message will be sent back to the consumer via its replyTo queue } }

Alternatively, the rabbitMessagePublisher can be used to respond.

class RpcExampleConsumer {
    def rabbitMessagePublisher

def handleMessage(String message, MessageContext messageContext) { println "received ${message}"

rabbitMessagePublisher.send { routingKey: messageContext.properties.replyTo body: "response" } } }

Allowing the plugin to build a reply message only converts the data returned from the message handler and publishes it to the reply queue. If you need to set any of the other message properties, like headers, content-types, etc, you must manually build the response message using the rabbitMessagePublisher, and refrain from returning data from the message handler.

4.2 Subscribing To Queues

Message consumers can subscribe to either queues or exchanges. When a consumer is registered to a queue, the consumer will receive messages from the queue as the RabbitMQ server determines that it's the consumer's turn to receive a message, since there may be multiple listeners on the same queue.

Each message consumer class must have a configuration defined. There are 2 methods to specify the configuration:

  • The consumer class can have a Map assigned to a static variable named rabbitConfig. To subscribe to queues, the only required configuration option is the queue variable, which is the name of the queue to subscribe to.
  • The application's configuration file can contain the configuration with the path rabbitmq.consumers.<ClassName>. See the section below for more details.

Here is a simple example of a consumer subscribing to a queue.

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { static rabbitConfig = [ "queue": "test.queue" ]

def handleMessage(def body, MessageContext context) { // Process message } }

There are many options available to influence how the consumer works, which can be found in the reference.

4.3 Subscribing To Exchanges

Subscribing to a exchanges is different from subscribing to queues, as there are different types of exchanges with different behavior. RabbitMQ's libraries do not provide a direct way to subscribe to an exchange, however the plugin provides a way to subscribe to exchanges directly by creating a temporary queue that is bound to an exchange. The binding requirements differ between the different types of exchanges.

Using the functionality to subscribe to an exchange works by creating a temporary queue at runtime that is bound to the exchange and consumed from by the message consumer. It is important to note that these queues are randomly named and are exclusive and auto delete, so this feature is not suitable if messages that are left on this queue should persist if the application shuts down.

4.3.1 Fanout Exchanges

A fanout exchange will forward a received message to every queue bound to it. There is no binding criteria for this kind of exchange. This is the simplest type of exchange, and is also the easiest to configure.

Fanout Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "fanout.exchange" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.3.2 Topic Exchanges

A topic exchange will forward messages to queues based on the binding criteria the queue used to register to the exchange. In RabbitMQ terms, this is called a routing key. The routing key can be either a direct match, or utilize wildcards to do a partial topic match. If a routing key is omitted, the queue will receive no messages. Use "#" to receive all messages from an exchange. More information can be found in the RabbitMQ documentation.

Topic Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "topic.exchange", "binding": "foo.bar.#" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.3.3 Direct Exchanges

A direct exchange will forward messages to queues based on binding criteria configured similarly to topic exchanges. The difference in this case is that direct routing does not utilize wildcards in their routing keys.

Direct Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "direct.exchange", "binding": "example" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.3.4 Headers Exchanges

Header exchanges work similarly to topic exchanges. A headers exchange will forward messages to queues based on header values contained in messages. Additionally, a queue can be bound on multiple header values, along with an option to require one or all of the headers to match.

Headers Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "headers.exchange", "binding": [ "foo": "bar", "hi": "there" ], "match": "any" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.4 Multi-Server

When using a multi-server setup, it is important to consider what server a consumer should listen on. Use the connection property in the rabbitConfig to specify which server the consumer should be bound to. If the connection is omitted, the consumer will be bound to the default connection.

package com.example

class ExampleConsumer { static rabbitConfig = [ "queue": "test.queue", "connection": "server1" // Where "server1" is a connection configured with that name. ]

def handleMessage(def body, MessageContext context) { // Do work } }

4.5 Central Configuration

It is also possible to define the consumer's configuration outside of the consumer in the Grails Config.groovy file. All of the configuration options described above are valid for this type of configuration. This functionality is valuable when a consumer's configuration needs to be determined at runtime, instead of being hardcoded in the consumer class itself.

To configuration a consumer in the Grails application's configuration file, an entry matching the consumer's class name should be present under the rabbitmq.consumers key, as follows:

// …
rabbitmq {
    consumers {
        ExampleConsumer {
            queue = "test.queue"
        }
    }
}