(Quick Reference)

Rabbitmq Native Plugin - Reference Documentation

Authors: Bud Byrd

Version: 3.1.3

1 Introduction

The RabbitMQ Native plugin for Grails provides RabbitMQ integration using the Java libraries provided by the RabbitMQ project. It provides easy configuration of exchanges and queues via a project's configuration, simple configuration of consumers, and a convenience class to easily publish messages.

This plugin strives to provide an easy convention to follow to quickly begin consuming messages, but still provide easy access to the underlying layer to allow users of this plugin to do whatever they need to do outside of those conventions. It also provides a powerful message body converter system that allows the user to quickly write and plug in converters for custom data types.

This guide details the configuration of the plugin, and may not fully explain the use and conventions of the RabbitMQ services or its Java library. More information can be found in their Java API guide and JavaDoc.

2 Quick Start

This is a quick and dirty how-to detailing how to quickly begin using the plugin. While there is a lot of configurability offered by this plugin, this is a very basic demonstration of its usage.

Create The Application

Create a project named RabbitExample. You can do this by entering:
grails create-app RabbitExample

Add The Plugin

In grails-app/conf/BuildConfig.groovy, under the plugins section, add:
plugins {
        // ...

compile name: "rabbitmq-native", version: "latest.release"

// … }

Then, refresh the project's dependencies:

grails refresh-dependencies

Configuring

In grails-app/conf/Config.groovy, add:
rabbitmq {
    connection = {
        connection host: "changeme", username: "changeme", password: "changeme"
    }
    queues = {
        queue name: "testqueue"
    }
}
Be sure to replace the appropriate values for your RabbitMQ server and user credentials.

Consumer

Create the a consumer by using the following command:
grails create-consumer com.example.Test

Update the consumer to reflect the following:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class TestConsumer { /** * Consumer configuration. */ static rabbitConfig = [ queue: "testqueue" ]

/** * Handle an incoming RabbitMQ message. * * @param body The converted body of the incoming message. * @param context Properties of the incoming message. * @return */ def handleMessage(def body, MessageContext context) { println body return "Hello to you, too!" } }

Controller

Create a controller by using the following command:
grails create-controller com.example.Test

Update the controller to reflect the following:

package com.example

import com.budjb.rabbitmq.publisher.RabbitMessagePublisher

class TestController { RabbitMessagePublisher rabbitMessagePublisher

def index() { render rabbitMessagePublisher.rpc { routingKey = "testqueue" body = "Hello!" } } }

Run it!

Run the grails application.
grails run-app

You can see the application in action by hitting the test controller. If you're running this on your localhost, your URL may be similar to http://localhost:8080/RabbitExample/test/index. You should see the message "Hello!" printed the application's output console, and your web browser should dispay the message "Hello to you, too!"

3 Configuration

Configuration of the connection to the RabbitMQ server is done in your project's grails-app/conf/Config.groovy file.

Below is the list of general configuration properties.

Configuration PropertyRequiredDescriptionTypeDefault
autoStart If true, will start any consumers during the bootstrap phase of the application startup.Booleantrue
enabled If false, will register the plugin's beans, but prevent the plugin connecting to the RabbitMQ broker(s) and consuming messages.Stringtrue

If autoStart is set to false, all of the connections to the broker will be initiated, but the message consumers will not automatically start consuming messages. To start consuming, use the rabbitContext bean and call rabbitContext.startConsumers().

3.1 Server Connection

The plugin expects the connection configuration to the RabbitMQ server to be defined. A bare minimum configuration example looks like:

rabbitmq {
    connection = {
        connection host: "example.com", username: "foo", password: "bar"
    }
}

Connections to many different RabbitMQ servers can be configured. A multi-server configuration looks like:

rabbitmq {
    connection = {
        connection host: "rabbit1.example.com", username: "foo", password: "bar"
        connection host: "rabbit2.example.com", username: "foo", password: "bar"
    }
}

The following table enumerates all the configuration options available to the connection configuration:

Configuration PropertyRequiredDescriptionTypeDefault
hostHostname or IP address of the RabbitMQ server to connect to.String none
usernameUsername to log into the RabbitMQ server with.String none
passwordPassword to log into the RabbitMQ server with.String none
name Name of the connection. This is used while sending messages to a specific RabbitMQ server when using multiple servers.String none
isDefault A connection with this set to true will be the server messages are sent to if no specific connection is specified when sending the message.booleangfalse
port Port to connect to the RabbitMQ server with.Integer5672
virtualHost Name of the virtual host to connect to the RabbitMQ server with.String none
ssl Whether to use SSL when connecting to a RabbitMQ server.booleanfalse
threads Threadpool size, if greater than 0, determines how many concurrent messages can be processed at any given time. If set to 0, consumers can consume as many messages as it's configured to.String0
automaticReconnect If true, will cause the application to automatically reconnect to a server when its connection is dropped.booleantrue
requestedHeartbeat Heartbeat interval, in seconds. A value of 0 disables heartbeats.Integer0

3.2 Defining Queues

The plugin allows authors to define the exchanges and queues programatically inside the configuration. This allows the application to configure its own queues without someone having to manually create the exchanges and queues prior to running the application.

Queue configuration is also done in the Config.groovy file under the rabbitmq block, much as the server connection is configured. Usage is best illustrated with an example:

rabbitmq {
    queues = {
        queue name: "example.queue", durable: true, exchange: "example.exchange"
    }
}

The above code will define a queue named example.queue, and its durable flag will be set.

Be sure to note that the queues property is a closure. You must ensure that the = is present for this feature to function properly.

If using the multi-server feature of the plugin, it is important to consider what server the queue should be defined in. The connection property specifies which server to create the queue in, illustrated below:

rabbitmq {
    queues = {
        // Assume there is a connection with the name "server1"…
        queue name: "example.queue", connection: "server1", durable: true
    }
}

If there are many queues to be defined for a specific server connection, the connection method provides a convenient way to bunch queue creation.

rabbitmq {
    queues = {
        // Assume there is a connection with the name "server1"…
        connection name: "server1", {
            queue name: "example.queue", durable: true
            queue name: "example2.queue", durable: true
        }
    }
}

Below is a table of all of the options available when defining queues:

PropertyRequiredDescriptionTypeDefault
arguments Extra arguments used to create the queue. See the RabbitMQ documentation for more information.Map none
autoDelete Whether to automatically delete the queue once there are no more consumers listening to it.booleanfalse
binding Used in conjunction with exchanges. See the section below for more information. Mixed none
durable Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts.booleanfalse
exchange Binds a queue to an exchange in conjunction with the binding property. Ignored if used inside an exchange declaration.String none
match Required when binding to a headers exchange. Either "any" or "all".String none
nameName of the queue.String none
connection Name of the connection to create the queue with. Uses the default connection if omitted.String none

3.3 Defining Exchanges

Defining exchanges is very similar to defining queues. The following code illustrates how to define an exchange:
rabbitmq {
    queues = {
        exchange name: "example.exchange", type: "topic"
    }
}

The above example will create an exchange with the name example.exchange and of the type topic. Below is a list of all the options availble when creating exchanges:

PropertyRequiredDescriptionTypeDefault
arguments Extra arguments used to create the exchange. See the RabbitMQ documentation for more information.Map none
autoDelete Whether to automatically delete the exchange once there are no longer any queues bound to it.booleanfalse
durable Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts.booleanfalse
nameName of the exchange.String none
typeOne of "fanout", "topic", "direct", or "headers".String none
connection Name of the connection to create the exchange with. Uses the default connection if omitted.String none

The same connection considerations exist for exchanges as with queues. The same connection method works for exchanges as well.

3.4 Binding Queues and Exchanges

Queues can be bound to an exchange by setting the exchange property when declaring the queue to the name of the exchange to bind to. This is the preferred method if the application being configured is not responsible for defining and configuring the exchange being bound to.

Queues can also be bound to an exchange by declaring the queues inside of a closure passed as the last parameter of an exchange definition. This is a convenient method to do queue binding when your application is responsible for defining and configuring the exchange.

rabbitmq {
    queues = {
        exchange name: "example.exchange", type: "topic", {
            queue name: "example.queue", binding: "sample.binding.#"
        }
    }
}
This example will create a topic exchange named example.exchange, as well as create a queue named example.queue. The queue will be bound to the exchange with the topic or routing key of "sample.binding.#".

Queues need to have their binding defined specifically for the type of exchange they are bound to.

Fanout Exchanges

Fanout exchanges are the easiest to configure bindings for, since they require none. Fanout exchanges simply send every message it received to every queue bound to it.
rabbitmq {
    queues = {
        exchange name: "example.exchange", type: "fanout", {
            queue name: "example.queue"
        }
    }
}

Topic Exchanges

Topic exchanges require queues to define a topic. Topics can be an exact match, but their strength is in their partial matching ability. See the RabbitMQ documentation for details about this kind of exchange.
rabbitmq {
    queues = {
        exchange name: "example.exchange", type: "topic", {
            queue name: "example.queue", binding: "exmaple.binding.#"
        }
    }
}

Direct Exchanges

Direct exchanges are similar to topic exchanges, except that their "topics" only function with direct name matching. The appropriate name for the binding in this case is "routing key". Queues must define a routing key when binding to this type of exchange.
rabbitmq {
    queues = {
        exchange name: "example.exchange", type: "direct", {
            queue name: "example.queue", binding: "exmapleRoutingKey"
        }
    }
}

Header Exchanges

Header exchanges are like topic exchanges, but with the ability to define multiple match keywords. The binding for queues allows the queue to match on all or one of multiple header values. The queue must also set the match property for this exchange type, and the value must be one of "any" or "all".
rabbitmq {
    queues = {
        exchange name: "example.exchange", type: "headers", {
            queue name: "example.queue", match: "any", binding: [
                "header1": "header-value-1",
                "header2": "header-value-2"
            ]
        }
    }
}

4 Consuming Messages

Consuming messages is accomplished by creating a message consumer class. Consumer classes are placed into the grails-app/rabbit-consumers, and must end with Consumer.groovy. These classes are Spring beans, meaning that they can inject other Spring beans such as services, the grailsApplication instance, etc. Each consumer class must be configured in a certain way. If there is a misconfiguration in a consumer, the plugin will log an error stating so and not register the consumer to any queues.

4.1 Message Handlers

One of the requirements for a consumer to be registered to the RabbitMQ server is that a message handler be declared in the consumer class. The message handler is the mechanism by which messages are consumed.

4.1.1 Basic Usage

In its most basic form, a message handler method takes in the body of the received message, and a MessageContext object that contains the message parameters received from the RabbitMQ server, along with the consumer's channel that the handler should publish messages through.

This is the most generic form of a message handler:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { // ...

def handleMessage(def body, MessageContext context) { // Do work } }

4.1.2 Typed Message Handlers

The logic surrounding the message consumer classes will by default attempt to intelligently convert the body of the received message from a byte array to a converted type, such as a String. Before routing the message to the consumer and handler, the plugin will run through a list of converters that will attempt to convert the message, and if the conversion was successful, determine if an appropriate handler has been defined.

For example, consider this JSON blob:

{"foo":"bar","hi":"there"}

If the above message is received, the converter for the Map class type will convert the byte array to a Map of the JSON data. If a valid handler for the Map type is defined, the handler will receive the converted JSON so that the handler does not need to handle the conversion.

The following handlers would accept the converted Map:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { // ...

def handleMessage(Map body, MessageContext context) { // Do work }

def handleMessage(def body, MessageContext context) { // Since def is a generic type (Object) } }

If a converter was successfully able to convert the message body, but no handler was defined to handle the class type, other converters will get a chance to convert the message body. In the above example, if only a handler for the String type was defined, the handler will receive the JSON blob as a String.

If no converter is able to convert the message body, the plugin will fall back to passing the handler the raw byte array received from the RabbitMQ server.

If no handler is defined that can handle the received message (including the raw byte array), an error will be logged and the message will be rejected.

The plugin has built-in converters for Integer, Map, List, GString, and String types. The plugin allows users to define their own converters to convert other object types, which will be discussed later in this guide.

4.1.3 Short-Form Usage

In addition to the 2-parameter handler method signature, there are 2 shortcut versions available for use. One form only takes the converted message body, and the other only takes the MessageContext object. The order of preference when multiple valid handlers are defined is:
  1. Long form (body, context)
  2. Short form (body)
  3. MessageContext form

The short-form handlers are shown below:

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { // ...

def handleMessage(String message) { // Do work }

def handleMessage(MessageContext context) { // Do work } }

The MessageContext-only handler will only be called if there is no other handler defined that can possibly handle any conversion of the message body.

4.1.4 MessageContext Object

The message context is just an object that encapsulates the data relevant to the received message. Below is a list of properties of the class.
PropertyDescription
bodyIncoming message in its raw byte format.
channelThe RabbitMQ channel the message handler should use to publish messages. This is especially important when using transactions.
consumerTagConsumer tag
envelopeProperties of the message's delivery (see RabbitMQ's documentation)
propertiesProperties of the message (see RabbitMQ's documentation)

4.1.5 RPC-Style Messages

When a client publishes a message and waits for a return reply, this is considered an RPC-style operation. Typically, the server-side of the operation (in this case, a message consumer/handler) must respond to the client on a queue that the client requested manually. This plugin provides a very convenient method to respond to the client without having to manually construct a response message.

The client will provide a response queue to the server to reply to. This queue name is stored in the MessageContext.properties.replyTo variable. If a client publishes this variable, and the handler returns some data, the plugin will convert the data returned from the message handler and build a response message for you.

The following example illustrates responding via a handler's returned data.

class RpcExampleConsumer {
    def handleMessage(String message, MessageContext messageContext) {
        println "received ${message}"

return "response" // this message will be sent back to the consumer via its replyTo queue } }

Alternatively, the rabbitMessagePublisher can be used to respond.

class RpcExampleConsumer {
    def rabbitMessagePublisher

def handleMessage(String message, MessageContext messageContext) { println "received ${message}"

rabbitMessagePublisher.send { routingKey: messageContext.properties.replyTo body: "response" } } }

Allowing the plugin to build a reply message only converts the data returned from the message handler and publishes it to the reply queue. If you need to set any of the other message properties, like headers, content-types, etc, you must manually build the response message using the rabbitMessagePublisher, and refrain from returning data from the message handler.

4.2 Subscribing To Queues

Message consumers can subscribe to either queues or exchanges. When a consumer is registered to a queue, the consumer will receive messages from the queue as the RabbitMQ server determines that it's the consumer's turn to receive a message, since there may be multiple listeners on the same queue.

Each message consumer class must have a configuration defined. There are 2 methods to specify the configuration:

  • The consumer class can have a Map assigned to a static variable named rabbitConfig. To subscribe to queues, the only required configuration option is the queue variable, which is the name of the queue to subscribe to.
  • The application's configuration file can contain the configuration with the path rabbitmq.consumers.<ClassName>. See the section below for more details.

Here is a simple example of a consumer subscribing to a queue.

package com.example

import com.budjb.rabbitmq.consumer.MessageContext

class ExampleConsumer { static rabbitConfig = [ "queue": "test.queue" ]

def handleMessage(def body, MessageContext context) { // Process message } }

There are many options available to influence how the consumer works, which can be found in the reference.

4.3 Subscribing To Exchanges

Subscribing to a exchanges is different from subscribing to queues, as there are different types of exchanges with different behavior. RabbitMQ's libraries do not provide a direct way to subscribe to an exchange, however the plugin provides a way to subscribe to exchanges directly by creating a temporary queue that is bound to an exchange. The binding requirements differ between the different types of exchanges.

Using the functionality to subscribe to an exchange works by creating a temporary queue at runtime that is bound to the exchange and consumed from by the message consumer. It is important to note that these queues are randomly named and are exclusive and auto delete, so this feature is not suitable if messages that are left on this queue should persist if the application shuts down.

4.3.1 Fanout Exchanges

A fanout exchange will forward a received message to every queue bound to it. There is no binding criteria for this kind of exchange. This is the simplest type of exchange, and is also the easiest to configure.

Fanout Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "fanout.exchange" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.3.2 Topic Exchanges

A topic exchange will forward messages to queues based on the binding criteria the queue used to register to the exchange. In RabbitMQ terms, this is called a routing key. The routing key can be either a direct match, or utilize wildcards to do a partial topic match. If a routing key is omitted, the queue will receive no messages. Use "#" to receive all messages from an exchange. More information can be found in the RabbitMQ documentation.

Topic Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "topic.exchange", "binding": "foo.bar.#" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.3.3 Direct Exchanges

A direct exchange will forward messages to queues based on binding criteria configured similarly to topic exchanges. The difference in this case is that direct routing does not utilize wildcards in their routing keys.

Direct Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "direct.exchange", "binding": "example" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.3.4 Headers Exchanges

Header exchanges work similarly to topic exchanges. A headers exchange will forward messages to queues based on header values contained in messages. Additionally, a queue can be bound on multiple header values, along with an option to require one or all of the headers to match.

Headers Example

package com.example

class ExampleConsumer { static rabbitConfig = [ "exchange": "headers.exchange", "binding": [ "foo": "bar", "hi": "there" ], "match": "any" ]

def handleMessage(def body, MessageContext context) { // Process message } }

4.4 Multi-Server

When using a multi-server setup, it is important to consider what server a consumer should listen on. Use the connection property in the rabbitConfig to specify which server the consumer should be bound to. If the connection is omitted, the consumer will be bound to the default connection.

package com.example

class ExampleConsumer { static rabbitConfig = [ "queue": "test.queue", "connection": "server1" // Where "server1" is a connection configured with that name. ]

def handleMessage(def body, MessageContext context) { // Do work } }

4.5 Central Configuration

It is also possible to define the consumer's configuration outside of the consumer in the Grails Config.groovy file. All of the configuration options described above are valid for this type of configuration. This functionality is valuable when a consumer's configuration needs to be determined at runtime, instead of being hardcoded in the consumer class itself.

To configuration a consumer in the Grails application's configuration file, an entry matching the consumer's class name should be present under the rabbitmq.consumers key, as follows:

// …
rabbitmq {
    consumers {
        ExampleConsumer {
            queue = "test.queue"
        }
    }
}

5 Publishing Messages

Publishing messages through the plugin is achieved by using the rabbitMessagePublisher bean. This Spring bean utilizes a closure-based configuration method to both send messages without waiting for a response, and sending rpc-style messages. There are many options available to the rabbitMessagePublisher which are documented in the reference, but this guide will only demonstrate basic usage.

In a multi-server setup, it is important to consider what server to send a message to. Like configuration and consumers, the connection property is used to route the message to the proper server connection.

5.1 Sending Messages

Sending a message means publishing a message to an exchange or queue and not waiting for a response (fire and forget). The only required parameters to the publisher are a queue or exchange to publish the message to, and the body of the message.

Send Example

import com.budjb.rabbitmq.publisher.RabbitMessagePublisher

class ExampleService { RabbitMessagePublisher rabbitMessagePublisher

def sendSomeMessage() { rabbitMessagePublisher.send { exchange = "some.exchange" routingKey = "some.routingKey" body = "hi!" } } }

RabbitMQ expects the body of a message to be a byte array. Message converters will also work when publishing messages, so if an object type other than byte is encountered, a suitable message converter will be found and run against the message body, if one exists.

5.2 RPC Messages

Publishing an RPC message is as easy as sending messages, except the returned message is returned from the function.

RPC Example

import com.budjb.rabbitmq.publisher.RabbitMessagePublisher

class ExampleService { RabbitMessagePublisher rabbitMessagePublisher

def sendSomeMessage() { def result = rabbitMessagePublisher.rpc { exchange = "some.exchange" routingKey = "some.routingKey" body = "hi!" timeout = 5000 } } }

The timeout option is especially important for RPC-style messages. The timeout property is the amount of time (in milliseconds) the client will wait for the server to respond. If the timeout is reached, a TimeoutException will be thrown. If the timeout is set to 0, the client will wait indefinitely for a response. The default value of the timeout, if not passed, is 5 seconds.

More options for the RabbitMessagePublisher can be found in the Quick Reference.

5.3 Bulk Publishing

By default, the publisher opens a new channel for each message it publishes. This is acceptable when sending individual messages, but when bulk publishing is required, it is much more efficient to open a single channel and use it for batches of messages.

As an example, during testing it took about 65 milliseconds to send one message. If 1000 messages are sent, the total time to publish is about 65 seconds! Using a single channel for the same operation takes about 1 millisecond per message, cutting the time down to 1 second for all 1000 messages.

These times are just an example based on testing, but they will differ based on network latency and server load.

While a channel may be manually created by authors using the rabbitContext, the publisher provides an easy way to send many messages with a single channel. See the example below.

rabbitMessagePublisher.withChannel { channel ->
    1000.times { i ->
        send {
            routingKey = "foobar"
            body = "Bulk message $i"
        }
    }
}

There is also a withChannel method that takes in a connection name for multi-server setups.

rabbitMessagePublisher.withChannel("connection1") { channel ->
    1000.times { i ->
        send {
            routingKey = "foobar"
            body = "Bulk message $i on connection1"
        }
    }
}

All of the send and rpc methods available with the publisher can be used with withChannel.

5.4 Publisher Confirms

The publisher allows authors to enable publisher confirms for a batch of messages using a similar mechanism as withChannel. These methods use RabbitMQ's waitForConfirms and waitForConfirmsOrDie methods. See RabbitMQ's documentation for more information about how publisher confirms work.

The reference details all of the various methods that the publisher contains to support confirms, but below are a couple of basic examples.

The withConfirms methods utilize the waitForConfirms methods from the RabbitMQ Java library. withConfirms will block until all messages sent in the provided closure have been acked or n'acked by the server. If a timeout is specified, it will only wait for the amount of time specified before throwing a TimeoutException.

rabbitMessagePublisher.withConfirms { channel ->
    send {
        routingKey = "foobar"
        body = "I am a test message"
    }

send { routingKey = "barbaz" body = "I am also a test message" } }

The withConfirmsOrDie methods utilize the waitForConfirmsOrDie methods from the RabbitMQ Java library. withConfirmsOrDie will block until all messages sent in the provided closure have been acked or n'acked by the server. The difference with this method is that if any n'ack is received, an exception is thrown. As with withConfirms, a timeout can be used.

rabbitMessagePublisher.withConfirmsOrDie { channel ->
    send {
        routingKey = "foobar"
        body = "I am a test message"
    }

send { routingKey = "barbaz" body = "I am also a test message" } }

There are versions of both of these types of methods that take a connection name and/or a timeout.

6 Message Converters

Message converters are classes that are responsible for converting objects to and from byte arrays. The plugin provides 4 built-in converters that cover basic types:
  • Integer
  • Map
  • List
  • GString
  • String

These converters allow message handlers to consume and return data without having to convert that data themselves. They are also used with the RabbitMessagePublisher class.

6.1 Custom Message Converters

The plugin provides a way for authors to create their own message converters. A custom message converter must be placed in the grails-app/rabbit-converters path, and must end with Converter.groovy.

Message converters should extend the MessageConverter abstract class. MessageConverter is a generic class, meaning when extending it, you need to pass it the object class type the message converter will be responsible for converting.

A message converter can notify the plugin just what abilities it has. Specifically, there are methods that return whether it can convert an object to or from a byte array. A message converter need not provide two-way conversion.

Message converters may also provide the plugin with a MIME-type that is typically indicative of the object type it is responsible for. It does not make sense for all object types to have a MIME-type associated with it, but this is useful to give the plugin hints if the conversion mode is set to attempt conversion only based on the content-type property a message contains.

Below is an example converter for the String object type. Custom converters should follow the same format.

import com.budjb.rabbitmq.converter.MessageConverter

class StringMessageConverter extends MessageConverter<String> { /** * Returns whether the message converter can convert a value from a String to a byte array. * * @return boolean */ @Override public boolean canConvertFrom() { return true }

/** * Returns whether the message converter can convert a value from a byte array to a String. * * @return boolean */ @Override public boolean canConvertTo() { return true }

/** * Converts a value from a byte array to a String. * * @param input Value to convert. * @return Value converted to a String, or null if the conversion failed. */ @Override public String convertTo(byte[] input) { return new String(input) }

/** * Converts a value from a String to a byte array. * * @param input Value to convert. * @return Value converted to a byte array, or null if the conversion failed.. */ @Override public byte[] convertFrom(String input) { return input.getBytes() }

/** * The MIME-type typically associated with the object type, if one exists. * * @return MIME-type typically associated with this object type, or null if one does not exist. */ @Override public String getContentType() { return 'text/plain' }

}

7 Advanced Usage

While the plugin effectively wraps the functionality of the RabbitMQ library, the end user has direct access to all of the underlying library objects and connection instances.

7.1 Spring Beans

The are several beans defined by the plugin to perform its various operations.

Bean NamePurpose
rabbitContextA front-end class to the plugin that aggregates useful functionality for users of the plugin. Besides the rabbitMessagePublisher, this is likely the only bean users will need to access, if at all.
connectionManagerManages the lifecycle of connection instances, including loading, starting, stopping, unloading, and retrieval.
messageConverterManagerHandles loading message converters and acts as the entry point when message conversion is required.
consumerManagerManages the lifecycle of consumer instances, including loading, starting, stopping, unloading, and retrieval.
queueBuilderResponsible for creating exchanges and queues defined in the application's configuration.
rabbitMessagePublisherUsed to send messages to a RabbitMQ broker.

7.2 Rabbit Context

Besides the rabbitMessagePublisher, the rabbitContext is the bean users will most likely interact with. While you may never need to use this bean, it can be useful. As with any Spring bean, the rabbitContext can be injected into any Spring managed bean, such as services, controllers, and rabbit consumers.

The rabbitContext is intended to be used as a front-end to all of the other beans defined by the plugin to hide some of the complexity of interacting with the system. As such, in most cases the rabbitContext proxies requests to the appropriate manager bean to accomplish the requested task.

In some cases, interactions with multiple managers are necessary to safely carry out the action. Therefore, unless the rabbitContext does not provide the required functionality, it should be considered best-practice to use the rabbitContext instead of the other beans directly. If you find that you are frequently resorting to using one of the other beans, I encourage you to post an issue on the GitHub project.

The follow subsections describe some of the more useful functionality the rabbitContext provides.

7.2.1 Native Objects

The main goal of the plugin is to effectively wrap the RabbitMQ library to hide the complexity of its usage and make using it more inline with the conventions of Grails applications, but also allow users to gain access to the underlying RabbitMQ objects if needed. rabbitContext provides several methods to gain direct access to the RabbitMQ Java library Connection and Channel objects.

To create a new Channel, use the createChannel methods.

// This creates a new channel with the default connection
Channel channel = rabbitContext.createChannel()

// The same using a different connection based on its name Channel channel = rabbitContext.createChannel("connection1")

If the createChannel methods are used, it is important that these channels are closed. The plugin handles opening and closing channels that it manages as part of publishing or consuming messages, but channels created with the createChannel methods are not managed by the plugin. It is the author's responsibility to close them, or connection leaks may and memory leaks most likely will occur.

Use the getConnection methods to gain access to the Connection objects.

// To retrieve the Connection from the default connection
Connection connection = rabbitContext.getConnection()

// To retrieve the Connection from a specific connection based on its name Connection connection = rabbitContext.getConnection("connection1")

7.2.2 Starting and Stopping Connections

The plugin handles starting and stopping connections automatically when the application is started or shut down, but sometimes applications may need to manually stop connections based on certain conditions or business logic. The rabbitContext contains several methods to manage the life cycle of connections.

MethodDescription
startConnectionsStarts all registered connections. If some connections are already started, the remainder will also be started.
stopConnectionsStops all connections and all consumers.
startConnectionStarts a connection based on its name.
stopConnectionStops a connection based on its name, and stops any consumers on the connection.

7.2.3 Starting and Stopping Consumers

Much like connections, the rabbitContext provides several methods to start and stop consumers if necessary.

MethodDescription
startConsumersStarts all registered consumers. If some consumers are already started, the remainder will also be started.
stopConsumersStops all consumers.
startConsumerStarts a connection based on its class name.
stopConsumerStops a connection based on its class name.
startConsumers(String)Starts all consumers on a specific connection, based on the connection name.
stopConsumers(String)Stops all consumers on a specific connection, based on the connection name.

7.2.4 Status Report

The RabbitContext contains a method getStatusReport(), which will build an object structure containing information about all RabbitMQ connections and all consumers. This information is useful to monitor the status of the RabbitMQ application system, as it contains running states and statistics including how many concurrent threads a consumer is configured for, how many are actually active, and how many are actively processing messages. Below is an example of its output gathered from one of the plugin's tests, serialized as JSON.

[
    {
        "consumers": [
            {
                "fullName": "com.budjb.rabbitmq.test.AllTopicConsumer",
                "load": 0,
                "name": "AllTopicConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "topic-queue-all",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.ReportingConsumer",
                "load": 0,
                "name": "ReportingConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "reporting",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.SleepingConsumer",
                "load": 0,
                "name": "SleepingConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "sleeping",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.SpecificTopicConsumer",
                "load": 0,
                "name": "SpecificTopicConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "topic-queue-specific",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.StringConsumer",
                "load": 0,
                "name": "StringConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "string-test",
                "runningState": "RUNNING"
            },
            {
                "fullName": "com.budjb.rabbitmq.test.SubsetTopicConsumer",
                "load": 0,
                "name": "SubsetTopicConsumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "topic-queue-subset",
                "runningState": "RUNNING"
            }
        ],
        "host": "localhost",
        "name": "connection1",
        "port": 5672,
        "runningState": "RUNNING",
        "virtualHost": "test1.rabbitmq.budjb.com"
    },
    {
        "consumers": [
            {
                "fullName": "com.budjb.rabbitmq.test.Connection2Consumer",
                "load": 0,
                "name": "Connection2Consumer",
                "numConfigured": 1,
                "numConsuming": 1,
                "numProcessing": 0,
                "queue": "connection2-queue",
                "runningState": "RUNNING"
            }
        ],
        "host": "localhost",
        "name": "connection2",
        "port": 5672,
        "runningState": "RUNNING",
        "virtualHost": "test2.rabbitmq.budjb.com"
    }
]

7.3 Transactions

The plugin provides a bit of automation around channel transactions. When a consumer is defined with the transacted property set to true, a transaction is automatically started on the channel passed to the message handler. When the message handler completes successfully, the transaction is automatically committed. If an unhandled exception is thrown from the message handler, the transaction is automatically rolled back.

It is especially important that any messages published from a message handler use the Channel instance passed in the MessageContext for this functionality to work.

Since the Channel is passed in the MessageContext, the author has full control over committing and rolling back transactions.

8 Upgrading

When an update to the plugin is made that has incompatibility considerations, this document will help identify the various impact considerations when upgrading to a new version.

8.1 From 3.0.X

The upgrade from 3.0.X to 3.1.X includes a refactor of how the consumer and connection managers work. The storage of consumer handlers was moved from the connection classes into the consumer manager in order to support the new start/stop functionality, and it made sense for consumer handlers to reside with the appropriately named manager.

Both of the managers also now have underlying interfaces that define how a manager should behave. In addition, the objects being managed are now called Contexts, which also have interfaces that define their general behavior. It is the intention that with these changes the plugin becomes more stable with less backwards- compatibility breaking changes being introduced moving forward.

Here are the items authors may need to address when upgrading to this version of the plugin.

  • The rabbitContext.getConnection() methods now return the RabbitMQ Connection object instead of the ConnectionContext instance. This was done to make the methods more consistent with the createChannel methods and to encapsulate the ConnectionContext objects inside the manager. The ConnectionContext objects can now be retrieved from the consumerManager bean.
  • Many of the methods inside of the ConsumerManager and ConnectionManager have been renamed to adhere to a common interface. This should only affect projects that use these beans directly.
  • Some of the methods inside of the RabbitContext have been renamed to match the interface used by the various managers.

Overall, users of the plugin should see no impact if the rabbitContext or any of the other beans are not used in their projects.

If the use of the plugin's beans is limited to the rabbitContext, the impact should be minimal, with some minor changes need to method names and some refactored code if retrieving a ConnectionContext. Users that use the other manager beans will need to account for the changed interface implemented by those beans.

8.2 From 2.0.X

The upgrade from any version in the 2.0.X range to 3.0.X includes a massive refactoring of the internals of the plugin. If users did not extend or override the RabbitContext, RabbitMessageBuilder, or any of the other helper classes, the amount of impact is limited to a couple package name changes.

Below are the changes that were made, at a high level:

  • RabbitMessageBuilder is deprecated. The class still exists and its interface is the same, however, the code in the class has been ripped out and now proxies requests to the rabbitMessagePublisher bean. The builder will be removed at some point in the near future.
  • RabbitContext used to contain a significant amount of code related to management of message converters, consumers, and connections. That functionality has been broken out into their own respective classes. The RabbitContext now serves only as a class to aggregate functionality useful to users of the plugin, and should still be used to simplify interfacing with the plugin rather than using the underlying beans directly.
  • Introduced the rabbitMessagePublisher as a replacement for the RabbitMessageBuilder. This bean can be injected into other spring managed beans, such as services, controllers, and rabbit consumers. Its functionality follows the builder closely, and users may send messages based on configurations made through closures or through the new RabbitMessageProperties object. The short-hand convenience methods are still available as well.
  • Introduced the messageConverterManager bean to handle all operations pertaining to message converters.
  • Introduced the consumerManager bean to handle all operations pertaining to message consumers.
  • Introduced the connectionManager bean to handle all operations pertaining to RabbitMQ connections and channels.
  • Introduced the queueBuilder bean to handle creating configured exchanges and queues.
  • The MessageContext class has been moved into a new package: com.budjb.rabbitmq.consumer.
  • The AutoAck enum has been moved into a new package: com.budjb.rabbitmq.consumer.
  • The ConsumerConfiguration class has been moved into a new package: com.budjb.rabbitmq.consumer.
  • The MessageConvertMethod class has been moved into a new package: com.budjb.rabbitmq.consumer.
  • The ConnectionContext class has been moved into a new package: com.budjb.rabbitmq.connection.
  • The ConnectionConfiguration class has been moved into a new package: com.budjb.rabbitmq.connection.
  • The MessageConverter abstract class has been moved into a new package: com.budjb.rabbitmq.converter.
  • All of the bundles message converters have been moved into a new package: com.budjb.rabbitmq.converter.
  • The RabbitMessageBuilder class has been moved into a new package: com.budjb.rabbitmq.publisher.
  • A large amount of unit tests and some integration tests have been added to the project. These tests rely on the Spock mocking framework, but the test files and dependencies are not exported with the plugin so that a new plugin dependency on Spock is not created.
  • The closures used for publishing messages have had their resolving strategy changed from OWNER_FIRST to DELEGATE_FIRST. This should not have much of an impact, but in some cases closures may need to explicity qualify some properties in the closures with delegate.

9 Changelog

Version 3.1.3 - 4/20/2016

  • Add status reports that provides information about all connections and their consumers, including load statistics.
  • Upgrade project to Grails 2.5.4.
  • Remove gpars dependency.
  • Make Grails 2.3 the minimum version of Grails this plugin is intended for.
  • Fix consumer configuration from application config parsing issue (#73).
  • Refactor message conversion for incoming messages so that conversion to a type only happens if an appropriate handler exists.

Version 3.1.2 - 8/6/2015

  • Added graceful shutdown support. See rabbitContext.shutdown().
  • Added methods to check running state on most managers and contexts.
  • Updated rabbitmq Java library to 3.5.4.
  • Added the gpars plugin as a dependency.

Version 3.1.1 - 5/13/2015

  • Refactored the code to load a consumer's configuration from a static variable so that it works correctly when the consumer is annotated with @Transactional. (#55)
  • Add setter methods for the message TTL (expiration). (#56)
  • Fix bug where missing connection configuration values do not allow the use of default values.
  • Remove checked exception from ConsumerManageImpl that does not exist in its interface. (#59)

Version 3.1.0 - 3/13/2015

  • Update the RabbitMQ Client Java library to 3.5.0.
  • Fix an issue that caused unclean shutdowns when redeploying an application using the plugin. (#54)
  • Added the ability to start and stop individual connections. (#49)
  • Added the ability to start and stop individual consumers. (#49)
  • Added the ability to start and stop consumers based on the connection they're tied to. (#49)
  • Moved consumer adapter storage from the connection context to the consumer manager.
  • Handle Throwable types that were not being handled before in the consumer handling so that channels are not closed if one of the unhandled errors occurs. (#47)
  • Added travis-ci continuous integration for all commits to the plugin.

Version 3.0.4 - 2/18/2015

  • Fix a null pointer exception when a consumer has no configuration.
  • Add a unit test to test behavior when a consumer has no configuration.
  • Add an integration test to test behavior when sending a message directly to a queue.

Version 3.0.3 - 2/15/2015

  • Introduced the rabbitMessagePublisher bean to replace the RabbitMessageBuilder.
  • Deprecated the RabbitMessageBuilder.
  • Massive refactor of the internals of the plugin. See the upgrading page for more detailed information about what has changed.
  • Added the ability to configure consumers centrally in the application's configuration file (thanks Erwan Arzur).
  • Updated RabbitMQ library version to 3.4.3.

Version 3.0.2

  • Internal release, see 3.0.3.

Version 3.0.1

  • Internal release, see 3.0.3.

Version 3.0.0

  • Internal Release, see 3.0.3.

Version 2.0.10 - 9/11/2014

  • Fix bug with converters that prevented converters later in the processing list from executing if another convert is unable to marshall data from bytes.
  • Add enabled flag to the configuration. If false, completely disables the plugin from starting.

Version 2.0.9 - 9/9/2014

  • Additional fix for memory leak associated with RPC calls and auto-recovering connections.

Version 2.0.8 - 9/8/2014

  • Fix bug introduced by rushing the previous fix. Mark consuming = true.

Version 2.0.7 - 9/8/2014

  • Add basicCancel() to RabbitMessageBuilder in an attempt to address a memory leak.
  • Improve cleaning up of resources in RPC calls.

Version 2.0.6 - 8/13/2014

  • Updated copyright notices.
  • Added GString message converter.
  • Updated publishing guide docs to make RabbitMessageBuilder usage more clear (thanks marcDeSantis @GitHub).

Version 2.0.5 - 8/01/2014

  • Added heartbeat configuration for connections (thanks LuisMuniz @GitHub).
  • Refactored Hibernate session support so that Hibernate is no longer a dependency of the plugin, and will now work with or without Hibernate present.

Version 2.0.4 - 7/28/2014

  • Added multi-server support to all aspects of the plugin.
  • Added SSL support for connections.
  • Added auto-reconnect support for dropped connections.
  • Added logic to wrap a Hibernate session around calls to consumers.
  • Updated the RabbitMQ library to version 3.3.0.
  • Added logging for connection/channel reconnects and channel shutdowns.
  • Changed format for connection configurations. The old style is still supported, but will likely be removed at some point.

Version 1.0.3 - 1/7/2014

  • Modified the logic to check for the existence of callbacks in consumers.

Version 1.0.2 - 1/6/2014

  • Added a cached thread pool so the user does not need to account for the number of threads consumers require. Set the default to 0 so that this is the default.
  • Added callbacks for messages: onReceive, onSuccess, onFailure, and onComplete.

Version 1.0.1 - 11/28/2013

  • Remove the maven group from the plugin definition class.

Version 1.0.0 - 11/27/2013

  • Version bump for general release.

Version 0.2.1 - 11/27/2013

  • Fixed a bug with the message handler discovery method that caused generically-typed handlers to get called incorrectly.

Version 0.2.0 - 11/27/2013

  • Refactored queue/exchange configuration. It is now possible configure queue binding without having to also configure the exchange being bound to.
  • Added the match property to queue configuration to support headers exchange binding. This breaks backwards compatibility.
  • Renaming the routingKey property of the consumer configuration to binding to match queue configuration. This breaks backwards compatibility.

Version 0.1.8 - 10/31/2013

  • Moved the trigger to start consumers on application launch to the bootstrap.

Version 0.1.7 - 10/30/2013

  • Added the prefetchCount option to the consumer configuration. Defaults to 1.
  • Added the threads option to the connection configuration. Defaults to 5.

Version 0.1.6 - 10/29/2013

  • Fixed logic to determine if a consumer is valid.
  • Added support for short-form handlers that only take a single parameter.

Version 0.1.5 - 10/28/2013

  • body parameter to the RabbitMessageBuilder is no longer required. It now defaults to an empty byte array.

Version 0.1.4 - 10/28/2013

  • Fix a class visibility issue in the artefact handlers for this plugin.

Version 0.1.3 - 10/23/2013

  • Touch up the consumer template.

Version 0.1.2 - 10/22/2013

  • Add the ability to create multiple consumers at the same time with the create-consumer script (thanks Aaron Brown!).
  • Also create a unit test when creating consumers (thanks Michael Rice!).

Version 0.1.1 - 10/22/2013

  • Throw an exception if the connection configuration is missing on application start (thanks Michael Rice!).
  • Add the create-consumer script (thanks Aaron Brown!).

Version 0.1 - 10/17/2013

  • Code complete/experimental release.