(Quick Reference)
Rabbitmq Native Plugin - Reference Documentation
Authors: Bud Byrd
Version: 3.1.3
1 Introduction
The RabbitMQ Native plugin for Grails provides RabbitMQ integration using the Java libraries provided by the RabbitMQ project.
It provides easy configuration of exchanges and queues via a project's configuration, simple configuration of consumers, and
a convenience class to easily publish messages.
This plugin strives to provide an easy convention to follow to quickly begin consuming messages, but still provide easy
access to the underlying layer to allow users of this plugin to do whatever they need to do outside of those conventions.
It also provides a powerful message body converter system that allows the user to quickly write and plug in converters for
custom data types.
This guide details the configuration of the plugin, and may not fully explain the use and conventions of the RabbitMQ services
or its Java library. More information can be found in their
Java API guide and
JavaDoc.
2 Quick Start
This is a quick and dirty how-to detailing how to quickly begin using the plugin. While there is a lot of configurability offered by this plugin, this is a very basic demonstration of its usage.
Create The Application
Create a project named
RabbitExample. You can do this by entering:
grails create-app RabbitExample
Add The Plugin
In
grails-app/conf/BuildConfig.groovy
, under the
plugins
section, add:
plugins {
// ... compile name: "rabbitmq-native", version: "latest.release" // …
}
Then, refresh the project's dependencies:
grails refresh-dependencies
Configuring
In
grails-app/conf/Config.groovy
, add:
rabbitmq {
connection = {
connection host: "changeme", username: "changeme", password: "changeme"
}
queues = {
queue name: "testqueue"
}
}
Be sure to replace the appropriate values for your RabbitMQ server and user credentials.
Consumer
Create the a consumer by using the following command:
grails create-consumer com.example.Test
Update the consumer to reflect the following:
package com.exampleimport com.budjb.rabbitmq.consumer.MessageContextclass TestConsumer {
/**
* Consumer configuration.
*/
static rabbitConfig = [
queue: "testqueue"
] /**
* Handle an incoming RabbitMQ message.
*
* @param body The converted body of the incoming message.
* @param context Properties of the incoming message.
* @return
*/
def handleMessage(def body, MessageContext context) {
println body
return "Hello to you, too!"
}
}
Controller
Create a controller by using the following command:
grails create-controller com.example.Test
Update the controller to reflect the following:
package com.exampleimport com.budjb.rabbitmq.publisher.RabbitMessagePublisherclass TestController {
RabbitMessagePublisher rabbitMessagePublisher def index() {
render rabbitMessagePublisher.rpc {
routingKey = "testqueue"
body = "Hello!"
}
}
}
Run it!
Run the grails application.
You can see the application in action by hitting the test controller. If you're running this on your localhost, your URL may be similar to
http://localhost:8080/RabbitExample/test/index
.
You should see the message "Hello!" printed the application's output console, and your web browser should dispay the message "Hello to you, too!"
3 Configuration
Configuration of the connection to the RabbitMQ server is done in your project's
grails-app/conf/Config.groovy
file.
Below is the list of general configuration properties.
Configuration Property | Required | Description | Type | Default |
---|
autoStart | | If true, will start any consumers during the bootstrap phase of the application startup. | Boolean | true |
enabled | | If false, will register the plugin's beans, but prevent the plugin connecting to the RabbitMQ broker(s) and consuming messages. | String | true |
If autoStart
is set to false, all of the connections to the broker will be initiated, but the message consumers will not
automatically start consuming messages. To start consuming, use the rabbitContext
bean and call rabbitContext.startConsumers()
.
3.1 Server Connection
The plugin expects the connection configuration to the RabbitMQ server to be defined. A bare minimum configuration example looks like:
rabbitmq {
connection = {
connection host: "example.com", username: "foo", password: "bar"
}
}
Connections to many different RabbitMQ servers can be configured. A multi-server configuration looks like:
rabbitmq {
connection = {
connection host: "rabbit1.example.com", username: "foo", password: "bar"
connection host: "rabbit2.example.com", username: "foo", password: "bar"
}
}
The following table enumerates all the configuration options available to the connection configuration:
Configuration Property | Required | Description | Type | Default |
---|
host |  | Hostname or IP address of the RabbitMQ server to connect to. | String | none |
username |  | Username to log into the RabbitMQ server with. | String | none |
password |  | Password to log into the RabbitMQ server with. | String | none |
name | | Name of the connection. This is used while sending messages to a specific RabbitMQ server when using multiple servers. | String | none |
isDefault | | A connection with this set to true will be the server messages are sent to if no specific connection is specified when sending the message. | booleang | false |
port | | Port to connect to the RabbitMQ server with. | Integer | 5672 |
virtualHost | | Name of the virtual host to connect to the RabbitMQ server with. | String | none |
ssl | | Whether to use SSL when connecting to a RabbitMQ server. | boolean | false |
threads | | Threadpool size, if greater than 0, determines how many concurrent messages can be processed at any given time. If set to 0, consumers can consume as many messages as it's configured to. | String | 0 |
automaticReconnect | | If true, will cause the application to automatically reconnect to a server when its connection is dropped. | boolean | true |
requestedHeartbeat | | Heartbeat interval, in seconds. A value of 0 disables heartbeats. | Integer | 0 |
3.2 Defining Queues
The plugin allows authors to define the exchanges and queues programatically inside the configuration. This allows the application to configure its own queues without someone
having to manually create the exchanges and queues prior to running the application.
Queue configuration is also done in the
Config.groovy
file under the rabbitmq block, much as the server connection is configured. Usage is best illustrated with an example:
rabbitmq {
queues = {
queue name: "example.queue", durable: true, exchange: "example.exchange"
}
}
The above code will define a queue named
example.queue, and its durable flag will be set.
Be sure to note that the queues property is a closure. You must ensure that the =
is present for this feature to function properly.
If using the multi-server feature of the plugin, it is important to consider what server the queue should be defined in. The
connection
property
specifies which server to create the queue in, illustrated below:
rabbitmq {
queues = {
// Assume there is a connection with the name "server1"…
queue name: "example.queue", connection: "server1", durable: true
}
}
If there are many queues to be defined for a specific server connection, the connection method provides a convenient way to bunch queue creation.
rabbitmq {
queues = {
// Assume there is a connection with the name "server1"…
connection name: "server1", {
queue name: "example.queue", durable: true
queue name: "example2.queue", durable: true
}
}
}
Below is a table of all of the options available when defining queues:
Property | Required | Description | Type | Default |
---|
arguments | | Extra arguments used to create the queue. See the RabbitMQ documentation for more information. | Map | none |
autoDelete | | Whether to automatically delete the queue once there are no more consumers listening to it. | boolean | false |
binding | | Used in conjunction with exchanges. See the section below for more information. | Mixed | none |
durable | | Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts. | boolean | false |
exchange | | Binds a queue to an exchange in conjunction with the binding property. Ignored if used inside an exchange declaration. | String | none |
match | | Required when binding to a headers exchange. Either "any" or "all". | String | none |
name |  | Name of the queue. | String | none |
connection | | Name of the connection to create the queue with. Uses the default connection if omitted. | String | none |
3.3 Defining Exchanges
Defining exchanges is very similar to defining queues. The following code illustrates how to define an exchange:
rabbitmq {
queues = {
exchange name: "example.exchange", type: "topic"
}
}
The above example will create an exchange with the name
example.exchange and of the type
topic. Below is a list of all the options availble when creating exchanges:
Property | Required | Description | Type | Default |
---|
arguments | | Extra arguments used to create the exchange. See the RabbitMQ documentation for more information. | Map | none |
autoDelete | | Whether to automatically delete the exchange once there are no longer any queues bound to it. | boolean | false |
durable | | Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts. | boolean | false |
name |  | Name of the exchange. | String | none |
type |  | One of "fanout", "topic", "direct", or "headers". | String | none |
connection | | Name of the connection to create the exchange with. Uses the default connection if omitted. | String | none |
The same connection considerations exist for exchanges as with queues. The same connection
method works for exchanges as well.
3.4 Binding Queues and Exchanges
Queues can be bound to an exchange by setting the
exchange
property when declaring the queue to the name of the exchange to
bind to. This is the preferred method if the application being configured is not responsible for defining and configuring
the exchange being bound to.
Queues can also be bound to an exchange by declaring the queues inside of a closure passed as the last parameter of an
exchange definition. This is a convenient method to do queue binding when your application is responsible for defining
and configuring the exchange.
rabbitmq {
queues = {
exchange name: "example.exchange", type: "topic", {
queue name: "example.queue", binding: "sample.binding.#"
}
}
}
This example will create a
topic exchange named
example.exchange, as well as create a queue named
example.queue. The queue will be bound to the exchange with the topic or
routing key of "sample.binding.#".
Queues need to have their binding defined specifically for the type of exchange they are bound to.
Fanout Exchanges
Fanout exchanges are the easiest to configure bindings for, since they require none. Fanout exchanges simply send every message it received to every queue bound to it.
rabbitmq {
queues = {
exchange name: "example.exchange", type: "fanout", {
queue name: "example.queue"
}
}
}
Topic Exchanges
Topic exchanges require queues to define a topic. Topics can be an exact match, but their strength is in their partial matching ability. See the
RabbitMQ documentation
for details about this kind of exchange.
rabbitmq {
queues = {
exchange name: "example.exchange", type: "topic", {
queue name: "example.queue", binding: "exmaple.binding.#"
}
}
}
Direct Exchanges
Direct exchanges are similar to topic exchanges, except that their "topics" only function with direct name matching. The appropriate name for the binding in this case is "routing key". Queues must define a
routing key when binding to this type of exchange.
rabbitmq {
queues = {
exchange name: "example.exchange", type: "direct", {
queue name: "example.queue", binding: "exmapleRoutingKey"
}
}
}
Header Exchanges
Header exchanges are like topic exchanges, but with the ability to define multiple match keywords. The binding for queues allows the queue to match on all or one of multiple header values. The queue must also
set the
match
property for this exchange type, and the value must be one of "any" or "all".
rabbitmq {
queues = {
exchange name: "example.exchange", type: "headers", {
queue name: "example.queue", match: "any", binding: [
"header1": "header-value-1",
"header2": "header-value-2"
]
}
}
}
4 Consuming Messages
Consuming messages is accomplished by creating a message consumer class. Consumer classes are placed into the
grails-app/rabbit-consumers
, and must end with
Consumer.groovy
. These classes are Spring beans, meaning
that they can inject other Spring beans such as services, the
grailsApplication
instance, etc. Each consumer
class must be configured in a certain way. If there is a misconfiguration in a consumer, the plugin will log
an error stating so and not register the consumer to any queues.
4.1 Message Handlers
One of the requirements for a consumer to be registered to the RabbitMQ server is that a message handler be declared
in the consumer class. The message handler is the mechanism by which messages are consumed.
4.1.1 Basic Usage
In its most basic form, a message handler method takes in the body of the received message, and a
MessageContext
object that contains the message parameters received from the RabbitMQ server, along with the consumer's channel
that the handler should publish messages through.
This is the most generic form of a message handler:
package com.exampleimport com.budjb.rabbitmq.consumer.MessageContextclass ExampleConsumer {
// ... def handleMessage(def body, MessageContext context) {
// Do work
}
}
4.1.2 Typed Message Handlers
The logic surrounding the message consumer classes will by default attempt to intelligently convert the body of
the received message from a byte array to a converted type, such as a String. Before routing the message to the
consumer and handler, the plugin will run through a list of
converters that will attempt to convert the message,
and if the conversion was successful, determine if an appropriate handler has been defined.
For example, consider this JSON blob:
{"foo":"bar","hi":"there"}
If the above message is received, the converter for the Map class type will convert the byte array to a Map
of the JSON data. If a valid handler for the Map type is defined, the handler will receive the converted JSON
so that the handler does not need to handle the conversion.
The following handlers would accept the converted Map:
package com.exampleimport com.budjb.rabbitmq.consumer.MessageContextclass ExampleConsumer {
// ... def handleMessage(Map body, MessageContext context) {
// Do work
} def handleMessage(def body, MessageContext context) {
// Since def is a generic type (Object)
}
}
If a converter was successfully able to convert the message body, but no handler was defined to handle the
class type, other converters will get a chance to convert the message body. In the above example, if only
a handler for the String type was defined, the handler will receive the JSON blob as a String.If no converter is able to convert the message body, the plugin will fall back to passing the handler the raw
byte array received from the RabbitMQ server.
If no handler is defined that can handle the received message (including the raw byte array), an error will
be logged and the message will be rejected.
The plugin has built-in
converters for
Integer
,
Map
,
List
,
GString
, and
String
types. The plugin allows users
to define their own converters to convert other object types, which will be discussed later in this guide.
4.1.3 Short-Form Usage
In addition to the 2-parameter handler method signature, there are 2 shortcut versions available for use. One form only takes the converted message body, and the other only
takes the
MessageContext
object. The order of preference when multiple valid handlers are defined is:
- Long form (body, context)
- Short form (body)
MessageContext
form
The short-form handlers are shown below:
package com.exampleimport com.budjb.rabbitmq.consumer.MessageContextclass ExampleConsumer {
// ... def handleMessage(String message) {
// Do work
} def handleMessage(MessageContext context) {
// Do work
}
}
The MessageContext
-only handler will only be called if there is no other handler defined that can possibly handle any conversion of the message body.
4.1.4 MessageContext Object
The message context is just an object that encapsulates the data relevant to the received message. Below
is a list of properties of the class.
Property | Description |
---|
body | Incoming message in its raw byte format. |
channel | The RabbitMQ channel the message handler should use to publish messages. This is especially important when using transactions. |
consumerTag | Consumer tag |
envelope | Properties of the message's delivery (see RabbitMQ's documentation) |
properties | Properties of the message (see RabbitMQ's documentation) |
4.1.5 RPC-Style Messages
When a client publishes a message and waits for a return reply, this is considered an RPC-style operation. Typically, the server-side of the operation (in this case, a message consumer/handler) must
respond to the client on a queue that the client requested manually. This plugin provides a very convenient method to respond to the client without having to manually construct a response message.
The client will provide a response queue to the server to reply to. This queue name is stored in the
MessageContext.properties.replyTo
variable. If a client publishes
this variable, and the handler returns some data, the plugin will convert the data returned from the message handler and build a response message for you.
The following example illustrates responding via a handler's returned data.
class RpcExampleConsumer {
def handleMessage(String message, MessageContext messageContext) {
println "received ${message}" return "response" // this message will be sent back to the consumer via its replyTo queue
}
}
Alternatively, the
rabbitMessagePublisher
can be used to respond.
class RpcExampleConsumer {
def rabbitMessagePublisher def handleMessage(String message, MessageContext messageContext) {
println "received ${message}" rabbitMessagePublisher.send {
routingKey: messageContext.properties.replyTo
body: "response"
}
}
}
Allowing the plugin to build a reply message only converts the data returned from the message handler and publishes it to the reply queue. If you need to set any of the other message properties,
like headers, content-types, etc, you must manually build the response message using the rabbitMessagePublisher
, and refrain from returning data from the message handler.
4.2 Subscribing To Queues
Message consumers can subscribe to either queues or exchanges. When a consumer is registered to a queue, the consumer will receive messages from the queue as
the RabbitMQ server determines that it's the consumer's turn to receive a message, since there may be multiple listeners on the same queue.
Each message consumer class must have a configuration defined. There are 2 methods to specify the configuration:
- The consumer class can have a
Map
assigned to a static variable named rabbitConfig
. To subscribe to queues, the only required configuration option is the queue
variable, which is the name of the queue to subscribe to.
- The application's configuration file can contain the configuration with the path
rabbitmq.consumers.<ClassName>
. See the section below for more details.
Here is a simple example of a consumer subscribing to a queue.
package com.exampleimport com.budjb.rabbitmq.consumer.MessageContextclass ExampleConsumer {
static rabbitConfig = [
"queue": "test.queue"
] def handleMessage(def body, MessageContext context) {
// Process message
}
}
There are many options available to influence how the consumer works, which can be found in the reference.
4.3 Subscribing To Exchanges
Subscribing to a exchanges is different from subscribing to queues, as there are different types of exchanges with different behavior.
RabbitMQ's libraries do not provide a direct way to subscribe to an exchange, however the plugin provides a way to subscribe to exchanges directly by creating
a temporary queue that is bound to an exchange. The binding requirements differ between the different types of exchanges.
Using the functionality to subscribe to an exchange works by creating a temporary queue at runtime that is bound to the exchange and consumed from by
the message consumer. It is important to note that these queues are randomly named and are exclusive and auto delete, so this feature is not
suitable if messages that are left on this queue should persist if the application shuts down.
4.3.1 Fanout Exchanges
A fanout exchange will forward a received message to every queue bound to it. There is no binding criteria for this kind of exchange.
This is the simplest type of exchange, and is also the easiest to configure.
Fanout Example
package com.exampleclass ExampleConsumer {
static rabbitConfig = [
"exchange": "fanout.exchange"
] def handleMessage(def body, MessageContext context) {
// Process message
}
}
4.3.2 Topic Exchanges
A topic exchange will forward messages to queues based on the binding criteria the queue used to register to the exchange. In RabbitMQ terms,
this is called a routing key. The routing key can be either a direct match, or utilize wildcards to do a partial topic match. If a routing key
is omitted, the queue will receive no messages. Use
"#"
to receive all messages from an exchange.
More information can be found in the RabbitMQ documentation.
Topic Example
package com.exampleclass ExampleConsumer {
static rabbitConfig = [
"exchange": "topic.exchange",
"binding": "foo.bar.#"
] def handleMessage(def body, MessageContext context) {
// Process message
}
}
4.3.3 Direct Exchanges
A direct exchange will forward messages to queues based on binding criteria configured similarly to topic exchanges.
The difference in this case is that direct routing does not utilize wildcards in their routing keys.
Direct Example
package com.exampleclass ExampleConsumer {
static rabbitConfig = [
"exchange": "direct.exchange",
"binding": "example"
] def handleMessage(def body, MessageContext context) {
// Process message
}
}
Header exchanges work similarly to topic exchanges. A headers exchange will forward messages to queues based on header values contained
in messages. Additionally, a queue can be bound on multiple header values, along with an option to require one or all of the headers
to match.
Headers Example
package com.exampleclass ExampleConsumer {
static rabbitConfig = [
"exchange": "headers.exchange",
"binding": [
"foo": "bar",
"hi": "there"
],
"match": "any"
] def handleMessage(def body, MessageContext context) {
// Process message
}
}
4.4 Multi-Server
When using a multi-server setup, it is important to consider what server a consumer should listen on. Use the
connection
property in the
rabbitConfig
to specify which server the consumer should be bound to. If the
connection
is omitted, the consumer will be bound to the default connection.
package com.exampleclass ExampleConsumer {
static rabbitConfig = [
"queue": "test.queue",
"connection": "server1" // Where "server1" is a connection configured with that name.
] def handleMessage(def body, MessageContext context) {
// Do work
}
}
4.5 Central Configuration
It is also possible to define the consumer's configuration outside of the consumer in the Grails
Config.groovy
file. All of the configuration
options described above are valid for this type of configuration. This functionality is valuable when a consumer's configuration needs to
be determined at runtime, instead of being hardcoded in the consumer class itself.
To configuration a consumer in the Grails application's configuration file, an entry matching the consumer's class name should be present
under the
rabbitmq.consumers
key, as follows:
// …
rabbitmq {
consumers {
ExampleConsumer {
queue = "test.queue"
}
}
}
5 Publishing Messages
Publishing messages through the plugin is achieved by using the
rabbitMessagePublisher
bean. This Spring bean utilizes a closure-based configuration method to
both send messages without waiting for a response, and sending rpc-style messages. There are many options available to the
rabbitMessagePublisher
which are
documented in the reference, but this guide will only demonstrate basic usage.
In a multi-server setup, it is important to consider what server to send a message to. Like configuration and consumers, the connection
property
is used to route the message to the proper server connection.
5.1 Sending Messages
Sending a message means publishing a message to an exchange or queue and not waiting for a response (fire and forget). The only required parameters to the publisher
are a queue or exchange to publish the message to, and the body of the message.
Send Example
import com.budjb.rabbitmq.publisher.RabbitMessagePublisherclass ExampleService {
RabbitMessagePublisher rabbitMessagePublisher def sendSomeMessage() {
rabbitMessagePublisher.send {
exchange = "some.exchange"
routingKey = "some.routingKey"
body = "hi!"
}
}
}
RabbitMQ expects the body of a message to be a byte array. Message converters will also work when publishing messages, so if an object type other than
byte
is
encountered, a suitable message converter will be found and run against the message body, if one exists.
5.2 RPC Messages
Publishing an RPC message is as easy as sending messages, except the returned message is returned from the function.
RPC Example
import com.budjb.rabbitmq.publisher.RabbitMessagePublisherclass ExampleService {
RabbitMessagePublisher rabbitMessagePublisher def sendSomeMessage() {
def result = rabbitMessagePublisher.rpc {
exchange = "some.exchange"
routingKey = "some.routingKey"
body = "hi!"
timeout = 5000
}
}
}
The timeout option is especially important for RPC-style messages. The timeout property is the amount of time (in milliseconds)
the client will wait for the server to respond. If the timeout is reached, a TimeoutException
will be thrown. If the timeout is set to 0, the client
will wait indefinitely for a response. The default value of the timeout, if not passed, is 5 seconds.
More options for the RabbitMessagePublisher
can be found in the Quick Reference.
5.3 Bulk Publishing
By default, the publisher opens a new channel for each message it publishes. This is acceptable when sending
individual messages, but when bulk publishing is required, it is much more efficient to open a single channel
and use it for batches of messages.
As an example, during testing it took about 65 milliseconds to send one message. If 1000 messages are sent,
the total time to publish is about 65 seconds! Using a single channel for the same operation takes about
1 millisecond per message, cutting the time down to 1 second for all 1000 messages.These times are just an example based on testing, but they will differ based on network latency and server load.
While a channel may be manually created by authors using the
rabbitContext
, the publisher provides an easy way
to send many messages with a single channel. See the example below.
rabbitMessagePublisher.withChannel { channel ->
1000.times { i ->
send {
routingKey = "foobar"
body = "Bulk message $i"
}
}
}
There is also a
withChannel
method that takes in a connection name for multi-server setups.
rabbitMessagePublisher.withChannel("connection1") { channel ->
1000.times { i ->
send {
routingKey = "foobar"
body = "Bulk message $i on connection1"
}
}
}
All of the
send
and
rpc
methods available with the publisher can be used with
withChannel
.
5.4 Publisher Confirms
The publisher allows authors to enable publisher confirms for a batch of messages using a similar mechanism as
withChannel
. These methods use RabbitMQ's
waitForConfirms
and
waitForConfirmsOrDie
methods. See
RabbitMQ's documentation for more information about how publisher confirms work.
The reference details all of the various methods that the publisher contains to support confirms, but
below are a couple of basic examples.
The
withConfirms
methods utilize the
waitForConfirms
methods from the RabbitMQ Java library.
withConfirms
will
block until all messages sent in the provided closure have been acked or n'acked by the server. If a timeout
is specified, it will only wait for the amount of time specified before throwing a
TimeoutException
.
rabbitMessagePublisher.withConfirms { channel ->
send {
routingKey = "foobar"
body = "I am a test message"
} send {
routingKey = "barbaz"
body = "I am also a test message"
}
}
The
withConfirmsOrDie
methods utilize the
waitForConfirmsOrDie
methods from the RabbitMQ Java library.
withConfirmsOrDie
will block until all messages sent in the provided closure have been acked or n'acked by the
server. The difference with this method is that if any n'ack is received, an exception is thrown.
As with
withConfirms
, a timeout can be used.
rabbitMessagePublisher.withConfirmsOrDie { channel ->
send {
routingKey = "foobar"
body = "I am a test message"
} send {
routingKey = "barbaz"
body = "I am also a test message"
}
}
There are versions of both of these types of methods that take a connection name and/or a timeout.
6 Message Converters
Message converters are classes that are responsible for converting objects to and from byte arrays. The plugin provides 4 built-in converters that cover basic types:
Integer
Map
List
GString
String
These converters allow message handlers to consume and return data without having to convert that data themselves. They are also used with the
RabbitMessagePublisher
class.
6.1 Custom Message Converters
The plugin provides a way for authors to create their own message converters. A custom message converter must be placed in the
grails-app/rabbit-converters
path, and must end with
Converter.groovy
.
Message converters should extend the
MessageConverter
abstract class.
MessageConverter
is a generic class, meaning when extending it, you need to pass it the object class type the message
converter will be responsible for converting.
A message converter can notify the plugin just what abilities it has. Specifically, there are methods that return whether it can convert an object to or from a byte array. A message converter need
not provide two-way conversion.
Message converters may also provide the plugin with a MIME-type that is typically indicative of the object type it is responsible for. It does not make sense for all object types to have a MIME-type
associated with it, but this is useful to give the plugin hints if the conversion mode is set to attempt conversion only based on the content-type property a message contains.
Below is an example converter for the String object type. Custom converters should follow the same format.
import com.budjb.rabbitmq.converter.MessageConverterclass StringMessageConverter extends MessageConverter<String> {
/**
* Returns whether the message converter can convert a value from a String to a byte array.
*
* @return boolean
*/
@Override
public boolean canConvertFrom() {
return true
} /**
* Returns whether the message converter can convert a value from a byte array to a String.
*
* @return boolean
*/
@Override
public boolean canConvertTo() {
return true
} /**
* Converts a value from a byte array to a String.
*
* @param input Value to convert.
* @return Value converted to a String, or null if the conversion failed.
*/
@Override
public String convertTo(byte[] input) {
return new String(input)
} /**
* Converts a value from a String to a byte array.
*
* @param input Value to convert.
* @return Value converted to a byte array, or null if the conversion failed..
*/
@Override
public byte[] convertFrom(String input) {
return input.getBytes()
} /**
* The MIME-type typically associated with the object type, if one exists.
*
* @return MIME-type typically associated with this object type, or null if one does not exist.
*/
@Override
public String getContentType() {
return 'text/plain'
}}
7 Advanced Usage
While the plugin effectively wraps the functionality of the RabbitMQ library, the end user has direct access to all of the underlying library objects and connection instances.
7.1 Spring Beans
The are several beans defined by the plugin to perform its various operations.
Bean Name | Purpose |
---|
rabbitContext | A front-end class to the plugin that aggregates useful functionality for users of the
plugin. Besides the rabbitMessagePublisher , this is likely the only bean users will
need to access, if at all. |
connectionManager | Manages the lifecycle of connection instances, including loading, starting, stopping,
unloading, and retrieval. |
messageConverterManager | Handles loading message converters and acts as the entry point when message conversion
is required. |
consumerManager | Manages the lifecycle of consumer instances, including loading, starting, stopping,
unloading, and retrieval. |
queueBuilder | Responsible for creating exchanges and queues defined in the application's configuration. |
rabbitMessagePublisher | Used to send messages to a RabbitMQ broker. |
7.2 Rabbit Context
Besides the
rabbitMessagePublisher
, the
rabbitContext
is the bean users will most likely interact with.
While you may never need to use this bean, it can be useful. As with any Spring bean, the
rabbitContext
can be
injected into any Spring managed bean, such as services, controllers, and rabbit consumers.
The
rabbitContext
is intended to be used as a front-end to all of the other beans defined by the plugin to hide
some of the complexity of interacting with the system. As such, in most cases the
rabbitContext
proxies requests
to the appropriate manager bean to accomplish the requested task.
In some cases, interactions with multiple managers are necessary to safely carry out the action. Therefore, unless
the rabbitContext
does not provide the required functionality, it should be considered best-practice to use
the rabbitContext
instead of the other beans directly. If you find that you are frequently resorting to using
one of the other beans, I encourage you to post an issue on the GitHub project.
The follow subsections describe some of the more useful functionality the
rabbitContext
provides.
7.2.1 Native Objects
The main goal of the plugin is to effectively wrap the RabbitMQ library to hide the complexity of its usage and
make using it more inline with the conventions of Grails applications, but also allow users to gain access to the
underlying RabbitMQ objects if needed.
rabbitContext
provides several methods to gain direct access to the
RabbitMQ Java library
Connection
and
Channel
objects.
To create a new
Channel
, use the
createChannel
methods.
// This creates a new channel with the default connection
Channel channel = rabbitContext.createChannel()// The same using a different connection based on its name
Channel channel = rabbitContext.createChannel("connection1")
If the createChannel
methods are used, it is important that these channels are closed. The plugin handles opening
and closing channels that it manages as part of publishing or consuming messages, but channels created with the
createChannel
methods are not managed by the plugin. It is the author's responsibility to close them, or connection
leaks may and memory leaks most likely will occur.
Use the
getConnection
methods to gain access to the
Connection
objects.
// To retrieve the Connection from the default connection
Connection connection = rabbitContext.getConnection()// To retrieve the Connection from a specific connection based on its name
Connection connection = rabbitContext.getConnection("connection1")
7.2.2 Starting and Stopping Connections
The plugin handles starting and stopping connections automatically when the application is started or shut down, but
sometimes applications may need to manually stop connections based on certain conditions or business logic. The
rabbitContext
contains several methods to manage the life cycle of connections.
Method | Description |
---|
startConnections | Starts all registered connections. If some connections are already started, the remainder
will also be started. |
stopConnections | Stops all connections and all consumers. |
startConnection | Starts a connection based on its name. |
stopConnection | Stops a connection based on its name, and stops any consumers on the connection. |
7.2.3 Starting and Stopping Consumers
Much like connections, the
rabbitContext
provides several methods to start and stop consumers if necessary.
Method | Description |
---|
startConsumers | Starts all registered consumers. If some consumers are already started, the remainder
will also be started. |
stopConsumers | Stops all consumers. |
startConsumer | Starts a connection based on its class name. |
stopConsumer | Stops a connection based on its class name. |
startConsumers(String) | Starts all consumers on a specific connection, based on the connection name. |
stopConsumers(String) | Stops all consumers on a specific connection, based on the connection name. |
7.2.4 Status Report
The
RabbitContext
contains a method
getStatusReport()
, which will build an object structure
containing information about all RabbitMQ connections and all consumers. This information is useful
to monitor the status of the RabbitMQ application system, as it contains running states and statistics including
how many concurrent threads a consumer is configured for, how many are actually active, and how many are actively
processing messages. Below is an example of its output gathered from one of the plugin's tests, serialized as JSON.
[
{
"consumers": [
{
"fullName": "com.budjb.rabbitmq.test.AllTopicConsumer",
"load": 0,
"name": "AllTopicConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "topic-queue-all",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.ReportingConsumer",
"load": 0,
"name": "ReportingConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "reporting",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.SleepingConsumer",
"load": 0,
"name": "SleepingConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "sleeping",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.SpecificTopicConsumer",
"load": 0,
"name": "SpecificTopicConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "topic-queue-specific",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.StringConsumer",
"load": 0,
"name": "StringConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "string-test",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.SubsetTopicConsumer",
"load": 0,
"name": "SubsetTopicConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "topic-queue-subset",
"runningState": "RUNNING"
}
],
"host": "localhost",
"name": "connection1",
"port": 5672,
"runningState": "RUNNING",
"virtualHost": "test1.rabbitmq.budjb.com"
},
{
"consumers": [
{
"fullName": "com.budjb.rabbitmq.test.Connection2Consumer",
"load": 0,
"name": "Connection2Consumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "connection2-queue",
"runningState": "RUNNING"
}
],
"host": "localhost",
"name": "connection2",
"port": 5672,
"runningState": "RUNNING",
"virtualHost": "test2.rabbitmq.budjb.com"
}
]
7.3 Transactions
The plugin provides a bit of automation around channel transactions. When a consumer is defined with the
transacted
property set to
true
, a transaction is automatically started on
the channel passed to the message handler. When the message handler completes successfully, the transaction is automatically committed. If an unhandled exception is thrown from the message
handler, the transaction is automatically rolled back.
It is
especially important that any messages published from a message handler use the
Channel
instance passed in the
MessageContext
for this functionality to work.
Since the
Channel
is passed in the
MessageContext
, the author has full control over committing and rolling back transactions.
8 Upgrading
When an update to the plugin is made that has incompatibility considerations, this document will help identify the
various impact considerations when upgrading to a new version.
8.1 From 3.0.X
The upgrade from 3.0.X to 3.1.X includes a refactor of how the consumer and connection managers work.
The storage of consumer handlers was moved from the connection classes into the consumer manager in order
to support the new start/stop functionality, and it made sense for consumer handlers to reside with
the appropriately named manager.
Both of the managers also now have underlying interfaces that define how a manager should behave. In addition,
the objects being managed are now called
Contexts, which also have interfaces that define their general
behavior. It is the intention that with these changes the plugin becomes more stable with less backwards-
compatibility breaking changes being introduced moving forward.
Here are the items authors may need to address when upgrading to this version of the plugin.
- The
rabbitContext.getConnection()
methods now return the RabbitMQ Connection
object instead of
the ConnectionContext
instance. This was done to make the methods more consistent with the
createChannel
methods and to encapsulate the ConnectionContext
objects inside the manager.
The ConnectionContext
objects can now be retrieved from the consumerManager
bean.
- Many of the methods inside of the
ConsumerManager
and ConnectionManager
have been renamed to adhere
to a common interface. This should only affect projects that use these beans directly.
- Some of the methods inside of the
RabbitContext
have been renamed to match the interface used by
the various managers.
Overall, users of the plugin should see no impact if the
rabbitContext
or any of the other beans are
not used in their projects.
If the use of the plugin's beans is limited to the
rabbitContext
, the impact should be minimal,
with some minor changes need to method names and some refactored code if retrieving a
ConnectionContext
.
Users that use the other manager beans will need to account for the changed interface implemented by
those beans.
8.2 From 2.0.X
The upgrade from any version in the 2.0.X range to 3.0.X includes a massive refactoring of the internals of the plugin. If users did not extend or override
the
RabbitContext
,
RabbitMessageBuilder
, or any of the other helper classes, the amount of impact is limited to a couple package name changes.
Below are the changes that were made, at a high level:
RabbitMessageBuilder
is deprecated. The class still exists and its interface is the same, however, the code in the class has been ripped out and now proxies requests to the rabbitMessagePublisher
bean. The builder will be removed at some point in the near future.
RabbitContext
used to contain a significant amount of code related to management of message converters, consumers, and connections. That functionality has been broken out into their own respective classes. The RabbitContext
now serves only as a class to aggregate functionality useful to users of the plugin, and should still be used to simplify interfacing with the plugin rather than using the underlying beans directly.
- Introduced the
rabbitMessagePublisher
as a replacement for the RabbitMessageBuilder
. This bean can be injected into other spring managed beans, such as services, controllers, and rabbit consumers. Its functionality follows the builder closely, and users may send messages based on configurations made through closures or through the new RabbitMessageProperties
object. The short-hand convenience methods are still available as well.
- Introduced the
messageConverterManager
bean to handle all operations pertaining to message converters.
- Introduced the
consumerManager
bean to handle all operations pertaining to message consumers.
- Introduced the
connectionManager
bean to handle all operations pertaining to RabbitMQ connections and channels.
- Introduced the
queueBuilder
bean to handle creating configured exchanges and queues.
- The
MessageContext
class has been moved into a new package: com.budjb.rabbitmq.consumer
.
- The
AutoAck
enum has been moved into a new package: com.budjb.rabbitmq.consumer
.
- The
ConsumerConfiguration
class has been moved into a new package: com.budjb.rabbitmq.consumer
.
- The
MessageConvertMethod
class has been moved into a new package: com.budjb.rabbitmq.consumer
.
- The
ConnectionContext
class has been moved into a new package: com.budjb.rabbitmq.connection
.
- The
ConnectionConfiguration
class has been moved into a new package: com.budjb.rabbitmq.connection
.
- The
MessageConverter
abstract class has been moved into a new package: com.budjb.rabbitmq.converter
.
- All of the bundles message converters have been moved into a new package:
com.budjb.rabbitmq.converter
.
- The
RabbitMessageBuilder
class has been moved into a new package: com.budjb.rabbitmq.publisher
.
- A large amount of unit tests and some integration tests have been added to the project. These tests rely on the Spock mocking framework, but the test files and dependencies are not exported with the plugin so that a new plugin dependency on Spock is not created.
- The closures used for publishing messages have had their resolving strategy changed from
OWNER_FIRST
to DELEGATE_FIRST
. This should not have much of an impact, but in some cases closures may need to explicity qualify some properties in the closures with delegate
.
9 Changelog
Version 3.1.3 - 4/20/2016
- Add status reports that provides information about all connections and their consumers, including load statistics.
- Upgrade project to Grails 2.5.4.
- Remove gpars dependency.
- Make Grails 2.3 the minimum version of Grails this plugin is intended for.
- Fix consumer configuration from application config parsing issue (#73).
- Refactor message conversion for incoming messages so that conversion to a type only happens if an appropriate handler exists.
Version 3.1.2 - 8/6/2015
- Added graceful shutdown support. See
rabbitContext.shutdown()
.
- Added methods to check running state on most managers and contexts.
- Updated rabbitmq Java library to 3.5.4.
- Added the
gpars
plugin as a dependency.
Version 3.1.1 - 5/13/2015
- Refactored the code to load a consumer's configuration from a static variable so that it works correctly when the
consumer is annotated with
@Transactional
.
(#55)
- Add setter methods for the message TTL (
expiration
).
(#56)
- Fix bug where missing connection configuration values do not allow the use of default values.
- Remove checked exception from ConsumerManageImpl that does not exist in its interface.
(#59)
Version 3.1.0 - 3/13/2015
- Update the RabbitMQ Client Java library to 3.5.0.
- Fix an issue that caused unclean shutdowns when redeploying an application using the plugin.
(#54)
- Added the ability to start and stop individual connections.
(#49)
- Added the ability to start and stop individual consumers.
(#49)
- Added the ability to start and stop consumers based on the connection they're tied to.
(#49)
- Moved consumer adapter storage from the connection context to the consumer manager.
- Handle
Throwable
types that were not being handled before in the consumer handling so that channels are not closed
if one of the unhandled errors occurs.
(#47)
- Added travis-ci continuous integration for all commits to the plugin.
Version 3.0.4 - 2/18/2015
- Fix a null pointer exception when a consumer has no configuration.
- Add a unit test to test behavior when a consumer has no configuration.
- Add an integration test to test behavior when sending a message directly to a queue.
Version 3.0.3 - 2/15/2015
- Introduced the
rabbitMessagePublisher
bean to replace the RabbitMessageBuilder
.
- Deprecated the
RabbitMessageBuilder
.
- Massive refactor of the internals of the plugin. See the upgrading page for more detailed information about what has
changed.
- Added the ability to configure consumers centrally in the application's configuration file (thanks Erwan Arzur).
- Updated RabbitMQ library version to 3.4.3.
Version 3.0.2
- Internal release, see 3.0.3.
Version 3.0.1
- Internal release, see 3.0.3.
Version 3.0.0
- Internal Release, see 3.0.3.
Version 2.0.10 - 9/11/2014
- Fix bug with converters that prevented converters later in the processing list from executing if another convert is
unable to marshall data from bytes.
- Add
enabled
flag to the configuration. If false, completely disables the plugin from starting.
Version 2.0.9 - 9/9/2014
- Additional fix for memory leak associated with RPC calls and auto-recovering connections.
Version 2.0.8 - 9/8/2014
- Fix bug introduced by rushing the previous fix. Mark consuming = true.
Version 2.0.7 - 9/8/2014
- Add
basicCancel()
to RabbitMessageBuilder
in an attempt to address a memory leak.
- Improve cleaning up of resources in RPC calls.
Version 2.0.6 - 8/13/2014
- Updated copyright notices.
- Added GString message converter.
- Updated publishing guide docs to make RabbitMessageBuilder usage more clear (thanks marcDeSantis @GitHub).
Version 2.0.5 - 8/01/2014
- Added heartbeat configuration for connections (thanks LuisMuniz @GitHub).
- Refactored Hibernate session support so that Hibernate is no longer a dependency of the plugin, and will now work
with or without Hibernate present.
Version 2.0.4 - 7/28/2014
- Added multi-server support to all aspects of the plugin.
- Added SSL support for connections.
- Added auto-reconnect support for dropped connections.
- Added logic to wrap a Hibernate session around calls to consumers.
- Updated the RabbitMQ library to version 3.3.0.
- Added logging for connection/channel reconnects and channel shutdowns.
- Changed format for connection configurations. The old style is still supported, but will likely be removed at some
point.
Version 1.0.3 - 1/7/2014
- Modified the logic to check for the existence of callbacks in consumers.
Version 1.0.2 - 1/6/2014
- Added a cached thread pool so the user does not need to account for the number of threads consumers require. Set the
default to 0 so that this is the default.
- Added callbacks for messages: onReceive, onSuccess, onFailure, and onComplete.
Version 1.0.1 - 11/28/2013
- Remove the maven group from the plugin definition class.
Version 1.0.0 - 11/27/2013
- Version bump for general release.
Version 0.2.1 - 11/27/2013
- Fixed a bug with the message handler discovery method that caused generically-typed handlers to get called
incorrectly.
Version 0.2.0 - 11/27/2013
- Refactored queue/exchange configuration. It is now possible configure queue binding without having to also configure
the exchange being bound to.
- Added the
match
property to queue configuration to support headers exchange binding.
This breaks backwards compatibility.
- Renaming the
routingKey
property of the consumer configuration to binding
to match queue configuration.
This breaks backwards compatibility.
Version 0.1.8 - 10/31/2013
- Moved the trigger to start consumers on application launch to the bootstrap.
Version 0.1.7 - 10/30/2013
- Added the
prefetchCount
option to the consumer configuration. Defaults to 1.
- Added the
threads
option to the connection configuration. Defaults to 5.
Version 0.1.6 - 10/29/2013
- Fixed logic to determine if a consumer is valid.
- Added support for short-form handlers that only take a single parameter.
Version 0.1.5 - 10/28/2013
body
parameter to the RabbitMessageBuilder
is no longer required. It now defaults to an empty byte array.
Version 0.1.4 - 10/28/2013
- Fix a class visibility issue in the artefact handlers for this plugin.
Version 0.1.3 - 10/23/2013
- Touch up the consumer template.
Version 0.1.2 - 10/22/2013
- Add the ability to create multiple consumers at the same time with the
create-consumer
script (thanks Aaron Brown!).
- Also create a unit test when creating consumers (thanks Michael Rice!).
Version 0.1.1 - 10/22/2013
- Throw an exception if the connection configuration is missing on application start (thanks Michael Rice!).
- Add the
create-consumer
script (thanks Aaron Brown!).
Version 0.1 - 10/17/2013
- Code complete/experimental release.