RocketMQ source connector
Source connector for Apache RocketMQ.
name | type | required | default value |
---|---|---|---|
topics | String | yes | - |
name.srv.addr | String | yes | - |
acl.enabled | Boolean | no | false |
access.key | String | no | |
secret.key | String | no | |
batch.size | int | no | 100 |
consumer.group | String | no | SeaTunnel-Consumer-Group |
commit.on.checkpoint | Boolean | no | true |
schema | no | - | |
format | String | no | json |
field.delimiter | String | no | , |
start.mode | String | no | CONSUME_FROM_GROUP_OFFSETS |
start.mode.offsets | no | ||
start.mode.timestamp | Long | no | |
partition.discovery.interval.millis | long | no | -1 |
common-options | config | no | - |
RocketMQ topic
name. If there are multiple topics
, use ,
to split, for example: "tpc1,tpc2"
.
RocketMQ
name server cluster address.
RocketMQ consumer group id
, used to distinguish different consumer groups.
If true, access control is enabled, and access key and secret key need to be configured.
When ACL_ENABLED is true, access key cannot be empty.
When ACL_ENABLED is true, secret key cannot be empty.
RocketMQ
consumer pull batch size
If true the consumer's offset will be periodically committed in the background.
The interval for dynamically discovering topics and partitions.
The structure of the data, including field names and field types.
Data format. The default format is json. Optional text format. The default field separator is ", ". If you customize the delimiter, add the "field.delimiter" option.
Customize the field delimiter for data format.
The initial consumption pattern of consumers,there are several types: [CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP] ,[CONSUME_FROM_SPECIFIC_OFFSETS]
The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".
The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
for example:
start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}
Source plugin common parameters, please refer to Source Common Options for details.
source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test-topic-002"
consumer.group = "consumer-group"
parallelism = 2
batch.size = 20
schema = {
fields {
age = int
name = string
}
}
start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
start.mode.offsets = {
test-topic-002-0 = 20
}
}
}