Skip to content

Kafka

Browse implementations

Target Configuration

For all actions, a target should be defined and have a name and a valid url.

Kafka target example
1
2
3
4
{
  "name": "my_kafka_target"
  "url": "tcp://localhost:60962"
}
Kafka target example
{
    "name":"KAFKA",
    "url":"tcp://kafka.server.fr:9095",
    "properties":[
        {
        "key":"ssl.keystore.location",
        "value":"/keystores/keys.jks"
        },
        {
        "key":"security.protocol",
        "value":"SSL"
        },
        {
        "key":"ssl.keystore.password",
        "value":"password"
        },
        {
        "key":"ssl.truststore.password",
        "value":"password"
        },
        {
        "key":"ssl.truststore.location",
        "value":"/truststores/trust.jks"
        },
        {
        "key":"auto.offset.reset",
        "value":"earliest"
        },
        {
        "key":"enable.auto.commit",
        "value":"true"
        }
    ]
}

Publish🔗

Use this action to publish a message to a Kafka topic.

Required Name Type Description
* target String Kafka target name
* topic String Topic where the message will be published
* headers Map <String, String> Headers to be sent with the request
* payload String Message to be published
properties Map <String, String> Configurations for Kafka producer
key String The key that will be included in the record
Name Type Description
payload String Sent message
headers String Headers sent with the request

Example🔗

KafkaBasicPublishTask(
    target = "my_kafka_target",
    topic = "my.queue",
    headers = mapOf(
        "contentType" to "application/json",
        "season" to "1"
    ),
    payload = """
                {
                  "title": "Castle in the Sky",
                  "director": "Hayao Miyazaki",
                  "rating": 78
                }
               """.trimIndent()
)

Consume🔗

Use this action to consume a message from a Kafka topic.

Required Name Type Default Description
* target String Kafka target name
* topic String Topic from where the message will be consumed
* group String Group id of the consumer
properties Map <String, String> Configurations for Kafka producer.
header-selector String Consume only messages whose headers match this selector. Selector must be a json paths. The root node is message's headers
selector String Consume only messages whose headers or payload match this selector. Selector must be json paths or xml paths. The root node is the whole message.
nb-messages Integer 1 How many messages to be consumed
content-type String application/json To be consumed message's content type
timeout Duration (String) 60 sec Listening time on the topic
ackMode AckMode target's ackMode, else BATCH The offset commit behavior
reset-offset Boolean false Consume from the first offset for each partitions
Name Type Description
body List<Map<String,Object>> Consumed messages
payloads List<String> Consumed messages payloads
headers List<String> Consumed messages headers
keys List<Object> Consumed messages keys

Example🔗

1
2
3
4
5
6
7
8
9
KafkaBasicConsumeAction(
    target = "my_kafka_target",
    topic = "my.queue",
    group= "my.group",
    timeout= "10 sec",
    selector= "\$..[?(\$.payload.title==\"Castle in the Sky\")]",
    headerSelector= "\$..[?(\$.season=='1')]",
    contentType= "application/json"
)