Connector is used to read from a RabbitMQ Queue or Topic.
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
# Set these required values
rabbitmq.queue=
kafka.topic=
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
kafka.topic | Kafka topic to write the messages to. | string | high | ||
rabbitmq.queue | rabbitmq.queue | list | high | ||
rabbitmq.host | The RabbitMQ host to connect to. See ConnectionFactory.setHost(java.lang.String) | string | localhost | high | |
rabbitmq.password | The password to authenticate to RabbitMQ with. See ConnectionFactory.setPassword(java.lang.String) | string | guest | high | |
rabbitmq.username | The username to authenticate to RabbitMQ with. See ConnectionFactory.setUsername(java.lang.String) | string | guest | high | |
rabbitmq.virtual.host | The virtual host to use when connecting to the broker. See ConnectionFactory.setVirtualHost(java.lang.String) | string | / | high | |
rabbitmq.port | The RabbitMQ port to connect to. See ConnectionFactory.setPort(int) | int | 5672 | medium | |
rabbitmq.prefetch.count | Maximum number of messages that the server will deliver, 0 if unlimited. See Channel.basicQos(int, boolean) | int | 0 | medium | |
rabbitmq.prefetch.global | True if the settings should be applied to the entire channel rather than each consumer. See Channel.basicQos(int, boolean) | boolean | false | medium | |
rabbitmq.automatic.recovery.enabled | Enables or disables automatic connection recovery. See ConnectionFactory.setAutomaticRecoveryEnabled(boolean) | boolean | true | low | |
rabbitmq.connection.timeout.ms | Connection TCP establishment timeout in milliseconds. zero for infinite. See ConnectionFactory.setConnectionTimeout(int) | int | 60000 | low | |
rabbitmq.handshake.timeout.ms | The AMQP0-9-1 protocol handshake timeout, in milliseconds. See ConnectionFactory.setHandshakeTimeout(int) | int | 10000 | low | |
rabbitmq.network.recovery.interval.ms | See ConnectionFactory.setNetworkRecoveryInterval(long) | int | 10000 | low | |
rabbitmq.requested.channel.max | Initially requested maximum channel number. Zero for unlimited. See ConnectionFactory.setRequestedChannelMax(int) | int | 0 | low | |
rabbitmq.requested.frame.max | Initially requested maximum frame size, in octets. Zero for unlimited. See ConnectionFactory.setRequestedFrameMax(int) | int | 0 | low | |
rabbitmq.requested.heartbeat.seconds | Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval. If server heartbeat timeout is configured to a non-zero value, this method can only be used to lower the value; otherwise any value provided by the client will be used. See ConnectionFactory.setRequestedHeartbeat(int) | int | 60 | low | |
rabbitmq.shutdown.timeout.ms | Set the shutdown timeout. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. See ConnectionFactory.setShutdownTimeout(int) | int | 10000 | low | |
rabbitmq.topology.recovery.enabled | Enables or disables topology recovery. See ConnectionFactory.setTopologyRecoveryEnabled(boolean) | boolean | true | low |
Used to store the value of a header value. The type
field stores the type of the data and the corresponding field to read the data from.
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
type | false | String | Used to define the type for the HeaderValue. This will define the corresponding field which will contain the value in it's original type. | |
timestamp | true | Timestamp | Storage for when the type field is set to timestamp . Null otherwise. |
|
int8 | true | Int8 | Storage for when the type field is set to int8 . Null otherwise. |
|
int16 | true | Int16 | Storage for when the type field is set to int16 . Null otherwise. |
|
int32 | true | Int32 | Storage for when the type field is set to int32 . Null otherwise. |
|
int64 | true | Int64 | Storage for when the type field is set to int64 . Null otherwise. |
|
float32 | true | Float32 | Storage for when the type field is set to float32 . Null otherwise. |
|
float64 | true | Float64 | Storage for when the type field is set to float64 . Null otherwise. |
|
boolean | true | Boolean | Storage for when the type field is set to boolean . Null otherwise. |
|
string | true | String | Storage for when the type field is set to string . Null otherwise. |
|
bytes | true | Bytes | Storage for when the type field is set to bytes . Null otherwise. |
Encapsulates a group of parameters used for AMQP's Basic methods. See Envelope
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
deliveryTag | false | Int64 | The delivery tag included in this parameter envelope. See Envelope.getDeliveryTag() | |
isRedeliver | false | Boolean | The redelivery flag included in this parameter envelope. See Envelope.isRedeliver() | |
exchange | true | String | The name of the exchange included in this parameter envelope. See Envelope.getExchange() | |
routingKey | true | String | The routing key included in this parameter envelope. See Envelope.getRoutingKey() |
Corresponds to the BasicProperties
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
contentType | true | String | The value in the contentType field. See BasicProperties.getContentType() | |
contentEncoding | true | String | The value in the contentEncoding field. See BasicProperties.getContentEncoding() | |
headers | false | Map of <String, com.github.jcustenborder.kafka.connect.rabbitmq.BasicProperties.HeaderValue> | ||
deliveryMode | true | Int32 | The value in the deliveryMode field. BasicProperties.html.getDeliveryMode() | |
priority | true | Int32 | The value in the priority field. BasicProperties.getPriority() | |
correlationId | true | String | The value in the correlationId field. See BasicProperties.getCorrelationId() | |
replyTo | true | String | The value in the replyTo field. BasicProperties.getReplyTo() | |
expiration | true | String | The value in the expiration field. See BasicProperties.getExpiration() | |
messageId | true | String | The value in the messageId field. BasicProperties.getMessageId() | |
timestamp | true | Timestamp | The value in the timestamp field. BasicProperties.getTimestamp() | |
type | true | String | The value in the type field. BasicProperties.getType() | |
userId | true | String | The value in the userId field. BasicProperties.getUserId() | |
appId | true | String | The value in the appId field. BasicProperties.getAppId() |
Message as it is delivered to the RabbitMQ Consumer
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
consumerTag | false | String | The consumer tag associated with the consumer | |
envelope | false | com.github.jcustenborder.kafka.connect.rabbitmq.Envelope | Encapsulates a group of parameters used for AMQP's Basic methods. See Envelope | |
basicProperties | false | com.github.jcustenborder.kafka.connect.rabbitmq.BasicProperties | Corresponds to the BasicProperties | |
body | false | Bytes | The value body (opaque, client-specific byte array) |
Key used for partition assignment in Kafka.
Name | Optional | Schema | Default Value | Documentation |
---|---|---|---|---|
messageId | true | String | The value in the messageId field. BasicProperties.getMessageId() |