Overview
The Message Queue Task Plus is an SSIS Control Flow task for interaction with message queues/message brokers (currently Kafka Connection Manager, RabbitMQ Connection Manager and AMQP 1.0 Connection Manager are supported).
Quick Start
In this section we will show you how to set up an Message Queue Tas Plus to receive a message.
- Before you begin, configure a Message Queue Task Plus compatible connection manger (e.g. Kafka Connection manager ) and a connection manager for the local file.
- Drag the Message Queue Task Plus from the SSIS Toolbox to the Control Flow canvas.
- Double-click on the task on the canvas to open the task editor.
- Once the task editor opens, select the Action you wish to perform (Send message, Receive message). In this example, we will choose Receive message.
- For the Destination Type parameter, choose File and for the Target Message File parameter choose either a Flat File or a File connection manager. In this example we’re using a File connection manager named IncomingMessageFile which points at a local path (C:....\Diane's Files\IncomingMessageFile.dat).
- For the Connection parameter, choose the Kafka connection manager you already set up.
- For the Consumer Group ID parameter, specify the consumer group for the client. If a consumer group with such a name does not currently exist on the server, it will be created automatically.
- For the Topic(s) parameter, specify the topic(s) from which the message will be read. In this example, the topics set will be “mytopic1,mytopic2”.
- For the Overwrite Existing Message File parameter, chose "True".
- Click OK to close the task editor.
In this section we will show you how to set up an Message Queue Tas Plus to read all messages from a specific topics set.
- Before you begin, please configure a Message Queue Task Plus for Receive Message Action as shown in Configure Message Queue Task Plus for Receive Message action.
- From Variables canvas click Add Variable and create new variable of boolean type called IsMessageReceived. This variable will be set to True by the task if a new message is read.
- Drag For Loop Container from the SSIS Toolbox to the Control Flow canvas.
- Drag Message Queue Task Plus from Control Flow canvas to the For Loop Container.
Parameters
General
Use the General page of the Message Queue Task Plus Editor dialog to configure task common information parameters.
Specify the task name.
Specify the task description.
Message Queue
Use the Message Queue page of the Message Queue Task Plus Editor dialog to configure the parameters needed to communicate with a server.
Selects an existing Message Queue compatible connection manager, or click <New connection...> to create a new connection manager.
Related Topics:
Kafka Connection Manager
RabitMQ Connection Manager
AMQP 1.0 Connection ManagerSpecify the task action. This parameter has the options listed in the following table.
Option Description Send message Sends a message. Selecting this action displays the dynamic parameters: - Queue auto-create (for RABBITMQ connection only)
- Is queue durable (for RABBITMQ connection only)
- Is queue exclusive (for RABBITMQ connection only)
- Exchange auto-create (for RABBITMQ connection only)
- Is exchange durable (for RABBITMQ connection only)
- Exchange Type (for RABBITMQ connection only)
- Exchange (for RABBITMQ connection only)
- Routing Key (for RABBITMQ connection only)
- Message Source Type
- Key Source Type in Source category (for KAFKA connection only)
- Header Source Type in Source category (for KAFKA connection only)
Receive message Receives a message. Selecting this action displays the dynamic parameters - Consumer Group ID
- Auto offset reset policy
- Mark read messages as consumed
- Message Availability Variable in Destination category.
Confirm delivery Explicitly sends to the broker a confirmation of all unconfirmed messages from previous tasks of type Message Queue Task Plus that share the same connection. This mode can be used for delayed confirmation after successful post-reading message processing. The time (in seconds) that the task will try to send/receive a message before aborting the operation.
Specifies whether the task to send a confirmation of a successful read message to the broker. When using Kafka Connection Manager, this means updating the offset in the specified topics. With RabbitMQ Connection Manager, this means deleting the message from the queue.
This property is visible when Receive Message is selected from Action dropdown menu.
Option Description Peek Message will be received from the queue without deleting them or modifying the queue in any way. Messages read with this option can be subsequently confirmed by using of separate Message Queue Plus Task in Confirm delivery which have the same connection selected. Receive Messages And Confirm Received message will be marked as consumed ( KAFKA) or will be deleted from the queue ( RABBITMQ) . Specifies the content type of the message. This parameter has the options listed in the following table.
Option Description Binary The content of the message is represented as a byte array. Text The content of the message is represented as text. Selecting this action displays the dynamic parameters Message Encoding. Specifies the code page of the message. This parameter is visible when Text is selected from Message Content Type dropdown menu.
Kafka
Use the Kafka page of the Message Queue Task Plus Editor dialog to configure the Kafka-specific parameters needed to communicate with a server.
The properties described in this section will be visible when a KAFKA-type connection is selected.
Specifies the name of the consumer group a Kafka consumer belongs to. When the Kafka consumer is constructed and Consumer Group ID does not exist yet (i.e. there are no existing consumers that are part of the group), the consumer group will be created automatically.
This property is visible when Receive Message is selected from Action dropdown menu.
Specifies the topic(s) for the message (comma-separated). A Topic is a category/feed name to which records are stored and published. All Kafka records are organized into topics. Message Queue Task Plus configurated in Send Message mode write data to topics and consumer applications read from topics.
Additional settings for the message queue client. Valid syntax is "name-value1 [&name2=value2]...[&nameN=valueN]".
See more details at Kafka Configuration Reference.
RabbitMQ
Use the RabbitMQ page of the Message Queue Task Plus Editor dialog to configure the RabbitMQ-specific parameters needed to communicate with a server.
The properties described in this section will be visible when a RABBITMQ-type connection is selected.
A Queue is a category/feed name to which records are stored and published. Message Queue Task Plus configurated in Send Message mode write data to Queue and consumer applications read from Queue.
Specifies whether the queue with the specified name should be created if it does not currently exist.
This property is visible when Send Message is selected from Action dropdown menu.
Durable queues will be recovered on node boot, including messages in them published as persistent. Messages published as transient will be discarded during recovery, even if they were stored in durable queues.
This property is visible when Queue auto-create is set to True.
Specifies if the queue can only be used by its declaring connection. An exclusive queue will be deleted when its declaring connection is closed.
This property is visible when Queue auto-create is set to True.
Specifies whether the exchange with the specified name should be created if it does not currently exist.
Durable exchanges survive server restarts and last until they are explicitly deleted.
Specifies the type of the exchange for the message. Exchanges are AMQP 0-9-1 entities where messages are sent. Exchanges take a message and route it into zero or more queues. The routing algorithm used depends on the exchange type and rules called bindings. AMQP 0-9-1 brokers provide four exchange types:
Option Description Direct A direct exchange delivers messages to queues based on the message routing key. A direct exchange is ideal for the unicast routing of messages (although they can be used for multicast routing as well). Fanout A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored. If N queues are bound to a fanout exchange, when a new message is published to that exchange a copy of the message is delivered to all N queues. Fanout exchanges are ideal for the broadcast routing of messages. Topic Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange. The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages. Headers A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding. This property is visible when Send Message is selected from Action dropdown menu.
Specifies the name of the exchange for the message.
This property is visible when Send Message is selected from Action dropdown menu.
Specifies the routing key which the exchange looks at when deciding how to route the message to queues (depending on exchange type).
This property is visible when Send Message is selected from Action dropdown menu.
AMQP 1.0
Use the AMQP 1.0 page of the Message Queue Task Plus Editor dialog to configure the AMQP 1.0-specific parameters needed to communicate with a server.
The properties described in this section will be visible when a AMQP 1.0-type connection is selected.
A Queue is a category/feed name to which records are stored and published. Message Queue Task Plus configurated in Send Message mode write data to Queue and consumer applications read from Queue.
Destination
Use the Destination page of the Message Queue Task Plus Editor dialog to configure incoming message destination type and location.
This category is visible when Receive Message is selected from Action dropdown menu.
Speciifies where to store the incoming message. This parameter has the options listed in the following table.
Option Description Variable Store incoming binary message in a variable. Selecting this action displays the dynamic parameter Target Message Variable. File Store incoming binary key in a file. Selecting this action displays the dynamic parameters Target Message File, Overwrite Existing Message File. Specifies where to store the incoming key. This parameter has the options listed in the following table.
Option Description None Do not store the key. Variable Store incoming binary key in a variable. Selecting this action displays the dynamic parameter Target Key Variable. File Store incoming binary key in a file. Selecting this action displays the dynamic parameters Target Key File, Overwrite Existing Key File. Specifies where to store the incoming header. This parameter has the options listed in the following table.
Option Description None Do not store the header. Variable Store incoming header in a variable. Selecting this action displays the dynamic parameter Target Header Variable. File Store incoming header in a file as JSON. Selecting this action displays the dynamic parameters Target Header File, Overwrite Existing Header File. Variable handling by type
String
As a header is a set of key-value pairs, in case the header's values are strings (i.e., not some other binary data), a variable of type string should be used. In this case, the variable value will be populated like {"Header1": "val1", "Header2": "val2"} which can be later processed with Script JSON to Variables Task to populate the values of individual variables User::Header1 and User::Header2.Object
In case that some header's values are not strings, variable of type object should be used and then it can be processed with Script Data Record To Variable Task to populate the values of the corresponding variables User::Header1 and User::Header2. The byte[] values usually will require further processing with custom scripting.Specifies if you want message target file overwritten.
Specifies if you want key target file overwritten.
Specifies if you want header target file overwritten.
What's New
- New: Support for Kafka's headers.
- New: Introduced task.
Related documentation
COZYROC SSIS+ Components Suite is free for testing in your development environment.
A licensed version can be deployed on-premises, on Azure-SSIS IR and on COZYROC Cloud.