The RabbitMQ Native plugin for Grails provides RabbitMQ integration using the Java libraries provided by the RabbitMQ project. It provides easy configuration of exchanges and queues via a project’s configuration, simple configuration of consumers, and a convenience class to easily publish messages.
This plugin strives to provide an easy convention to follow to quickly begin consuming messages, but still provide easy access to the underlying layer to allow users of this plugin to do whatever they need to do outside of those conventions. It also provides a powerful message body converter system that allows the user to quickly write and plug in converters for custom data types.
This guide details the configuration of the plugin, and may not fully explain the use and conventions of the RabbitMQ services or its Java library. More information can be found in their Java API guide and JavaDoc.
1. Quickstart
This is a quick and dirty how-to detailing how to quickly begin using the plugin. While the plugin is very configurable, this is a very basic demonstration of its usage.
1.1. Create The Application
Create a project named RabbitExample. You can do this by entering:
grails create-app RabbitExample
1.2. Add The Plugin
In the application’s build.gradle
file, under the dependencies
section, add:
dependencies {
// ...
compile "org.grails.plugins:rabbitmq-native:3.5.1"
// ...
}
1.3. Configuring
In grails-app/conf/application.yml
, add to the bottom:
rabbitmq:
connections:
- name: main
host: changeme
username: changeme
password: changeme
queues:
- name: testqueue
Be sure to replace the appropriate values for your RabbitMQ server and user credentials. |
1.4. Consumer
Create the a consumer by using the following command:
grails create-consumer com.example.Test
Update the consumer to reflect the following:
package com.example
import com.budjb.rabbitmq.consumer.MessageContext
class TestConsumer {
/**
* Consumer configuration.
*/
static rabbitConfig = [
queue: "testqueue"
]
/**
* Handle an incoming RabbitMQ message.
*
* @param body The converted body of the incoming message.
* @param context Properties of the incoming message.
* @return
*/
def handleMessage(def body, MessageContext context) {
println body
return "Hello to you, too!"
}
}
1.5. Controller
Create a controller by using the following command:
grails create-controller com.example.Test
Update the controller to reflect the following:
package com.example
import com.budjb.rabbitmq.publisher.RabbitMessagePublisher
class TestController {
RabbitMessagePublisher rabbitMessagePublisher
def index() {
render rabbitMessagePublisher.rpc {
routingKey = "testqueue"
body = "Hello!"
}
}
}
1.6. Run it!
Run the grails application.
./gradlew bootRun
You can see the application in action by hitting the test controller. If you’re running this on your localhost, your URL
may be similar to http://localhost:8080/test/index
. You should see the message "Hello!" printed the
application’s output console, and your web browser should display the message "Hello to you, too!".
2. Configuration
Configuration of the connection to the RabbitMQ server is done in your project’s grails-app/conf/application.groovy
or
grails-app/conf/application.yml
file.
Below is the list of general configuration properties.
Configuration Property |
Required |
Description |
Type |
Default |
|
If false, will register the plugin’s beans, but prevent the plugin connecting to the RabbitMQ broker(s) and consuming messages. |
|
|
|
|
If true, enables the use of a converter that handles Java serialization. |
|
|
2.1. Server Connection
The plugin expects the connection configuration to the RabbitMQ server to be defined. A bare minimum configuration example looks like:
rabbitmq {
connections = [
[
name: "defaultConnection",
host: "example.com",
username: "foo",
password: "bar"
]
]
}
rabbitmq:
connections:
- name: defaultConnection
host: example.com
username: foo
password: bar
The connections
section should contain a list of maps, where each entry in the list represents an individual
connection.
Connections to many different RabbitMQ servers can be configured. A multi-server configuration looks like:
rabbitmq {
connections = [
[
name: "connection1",
host: "rabbit1.example.com",
username: "foo",
password: "bar"
],
[
name: "connection2",
host: "rabbit2.example.com",
username: "foo",
password: "bar"
]
]
}
rabbitmq:
connections:
- name: connection1
host: rabbit1.example.com
username: foo
password: bar
- name: connection2
host: rabbit2.example.com
username: foo
password: bar
The following table enumerates all the configuration options available to the connection configuration:
Configuration Property |
Required |
Description |
Type |
Default |
|
Name of the connection, which can be used to tie queues and exchanges to a particular connection. |
|
none |
|
|
Hostname or IP address of the RabbitMQ server to connect to. |
|
none |
|
|
Username to log into the RabbitMQ server with. |
|
none |
|
|
Password to log into the RabbitMQ server with. |
|
none |
|
|
A connection with this set to true will be the server messages are sent to if no specific connection is specified when sending the message. |
|
|
|
|
Port to connect to the RabbitMQ server with. |
|
5672 |
|
|
Name of the virtual host to connect to the RabbitMQ server with. |
|
none |
|
|
Whether to use SSL when connecting to a RabbitMQ server. |
|
|
|
|
Threadpool size, if greater than 0, determines how many concurrent messages can be processed at any given time. If set to 0, consumers can consume as many messages as it’s configured to. |
|
|
|
|
If true, will cause the application to automatically reconnect to a server when its connection is dropped. |
|
|
|
|
Heartbeat interval, in seconds. A value of |
|
|
|
|
If true, will create a metric registry and associate it for the connection via DropWizard. |
|
|
2.2. Defining Queues
The plugin allows authors to define the exchanges and queues programatically inside the configuration. This allows the application to configure its own queues without someone having to manually create the exchanges and queues prior to running the application.
Queue configuration is also done in the rabbitmq block, much as the server connection is configured. Usage is best illustrated with an example:
rabbitmq {
connections = [
// ...
]
queues = [
[
name: "example.queue",
durable: true,
exchange: "example.exchange"
]
]
}
rabbitmq:
connections:
# ...
queues:
- name: example.queue
durable: true
exchange: example.exchange
The above code will define a queue named example.queue, and its durable flag will be set.
If using the multi-server feature of the plugin, it is important to consider what server the queue should be defined in.
The connection
property specifies which server to create the queue in, illustrated below:
rabbitmq {
queues = [
// Assume there is a connection with the name "server1"...
[
name: "example.queue",
connection: "server1",
durable: true
]
]
}
rabbitmq:
queues:
# Assume there is a connection with the name "server1"...
- name: example.queue
connection: server1
durable: true
Below is a table of all of the options available when defining queues:
Property |
Required |
Description |
Type |
Default |
|
Name of the queue. |
|
none |
|
|
Extra arguments used to create the queue. See the RabbitMQ documentation for more information. |
|
none |
|
|
Whether to automatically delete the queue once there are no more consumers listening to it. |
|
|
|
|
Used in conjunction with exchanges. See the section below for more information. |
Mixed |
none |
|
|
Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts. |
|
|
|
|
Binds a queue to an exchange in conjunction with the |
|
none |
|
|
Required when binding to a headers exchange. Either "any" or "all". |
|
none |
|
|
Name of the connection to create the queue with. Uses the default connection if omitted. |
|
none |
2.3. Defining Exchanges
Defining exchanges is very similar to defining queues. The following code illustrates how to define an exchange:
rabbitmq {
exchanges = [
[
name: "example.exchange",
type: "topic"
]
]
}
rabbitmq:
exchanges:
- name: example.exchange
type: topic
The above example will create an exchange with the name example.exchange and of the type topic. Below is a list of all the options available when creating exchanges:
Property |
Required |
Description |
Type |
Default |
|
Name of the exchange. |
|
none |
|
|
Extra arguments used to create the exchange. See the RabbitMQ documentation for more information. |
|
none |
|
|
Whether to automatically delete the exchange once there are no longer any queues bound to it. |
|
|
|
|
Whether messages should be persisted to the disk on the RabbitMQ server to survive server restarts. |
|
|
|
|
One of "fanout", "topic", "direct", or "headers". |
|
none |
|
|
Name of the connection to create the exchange with. Uses the default connection if omitted. |
|
none |
2.4. Binding Queues To Exchanges
Queues can be bound to an exchange by setting the exchange
and binding
properties in the queue’s configuration. The
value of the binding depends on the type of exchange the queue is being bound to. Each exchange type is explained below.
This basic example will create a topic exchange named example.exchange, as well as create a queue named
example.queue. The queue will be bound to the exchange with the topic, or routing key, of sample.binding.#
.
rabbitmq {
queues = [
[
name: "example.queue",
exchange: "example.exchange",
binding: "sample.binding.#"
]
]
exchanges = [
[
name: "example.exchange",
type: "topic"
]
]
}
rabbitmq:
queues:
- name: example.queue
exchange: example.exchange
binding: 'sample.binding.#'
exchanges:
- name: example.exchange
type: topic
The character is used frequently with RabbitMQ bindings. The character is also special in YAML, so it is
important that strings using the # character are quoted so that the YAML engine will treat it as plain text.
|
Queues need to have their binding defined specifically for the type of exchange they are bound to.
2.4.1. Fanout Exchanges
Fanout exchanges are the easiest to configure bindings for, since they require none. Fanout exchanges simply send every message it receives to every queue bound to it.
rabbitmq {
queues = [
[
name: "example.queue",
exchange: "example.exchange"
]
]
exchanges = [
[
name: "example.exchange",
type: "fanout"
]
]
}
rabbitmq:
queues:
- name: example.queue
exchange: example.exchange
exchanges:
- name: example.exchange
type: fanout
2.4.2. Topic Exchanges
Topic exchanges require queues to define a topic. Topics can be an exact match, but their strength is in their partial matching ability. See the RabbitMQ documentation for details about this kind of exchange.
rabbitmq {
queues = [
[
name: "example.queue",
exchange: "example.exchange",
binding: "example.binding.#"
]
]
exchanges = [
[
name: "example.exchange",
type: "topic"
]
]
}
rabbitmq:
queues:
- name: example.queue
exchange: example.exchange
binding: 'example.binding.#'
exchanges:
- name: example.exchange
type: topic
2.4.3. Direct Exchanges
Direct exchanges are similar to topic exchanges, except that their "topics" only function with direct name matching. The appropriate name for the binding in this case is "routing key". Queues must define a routing key when binding to this type of exchange.
rabbitmq {
queues = [
[
name: "example.queue",
exchange: "example.exchange",
binding: "exampleRoutingKey"
]
]
exchanges = [
[
name: "example.exchange",
type: "direct"
]
]
}
rabbitmq:
queues:
- name: example.queue
exchange: example.exchange
binding: exampleRoutingKey
exchanges:
- name: example.exchange
type: direct
2.4.4. Header Exchanges
Header exchanges are like topic exchanges, but with the ability to define multiple match keywords. The binding for
queues allows the queue to match on all or one of multiple header values. The queue must also set the match
property
for this exchange type, and the value must be one of "any" or "all".
rabbitmq {
queues = [
[
name: "example.queue",
match: "any",
binding: [
"header1": "header-value-1",
"header2": "header-value-2"
]
]
]
exchanges = [
[
name: "example.exchange",
type: "headers"
]
]
}
rabbitmq:
queues:
- name: example.queue
match: any
binding:
header1: header-value-1
header2: header-value-2
exchanges:
- name: example.exchange
type: headers
2.5. Binding Exchanges to Exchanges
Exchanges can also be bound to other exchanges to create richer routing topologies. More information can be found here.
This basic example will create 2 exchanges, with a binding that will forward messages from source-exchange
to
target-exchange
when messages match the routing key foo.bar.#
.
rabbitmq {
exchanges = [
[
name: "source-exchange",
type: "topic",
exchangeBindings: [
[ exchange: "target-exchange", binding: "foo.bar.#" ]
]
],
[
name: "target-exchange",
type: "direct"
]
]
}
rabbitmq:
exchanges:
- name: source-exchange
type: topic
exchangeBindings:
- exchange: target-exchange
binding: 'foo.bar.#'
name: target-exchange
type: direct
3. Consuming Messages
Consuming messages is accomplished by creating a message consumer class. Consumer classes are placed into the
grails-app/rabbit-consumers
, and must end with Consumer.groovy
. These classes are Spring beans, meaning
that they can inject other Spring beans such as services, the grailsApplication
instance, etc. Each consumer
class must be configured in a certain way. If there is a misconfiguration in a consumer, the plugin will log
an error stating so and not register the consumer to any queues.
3.1. Message Handlers
One of the requirements for a consumer to be registered to the RabbitMQ server is that a message handler be declared in the consumer class. The message handler is the mechanism by which messages are consumed.
3.1.1. Basic Usage
In its most basic form, a message handler method takes in the body of the received message, and a MessageContext
object that contains the message parameters received from the RabbitMQ server, along with the consumer’s channel
that the handler should publish messages through.
This is the most generic form of a message handler:
package com.example
import com.budjb.rabbitmq.consumer.MessageContext
class ExampleConsumer {
// ...
def handleMessage(def body, MessageContext context) {
// Do work
}
}
3.1.2. Typed Message Handlers
The logic surrounding the message consumer classes will by default attempt to intelligently convert the body of the received message from a byte array to a converted type, such as a String. Before routing the message to the consumer and handler, the plugin will run through a list of Message Converters, limited by the class types of the handlers defined in the message consumer, and attempt to find a conversion that works.
For example, consider this JSON blob:
{"foo":"bar","hi":"there"}
If the above message is received, the converter for the Map class type will convert the byte array to a Map of the JSON data. If a valid handler for the Map type is defined, the handler will receive the converted JSON.
The following handlers would accept the converted map:
package com.example
import com.budjb.rabbitmq.consumer.MessageContext
class ExampleConsumer {
// ...
def handleMessage(Map body, MessageContext context) {
// Do work
}
def handleMessage(def body, MessageContext context) {
// Since def is a generic type (Object)
}
}
If no converter is able to convert the message body, the plugin will fall back to passing the handler the raw byte array received from the RabbitMQ server.
More information about message converters can be found in the Message Converters section, including a list of built-in message converters and details of how to create custom converters. |
3.1.3. Short-Form Usage
In addition to the 2-parameter handler method signature, there are 2 shortcut versions available for use. One form only
takes the converted message body, and the other only takes the MessageContext
object.
The short-form handlers are shown below:
package com.example
import com.budjb.rabbitmq.consumer.MessageContext
class ExampleConsumer {
// ...
def handleMessage(String message) {
// Do work
}
def handleMessage(MessageContext context) {
// Do work
}
}
The MessageContext -only handler will only be called if there is no other handler defined that can possibly
handle any conversion of the message body.
|
3.1.4. MessageContext Object
The message context is just an object that encapsulates the data relevant to the received message. Below is a list of properties of the class.
Property |
Description |
|
Incoming message in its raw |
|
The RabbitMQ channel the message handler should use to publish messages. This is especially important when using transactions. |
|
Consumer tag |
|
Properties of the message’s delivery (see RabbitMQ’s documentation) |
|
Properties of the message (see RabbitMQ’s documentation) |
3.1.5. RPC-Style Messages
When a client publishes a message and waits for a return reply, this is considered an RPC-style operation. Typically, the server-side of the operation (in this case, a message consumer/handler) must respond to the client on a queue that the client requested manually. This plugin provides a very convenient method to respond to the client without having to manually construct a response message.
The client will provide a response queue to the server to reply to. This queue name is stored in the MessageContext.properties.replyTo
variable. If a client publishes
this variable, and the handler returns some data, the plugin will convert the data returned from the message handler and build a response message for you.
The following example illustrates responding via a handler’s returned data.
class RpcExampleConsumer {
def handleMessage(String message, MessageContext messageContext) {
println "received ${message}"
return "response" // this message will be sent back to the consumer via its replyTo queue
}
}
Alternatively, the rabbitMessagePublisher
can be used to respond.
class RpcExampleConsumer {
def rabbitMessagePublisher
def handleMessage(String message, MessageContext messageContext) {
println "received ${message}"
rabbitMessagePublisher.send {
routingKey: messageContext.properties.replyTo
body: "response"
}
}
}
Allowing the plugin to build a reply message only converts the data returned from the message handler and publishes it to the reply queue. If you need to set any of the other message properties,
like headers, content-types, etc, you must manually build the response message using the rabbitMessagePublisher , and refrain from returning data from the message handler.
|
3.1.6. Handling Unsupported Messages
As long as there is a converter that can convert an incoming message and a handler defined that accepts the converted object, messages can be delivered. In the case where an incoming message can not be converted and there is no possible way to provide the message to a handler, an error is logged by default and the message is rejected.
Message consumers that need to conduct some logic when this occurs may implement the UnsupportedMessageHandler
interface, which requires the handleUnsupportedMessage
method. When no handler and converter exist for a message,
this method is called and the MessageContext
is passed in, giving the consumer some way to gracefully handle input
failures. Objects may even be returned, much like regular handlers, as a response to an RPC call.
3.2. Subscribing To Queues
Message consumers can subscribe to either queues or exchanges. When a consumer is registered to a queue, the consumer will receive messages from the queue as the RabbitMQ server determines that it’s the consumer’s turn to receive a message, since there may be multiple listeners on the same queue.
Each message consumer class must have a configuration defined. There are 2 methods to specify the configuration:
-
The consumer class can have a
Map
assigned to a static variable namedrabbitConfig
. To subscribe to queues, the only required configuration option is thequeue
variable, which is the name of the queue to subscribe to. -
The application’s configuration file can contain the configuration with the path
rabbitmq.consumers.<ClassName>
. See the section below for more details.
Here is a simple example of a consumer subscribing to a queue.
package com.example
import com.budjb.rabbitmq.consumer.MessageContext
class ExampleConsumer {
static rabbitConfig = [
"queue": "test.queue"
]
def handleMessage(def body, MessageContext context) {
// Process message
}
}
There are many options available to influence how the consumer works, which can be found in the reference.
3.3. Subscribing to Exchanges
Subscribing to a exchanges is different from subscribing to queues, as there are different types of exchanges with different behavior. RabbitMQ’s libraries do not provide a direct way to subscribe to an exchange, however the plugin provides a way to subscribe to exchanges directly by creating a temporary queue that is bound to an exchange. The binding requirements differ between the different types of exchanges.
Using the functionality to subscribe to an exchange works by creating a temporary queue at runtime that is bound to the exchange and consumed from by the message consumer. It is important to note that these queues are randomly named and are exclusive and auto delete, so this feature is not suitable if messages that are left on this queue should persist if the application shuts down. |
3.3.1. Fanout Exchanges
A fanout exchange will forward a received message to every queue bound to it. There is no binding criteria for this kind of exchange. This is the simplest type of exchange, and is also the easiest to configure.
package com.example
class ExampleConsumer {
static rabbitConfig = [
"exchange": "fanout.exchange"
]
def handleMessage(def body, MessageContext context) {
// Process message
}
}
3.3.2. Topic Exchanges
A topic exchange will forward messages to queues based on the binding criteria the queue used to register to the exchange. In RabbitMQ terms,
this is called a routing key. The routing key can be either a direct match, or utilize wildcards to do a partial topic match. If a routing key
is omitted, the queue will receive no messages. Use "#"
to receive all messages from an exchange.
More information can be found in the RabbitMQ documentation.
package com.example
class ExampleConsumer {
static rabbitConfig = [
"exchange": "topic.exchange",
"binding": "foo.bar.#"
]
def handleMessage(def body, MessageContext context) {
// Process message
}
}
3.3.3. Direct Exchanges
A direct exchange will forward messages to queues based on binding criteria configured similarly to topic exchanges. The difference in this case is that direct routing does not utilize wildcards in their routing keys.
h4. Direct Example
package com.example
class ExampleConsumer {
static rabbitConfig = [
"exchange": "direct.exchange",
"binding": "example"
]
def handleMessage(def body, MessageContext context) {
// Process message
}
}
3.3.4. Headers Exchanges
Header exchanges work similarly to topic exchanges. A headers exchange will forward messages to queues based on header values contained in messages. Additionally, a queue can be bound on multiple header values, along with an option to require one or all of the headers to match.
package com.example
class ExampleConsumer {
static rabbitConfig = [
"exchange": "headers.exchange",
"binding": [
"foo": "bar",
"hi": "there"
],
"match": "any"
]
def handleMessage(def body, MessageContext context) {
// Process message
}
}
3.4. Multi-Server
When using a multi-server setup, it is important to consider what server a consumer should listen on. Use the connection
property in the rabbitConfig
to specify which server the consumer should be bound to. If the connection
is omitted, the consumer will be bound to the default connection.
package com.example
class ExampleConsumer {
static rabbitConfig = [
"queue": "test.queue",
"connection": "server1" // Where "server1" is a connection configured with that name.
]
def handleMessage(def body, MessageContext context) {
// Do work
}
}
3.5. Central Configuration
It is also possible to define the consumer’s configuration outside of the consumer in the application’s configuration file. All of the configuration options described above are valid for this type of configuration. This functionality is valuable when a consumer’s configuration needs to be determined at runtime, instead of being hardcoded in the consumer class itself.
To configuration a consumer in the Grails application’s configuration file, an entry matching the consumer’s class name
should be present under the rabbitmq.consumers
key, as follows:
rabbitmq {
consumers {
ExampleConsumer {
queue = "test.queue"
}
}
}
rabbitmq:
consumers:
ExampleConsumer:
queue: test.queue
3.6. Consumer Event Handlers
It may be useful to execute some logic at certain times during the message delivery lifecycle. To enable this, the
MessageConsumerEventHandler
trait exists that provides several hooks or event handlers, which are called at various
times during the message delivery.
The trait provides empty bodies for all of its methods and so implementations of the trait need only override the specific event handlers that are required.
Method |
Description |
|
Called when a message is initially received by the underlying RabbitMQ system and before handed to the message consumer class. |
|
Called when a message has been successfully delivered and processed by the message consumer class. |
|
Called when some unhandled exception occurred during the process of delivering or processing the message. This event handler differs from the rest in that the unhandled exception is provided. |
|
Called when the delivery process is complete, whether it was successful or
failed. This event handler will be called even when |
4. Publishing Messages
Publishing messages through the plugin is achieved by using the rabbitMessagePublisher
bean. This Spring bean utilizes a closure-based configuration method to
both send messages without waiting for a response, and sending rpc-style messages. There are many options available to the rabbitMessagePublisher
which are
documented in the reference, but this guide will only demonstrate basic usage.
In a multi-server setup, it is important to consider what server to send a message to. Like configuration and consumers, the connection property
is used to route the message to the proper server connection.
|
4.1. Sending Messages
Sending a message means publishing a message to an exchange or queue and not waiting for a response (fire and forget). The only required parameters to the publisher are a queue or exchange to publish the message to, and the body of the message.
import com.budjb.rabbitmq.publisher.RabbitMessagePublisher
class ExampleService {
RabbitMessagePublisher rabbitMessagePublisher
def sendSomeMessage() {
rabbitMessagePublisher.send {
exchange = "some.exchange"
routingKey = "some.routingKey"
body = "hi!"
}
}
}
RabbitMQ expects the body of a message to be a byte array. Message converters will also work when publishing messages, so if an object type other than byte[]
is
encountered, a suitable message converter will be found and run against the message body, if one exists.
4.2. RPC Messages
Publishing an RPC message is as easy as sending messages, except the returned message is returned from the function.
import com.budjb.rabbitmq.publisher.RabbitMessagePublisher
class ExampleService {
RabbitMessagePublisher rabbitMessagePublisher
def sendSomeMessage() {
def result = rabbitMessagePublisher.rpc {
exchange = "some.exchange"
routingKey = "some.routingKey"
body = "hi!"
timeout = 5000
}
}
}
The timeout option is especially important for RPC-style messages. The timeout property is the amount of time (in milliseconds)
the client will wait for the server to respond. If the timeout is reached, a TimeoutException will be thrown. If the timeout is set to 0, the client
will wait indefinitely for a response. The default value of the timeout, if not passed, is 5 seconds.
|
4.3. Bulk Publishing
By default, the publisher opens a new channel for each message it publishes. This is acceptable when sending individual messages, but when bulk publishing is required, it is much more efficient to open a single channel and use it for batches of messages.
As an example, during testing it took about 65 milliseconds to send one message. If 1000 messages are sent, the total time to publish is about 65 seconds! Using a single channel for the same operation takes about 1 millisecond per message, cutting the time down to 1 second for all 1000 messages. These times are just an example based on testing, but they will differ based on network latency and server load. |
While a channel may be manually created by authors using the rabbitContext
, the publisher provides an easy way
to send many messages with a single channel. See the example below.
rabbitMessagePublisher.withChannel { channel ->
1000.times { i ->
send {
routingKey = "foobar"
body = "Bulk message $i"
}
}
}
There is also a withChannel
method that takes in a connection name for multi-server setups.
rabbitMessagePublisher.withChannel("connection1") { channel ->
1000.times { i ->
send {
routingKey = "foobar"
body = "Bulk message $i on connection1"
}
}
}
All of the send
and rpc
methods available with the publisher can be used with withChannel
.
4.4. Publisher Confirms
The publisher allows authors to enable publisher confirms for a batch of messages using a similar mechanism as
withChannel
. These methods use RabbitMQ’s waitForConfirms
and waitForConfirmsOrDie
methods. See
RabbitMQ’s documentation for more information about how publisher confirms work.
The reference details all of the various methods that the publisher contains to support confirms, but below are a couple of basic examples.
The withConfirms
methods utilize the waitForConfirms
methods from the RabbitMQ Java library. withConfirms
will
block until all messages sent in the provided closure have been acked or n’acked by the server. If a timeout
is specified, it will only wait for the amount of time specified before throwing a TimeoutException
.
rabbitMessagePublisher.withConfirms { channel ->
send {
routingKey = "foobar"
body = "I am a test message"
}
send {
routingKey = "barbaz"
body = "I am also a test message"
}
}
The withConfirmsOrDie
methods utilize the waitForConfirmsOrDie
methods from the RabbitMQ Java library.
withConfirmsOrDie
will block until all messages sent in the provided closure have been acked or n’acked by the
server. The difference with this method is that if any n’ack is received, an exception is thrown.
As with withConfirms
, a timeout can be used.
rabbitMessagePublisher.withConfirmsOrDie { channel ->
send {
routingKey = "foobar"
body = "I am a test message"
}
send {
routingKey = "barbaz"
body = "I am also a test message"
}
}
There are versions of both of these types of methods that take a connection name and/or a timeout.
5. Message Converters
Message converters are classes that are responsible for converting objects to and from byte arrays.
5.1. Built-in Message Converters
The plugin provides converters for the following types:
Type |
Incoming |
Outgoing |
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
These converters allow message handlers to consume and return data without having to convert that data themselves. They
are also used when publishing messages with the RabbitMessagePublisher
.
INFO: The message converter for Serializable
classes will always be attempted first.
INFO: Serializable
converter is off by default. set rabbitmq.enableSerializableConverter
, to true
to enable this feature.
5.2. Custom Message Converters
The plugin provides a way for authors to create their own message converters. A custom message converter must be placed
in the grails-app/rabbit-converters
path, and must end with Converter.groovy
.
A message converter must implement the appropriate interface for the type of conversion it supports. The
ByteToObjectConverter
interface is used when a converter supports incoming messages from RabbitMQ, while the
ObjectToByteConverter
interface is used when a converter supports outgoing messages to RabbitMQ. Classes that support
both should implement both interfaces.
Message converters advertise what objects and MIME types it supports. The conversion system will typically attempt to first match an incoming message with a converter matching its MIME type, if it was provided. Message converters that are aware of the MIME types it supports provides the conversion system with a more accurate type detection ability. Additionally, converters may apply a MIME type to outgoing messages if one has not already been defined.
Below is an example converter for the String object type. Custom converters should follow the same format.
package com.budjb.rabbitmq.converter
import groovy.transform.CompileStatic
import org.springframework.util.MimeType
/**
* A converter that supports conversion to and from a {@link String}.
*/
@CompileStatic
class StringMessageConverter implements ByteToObjectConverter, ObjectToByteConverter {
/**
* Mime type.
*/
private static final MimeType mimeType = MimeType.valueOf('text/plain')
/**
* {@inheritDoc}
*/
@Override
boolean supports(Class<?> type) {
return String.isAssignableFrom(type)
}
/**
* {@inheritDoc}
*/
@Override
boolean supports(MimeType mimeType) {
return mimeType.isCompatibleWith(this.mimeType)
}
/**
* {@inheritDoc}
*/
@Override
ByteToObjectResult convert(ByteToObjectInput input) {
return new ByteToObjectResult(new String(input.getBytes(), input.getCharset()))
}
/**
* {@inheritDoc}
*/
@Override
ObjectToByteResult convert(ObjectToByteInput input) {
return new ObjectToByteResult(
((String) input.getObject()).getBytes(input.getCharset()),
new MimeType(mimeType, input.getCharset())
)
}
}
6. Advanced Usage
While the plugin effectively wraps the functionality of the RabbitMQ library, the end user has direct access to all of the underlying library objects and connection instances.
6.1. Spring Beans
The are several beans defined by the plugin to perform its various operations.
Bean Name |
Purpose |
|
A front-end class to the plugin that aggregates useful functionality for users of the
plugin. Besides the |
|
Manages the lifecycle of connection instances, including loading, starting, stopping, unloading, and retrieval. |
|
Handles loading message converters and acts as the entry point when message conversion is required. |
|
Manages the lifecycle of consumer instances, including loading, starting, stopping, unloading, and retrieval. |
|
Responsible for creating exchanges and queues defined in the application’s configuration. |
|
Used to send messages to a RabbitMQ broker. |
6.2. Spring Application Events
The plugin provides several application events that applications using the plugin may be interested in. They are particularly useful when some logic needs to execute during the startup and shutdown lifecycle of the plugin.
Event Type |
Description |
|
Published before the |
|
Published after the |
|
Published before the |
|
Published after the |
|
Published before all consumers are started. |
|
Published after all consumers are started. |
|
Published before all consumers are stopped. |
|
Published after all consumers are stopped. |
|
Published before a specific consumer is started. |
|
Published after a specific consumer is started. |
|
Published before a specific consumer is stopped. |
|
Published before a specific consumer is stopped. |
|
Published before all connections are started. |
|
Published after all connections are started. |
|
Published before all connections are stopped. |
|
Published after all connections are stopped. |
|
Published before a specific connection is started. |
|
Published after a specific connection is started. |
|
Published before a specific connection is stopped. |
|
Published after a specific connection is stopped. |
Events can be consumed by registering a bean that implements the ApplicationListener
interface. The interface is
generic and takes the type of event it wants to listen for. Below is an example of a listener that’s interested in
the RabbitContextStartingEvent
event.
package com.example
import com.example.MyRabbitContextStartingEventListener
import grails.boot.GrailsApp
import grails.boot.config.GrailsAutoConfiguration
import org.springframework.context.annotation.Bean
class Application extends GrailsAutoConfiguration {
static void main(String[] args) {
GrailsApp.run(Application, args)
}
@Bean
MyRabbitContextStartingEventListener myRabbitContextStartingEventListener() {
return new MyRabbitContextStartingEventListener()
}
}
package com.example
import com.budjb.rabbitmq.event.RabbitContextStartingEvent
import org.springframework.context.ApplicationListener
class MyRabbitContextStartingListener implements ApplicationListener<RabbitContextStartingEvent> {
@Override
void onApplicationEvent(RabbitContextStartingEvent event) {
println "received a RabbitContextStartingEvent event"
}
}
6.3. Rabbit Context
Besides the rabbitMessagePublisher
, the rabbitContext
is the bean users will most likely interact with.
While you may never need to use this bean, it can be useful. As with any Spring bean, the rabbitContext
can be
injected into any Spring managed bean, such as services, controllers, and rabbit consumers.
The rabbitContext
is intended to be used as a front-end to all of the other beans defined by the plugin to hide
some of the complexity of interacting with the system. As such, in most cases the rabbitContext
proxies requests
to the appropriate manager bean to accomplish the requested task.
In some cases, interactions with multiple managers are necessary to safely carry out the action. Therefore, unless
the rabbitContext does not provide the required functionality, it should be considered best-practice to use
the rabbitContext instead of the other beans directly. If you find that you are frequently resorting to using
one of the other beans, I encourage you to post an issue on the GitHub project.
|
The follow subsections describe some of the more useful functionality the rabbitContext
provides.
6.3.1. Native Objects
The main goal of the plugin is to effectively wrap the RabbitMQ library to hide the complexity of its usage and
make using it more inline with the conventions of Grails applications, but also allow users to gain access to the
underlying RabbitMQ objects if needed. rabbitContext
provides several methods to gain direct access to the
RabbitMQ Java library Connection
and Channel
objects.
To create a new Channel
, use the createChannel
methods.
// This creates a new channel with the default connection
Channel channel = rabbitContext.createChannel()
// The same using a different connection based on its name
Channel channel = rabbitContext.createChannel("connection1")
If the createChannel methods are used, it is important that these channels are closed. The plugin handles opening
and closing channels that it manages as part of publishing or consuming messages, but channels created with the
createChannel methods are not managed by the plugin. It is the author’s responsibility to close them, or connection
leaks may and memory leaks most likely will occur.
|
Use the getConnection
methods to gain access to the Connection
objects.
// To retrieve the Connection from the default connection
Connection connection = rabbitContext.getConnection()
// To retrieve the Connection from a specific connection based on its name
Connection connection = rabbitContext.getConnection("connection1")
6.3.2. Starting and Stopping Connections
The plugin handles starting and stopping connections automatically when the application is started or shut down, but
sometimes applications may need to manually stop connections based on certain conditions or business logic. The
rabbitContext
contains several methods to manage the life cycle of connections.
Method |
Description |
|
Starts all registered connections. If some connections are already started, the remainder will also be started. |
|
Stops all connections and all consumers. |
|
Starts a connection based on its name. |
|
Stops a connection based on its name, and stops any consumers on the connection. |
6.3.3. Starting and Stopping Consumers
Much like connections, the rabbitContext
provides several methods to start and stop consumers if necessary.
Method |
Description |
|
Starts all registered consumers. If some consumers are already started, the remainder will also be started. |
|
Stops all consumers. |
|
Starts a connection based on its class name. |
|
Stops a connection based on its class name. |
|
Starts all consumers on a specific connection, based on the connection name. |
|
Stops all consumers on a specific connection, based on the connection name. |
6.3.4. Status Report
The RabbitContext
contains a method getStatusReport()
, which will build an object structure
containing information about all RabbitMQ connections and all consumers. This information is useful
to monitor the status of the RabbitMQ application system, as it contains running states and statistics including
how many concurrent threads a consumer is configured for, how many are actually active, and how many are actively
processing messages. Below is an example of its output gathered from one of the plugin’s tests, serialized as JSON.
[
{
"consumers": [
{
"fullName": "com.budjb.rabbitmq.test.AllTopicConsumer",
"load": 0,
"name": "AllTopicConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "topic-queue-all",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.ReportingConsumer",
"load": 0,
"name": "ReportingConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "reporting",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.SleepingConsumer",
"load": 0,
"name": "SleepingConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "sleeping",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.SpecificTopicConsumer",
"load": 0,
"name": "SpecificTopicConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "topic-queue-specific",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.StringConsumer",
"load": 0,
"name": "StringConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "string-test",
"runningState": "RUNNING"
},
{
"fullName": "com.budjb.rabbitmq.test.SubsetTopicConsumer",
"load": 0,
"name": "SubsetTopicConsumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "topic-queue-subset",
"runningState": "RUNNING"
}
],
"host": "localhost",
"name": "connection1",
"port": 5672,
"runningState": "RUNNING",
"virtualHost": "test1.rabbitmq.budjb.com"
},
{
"consumers": [
{
"fullName": "com.budjb.rabbitmq.test.Connection2Consumer",
"load": 0,
"name": "Connection2Consumer",
"numConfigured": 1,
"numConsuming": 1,
"numProcessing": 0,
"queue": "connection2-queue",
"runningState": "RUNNING"
}
],
"host": "localhost",
"name": "connection2",
"port": 5672,
"runningState": "RUNNING",
"virtualHost": "test2.rabbitmq.budjb.com"
}
]
6.4. Transactions
The plugin provides a bit of automation around channel transactions. When a consumer is defined with the transacted
property set to true
, a transaction is automatically started on
the channel passed to the message handler. When the message handler completes successfully, the transaction is automatically committed. If an unhandled exception is thrown from the message
handler, the transaction is automatically rolled back.
It is especially important that any messages published from a message handler use the Channel
instance passed in the MessageContext
for this functionality to work.
Since the Channel
is passed in the MessageContext
, the author has full control over committing and rolling back transactions.
7. Upgrading
When an update to the plugin is made that has incompatibility considerations, this document will help identify the various impact considerations when upgrading to a new version.
7.1. From 3.0.X
The upgrade from 3.0.X to 3.1.X includes a refactor of how the consumer and connection managers work. The storage of consumer handlers was moved from the connection classes into the consumer manager in order to support the new start/stop functionality, and it made sense for consumer handlers to reside with the appropriately named manager.
Both of the managers also now have underlying interfaces that define how a manager should behave. In addition, the objects being managed are now called Contexts, which also have interfaces that define their general behavior. It is the intention that with these changes the plugin becomes more stable with less backwards- compatibility breaking changes being introduced moving forward.
Here are the items authors may need to address when upgrading to this version of the plugin.
-
The
rabbitContext.getConnection()
methods now return the RabbitMQConnection
object instead of theConnectionContext
instance. This was done to make the methods more consistent with thecreateChannel
methods and to encapsulate theConnectionContext
objects inside the manager. TheConnectionContext
objects can now be retrieved from theconsumerManager
bean. -
Many of the methods inside of the
ConsumerManager
andConnectionManager
have been renamed to adhere to a common interface. This should only affect projects that use these beans directly. -
Some of the methods inside of the
RabbitContext
have been renamed to match the interface used by the various managers.
Overall, users of the plugin should see no impact if the rabbitContext
or any of the other beans are
not used in their projects.
If the use of the plugin’s beans is limited to the rabbitContext
, the impact should be minimal,
with some minor changes need to method names and some refactored code if retrieving a ConnectionContext
.
Users that use the other manager beans will need to account for the changed interface implemented by
those beans.
7.2. From 2.0.X
The upgrade from any version in the 2.0.X range to 3.0.X includes a massive refactoring of the internals of the plugin. If users did not extend or override
the RabbitContext
, RabbitMessageBuilder
, or any of the other helper classes, the amount of impact is limited to a couple package name changes.
Below are the changes that were made, at a high level:
-
RabbitMessageBuilder
is deprecated. The class still exists and its interface is the same, however, the code in the class has been ripped out and now proxies requests to therabbitMessagePublisher
bean. The builder will be removed at some point in the near future. -
RabbitContext
used to contain a significant amount of code related to management of message converters, consumers, and connections. That functionality has been broken out into their own respective classes. TheRabbitContext
now serves only as a class to aggregate functionality useful to users of the plugin, and should still be used to simplify interfacing with the plugin rather than using the underlying beans directly. -
Introduced the
rabbitMessagePublisher
as a replacement for theRabbitMessageBuilder
. This bean can be injected into other spring managed beans, such as services, controllers, and rabbit consumers. Its functionality follows the builder closely, and users may send messages based on configurations made through closures or through the newRabbitMessageProperties
object. The short-hand convenience methods are still available as well. -
Introduced the
messageConverterManager
bean to handle all operations pertaining to message converters. -
Introduced the
consumerManager
bean to handle all operations pertaining to message consumers. -
Introduced the
connectionManager
bean to handle all operations pertaining to RabbitMQ connections and channels. -
Introduced the
queueBuilder
bean to handle creating configured exchanges and queues. -
The
MessageContext
class has been moved into a new package:com.budjb.rabbitmq.consumer
. -
The
AutoAck
enum has been moved into a new package:com.budjb.rabbitmq.consumer
. -
The
ConsumerConfiguration
class has been moved into a new package:com.budjb.rabbitmq.consumer
. -
The
MessageConvertMethod
class has been moved into a new package:com.budjb.rabbitmq.consumer
. -
The
ConnectionContext
class has been moved into a new package:com.budjb.rabbitmq.connection
. -
The
ConnectionConfiguration
class has been moved into a new package:com.budjb.rabbitmq.connection
. -
The
MessageConverter
abstract class has been moved into a new package:com.budjb.rabbitmq.converter
. -
All of the bundles message converters have been moved into a new package:
com.budjb.rabbitmq.converter
. -
The
RabbitMessageBuilder
class has been moved into a new package:com.budjb.rabbitmq.publisher
. -
A large amount of unit tests and some integration tests have been added to the project. These tests rely on the Spock mocking framework, but the test files and dependencies are not exported with the plugin so that a new plugin dependency on Spock is not created.
-
The closures used for publishing messages have had their resolving strategy changed from
OWNER_FIRST
toDELEGATE_FIRST
. This should not have much of an impact, but in some cases closures may need to explicity qualify some properties in the closures withdelegate
.
8. Reference
8.1. Command Line
8.1.1. create-consumer
The create-consumer
command creates a new consumer or consumers.
grails create-consumer com.example.Test
This command is useful to quickly create new consumers, in much the same way as Grails' built-in commands to create new controllers, domains, etc. The generated file is a template that is ready to be configured and used, and includes a generic message handler.
A powerful feature of this script is the ability to create as many consumers as are entered on the command line. For example:
grails create-consumer com.example.First com.example.Second
The above will create two consumers: FirstConsumer
and SecondConsumer
.
8.2. Consumer Configuration
8.2.1. autoAck
Sets whether incoming messages should be automatically acknowledged.
static rabbitConfig = [
queue: "example.queue",
autoAck: AutoAck.POST
]
There are 3 auto-acknowledgement modes:
Enum Value |
Effect |
AutoAck.MANUAL |
The message is never automatically acknowledged. The message handler is responsible for acknowledging the message. |
AutoAck.ALWAYS |
The message will be automatically acknowledged before the message is delivered the message handler. |
AutoAck.POST |
The message will be automatically acknowledged after the message handler successfully completes. If an unhandled exception escapes the handler, the message is rejected. This is the default mode. |
8.2.2. binding
Set the binding criteria to use when subscribing to an exchange.
static rabbitConfig = [
exchange: "example.exchange",
binding: "example.routing.key"
]
The binding criteria may be necessary depending on the type of exchange is being subscribed to. This property is ignored when consuming from a queue. See the guide for more information on this property’s usage.
8.2.3. connection
Sets which connection should be used to consume messages from.
static rabbitConfig = [
queue: "foobar",
connection: "server1"
]
The connection property should be used in multi-server configurations to specify which connection should be used to consume messages from. If the connection property is omitted, the default connection will be used.
8.2.4. consumers
Set the number of concurrent consumers the message consumer should start.
static rabbitConfig = [
queue: "example.queue",
consumers: 10
]
By default, a message consumer class will only start one consumer. This means the consumer can only handle one message at a time. Authors can increase the number of concurrent consumers are running by increasing the value of this property.
8.2.5. convert
Sets the automatic message body conversion mode.
static rabbitConfig = [
queue: "example.queue",
convert: MessageConvertMethod.ALWAYS
]
There are 3 convert modes:
Enum Value |
Effect |
|
The message is never automatically converted. The message handler always receives a byte array. |
|
The message will only be automatically converted if the incoming message has the content-type property set and a matching converter is found based on that content-type. |
|
The message will be automatically converted as long as a suitable message converter and message handler are found. This is the default mode. |
8.2.6. exchange
Set the exchange to subscribe to.
static rabbitConfig = [
exchange: "example.exchange"
]
Tells the plugin that the consumer should subscribe to an exchange. The exchange must already exist for the consumer to beginn listening to it. Exchanges can be created externally from the application, or via the application’s RabbitMQ configuration. Note that a routing key might be necessary depending on the type of exchange is specified.
8.2.7. match
Set the match criteria to use when subscribing to a headers exchange.
static rabbitConfig = [
exchange: "example.exchange",
binding: ["foo": "bar"],
match: "any"
]
The match property determines with any one header must match or whether all headers must match for the queue’s binding. This property is only used when binding to a headers exchange.
8.2.8. prefetchCount
Sets number of messages the consumer will prefetch.
static rabbitConfig = [
queue: "example.queue",
prefetchCount: 1
]
Sets the QOS prefetch count property of the consumer’s channel. By default, the consumer will fetch one message at a time. Increasing this value will increase the number of messages the consumer will queue up before it processes them.
8.2.9. queue
Set the queue to consume from.
static rabbitConfig = [
queue: "example.queue"
]
Tells the plugin that the consumer should consume a specific queue. The queue must already exist for the consumer to beging listening to it. Queues can be created externally from the application, or via the application’s RabbitMQ configuration.
8.2.10. retry
Sets whether a rejected message should be redelivered.
static rabbitConfig = [
queue: "example.queue",
retry: false
]
If a message is rejected, this property is used to determine whether the message should be marked for redelivery.
If a message is rejected because of an unhandled exception, for example, that will repeat every time the message is consumed, the message will be retried indefinitely if enabled. Be careful when using this feature. |
8.2.11. transacted
Sets whether automatic transactions should be enabled on the consumer.
static rabbitConfig = [
queue: "example.queue",
transacted: true
]
See the Transactions documentation for more information.
8.3. Rabbit Message Publisher
8.3.1. appId
Sets the appId property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
appId = "example"
body = "test message"
}
8.3.2. autoConvert
Toggles whether the message will be converted via the message converters when a response is received from an RPC message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
body = "test message"
autoConvert = true
}
If this value is true
, the same logic to convert messages in the message consumers is used to convert the received message from a byte array to a converted object type, if one exists.
If this is false
, the raw byte array is returned from the RPC response.
8.3.3. body
Body of the message to send to the RabbitMQ server.
rabbitMessagePublisher.send {
routingKey = "example.queue"
body = "test message"
}
The body is a required part of the message to transmit. The RabbitMQ server expects the message to be in a byte array format, but the plugin attempts to handle the conversion of objects for you via the Message Converters objects. You may assign any type of object to this property as long as there is a message converter available to convert the object to a byte array.
8.3.4. connection
Sets which connection should be used to send the message to.
rabbitMessagePublisher.send {
exchange = "example.exchange"
routingKey = "example.topic"
body = "message"
connection = "server1"
}
The connection property is used to specify which server to send a message with in a multi-server configuration. If the connection is omitted, the default connection will be used.
8.3.5. contentEncoding
Sets the content encoding property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
contentEncoding = "base64"
body = "test message"
}
8.3.6. contentType
Sets the content-type property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
contentType = "text/plain"
body = "test message"
}
8.3.7. correlationId
Sets the application correlation ID of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
correlationId = "1234"
body = "test message"
}
8.3.8. deliveryMode
Sets the delivery mode of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
deliveryMode = 1
body = "test message"
}
Values are either Non-persistent (1
) or persistent (2
).
8.3.9. exchange
Define the exchange to publish a message to.
rabbitMessagePublisher.send {
exchange = "example.exchange"
body = "message"
}
Setting this property lets the rabbitMessagePublisher
know to publish the message to an exchange. Depending on the type of exchange, the use of the routingKey
property may be necessary. See the RabbitMQ documentation for more information on exchanges.
A publish operation with the rabbitMessagePublisher may only be done with either an exchange or a queue, but not both. Attempting to use both will result in an error.
|
8.3.10. expiration
Sets the expiration property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
expiration = "Tuesday, October 22nd 2013"
body = "test message"
}
8.3.11. headers
Attached headers to send with the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
headers = [
"header1": "foo",
"header2": "bar"
}
body = "test message"
}
The headers property is simply a map with key/value pairs for the header names and their values.
8.3.12. messageId
Sets the messageId property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
messageId = "1234"
body = "test message"
}
8.3.13. priority
Sets the priority of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
priority = 5
body = "test message"
}
Message priority is an integer with values from 0
to 9
.
8.3.14. replyTo
Sets the replyTo queue name.
rabbitMessagePublisher.send {
routingKey = "example.queue"
replyTo = "reply.queue"
body = "test message"
}
Setting the replyTo
property of the message lets the consumer know that the publisher expects a response to the message. The value of this property should be an existent queue the client is consuming on.
This property is automatically set when the RabbitMessagePublisher.rpc() method is used, and the consumer has access to this property to reply to the message.
|
8.3.15. routingKey
The routing key is used in conjunction with an exchange to publish a message.
rabbitMessagePublisher.send {
exchange = "example.exchange"
routingKey = "example.topic"
body = "message"
}
The routing key is useful when publishing messages to topic or direct exchanges. When used on direct exchanges, the routing key must match the binding a queue used to bind to an exchange, or the message will become unroutable (and possibly lost). See the RabbitMQ documentation for more information on routing keys and exchanges.
The exchange
can be omitted, in which case the routingKey
can be used to send a message directly to a queue.
8.3.16. timeout
Sets the amount of time, in milliseconds, an RPC message will wait before giving up.
rabbitMessagePublisher.send {
routingKey = "example.queue"
timeout = 20 * 60 * 1000 // 20 seconds
body = "message"
}
The timeout is important when sending RPC messages. A server may fail and not reply to the message, and the client must not wait until eternity for the message to come back. The default timeout, if not specified, is 5 seconds. If the client does wish to wait indefinitely, setting this value to 0 will cause the RPC call to wait until a reply is received.
8.3.17. timestamp
Sets the timestamp property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
timestamp = Calendar.getInstance()
body = "test message"
}
8.3.18. type
Sets the type property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
type = "type"
body = "test message"
}
8.3.19. userId
Sets the userId property of the message.
rabbitMessagePublisher.send {
routingKey = "example.queue"
userId = "foobar"
body = "test message"
}
9. Changelog
9.1. 3.5.x
9.1.1. Version 3.5.1
-
Fix another instance of
getCharSet
.
9.1.2. Version 3.5.0
-
Update to work with Grails 3.3 (or better).
-
Add
Order
andOrdered
support to message converters.
9.2. 3.4.x
9.2.1. Version
9.2.2. Version 3.4.6
-
Add some error handling around flushing and destroying the persistence context interceptor. Was unable to reproduce the error, but this is an attempt to fix a zombie Hibernate session bug.
9.2.3. Version 3.4.5
-
Added rabbitmq prefix, to avoid conflicts with environment variables. (#130)
9.2.4. Version 3.4.4
-
Move the
TypeConvertertingMapConverter
to a higher converting priority thanJsonConverter
.
9.2.5. Version 3.4.3
-
Add top-level exception handling for the internal message handler implementation. This fixes an issue where Hibernate exceptions closed RabbitMQ channels.
-
Use a different
MimeType
constructor with built-in message converters to maintain Grails 3.1.x support.
9.2.6. Version 3.4.2
-
Add configuration option
rabbitmq.enableSerializableConverter
, which defaults tofalse
. This breaks behavior with the previous 3.4.x releases but brings it back inline with the behavior of previous versions. This was done due to unexpected serialization issues with embedded lazy maps returned from theJsonSlurper
.
9.2.7. Version 3.4.1
-
Change from using Spring’s
MimeType.getCharset()
toMimeType.getCharSet()
. The latter is deprecated but using it allows Grails 3.1.x to function correctly.
9.2.8. Version 3.4.0
-
Remove support for legacy connection configuration (
rabbitmq.connection
). -
Update RabbitMQ Client library to 4.2.0.
-
Add support for exporting RabbitMQ metrics.
-
Major refactor of the message converter system. These changes are not backwards compatible.
-
Major refactor of the message consumer backend system. Existing consumers are compatible.
-
Migrate documentation to AsciiDoc.
-
Introduce new interfaces for message consumers (
UnsupportedMessageHandler
andMessageConsumerEventHandler
). -
Added Spring application events for RabbitMQ plugin system start and stop events.
9.3. 3.3.x
9.3.1. Version 3.3.3
-
Upgrade codebase to Grails 3.2.
-
Add ability to bind exchanges to exchanges.
-
Migrate from
Calendar
toOffsetDateTime
for message timestamps. -
Update amqp-client to version 3.6.6. (#83)
9.3.2. Version 3.3.2
-
Fix message converter/consumer handler issue where message handler methods with primitive types (such as
int
) would not accept the equivalent boxed type (such asInteger
).
9.3.3. Version 3.3.1
-
Reintroduce support for consumer handler methods taking only the
MessageContext
as a parameter.
9.3.4. Version 3.3.0
9.3.5. Version
This version includes changes to the configuration format introduced in 3.2.0. There were issues with YAML configuration and using the keys as the names of connection, queues, or exchanges where those names had periods in them. There was no way to escape them, and it broke the configuration parsing code. Due to this, the format had to change. This decision was not made lightly and I intend on this being the last change to the configuration format. ==== Version * Changed the configuration format introduced in 3.2.0. This breaks backward compatibility with that version.
9.4. 3.2.x
9.4.1. Version 3.2.0
A huge thanks to Ollie Freeman for his work on this release. |
-
First non-beta release of the plugin for Grails 3.
-
Deprecate closure-based queue/exchange configuration.
-
Add new map-based configuration that works well with YAML.
-
Removed
RabbitMessageBuilder
.
9.5. 3.1.x
All versions below refer to the Grails 2.x plugin. The Grails 3.x plugin forked from the Grails 2.x version after release 3.1.3. |
9.5.1. Version 3.1.3
-
Add status reports that provides information about all connections and their consumers, including load statistics.
-
Upgrade project to Grails 2.5.4.
-
Remove gpars dependency.
-
Make Grails 2.3 the minimum version of Grails this plugin is intended for.
-
Fix consumer configuration from application config parsing issue (#73).
-
Refactor message conversion for incoming messages so that conversion to a type only happens if an appropriate handler exists.
9.5.2. Version 3.1.2-beta
-
Experimental upgrade to Grails 3.
9.5.3. Version 3.1.2
-
Added graceful shutdown support. See
rabbitContext.shutdown()
. -
Added methods to check running state on most managers and contexts.
-
Updated rabbitmq Java library to 3.5.4.
-
Added the
gpars
plugin as a dependency.
9.5.4. Version 3.1.1
-
Refactored the code to load a consumer’s configuration from a static variable so that it works correctly when the consumer is annotated with
@Transactional
. (#55) -
Add setter methods for the message TTL (
expiration
). (#56) -
Fix bug where missing connection configuration values do not allow the use of default values.
-
Remove checked exception from ConsumerManageImpl that does not exist in its interface. (#59)
9.5.5. Version 3.1.0
-
Update the RabbitMQ Client Java library to 3.5.0.
-
Fix an issue that caused unclean shutdowns when redeploying an application using the plugin. (#54)
-
Added the ability to start and stop individual connections. (#49)
-
Added the ability to start and stop individual consumers. (#49)
-
Added the ability to start and stop consumers based on the connection they’re tied to. (#49)
-
Moved consumer adapter storage from the connection context to the consumer manager.
-
Handle
Throwable
types that were not being handled before in the consumer handling so that channels are not closed if one of the unhandled errors occurs. (#47) -
Added travis-ci continuous integration for all commits to the plugin.
9.6. 3.0.x
9.6.1. Version 3.0.4
-
Fix a null pointer exception when a consumer has no configuration.
-
Add a unit test to test behavior when a consumer has no configuration.
-
Add an integration test to test behavior when sending a message directly to a queue.
9.6.2. Version 3.0.3
-
Introduced the
rabbitMessagePublisher
bean to replace theRabbitMessageBuilder
. -
Deprecated the
RabbitMessageBuilder
. -
Massive refactor of the internals of the plugin. See the upgrading page for more detailed information about what has changed.
-
Added the ability to configure consumers centrally in the application’s configuration file (thanks Erwan Arzur).
-
Updated RabbitMQ library version to 3.4.3.
9.6.3. Version 3.0.2
-
Internal release, see 3.0.3.
9.6.4. Version 3.0.1
-
Internal release, see 3.0.3.
9.6.5. Version 3.0.0
-
Internal Release, see 3.0.3.
9.7. 2.0.x
9.7.1. Version 2.0.10
-
Fix bug with converters that prevented converters later in the processing list from executing if another convert is unable to marshall data from bytes.
-
Add
enabled
flag to the configuration. If false, completely disables the plugin from starting.
9.7.2. Version 2.0.9
-
Additional fix for memory leak associated with RPC calls and auto-recovering connections.
9.7.3. Version 2.0.8
-
Fix bug introduced by rushing the previous fix. Mark consuming = true.
9.7.4. Version 2.0.7
-
Add
basicCancel()
toRabbitMessageBuilder
in an attempt to address a memory leak. -
Improve cleaning up of resources in RPC calls.
9.7.5. Version 2.0.6
-
Updated copyright notices.
-
Added GString message converter.
-
Updated publishing guide docs to make RabbitMessageBuilder usage more clear (thanks marcDeSantis @GitHub).
9.7.6. Version 2.0.5
-
Added heartbeat configuration for connections (thanks LuisMuniz @GitHub).
-
Refactored Hibernate session support so that Hibernate is no longer a dependency of the plugin, and will now work with or without Hibernate present.
9.7.7. Version 2.0.4
-
Added multi-server support to all aspects of the plugin.
-
Added SSL support for connections.
-
Added auto-reconnect support for dropped connections.
-
Added logic to wrap a Hibernate session around calls to consumers.
-
Updated the RabbitMQ library to version 3.3.0.
-
Added logging for connection/channel reconnects and channel shutdowns.
-
Changed format for connection configurations. The old style is still supported, but will likely be removed at some point.
9.8. 1.0.x
9.8.1. Version 1.0.3
-
Modified the logic to check for the existence of callbacks in consumers.
9.8.2. Version 1.0.2
-
Added a cached thread pool so the user does not need to account for the number of threads consumers require. Set the default to 0 so that this is the default.
-
Added callbacks for messages: onReceive, onSuccess, onFailure, and onComplete.
9.8.3. Version 1.0.1
-
Remove the maven group from the plugin definition class.
9.8.4. Version 1.0.0
-
Version bump for general release.
9.9. 0.2.x
9.9.1. Version 0.2.1
-
Fixed a bug with the message handler discovery method that caused generically-typed handlers to get called incorrectly.
9.9.2. Version 0.2.0
-
Refactored queue/exchange configuration. It is now possible configure queue binding without having to also configure the exchange being bound to.
-
Added the
match
property to queue configuration to support headers exchange binding. This breaks backwards compatibility. -
Renaming the
routingKey
property of the consumer configuration tobinding
to match queue configuration. This breaks backwards compatibility.
9.10. 0.1.x
9.10.1. Version 0.1.8
-
Moved the trigger to start consumers on application launch to the bootstrap.
9.10.2. Version 0.1.7
-
Added the
prefetchCount
option to the consumer configuration. Defaults to 1. -
Added the
threads
option to the connection configuration. Defaults to 5.
9.10.3. Version 0.1.6
-
Fixed logic to determine if a consumer is valid.
-
Added support for short-form handlers that only take a single parameter.
9.10.4. Version 0.1.5
-
body
parameter to theRabbitMessageBuilder
is no longer required. It now defaults to an empty byte array.
9.10.5. Version 0.1.4
-
Fix a class visibility issue in the artefact handlers for this plugin.
9.10.6. Version 0.1.3
-
Touch up the consumer template.
9.10.7. Version 0.1.2
-
Add the ability to create multiple consumers at the same time with the
create-consumer
script (thanks Aaron Brown!). -
Also create a unit test when creating consumers (thanks Michael Rice!).
9.10.8. Version 0.1.1
-
Throw an exception if the connection configuration is missing on application start (thanks Michael Rice!).
-
Add the
create-consumer
script (thanks Aaron Brown!).
9.10.9. Version 0.1
-
Code complete/experimental release.