Skip to content

AMQP

Browse implementations

Target Configuration

For all actions, a AMQP target should be defined and have a name and a valid url.

Rabbitmq target
{
    "name":"RABBITMQ_TARGET",
    "url":"amqp://localhost:5672",
    "properties": [
        {
            "key":"password",
            "value":"admin"
        },
        {
            "key":"username",
            "value":"admin"
        }
    ]
}

Basic publish🔗

Use this action to publish a message to an exchange.

Required Name Type Description
* exchange-name String The exchange to publish the message to. Must exist
routing-key String The routing key. See this example for more details.
headers Map<String, Object> Message headers
properties Map<String, String> Other message's properties. Actually only content_type property is handled.
* payload String Message content
Name Type Description
payload String Message content
headers String Message headers

Example🔗

    AmqpBasicPublishAction(
        target = "RABBITMQ_TARGET",
        exchangeName = "my.exchange",
        routingKey = "children.fiction",
        headers = mapOf(
            "season" to "1",
        ),
        properties = mapOf(
            "content_type" to "application/json",
        ),
        payload = """
                {
                "title": "Castle in the Sky",
                "director": "Hayao Miyazaki",
                "rating": 78,
                "category": "fiction"
                }
            """.trimIndent(),
    )

Basic consume🔗

Use this action to consume messages from a queue.

Parallel consume

  • Only one queue consumer can be started at a given time.
  • To start a queue consumer, Chutney:
    • check if an other consumer was started:
      • if true, it re check the queue availability every 500 ms without exceeding timeout duration. At every iteration, the remaining timeout is reduced by 500 ms.
      • else, it mark the queue as locked and start the current consumer.
    • consume messages without exceeding the remaining timeout ( = timeout - n * 500ms) or the nb-messages.
    • stop consumer and unlock the queue.

Don't use long timeout. In parallel execution, it makes execution slower and it can fails other executions. Prefer a retry strategy with short timeOut to allow different execution to access to the queue. For example : instead of 5 min timeout at action level, prefer adding a RetryTimeOutStrategy("5 min" (timeout), "1 s" (delay))) to the step

Required Name Type Default Description
* queue-name string Queue name.
nb-messages integer 1 How many messages to be consumed. Throw error if got messages number is less than nb-messages.
selector string
timeout duration "10 sec" In how many time a consume connection must be established and messages must be read
ack boolean true Basic.ack acknowledgements mode is used if true.
Name Type Description
body String response as Map
payloads String response paylods
headers String response headers

Example🔗

1
2
3
4
5
6
7
8
AmqpBasicConsumeAction(
    target = "RABBITMQ_TARGET",
    queueName = "my.queue",
    nbMessages = 1,
    selector = "\$..[?(\$.headers.season=='1')]",
    timeout = "5 sec",
    ack = true
)
1
2
3
4
5
6
7
8
Step("Long basic consume", RetryTimeOutStrategy("5 min", "1 s")) {
    AmqpBasicConsumeAction(
        target = "RABBITMQ_TARGET",
        queueName = "my.queue",
        nbMessages = 1,
        selector = "\$..[?(\$.headers.season=='1')]"
    )
}

Basic get🔗

Use this action to have a direct access to available messages in a queue.

Required Name Type Description
* queue-name String Queue name.
Name Type Description
message String response as Map
body String response body
headers String response headers

Example🔗

1
2
3
4
AmqpBasicGetAction(
    target ="RABBITMQ_TARGET",
    queueName = "my.queue"
)

Clean queues🔗

Use this action to purges the content of the given queues. For example, it can be used at the beginning of your scenario to ensure that used queues are empty.

Warning

Be careful when cleaning a queue which is shared between many scenarios.

Required Name Type Description
* queue-names List<String> to be burged queues names

No output

Example🔗

1
2
3
4
5
6
7
AmqpCleanQueuesAction(
    target = "RABBITMQ_TARGET",
    queueNames = listOf(
        "my.queue",
        "my.other.queue"
    )
)

Create and bind temporary queue🔗

Use this action to create a temporary queue and bind it to an existing exchange using a routing key.

Required Name Type Default Description
* exchange-name String Exchange name
routing-key String "queue-name" The routing key to use for the binding. See this example for more details.
* queue-name String Queue name
Name Type Description
queueName String Created queue name

Example🔗

1
2
3
4
5
6
AmqpCreateBoundTemporaryQueueAction(
    target = "RABBITMQ_TARGET",
    exchangeName = "my.exchange",
    queueName = "my.queue",
    routingKey = "children.*"
)

Info

At the end of the scenario execution, the created binding and queue will be automatically deleted respectively by amqp-unbind-queue and amqp-delete-queue final actions.

Unbind queue🔗

Use this action to delete a binding between exchange and queue.

Required Name Type Description
* queue-name String Queue name
exchange-name String Exchange name
routing-key String The routing key used for the binding.

Example🔗

1
2
3
4
5
6
AmqpUnbindQueueAction(
target = "RABBITMQ_TARGET",
queueName = "my.queue",
exchangeName = "my.exchange",
routingKey = "children.*"
)

Delete queue🔗

Use this action to delete an existing queue without regard for whether it is in use or has messages on it.

Required Name Type Description
* queue-name String Queue name

No output

Example🔗

1
2
3
4
AmqpDeleteQueueAction(
    target = "RABBITMQ_TARGET",
    queueName = "my.queue"
)