Kafka
Target Configuration
For all actions, a target should be defined and have a name
and a valid url
.
Publish🔗
!!! info "Browse implementation{: target="_blank"}"
Use this action to publish a message to a Kafka topic.
Required | Name | Type | Description |
---|---|---|---|
* | target |
String | Kafka target name |
* | topic |
String | Topic where the message will be published |
* | headers |
Map <String, String> | Headers to be sent with the request |
* | payload |
String | Message to be published |
properties |
Map <String, String> | Configurations for Kafka producer. Override target properties. | |
key |
String | The key that will be included in the record |
Name | Type | Description |
---|---|---|
payload |
String | Sent message |
headers |
String | Headers sent with the request |
Example🔗
Consume🔗
!!! info "Browse implementation{: target="_blank"}"
Use this action to consume a message from a Kafka topic.
Required | Name | Type | Default | Description |
---|---|---|---|---|
* | target |
String | Kafka target name | |
* | topic |
String | Topic from where the message will be consumed | |
* | group |
String | Group id of the consumer (override target or properties configuration) | |
properties |
Map <String, String> | Configurations for Kafka consumer. Override target properties. | ||
header-selector |
String | Consume only messages whose headers match this selector. Selector must be a json paths. The root node is message's headers | ||
selector |
String | Consume only messages whose headers or payload match this selector for json mime type, selector must be json paths with the root node as the whole message. Consume only messages whose payload match this selector for xml mime type, selector must be xml paths with the root node as the payload. Ignored for bytes array pauload. Otherwise, check that payload contains the given selector. | ||
nb-messages |
Integer | 1 | How many messages to be consumed | |
content-type |
String | application/json |
To be consumed message's content type | |
timeout |
Duration (String) | 60 sec |
Listening time on the topic | |
ackMode |
AckMode | target's ackMode, else BATCH |
The offset commit behavior | |
reset-offset |
Boolean | false | Consume from the first offset for each partitions |
Name | Type | Description |
---|---|---|
body |
List<Map<String,Object>> | Consumed messages |
payloads |
List<String> | Consumed messages payloads |
headers |
List<String> | Consumed messages headers |
keys |
List<Object> | Consumed messages keys |