Skip to content

Kafka

Browse implementations

Target Configuration

For all actions, a target should be defined and have a name and a valid url.

Kafka target example
1
2
3
4
{
  "name": "my_kafka_target"
  "url": "tcp://localhost:60962"
}
Kafka target example
{
    "name":"KAFKA",
    "url":"tcp://kafka.server.fr:9095",
    "properties":[
        {
        "key":"ssl.keystore.location",
        "value":"/keystores/keys.jks"
        },
        {
        "key":"security.protocol",
        "value":"SSL"
        },
        {
        "key":"ssl.keystore.password",
        "value":"password"
        },
        {
        "key":"ssl.truststore.password",
        "value":"password"
        },
        {
        "key":"ssl.truststore.location",
        "value":"/truststores/trust.jks"
        },
        {
        "key":"auto.offset.reset",
        "value":"earliest"
        },
        {
        "key":"enable.auto.commit",
        "value":"true"
        }
    ]
}

Publish🔗

Use this action to publish a message to a Kafka topic.

Required Name Type Description
* target String Kafka target name
* topic String Topic where the message will be published
* headers Map <String, String> Headers to be sent with the request
* payload String Message to be published
properties Map <String, String> Configurations for Kafka producer.
Name Type Description
payload String Sent message
headers String Headers to be sent with the request

Example🔗

KafkaBasicPublishTask(
    target = "my_kafka_target",
    topic = "my.queue",
    headers = mapOf(
        "contentType" to "application/json",
        "season" to "1"
    ),
    payload = """
                {
                  "title": "Castle in the Sky",
                  "director": "Hayao Miyazaki",
                  "rating": 78
                }
               """.trimIndent()
)

Consume🔗

Use this action to consume a message from a Kafka topic.

Required Name Type Default Description
* target String Kafka target name
* topic String Topic from where the message will be consumed
* group String Group id of the consumer
properties Map <String, String> Configurations for Kafka producer.
header-selector String Consume only messages whose headers match this selector. Selector must be a json paths. The root node is message's headers
selector String Consume only messages whose headers or payload match this selector. Selector must be json paths or xml paths. The root node is the whole message.
nb-messages Integer 1 How many messages to be consumed
content-type String application/json To be consumed message's content type
timeout Duration 60 sec Listening time on the topic
ackMode AckMode target's ackMode, else BATCH The offset commit behavior
Name Type Description
body List<Map<String,Object>> Consumed messages
payloads List<String> Consumed messages payloads
headers List<String> Consumed messages headers

Example🔗

1
2
3
4
5
6
7
8
9
KafkaBasicConsumeAction(
    target = "my_kafka_target",
    topic = "my.queue",
    group= "my.group",
    timeout= "10 sec",
    selector= "\$..[?(\$.payload.title==\"Castle in the Sky\")]",
    headerSelector= "\$..[?(\$.season=='1')]",
    contentType= "application/json"
)