The RabbitMQ Native plugin for Grails provides RabbitMQ integration using the Java libraries provided by the RabbitMQ project. It provides easy configuration of exchanges and queues via a project’s configuration, simple configuration of consumers, and a convenience class to easily publish messages.

This plugin strives to provide an easy convention to follow to quickly begin consuming messages, but still provide easy access to the underlying layer to allow users of this plugin to do whatever they need to do outside of those conventions. It also provides a powerful message body converter system that allows the user to quickly write and plug in converters for custom data types.

This guide details the configuration of the plugin, and may not fully explain the use and conventions of the RabbitMQ services or its Java library. More information can be found in their Java API guide and JavaDoc.

1. Quickstart

This is a quick and dirty how-to detailing how to quickly begin using the plugin. While the plugin is very configurable, this is a very basic demonstration of its usage.

1.1. Create The Application

Create a project named RabbitExample. You can do this by entering:

grails create-app RabbitExample

1.2. Add The Plugin

In the application’s build.gradle file, under the dependencies section, add:

build.gradle
    dependencies {
        // ...

        compile "org.grails.plugins:rabbitmq-native:3.5.1"

        // ...
    }

1.3. Configuring

In grails-app/conf/application.yml, add to the bottom:

grails-app/conf/application.yml
rabbitmq:
    connections:
      - name: main
        host: changeme
        username: changeme
        password: changeme
    queues:
      - name: testqueue
Be sure to replace the appropriate values for your RabbitMQ server and user credentials.

1.4. Consumer

Create the a consumer by using the following command:

grails create-consumer com.example.Test

Update the consumer to reflect the following:

grails-app/rabbit-consumers/com/example/TestConsumer.groovy
package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class TestConsumer {
    /**
     * Consumer configuration.
     */
    static rabbitConfig = [
        queue: "testqueue"
    ]

    /**
     * Handle an incoming RabbitMQ message.
     *
     * @param body    The converted body of the incoming message.
     * @param context Properties of the incoming message.
     * @return
     */
    def handleMessage(def body, MessageContext context) {
        println body
        return "Hello to you, too!"
    }
}

1.5. Controller

Create a controller by using the following command:

grails create-controller com.example.Test

Update the controller to reflect the following:

grails-app/controllers/com/example/TestController.groovy
package com.example

import com.budjb.rabbitmq.publisher.RabbitMessagePublisher

class TestController {
    RabbitMessagePublisher rabbitMessagePublisher

    def index() {
        render rabbitMessagePublisher.rpc {
            routingKey = "testqueue"
            body = "Hello!"
        }
    }
}

1.6. Run it!

Run the grails application.

./gradlew bootRun

You can see the application in action by hitting the test controller. If you’re running this on your localhost, your URL may be similar to http://localhost:8080/test/index. You should see the message "Hello!" printed the application’s output console, and your web browser should display the message "Hello to you, too!".

2. Configuration

Configuration of the connection to the RabbitMQ server is done in your project’s grails-app/conf/application.groovy or grails-app/conf/application.yml file.

Below is the list of general configuration properties.

Configuration Property

Required

Description

Type

Default

enabled

If false, will register the plugin’s beans, but prevent the plugin connecting to the RabbitMQ broker(s) and consuming messages.

String

true

enableSerializableConverter

If true, enables the use of a converter that handles Java serialization.

Boolean

false

2.1. Server Connection

The plugin expects the connection configuration to the RabbitMQ server to be defined. A bare minimum configuration example looks like:

application.groovy
rabbitmq {
    connections = [
        [
            name: "defaultConnection",
            host: "example.com",
            username: "foo",
            password: "bar"
        ]
    ]
}
application.yml
rabbitmq:
    connections:
      - name: defaultConnection
        host: example.com
        username: foo
        password: bar

The connections section should contain a list of maps, where each entry in the list represents an individual connection.

Connections to many different RabbitMQ servers can be configured. A multi-server configuration looks like:

application.groovy
rabbitmq {
    connections = [
        [
            name: "connection1",
            host: "rabbit1.example.com",
            username: "foo",
            password: "bar"
        ],
        [
            name: "connection2",
            host: "rabbit2.example.com",
            username: "foo",
            password: "bar"
        ]
    ]
}
application.yml
rabbitmq:
    connections:
      - name: connection1
        host: rabbit1.example.com
        username: foo
        password: bar

      - name: connection2
        host: rabbit2.example.com
        username: foo
        password: bar

The following table enumerates all the configuration options available to the connection configuration:

Configuration Property

Required

Description

Type

Default

name

Name of the connection, which can be used to tie queues and exchanges to a particular connection.

String

none

host

36

Hostname or IP address of the RabbitMQ server to connect to.

String

none

username

36

Username to log into the RabbitMQ server with.

String

none

password

36

Password to log into the RabbitMQ server with.

String

none

isDefault

A connection with this set to true will be the server messages are sent to if no specific connection is specified when sending the message.

boolean

false

port

Port to connect to the RabbitMQ server with.

Integer

5672

virtualHost

Name of the virtual host to connect to the RabbitMQ server with.

String

none

ssl

Whether to use SSL when connecting to a RabbitMQ server.

boolean

false

threads

Threadpool size, if greater than 0, determines how many concurrent messages can be processed at any given time. If set to 0, consumers can consume as many messages as it’s configured to.

String

0

automaticReconnect

If true, will cause the application to automatically reconnect to a server when its connection is dropped.

boolean

true

requestedHeartbeat

Heartbeat interval, in seconds. A value of 0 disables heartbeats.

Integer

0

enableMetrics

If true, will create a metric registry and associate it for the connection via DropWizard.

Boolean

false

2.2. Defining Queues

The plugin allows authors to define the exchanges and queues programatically inside the configuration. This allows the application to configure its own queues without someone having to manually create the exchanges and queues prior to running the application.

Queue configuration is also done in the rabbitmq block, much as the server connection is configured. Usage is best illustrated with an example:

application.groovy
rabbitmq {
    connections = [
        // ...
    ]
    queues = [
        [
            name: "example.queue",
            durable: true,
            exchange: "example.exchange"
        ]
    ]
}
application.yml
rabbitmq:
    connections:
        # ...
    queues:
      - name: example.queue
        durable: true
        exchange: example.exchange

The above code will define a queue named example.queue, and its durable flag will be set.

If using the multi-server feature of the plugin, it is important to consider what server the queue should be defined in. The connection property specifies which server to create the queue in, illustrated below:

application.groovy
rabbitmq {
    queues = [
        // Assume there is a connection with the name "server1"...
        [
            name: "example.queue",
            connection: "server1",
            durable: true
        ]
    ]
}
application.yml
rabbitmq:
    queues:
        # Assume there is a connection with the name "server1"...
      - name: example.queue
        connection: server1
        durable: true

Below is a table of all of the options available when defining queues:

Property

Required

Description

Type

Default

name

36

Name of the queue.

String

none

arguments

Extra arguments used to create the queue. See the RabbitMQ documentation for more information.

Map

none

autoDelete

Whether to automatically delete the queue once there are no more consumers listening to it.

boolean

false

binding

Used in conjunction with exchanges. See the section below for more information.

Mixed

none

durable

Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts.

boolean

false

exchange

Binds a queue to an exchange in conjunction with the binding property. Ignored if used inside an exchange declaration.

String

none

match

Required when binding to a headers exchange. Either "any" or "all".

String

none

connection

Name of the connection to create the queue with. Uses the default connection if omitted.

String

none

2.3. Defining Exchanges

Defining exchanges is very similar to defining queues. The following code illustrates how to define an exchange:

application.groovy
rabbitmq {
    exchanges = [
        [
            name: "example.exchange",
            type: "topic"
        ]
    ]
}
application.yml
rabbitmq:
    exchanges:
      - name: example.exchange
        type: topic

The above example will create an exchange with the name example.exchange and of the type topic. Below is a list of all the options available when creating exchanges:

Property

Required

Description

Type

Default

name

36

Name of the exchange.

String

none

arguments

Extra arguments used to create the exchange. See the RabbitMQ documentation for more information.

Map

none

autoDelete

Whether to automatically delete the exchange once there are no longer any queues bound to it.

boolean

false

durable

Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts.

boolean

false

type

36

One of "fanout", "topic", "direct", or "headers".

String

none

connection

Name of the connection to create the exchange with. Uses the default connection if omitted.

String

none

2.4. Binding Queues To Exchanges

Queues can be bound to an exchange by setting the exchange and binding properties in the queue’s configuration. The value of the binding depends on the type of exchange the queue is being bound to. Each exchange type is explained below.

This basic example will create a topic exchange named example.exchange, as well as create a queue named example.queue. The queue will be bound to the exchange with the topic, or routing key, of sample.binding.#.

application.groovy
rabbitmq {
    queues = [
        [
            name: "example.queue",
            exchange: "example.exchange",
            binding: "sample.binding.#"
        ]
    ]
    exchanges = [
        [
            name: "example.exchange",
            type: "topic"
        ]
    ]
}
application.yml
rabbitmq:
    queues:
      - name: example.queue
        exchange: example.exchange
        binding: 'sample.binding.#'
    exchanges:
      - name: example.exchange
        type: topic
The character is used frequently with RabbitMQ bindings. The character is also special in YAML, so it is important that strings using the # character are quoted so that the YAML engine will treat it as plain text.

Queues need to have their binding defined specifically for the type of exchange they are bound to.

2.4.1. Fanout Exchanges

Fanout exchanges are the easiest to configure bindings for, since they require none. Fanout exchanges simply send every message it receives to every queue bound to it.

application.groovy
rabbitmq {
    queues = [
        [
            name: "example.queue",
            exchange: "example.exchange"
        ]
    ]
    exchanges = [
        [
            name: "example.exchange",
            type: "fanout"
        ]
    ]
}
application.yml
rabbitmq:
    queues:
      - name: example.queue
        exchange: example.exchange
    exchanges:
      - name: example.exchange
        type: fanout

2.4.2. Topic Exchanges

Topic exchanges require queues to define a topic. Topics can be an exact match, but their strength is in their partial matching ability. See the RabbitMQ documentation for details about this kind of exchange.

application.groovy
rabbitmq {
    queues = [
        [
            name: "example.queue",
            exchange: "example.exchange",
            binding: "example.binding.#"
        ]
    ]
    exchanges = [
        [
            name: "example.exchange",
            type: "topic"
        ]
    ]
}
application.yml
rabbitmq:
    queues:
      - name: example.queue
        exchange: example.exchange
        binding: 'example.binding.#'
    exchanges:
      - name: example.exchange
        type: topic

2.4.3. Direct Exchanges

Direct exchanges are similar to topic exchanges, except that their "topics" only function with direct name matching. The appropriate name for the binding in this case is "routing key". Queues must define a routing key when binding to this type of exchange.

application.groovy
rabbitmq {
    queues = [
        [
            name: "example.queue",
            exchange: "example.exchange",
            binding: "exampleRoutingKey"
        ]
    ]
    exchanges = [
        [
            name: "example.exchange",
            type: "direct"
        ]
    ]
}
application.yml
rabbitmq:
    queues:
      - name: example.queue
        exchange: example.exchange
        binding: exampleRoutingKey
    exchanges:
      - name: example.exchange
        type: direct

2.4.4. Header Exchanges

Header exchanges are like topic exchanges, but with the ability to define multiple match keywords. The binding for queues allows the queue to match on all or one of multiple header values. The queue must also set the match property for this exchange type, and the value must be one of "any" or "all".

application.groovy
rabbitmq {
    queues = [
        [
            name: "example.queue",
            match: "any",
            binding: [
                "header1": "header-value-1",
                "header2": "header-value-2"
            ]
        ]
    ]
    exchanges = [
        [
            name: "example.exchange",
            type: "headers"
        ]
    ]
}
application.yml
rabbitmq:
    queues:
      - name: example.queue
        match: any
        binding:
            header1: header-value-1
            header2: header-value-2
    exchanges:
      - name: example.exchange
        type: headers

2.5. Binding Exchanges to Exchanges

Exchanges can also be bound to other exchanges to create richer routing topologies. More information can be found here.

This basic example will create 2 exchanges, with a binding that will forward messages from source-exchange to target-exchange when messages match the routing key foo.bar.#.

application.groovy
rabbitmq {
    exchanges = [
        [
            name: "source-exchange",
            type: "topic",
            exchangeBindings: [
                [ exchange: "target-exchange", binding: "foo.bar.#" ]
            ]
        ],
        [
            name: "target-exchange",
            type: "direct"
        ]
    ]
}
application.yml
rabbitmq:
    exchanges:
      - name: source-exchange
        type: topic
        exchangeBindings:
          - exchange: target-exchange
            binding: 'foo.bar.#'
        name: target-exchange
        type: direct

3. Consuming Messages

Consuming messages is accomplished by creating a message consumer class. Consumer classes are placed into the grails-app/rabbit-consumers, and must end with Consumer.groovy. These classes are Spring beans, meaning that they can inject other Spring beans such as services, the grailsApplication instance, etc. Each consumer class must be configured in a certain way. If there is a misconfiguration in a consumer, the plugin will log an error stating so and not register the consumer to any queues.

3.1. Message Handlers

One of the requirements for a consumer to be registered to the RabbitMQ server is that a message handler be declared in the consumer class. The message handler is the mechanism by which messages are consumed.

3.1.1. Basic Usage

In its most basic form, a message handler method takes in the body of the received message, and a MessageContext object that contains the message parameters received from the RabbitMQ server, along with the consumer’s channel that the handler should publish messages through.

This is the most generic form of a message handler:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer {
    // ...

    def handleMessage(def body, MessageContext context) {
        // Do work
    }
}

3.1.2. Typed Message Handlers

The logic surrounding the message consumer classes will by default attempt to intelligently convert the body of the received message from a byte array to a converted type, such as a String. Before routing the message to the consumer and handler, the plugin will run through a list of Message Converters, limited by the class types of the handlers defined in the message consumer, and attempt to find a conversion that works.

For example, consider this JSON blob:

{"foo":"bar","hi":"there"}

If the above message is received, the converter for the Map class type will convert the byte array to a Map of the JSON data. If a valid handler for the Map type is defined, the handler will receive the converted JSON.

The following handlers would accept the converted map:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer {
    // ...

    def handleMessage(Map body, MessageContext context) {
        // Do work
    }

    def handleMessage(def body, MessageContext context) {
        // Since def is a generic type (Object)
    }
}

If no converter is able to convert the message body, the plugin will fall back to passing the handler the raw byte array received from the RabbitMQ server.

More information about message converters can be found in the Message Converters section, including a list of built-in message converters and details of how to create custom converters.

3.1.3. Short-Form Usage

In addition to the 2-parameter handler method signature, there are 2 shortcut versions available for use. One form only takes the converted message body, and the other only takes the MessageContext object.

The short-form handlers are shown below:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer {
    // ...

    def handleMessage(String message) {
        // Do work
    }

    def handleMessage(MessageContext context) {
        // Do work
    }
}
The MessageContext-only handler will only be called if there is no other handler defined that can possibly handle any conversion of the message body.

3.1.4. MessageContext Object

The message context is just an object that encapsulates the data relevant to the received message. Below is a list of properties of the class.

Property

Description

body

Incoming message in its raw byte[] format.

channel

The RabbitMQ channel the message handler should use to publish messages. This is especially important when using transactions.

consumerTag

Consumer tag

envelope

Properties of the message’s delivery (see RabbitMQ’s documentation)

properties

Properties of the message (see RabbitMQ’s documentation)

3.1.5. RPC-Style Messages

When a client publishes a message and waits for a return reply, this is considered an RPC-style operation. Typically, the server-side of the operation (in this case, a message consumer/handler) must respond to the client on a queue that the client requested manually. This plugin provides a very convenient method to respond to the client without having to manually construct a response message.

The client will provide a response queue to the server to reply to. This queue name is stored in the MessageContext.properties.replyTo variable. If a client publishes this variable, and the handler returns some data, the plugin will convert the data returned from the message handler and build a response message for you.

The following example illustrates responding via a handler’s returned data.

class RpcExampleConsumer {
    def handleMessage(String message, MessageContext messageContext) {
        println "received ${message}"

        return "response" // this message will be sent back to the consumer via its replyTo queue
    }
}

Alternatively, the rabbitMessagePublisher can be used to respond.

class RpcExampleConsumer {
    def rabbitMessagePublisher

    def handleMessage(String message, MessageContext messageContext) {
        println "received ${message}"

        rabbitMessagePublisher.send {
            routingKey: messageContext.properties.replyTo
            body: "response"
        }
    }
}
Allowing the plugin to build a reply message only converts the data returned from the message handler and publishes it to the reply queue. If you need to set any of the other message properties, like headers, content-types, etc, you must manually build the response message using the rabbitMessagePublisher, and refrain from returning data from the message handler.

3.1.6. Handling Unsupported Messages

As long as there is a converter that can convert an incoming message and a handler defined that accepts the converted object, messages can be delivered. In the case where an incoming message can not be converted and there is no possible way to provide the message to a handler, an error is logged by default and the message is rejected.

Message consumers that need to conduct some logic when this occurs may implement the UnsupportedMessageHandler interface, which requires the handleUnsupportedMessage method. When no handler and converter exist for a message, this method is called and the MessageContext is passed in, giving the consumer some way to gracefully handle input failures. Objects may even be returned, much like regular handlers, as a response to an RPC call.

3.2. Subscribing To Queues

Message consumers can subscribe to either queues or exchanges. When a consumer is registered to a queue, the consumer will receive messages from the queue as the RabbitMQ server determines that it’s the consumer’s turn to receive a message, since there may be multiple listeners on the same queue.

Each message consumer class must have a configuration defined. There are 2 methods to specify the configuration:

  • The consumer class can have a Map assigned to a static variable named rabbitConfig. To subscribe to queues, the only required configuration option is the queue variable, which is the name of the queue to subscribe to.

  • The application’s configuration file can contain the configuration with the path rabbitmq.consumers.<ClassName>. See the section below for more details.

Here is a simple example of a consumer subscribing to a queue.

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer {
    static rabbitConfig = [
        "queue": "test.queue"
    ]

    def handleMessage(def body, MessageContext context) {
        // Process message
    }
}

There are many options available to influence how the consumer works, which can be found in the reference.

3.3. Subscribing to Exchanges

Subscribing to a exchanges is different from subscribing to queues, as there are different types of exchanges with different behavior. RabbitMQ’s libraries do not provide a direct way to subscribe to an exchange, however the plugin provides a way to subscribe to exchanges directly by creating a temporary queue that is bound to an exchange. The binding requirements differ between the different types of exchanges.

Using the functionality to subscribe to an exchange works by creating a temporary queue at runtime that is bound to the exchange and consumed from by the message consumer. It is important to note that these queues are randomly named and are exclusive and auto delete, so this feature is not suitable if messages that are left on this queue should persist if the application shuts down.

3.3.1. Fanout Exchanges

A fanout exchange will forward a received message to every queue bound to it. There is no binding criteria for this kind of exchange. This is the simplest type of exchange, and is also the easiest to configure.

package com.example

class ExampleConsumer {
    static rabbitConfig = [
        "exchange": "fanout.exchange"
    ]

    def handleMessage(def body, MessageContext context) {
        // Process message
    }
}

3.3.2. Topic Exchanges

A topic exchange will forward messages to queues based on the binding criteria the queue used to register to the exchange. In RabbitMQ terms, this is called a routing key. The routing key can be either a direct match, or utilize wildcards to do a partial topic match. If a routing key is omitted, the queue will receive no messages. Use "#" to receive all messages from an exchange. More information can be found in the RabbitMQ documentation.

package com.example

class ExampleConsumer {
    static rabbitConfig = [
        "exchange": "topic.exchange",
        "binding": "foo.bar.#"
    ]

    def handleMessage(def body, MessageContext context) {
        // Process message
    }
}

3.3.3. Direct Exchanges

A direct exchange will forward messages to queues based on binding criteria configured similarly to topic exchanges. The difference in this case is that direct routing does not utilize wildcards in their routing keys.

h4. Direct Example

package com.example

class ExampleConsumer {
    static rabbitConfig = [
        "exchange": "direct.exchange",
        "binding": "example"
    ]

    def handleMessage(def body, MessageContext context) {
        // Process message
    }
}

3.3.4. Headers Exchanges

Header exchanges work similarly to topic exchanges. A headers exchange will forward messages to queues based on header values contained in messages. Additionally, a queue can be bound on multiple header values, along with an option to require one or all of the headers to match.

package com.example

class ExampleConsumer {
    static rabbitConfig = [
        "exchange": "headers.exchange",
        "binding": [
            "foo": "bar",
            "hi": "there"
        ],
        "match": "any"
    ]

    def handleMessage(def body, MessageContext context) {
        // Process message
    }
}

3.4. Multi-Server

When using a multi-server setup, it is important to consider what server a consumer should listen on. Use the connection property in the rabbitConfig to specify which server the consumer should be bound to. If the connection is omitted, the consumer will be bound to the default connection.

package com.example

class ExampleConsumer {
    static rabbitConfig = [
        "queue": "test.queue",
        "connection": "server1" // Where "server1" is a connection configured with that name.
    ]

    def handleMessage(def body, MessageContext context) {
        // Do work
    }
}

3.5. Central Configuration

It is also possible to define the consumer’s configuration outside of the consumer in the application’s configuration file. All of the configuration options described above are valid for this type of configuration. This functionality is valuable when a consumer’s configuration needs to be determined at runtime, instead of being hardcoded in the consumer class itself.

To configuration a consumer in the Grails application’s configuration file, an entry matching the consumer’s class name should be present under the rabbitmq.consumers key, as follows:

application.groovy
rabbitmq {
    consumers {
        ExampleConsumer {
            queue = "test.queue"
        }
    }
}
application.yml
rabbitmq:
    consumers:
        ExampleConsumer:
            queue: test.queue

3.6. Consumer Event Handlers

It may be useful to execute some logic at certain times during the message delivery lifecycle. To enable this, the MessageConsumerEventHandler trait exists that provides several hooks or event handlers, which are called at various times during the message delivery.

The trait provides empty bodies for all of its methods and so implementations of the trait need only override the specific event handlers that are required.

Table 1. Event Handlers

Method

Description

onReceive(MessageContext)

Called when a message is initially received by the underlying RabbitMQ system and before handed to the message consumer class.

onSuccess(MessageContext)

Called when a message has been successfully delivered and processed by the message consumer class.

onFailure(MessageContext, Throwable)

Called when some unhandled exception occurred during the process of delivering or processing the message. This event handler differs from the rest in that the unhandled exception is provided.

onComplete(MessageContext)

Called when the delivery process is complete, whether it was successful or failed. This event handler will be called even when onSuccess or onFailure are called.

4. Publishing Messages

Publishing messages through the plugin is achieved by using the rabbitMessagePublisher bean. This Spring bean utilizes a closure-based configuration method to both send messages without waiting for a response, and sending rpc-style messages. There are many options available to the rabbitMessagePublisher which are documented in the reference, but this guide will only demonstrate basic usage.

In a multi-server setup, it is important to consider what server to send a message to. Like configuration and consumers, the connection property is used to route the message to the proper server connection.

4.1. Sending Messages

Sending a message means publishing a message to an exchange or queue and not waiting for a response (fire and forget). The only required parameters to the publisher are a queue or exchange to publish the message to, and the body of the message.

import com.budjb.rabbitmq.publisher.RabbitMessagePublisher

class ExampleService {
    RabbitMessagePublisher rabbitMessagePublisher

    def sendSomeMessage() {
        rabbitMessagePublisher.send {
            exchange = "some.exchange"
            routingKey = "some.routingKey"
            body = "hi!"
        }
    }
}

RabbitMQ expects the body of a message to be a byte array. Message converters will also work when publishing messages, so if an object type other than byte[] is encountered, a suitable message converter will be found and run against the message body, if one exists.

4.2. RPC Messages

Publishing an RPC message is as easy as sending messages, except the returned message is returned from the function.

import com.budjb.rabbitmq.publisher.RabbitMessagePublisher

class ExampleService {
    RabbitMessagePublisher rabbitMessagePublisher

    def sendSomeMessage() {
        def result = rabbitMessagePublisher.rpc {
            exchange = "some.exchange"
            routingKey = "some.routingKey"
            body = "hi!"
            timeout = 5000
        }
    }
}
The timeout option is especially important for RPC-style messages. The timeout property is the amount of time (in milliseconds) the client will wait for the server to respond. If the timeout is reached, a TimeoutException will be thrown. If the timeout is set to 0, the client will wait indefinitely for a response. The default value of the timeout, if not passed, is 5 seconds.

4.3. Bulk Publishing

By default, the publisher opens a new channel for each message it publishes. This is acceptable when sending individual messages, but when bulk publishing is required, it is much more efficient to open a single channel and use it for batches of messages.

As an example, during testing it took about 65 milliseconds to send one message. If 1000 messages are sent, the total time to publish is about 65 seconds! Using a single channel for the same operation takes about 1 millisecond per message, cutting the time down to 1 second for all 1000 messages.

These times are just an example based on testing, but they will differ based on network latency and server load.

While a channel may be manually created by authors using the rabbitContext, the publisher provides an easy way to send many messages with a single channel. See the example below.

rabbitMessagePublisher.withChannel { channel ->
    1000.times { i ->
        send {
            routingKey = "foobar"
            body = "Bulk message $i"
        }
    }
}

There is also a withChannel method that takes in a connection name for multi-server setups.

rabbitMessagePublisher.withChannel("connection1") { channel ->
    1000.times { i ->
        send {
            routingKey = "foobar"
            body = "Bulk message $i on connection1"
        }
    }
}

All of the send and rpc methods available with the publisher can be used with withChannel.

4.4. Publisher Confirms

The publisher allows authors to enable publisher confirms for a batch of messages using a similar mechanism as withChannel. These methods use RabbitMQ’s waitForConfirms and waitForConfirmsOrDie methods. See RabbitMQ’s documentation for more information about how publisher confirms work.

The reference details all of the various methods that the publisher contains to support confirms, but below are a couple of basic examples.

The withConfirms methods utilize the waitForConfirms methods from the RabbitMQ Java library. withConfirms will block until all messages sent in the provided closure have been acked or n’acked by the server. If a timeout is specified, it will only wait for the amount of time specified before throwing a TimeoutException.

rabbitMessagePublisher.withConfirms { channel ->
    send {
        routingKey = "foobar"
        body = "I am a test message"
    }

    send {
        routingKey = "barbaz"
        body = "I am also a test message"
    }
}

The withConfirmsOrDie methods utilize the waitForConfirmsOrDie methods from the RabbitMQ Java library. withConfirmsOrDie will block until all messages sent in the provided closure have been acked or n’acked by the server. The difference with this method is that if any n’ack is received, an exception is thrown. As with withConfirms, a timeout can be used.

rabbitMessagePublisher.withConfirmsOrDie { channel ->
    send {
        routingKey = "foobar"
        body = "I am a test message"
    }

    send {
        routingKey = "barbaz"
        body = "I am also a test message"
    }
}

There are versions of both of these types of methods that take a connection name and/or a timeout.

5. Message Converters

Message converters are classes that are responsible for converting objects to and from byte arrays.

5.1. Built-in Message Converters

The plugin provides converters for the following types:

Table 2. Built-in Message Converters

Type

Incoming

Outgoing

Serializable

36

36

Integer

36

36

Long

36

36

Map (JSON)

36

36

TypeConvertingMap (JSON)

36

36

List (JSON)

36

36

String

36

36

GString

36

These converters allow message handlers to consume and return data without having to convert that data themselves. They are also used when publishing messages with the RabbitMessagePublisher.

INFO: The message converter for Serializable classes will always be attempted first.

INFO: Serializable converter is off by default. set rabbitmq.enableSerializableConverter, to true to enable this feature.

5.2. Custom Message Converters

The plugin provides a way for authors to create their own message converters. A custom message converter must be placed in the grails-app/rabbit-converters path, and must end with Converter.groovy.

A message converter must implement the appropriate interface for the type of conversion it supports. The ByteToObjectConverter interface is used when a converter supports incoming messages from RabbitMQ, while the ObjectToByteConverter interface is used when a converter supports outgoing messages to RabbitMQ. Classes that support both should implement both interfaces.

Message converters advertise what objects and MIME types it supports. The conversion system will typically attempt to first match an incoming message with a converter matching its MIME type, if it was provided. Message converters that are aware of the MIME types it supports provides the conversion system with a more accurate type detection ability. Additionally, converters may apply a MIME type to outgoing messages if one has not already been defined.

Below is an example converter for the String object type. Custom converters should follow the same format.

StringMessageConverter
package com.budjb.rabbitmq.converter

import groovy.transform.CompileStatic
import org.springframework.util.MimeType

/**
 * A converter that supports conversion to and from a {@link String}.
 */
@CompileStatic
class StringMessageConverter implements ByteToObjectConverter, ObjectToByteConverter {
    /**
     * Mime type.
     */
    private static final MimeType mimeType = MimeType.valueOf('text/plain')

    /**
     * {@inheritDoc}
     */
    @Override
    boolean supports(Class<?> type) {
        return String.isAssignableFrom(type)
    }

    /**
     * {@inheritDoc}
     */
    @Override
    boolean supports(MimeType mimeType) {
        return mimeType.isCompatibleWith(this.mimeType)
    }

    /**
     * {@inheritDoc}
     */
    @Override
    ByteToObjectResult convert(ByteToObjectInput input) {
        return new ByteToObjectResult(new String(input.getBytes(), input.getCharset()))
    }

    /**
     * {@inheritDoc}
     */
    @Override
    ObjectToByteResult convert(ObjectToByteInput input) {
        return new ObjectToByteResult(
            ((String) input.getObject()).getBytes(input.getCharset()),
            new MimeType(mimeType, input.getCharset())
        )
    }
}

6. Advanced Usage

While the plugin effectively wraps the functionality of the RabbitMQ library, the end user has direct access to all of the underlying library objects and connection instances.

6.1. Spring Beans

The are several beans defined by the plugin to perform its various operations.

Bean Name

Purpose

rabbitContext

A front-end class to the plugin that aggregates useful functionality for users of the plugin. Besides the rabbitMessagePublisher, this is likely the only bean users will need to access, if at all.

connectionManager

Manages the lifecycle of connection instances, including loading, starting, stopping, unloading, and retrieval.

messageConverterManager

Handles loading message converters and acts as the entry point when message conversion is required.

consumerManager

Manages the lifecycle of consumer instances, including loading, starting, stopping, unloading, and retrieval.

queueBuilder

Responsible for creating exchanges and queues defined in the application’s configuration.

rabbitMessagePublisher

Used to send messages to a RabbitMQ broker.

6.2. Spring Application Events

The plugin provides several application events that applications using the plugin may be interested in. They are particularly useful when some logic needs to execute during the startup and shutdown lifecycle of the plugin.

Table 3. Application Events

Event Type

Description

RabbitContextStartingEvent

Published before the RabbitContext is started. This is useful when code needs to execute before connections and consumers start.

RabbitContextStartedEvent

Published after the RabbitContext is started. This is useful when code needs to execute after connections and consumers are started.

RabbitContextStoppingEvent

Published before the RabbitContext is stopped. This is useful when code needs to execute before connections and consumers are stopped.

RabbitContextStoppedEvent

Published after the RabbitContext is stopped. This is useful when code needs to execute after connections and consumers are stopped.

ConsumerManagerStartingEvent

Published before all consumers are started.

ConsumerManagerStartedEvent

Published after all consumers are started.

ConsumerManagerStoppingEvent

Published before all consumers are stopped.

ConsumerManagerStoppedEvent

Published after all consumers are stopped.

ConsumerContextStartingEvent

Published before a specific consumer is started.

ConsumerContextStartedEvent

Published after a specific consumer is started.

ConsumerContextStoppingEvent

Published before a specific consumer is stopped.

ConsumerContextStoppedEvent

Published before a specific consumer is stopped.

ConnectionManagerStartingEvent

Published before all connections are started.

ConnectionManagerStartedEvent

Published after all connections are started.

ConnectionManagerStoppingEvent

Published before all connections are stopped.

ConnectionManagerStoppedEvent

Published after all connections are stopped.

ConnectionContextStartingEvent

Published before a specific connection is started.

ConnectionContextStartedEvent

Published after a specific connection is started.

ConnectionContextStoppingEvent

Published before a specific connection is stopped.

ConnectionContextStoppedEvent

Published after a specific connection is stopped.

Events can be consumed by registering a bean that implements the ApplicationListener interface. The interface is generic and takes the type of event it wants to listen for. Below is an example of a listener that’s interested in the RabbitContextStartingEvent event.

grails-app/init/Application.groovy
package com.example

import com.example.MyRabbitContextStartingEventListener
import grails.boot.GrailsApp
import grails.boot.config.GrailsAutoConfiguration
import org.springframework.context.annotation.Bean

class Application extends GrailsAutoConfiguration {
    static void main(String[] args) {
        GrailsApp.run(Application, args)
    }

    @Bean
    MyRabbitContextStartingEventListener myRabbitContextStartingEventListener() {
        return new MyRabbitContextStartingEventListener()
    }
}
src/main/groovy/com/example/MyRabbitContextStartingListener.groovy
package com.example

import com.budjb.rabbitmq.event.RabbitContextStartingEvent
import org.springframework.context.ApplicationListener

class MyRabbitContextStartingListener implements ApplicationListener<RabbitContextStartingEvent> {
    @Override
    void onApplicationEvent(RabbitContextStartingEvent event) {
        println "received a RabbitContextStartingEvent event"
    }
}

6.3. Rabbit Context

Besides the rabbitMessagePublisher, the rabbitContext is the bean users will most likely interact with. While you may never need to use this bean, it can be useful. As with any Spring bean, the rabbitContext can be injected into any Spring managed bean, such as services, controllers, and rabbit consumers.

The rabbitContext is intended to be used as a front-end to all of the other beans defined by the plugin to hide some of the complexity of interacting with the system. As such, in most cases the rabbitContext proxies requests to the appropriate manager bean to accomplish the requested task.

In some cases, interactions with multiple managers are necessary to safely carry out the action. Therefore, unless the rabbitContext does not provide the required functionality, it should be considered best-practice to use the rabbitContext instead of the other beans directly. If you find that you are frequently resorting to using one of the other beans, I encourage you to post an issue on the GitHub project.

The follow subsections describe some of the more useful functionality the rabbitContext provides.

6.3.1. Native Objects

The main goal of the plugin is to effectively wrap the RabbitMQ library to hide the complexity of its usage and make using it more inline with the conventions of Grails applications, but also allow users to gain access to the underlying RabbitMQ objects if needed. rabbitContext provides several methods to gain direct access to the RabbitMQ Java library Connection and Channel objects.

To create a new Channel, use the createChannel methods.

// This creates a new channel with the default connection
Channel channel = rabbitContext.createChannel()

// The same using a different connection based on its name
Channel channel = rabbitContext.createChannel("connection1")
If the createChannel methods are used, it is important that these channels are closed. The plugin handles opening and closing channels that it manages as part of publishing or consuming messages, but channels created with the createChannel methods are not managed by the plugin. It is the author’s responsibility to close them, or connection leaks may and memory leaks most likely will occur.

Use the getConnection methods to gain access to the Connection objects.

// To retrieve the Connection from the default connection
Connection connection = rabbitContext.getConnection()

// To retrieve the Connection from a specific connection based on its name
Connection connection = rabbitContext.getConnection("connection1")

6.3.2. Starting and Stopping Connections

The plugin handles starting and stopping connections automatically when the application is started or shut down, but sometimes applications may need to manually stop connections based on certain conditions or business logic. The rabbitContext contains several methods to manage the life cycle of connections.

Method

Description

startConnections

Starts all registered connections. If some connections are already started, the remainder will also be started.

stopConnections

Stops all connections and all consumers.

startConnection

Starts a connection based on its name.

stopConnection

Stops a connection based on its name, and stops any consumers on the connection.

6.3.3. Starting and Stopping Consumers

Much like connections, the rabbitContext provides several methods to start and stop consumers if necessary.

Method

Description

startConsumers

Starts all registered consumers. If some consumers are already started, the remainder will also be started.

stopConsumers

Stops all consumers.

startConsumer

Starts a connection based on its class name.

stopConsumer

Stops a connection based on its class name.

startConsumers(String)

Starts all consumers on a specific connection, based on the connection name.

stopConsumers(String)

Stops all consumers on a specific connection, based on the connection name.

6.3.4. Status Report

The RabbitContext contains a method getStatusReport(), which will build an object structure containing information about all RabbitMQ connections and all consumers. This information is useful to monitor the status of the RabbitMQ application system, as it contains running states and statistics including how many concurrent threads a consumer is configured for, how many are actually active, and how many are actively processing messages. Below is an example of its output gathered from one of the plugin’s tests, serialized as JSON.

[
    {
        "consumers": [
            {
                "fullName": "com.budjb.rabbitmq.test.AllTopicConsumer",
                "load": 0,
                "name": "AllTopicConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "topic-queue-all",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.ReportingConsumer",
                "load": 0,
                "name": "ReportingConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "reporting",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.SleepingConsumer",
                "load": 0,
                "name": "SleepingConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "sleeping",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.SpecificTopicConsumer",
                "load": 0,
                "name": "SpecificTopicConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "topic-queue-specific",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.StringConsumer",
                "load": 0,
                "name": "StringConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "string-test",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.SubsetTopicConsumer",
                "load": 0,
                "name": "SubsetTopicConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "topic-queue-subset",
                "runningState": "RUNNING"
            }
        ],
        "host": "localhost",
        "name": "connection1",
        "port": 5672,
        "runningState": "RUNNING",
        "virtualHost": "test1.rabbitmq.budjb.com"
    },
    {
        "consumers": [
            {
                "fullName": "com.budjb.rabbitmq.test.Connection2Consumer",
                "load": 0,
                "name": "Connection2Consumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "connection2-queue",
                "runningState": "RUNNING"
            }
        ],
        "host": "localhost",
        "name": "connection2",
        "port": 5672,
        "runningState": "RUNNING",
        "virtualHost": "test2.rabbitmq.budjb.com"
    }
]

6.4. Transactions

The plugin provides a bit of automation around channel transactions. When a consumer is defined with the transacted property set to true, a transaction is automatically started on the channel passed to the message handler. When the message handler completes successfully, the transaction is automatically committed. If an unhandled exception is thrown from the message handler, the transaction is automatically rolled back.

It is especially important that any messages published from a message handler use the Channel instance passed in the MessageContext for this functionality to work.

Since the Channel is passed in the MessageContext, the author has full control over committing and rolling back transactions.

7. Upgrading

When an update to the plugin is made that has incompatibility considerations, this document will help identify the various impact considerations when upgrading to a new version.

7.1. From 3.0.X

The upgrade from 3.0.X to 3.1.X includes a refactor of how the consumer and connection managers work. The storage of consumer handlers was moved from the connection classes into the consumer manager in order to support the new start/stop functionality, and it made sense for consumer handlers to reside with the appropriately named manager.

Both of the managers also now have underlying interfaces that define how a manager should behave. In addition, the objects being managed are now called Contexts, which also have interfaces that define their general behavior. It is the intention that with these changes the plugin becomes more stable with less backwards- compatibility breaking changes being introduced moving forward.

Here are the items authors may need to address when upgrading to this version of the plugin.

  • The rabbitContext.getConnection() methods now return the RabbitMQ Connection object instead of the ConnectionContext instance. This was done to make the methods more consistent with the createChannel methods and to encapsulate the ConnectionContext objects inside the manager. The ConnectionContext objects can now be retrieved from the consumerManager bean.

  • Many of the methods inside of the ConsumerManager and ConnectionManager have been renamed to adhere to a common interface. This should only affect projects that use these beans directly.

  • Some of the methods inside of the RabbitContext have been renamed to match the interface used by the various managers.

Overall, users of the plugin should see no impact if the rabbitContext or any of the other beans are not used in their projects.

If the use of the plugin’s beans is limited to the rabbitContext, the impact should be minimal, with some minor changes need to method names and some refactored code if retrieving a ConnectionContext. Users that use the other manager beans will need to account for the changed interface implemented by those beans.

7.2. From 2.0.X

The upgrade from any version in the 2.0.X range to 3.0.X includes a massive refactoring of the internals of the plugin. If users did not extend or override the RabbitContext, RabbitMessageBuilder, or any of the other helper classes, the amount of impact is limited to a couple package name changes.

Below are the changes that were made, at a high level:

  • RabbitMessageBuilder is deprecated. The class still exists and its interface is the same, however, the code in the class has been ripped out and now proxies requests to the rabbitMessagePublisher bean. The builder will be removed at some point in the near future.

  • RabbitContext used to contain a significant amount of code related to management of message converters, consumers, and connections. That functionality has been broken out into their own respective classes. The RabbitContext now serves only as a class to aggregate functionality useful to users of the plugin, and should still be used to simplify interfacing with the plugin rather than using the underlying beans directly.

  • Introduced the rabbitMessagePublisher as a replacement for the RabbitMessageBuilder. This bean can be injected into other spring managed beans, such as services, controllers, and rabbit consumers. Its functionality follows the builder closely, and users may send messages based on configurations made through closures or through the new RabbitMessageProperties object. The short-hand convenience methods are still available as well.

  • Introduced the messageConverterManager bean to handle all operations pertaining to message converters.

  • Introduced the consumerManager bean to handle all operations pertaining to message consumers.

  • Introduced the connectionManager bean to handle all operations pertaining to RabbitMQ connections and channels.

  • Introduced the queueBuilder bean to handle creating configured exchanges and queues.

  • The MessageContext class has been moved into a new package: com.budjb.rabbitmq.consumer.

  • The AutoAck enum has been moved into a new package: com.budjb.rabbitmq.consumer.

  • The ConsumerConfiguration class has been moved into a new package: com.budjb.rabbitmq.consumer.

  • The MessageConvertMethod class has been moved into a new package: com.budjb.rabbitmq.consumer.

  • The ConnectionContext class has been moved into a new package: com.budjb.rabbitmq.connection.

  • The ConnectionConfiguration class has been moved into a new package: com.budjb.rabbitmq.connection.

  • The MessageConverter abstract class has been moved into a new package: com.budjb.rabbitmq.converter.

  • All of the bundles message converters have been moved into a new package: com.budjb.rabbitmq.converter.

  • The RabbitMessageBuilder class has been moved into a new package: com.budjb.rabbitmq.publisher.

  • A large amount of unit tests and some integration tests have been added to the project. These tests rely on the Spock mocking framework, but the test files and dependencies are not exported with the plugin so that a new plugin dependency on Spock is not created.

  • The closures used for publishing messages have had their resolving strategy changed from OWNER_FIRST to DELEGATE_FIRST. This should not have much of an impact, but in some cases closures may need to explicity qualify some properties in the closures with delegate.

8. Reference

8.1. Command Line

8.1.1. create-consumer

The create-consumer command creates a new consumer or consumers.

grails create-consumer com.example.Test

This command is useful to quickly create new consumers, in much the same way as Grails' built-in commands to create new controllers, domains, etc. The generated file is a template that is ready to be configured and used, and includes a generic message handler.

A powerful feature of this script is the ability to create as many consumers as are entered on the command line. For example:

grails create-consumer com.example.First com.example.Second

The above will create two consumers: FirstConsumer and SecondConsumer.

8.2. Consumer Configuration

8.2.1. autoAck

Sets whether incoming messages should be automatically acknowledged.

static rabbitConfig = [
    queue: "example.queue",
    autoAck: AutoAck.POST
]

There are 3 auto-acknowledgement modes:

Enum Value

Effect

AutoAck.MANUAL

The message is never automatically acknowledged. The message handler is responsible for acknowledging the message.

AutoAck.ALWAYS

The message will be automatically acknowledged before the message is delivered the message handler.

AutoAck.POST

The message will be automatically acknowledged after the message handler successfully completes. If an unhandled exception escapes the handler, the message is rejected. This is the default mode.

8.2.2. binding

Set the binding criteria to use when subscribing to an exchange.

static rabbitConfig = [
    exchange: "example.exchange",
    binding: "example.routing.key"
]

The binding criteria may be necessary depending on the type of exchange is being subscribed to. This property is ignored when consuming from a queue. See the guide for more information on this property’s usage.

8.2.3. connection

Sets which connection should be used to consume messages from.

static rabbitConfig = [
    queue: "foobar",
    connection: "server1"
]

The connection property should be used in multi-server configurations to specify which connection should be used to consume messages from. If the connection property is omitted, the default connection will be used.

8.2.4. consumers

Set the number of concurrent consumers the message consumer should start.

static rabbitConfig = [
    queue: "example.queue",
    consumers: 10
]

By default, a message consumer class will only start one consumer. This means the consumer can only handle one message at a time. Authors can increase the number of concurrent consumers are running by increasing the value of this property.

8.2.5. convert

Sets the automatic message body conversion mode.

static rabbitConfig = [
    queue: "example.queue",
    convert: MessageConvertMethod.ALWAYS
]

There are 3 convert modes:

Enum Value

Effect

MessageConvertMethod.DISABLED

The message is never automatically converted. The message handler always receives a byte array.

MessageConvertMethod.HEADER

The message will only be automatically converted if the incoming message has the content-type property set and a matching converter is found based on that content-type.

MessageConvertMethod.ALWAYS

The message will be automatically converted as long as a suitable message converter and message handler are found. This is the default mode.

8.2.6. exchange

Set the exchange to subscribe to.

static rabbitConfig = [
    exchange: "example.exchange"
]

Tells the plugin that the consumer should subscribe to an exchange. The exchange must already exist for the consumer to beginn listening to it. Exchanges can be created externally from the application, or via the application’s RabbitMQ configuration. Note that a routing key might be necessary depending on the type of exchange is specified.

8.2.7. match

Set the match criteria to use when subscribing to a headers exchange.

static rabbitConfig = [
    exchange: "example.exchange",
    binding: ["foo": "bar"],
    match: "any"
]

The match property determines with any one header must match or whether all headers must match for the queue’s binding. This property is only used when binding to a headers exchange.

8.2.8. prefetchCount

Sets number of messages the consumer will prefetch.

static rabbitConfig = [
    queue: "example.queue",
    prefetchCount: 1
]

Sets the QOS prefetch count property of the consumer’s channel. By default, the consumer will fetch one message at a time. Increasing this value will increase the number of messages the consumer will queue up before it processes them.

8.2.9. queue

Set the queue to consume from.

static rabbitConfig = [
    queue: "example.queue"
]

Tells the plugin that the consumer should consume a specific queue. The queue must already exist for the consumer to beging listening to it. Queues can be created externally from the application, or via the application’s RabbitMQ configuration.

8.2.10. retry

Sets whether a rejected message should be redelivered.

static rabbitConfig = [
    queue: "example.queue",
    retry: false
]

If a message is rejected, this property is used to determine whether the message should be marked for redelivery.

If a message is rejected because of an unhandled exception, for example, that will repeat every time the message is consumed, the message will be retried indefinitely if enabled. Be careful when using this feature.

8.2.11. transacted

Sets whether automatic transactions should be enabled on the consumer.

static rabbitConfig = [
    queue: "example.queue",
    transacted: true
]

See the Transactions documentation for more information.

8.3. Rabbit Message Publisher

8.3.1. appId

Sets the appId property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    appId = "example"
    body = "test message"
}

8.3.2. autoConvert

Toggles whether the message will be converted via the message converters when a response is received from an RPC message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    body = "test message"
    autoConvert = true
}

If this value is true, the same logic to convert messages in the message consumers is used to convert the received message from a byte array to a converted object type, if one exists. If this is false, the raw byte array is returned from the RPC response.

8.3.3. body

Body of the message to send to the RabbitMQ server.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    body = "test message"
}

The body is a required part of the message to transmit. The RabbitMQ server expects the message to be in a byte array format, but the plugin attempts to handle the conversion of objects for you via the Message Converters objects. You may assign any type of object to this property as long as there is a message converter available to convert the object to a byte array.

8.3.4. connection

Sets which connection should be used to send the message to.

rabbitMessagePublisher.send {
    exchange = "example.exchange"
    routingKey = "example.topic"
    body = "message"
    connection = "server1"
}

The connection property is used to specify which server to send a message with in a multi-server configuration. If the connection is omitted, the default connection will be used.

8.3.5. contentEncoding

Sets the content encoding property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    contentEncoding = "base64"
    body = "test message"
}

8.3.6. contentType

Sets the content-type property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    contentType = "text/plain"
    body = "test message"
}

8.3.7. correlationId

Sets the application correlation ID of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    correlationId = "1234"
    body = "test message"
}

8.3.8. deliveryMode

Sets the delivery mode of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    deliveryMode = 1
    body = "test message"
}

Values are either Non-persistent (1) or persistent (2).

8.3.9. exchange

Define the exchange to publish a message to.

rabbitMessagePublisher.send {
    exchange = "example.exchange"
    body = "message"
}

Setting this property lets the rabbitMessagePublisher know to publish the message to an exchange. Depending on the type of exchange, the use of the routingKey property may be necessary. See the RabbitMQ documentation for more information on exchanges.

A publish operation with the rabbitMessagePublisher may only be done with either an exchange or a queue, but not both. Attempting to use both will result in an error.

8.3.10. expiration

Sets the expiration property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    expiration = "Tuesday, October 22nd 2013"
    body = "test message"
}

8.3.11. headers

Attached headers to send with the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    headers = [
        "header1": "foo",
        "header2": "bar"
    }
    body = "test message"
}

The headers property is simply a map with key/value pairs for the header names and their values.

8.3.12. messageId

Sets the messageId property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    messageId = "1234"
    body = "test message"
}

8.3.13. priority

Sets the priority of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    priority = 5
    body = "test message"
}

Message priority is an integer with values from 0 to 9.

8.3.14. replyTo

Sets the replyTo queue name.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    replyTo = "reply.queue"
    body = "test message"
}

Setting the replyTo property of the message lets the consumer know that the publisher expects a response to the message. The value of this property should be an existent queue the client is consuming on.

This property is automatically set when the RabbitMessagePublisher.rpc() method is used, and the consumer has access to this property to reply to the message.

8.3.15. routingKey

The routing key is used in conjunction with an exchange to publish a message.

rabbitMessagePublisher.send {
    exchange = "example.exchange"
    routingKey = "example.topic"
    body = "message"
}

The routing key is useful when publishing messages to topic or direct exchanges. When used on direct exchanges, the routing key must match the binding a queue used to bind to an exchange, or the message will become unroutable (and possibly lost). See the RabbitMQ documentation for more information on routing keys and exchanges.

The exchange can be omitted, in which case the routingKey can be used to send a message directly to a queue.

8.3.16. timeout

Sets the amount of time, in milliseconds, an RPC message will wait before giving up.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    timeout = 20 * 60 * 1000 // 20 seconds
    body = "message"
}

The timeout is important when sending RPC messages. A server may fail and not reply to the message, and the client must not wait until eternity for the message to come back. The default timeout, if not specified, is 5 seconds. If the client does wish to wait indefinitely, setting this value to 0 will cause the RPC call to wait until a reply is received.

8.3.17. timestamp

Sets the timestamp property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    timestamp = Calendar.getInstance()
    body = "test message"
}

8.3.18. type

Sets the type property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    type = "type"
    body = "test message"
}

8.3.19. userId

Sets the userId property of the message.

rabbitMessagePublisher.send {
    routingKey = "example.queue"
    userId = "foobar"
    body = "test message"
}

9. Changelog

9.1. 3.5.x

9.1.1. Version 3.5.1

  • Fix another instance of getCharSet.

9.1.2. Version 3.5.0

  • Update to work with Grails 3.3 (or better).

  • Add Order and Ordered support to message converters.

9.2. 3.4.x

9.2.1. Version

9.2.2. Version 3.4.6

  • Add some error handling around flushing and destroying the persistence context interceptor. Was unable to reproduce the error, but this is an attempt to fix a zombie Hibernate session bug.

9.2.3. Version 3.4.5

  • Added rabbitmq prefix, to avoid conflicts with environment variables. (#130)

9.2.4. Version 3.4.4

  • Move the TypeConvertertingMapConverter to a higher converting priority than JsonConverter.

9.2.5. Version 3.4.3

  • Add top-level exception handling for the internal message handler implementation. This fixes an issue where Hibernate exceptions closed RabbitMQ channels.

  • Use a different MimeType constructor with built-in message converters to maintain Grails 3.1.x support.

9.2.6. Version 3.4.2

  • Add configuration option rabbitmq.enableSerializableConverter, which defaults to false. This breaks behavior with the previous 3.4.x releases but brings it back inline with the behavior of previous versions. This was done due to unexpected serialization issues with embedded lazy maps returned from the JsonSlurper.

9.2.7. Version 3.4.1

  • Change from using Spring’s MimeType.getCharset() to MimeType.getCharSet(). The latter is deprecated but using it allows Grails 3.1.x to function correctly.

9.2.8. Version 3.4.0

  • Remove support for legacy connection configuration (rabbitmq.connection).

  • Update RabbitMQ Client library to 4.2.0.

  • Add support for exporting RabbitMQ metrics.

  • Major refactor of the message converter system. These changes are not backwards compatible.

  • Major refactor of the message consumer backend system. Existing consumers are compatible.

  • Migrate documentation to AsciiDoc.

  • Introduce new interfaces for message consumers (UnsupportedMessageHandler and MessageConsumerEventHandler).

  • Added Spring application events for RabbitMQ plugin system start and stop events.

9.3. 3.3.x

9.3.1. Version 3.3.3

  • Upgrade codebase to Grails 3.2.

  • Add ability to bind exchanges to exchanges.

  • Migrate from Calendar to OffsetDateTime for message timestamps.

  • Update amqp-client to version 3.6.6. (#83)

9.3.2. Version 3.3.2

  • Fix message converter/consumer handler issue where message handler methods with primitive types (such as int) would not accept the equivalent boxed type (such as Integer).

9.3.3. Version 3.3.1

  • Reintroduce support for consumer handler methods taking only the MessageContext as a parameter.

9.3.4. Version 3.3.0

9.3.5. Version

This version includes changes to the configuration format introduced in 3.2.0. There were issues with YAML configuration and using the keys as the names of connection, queues, or exchanges where those names had periods in them. There was no way to escape them, and it broke the configuration parsing code. Due to this, the format had to change. This decision was not made lightly and I intend on this being the last change to the configuration format. ==== Version * Changed the configuration format introduced in 3.2.0. This breaks backward compatibility with that version.

9.4. 3.2.x

9.4.1. Version 3.2.0

A huge thanks to Ollie Freeman for his work on this release.
  • First non-beta release of the plugin for Grails 3.

  • Deprecate closure-based queue/exchange configuration.

  • Add new map-based configuration that works well with YAML.

  • Removed RabbitMessageBuilder.

9.5. 3.1.x

All versions below refer to the Grails 2.x plugin. The Grails 3.x plugin forked from the Grails 2.x version after release 3.1.3.

9.5.1. Version 3.1.3

  • Add status reports that provides information about all connections and their consumers, including load statistics.

  • Upgrade project to Grails 2.5.4.

  • Remove gpars dependency.

  • Make Grails 2.3 the minimum version of Grails this plugin is intended for.

  • Fix consumer configuration from application config parsing issue (#73).

  • Refactor message conversion for incoming messages so that conversion to a type only happens if an appropriate handler exists.

9.5.2. Version 3.1.2-beta

  • Experimental upgrade to Grails 3.

9.5.3. Version 3.1.2

  • Added graceful shutdown support. See rabbitContext.shutdown().

  • Added methods to check running state on most managers and contexts.

  • Updated rabbitmq Java library to 3.5.4.

  • Added the gpars plugin as a dependency.

9.5.4. Version 3.1.1

  • Refactored the code to load a consumer’s configuration from a static variable so that it works correctly when the consumer is annotated with @Transactional. (#55)

  • Add setter methods for the message TTL (expiration). (#56)

  • Fix bug where missing connection configuration values do not allow the use of default values.

  • Remove checked exception from ConsumerManageImpl that does not exist in its interface. (#59)

9.5.5. Version 3.1.0

  • Update the RabbitMQ Client Java library to 3.5.0.

  • Fix an issue that caused unclean shutdowns when redeploying an application using the plugin. (#54)

  • Added the ability to start and stop individual connections. (#49)

  • Added the ability to start and stop individual consumers. (#49)

  • Added the ability to start and stop consumers based on the connection they’re tied to. (#49)

  • Moved consumer adapter storage from the connection context to the consumer manager.

  • Handle Throwable types that were not being handled before in the consumer handling so that channels are not closed if one of the unhandled errors occurs. (#47)

  • Added travis-ci continuous integration for all commits to the plugin.

9.6. 3.0.x

9.6.1. Version 3.0.4

  • Fix a null pointer exception when a consumer has no configuration.

  • Add a unit test to test behavior when a consumer has no configuration.

  • Add an integration test to test behavior when sending a message directly to a queue.

9.6.2. Version 3.0.3

  • Introduced the rabbitMessagePublisher bean to replace the RabbitMessageBuilder.

  • Deprecated the RabbitMessageBuilder.

  • Massive refactor of the internals of the plugin. See the upgrading page for more detailed information about what has changed.

  • Added the ability to configure consumers centrally in the application’s configuration file (thanks Erwan Arzur).

  • Updated RabbitMQ library version to 3.4.3.

9.6.3. Version 3.0.2

  • Internal release, see 3.0.3.

9.6.4. Version 3.0.1

  • Internal release, see 3.0.3.

9.6.5. Version 3.0.0

  • Internal Release, see 3.0.3.

9.7. 2.0.x

9.7.1. Version 2.0.10

  • Fix bug with converters that prevented converters later in the processing list from executing if another convert is unable to marshall data from bytes.

  • Add enabled flag to the configuration. If false, completely disables the plugin from starting.

9.7.2. Version 2.0.9

  • Additional fix for memory leak associated with RPC calls and auto-recovering connections.

9.7.3. Version 2.0.8

  • Fix bug introduced by rushing the previous fix. Mark consuming = true.

9.7.4. Version 2.0.7

  • Add basicCancel() to RabbitMessageBuilder in an attempt to address a memory leak.

  • Improve cleaning up of resources in RPC calls.

9.7.5. Version 2.0.6

  • Updated copyright notices.

  • Added GString message converter.

  • Updated publishing guide docs to make RabbitMessageBuilder usage more clear (thanks marcDeSantis @GitHub).

9.7.6. Version 2.0.5

  • Added heartbeat configuration for connections (thanks LuisMuniz @GitHub).

  • Refactored Hibernate session support so that Hibernate is no longer a dependency of the plugin, and will now work with or without Hibernate present.

9.7.7. Version 2.0.4

  • Added multi-server support to all aspects of the plugin.

  • Added SSL support for connections.

  • Added auto-reconnect support for dropped connections.

  • Added logic to wrap a Hibernate session around calls to consumers.

  • Updated the RabbitMQ library to version 3.3.0.

  • Added logging for connection/channel reconnects and channel shutdowns.

  • Changed format for connection configurations. The old style is still supported, but will likely be removed at some point.

9.8. 1.0.x

9.8.1. Version 1.0.3

  • Modified the logic to check for the existence of callbacks in consumers.

9.8.2. Version 1.0.2

  • Added a cached thread pool so the user does not need to account for the number of threads consumers require. Set the default to 0 so that this is the default.

  • Added callbacks for messages: onReceive, onSuccess, onFailure, and onComplete.

9.8.3. Version 1.0.1

  • Remove the maven group from the plugin definition class.

9.8.4. Version 1.0.0

  • Version bump for general release.

9.9. 0.2.x

9.9.1. Version 0.2.1

  • Fixed a bug with the message handler discovery method that caused generically-typed handlers to get called incorrectly.

9.9.2. Version 0.2.0

  • Refactored queue/exchange configuration. It is now possible configure queue binding without having to also configure the exchange being bound to.

  • Added the match property to queue configuration to support headers exchange binding. This breaks backwards compatibility.

  • Renaming the routingKey property of the consumer configuration to binding to match queue configuration. This breaks backwards compatibility.

9.10. 0.1.x

9.10.1. Version 0.1.8

  • Moved the trigger to start consumers on application launch to the bootstrap.

9.10.2. Version 0.1.7

  • Added the prefetchCount option to the consumer configuration. Defaults to 1.

  • Added the threads option to the connection configuration. Defaults to 5.

9.10.3. Version 0.1.6

  • Fixed logic to determine if a consumer is valid.

  • Added support for short-form handlers that only take a single parameter.

9.10.4. Version 0.1.5

  • body parameter to the RabbitMessageBuilder is no longer required. It now defaults to an empty byte array.

9.10.5. Version 0.1.4

  • Fix a class visibility issue in the artefact handlers for this plugin.

9.10.6. Version 0.1.3

  • Touch up the consumer template.

9.10.7. Version 0.1.2

  • Add the ability to create multiple consumers at the same time with the create-consumer script (thanks Aaron Brown!).

  • Also create a unit test when creating consumers (thanks Michael Rice!).

9.10.8. Version 0.1.1

  • Throw an exception if the connection configuration is missing on application start (thanks Michael Rice!).

  • Add the create-consumer script (thanks Aaron Brown!).

9.10.9. Version 0.1

  • Code complete/experimental release.