Skip to content

Commit f0cefbe

Browse files
authored
[Feature] [Activemq] Added activemq sink (apache#7251)
1 parent bb2c912 commit f0cefbe

File tree

21 files changed

+1419
-0
lines changed

21 files changed

+1419
-0
lines changed

.github/workflows/labeler/label-scope-conf.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,11 @@ Milvus:
252252
- changed-files:
253253
- any-glob-to-any-file: seatunnel-connectors-v2/connector-milvus/**
254254
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(milvus)/**'
255+
activemq:
256+
- all:
257+
- changed-files:
258+
- any-glob-to-any-file: seatunnel-connectors-v2/connector-activemq/**
259+
- all-globs-to-all-files: '!seatunnel-connectors-v2/connector-!(activemq)/**'
255260
Zeta Rest API:
256261
- changed-files:
257262
- any-glob-to-any-file: seatunnel-engine/**/server/rest/**

config/plugin_config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,5 @@ connector-rocketmq
8686
connector-tdengine
8787
connector-web3j
8888
connector-milvus
89+
connector-activemq
8990
--end--
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Activemq
2+
3+
> Activemq sink connector
4+
5+
## Description
6+
7+
Used to write data to Activemq.
8+
9+
## Key features
10+
11+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
12+
13+
## Options
14+
15+
| name | type | required | default value |
16+
|-------------------------------------|---------|----------|---------------|
17+
| host | string | no | - |
18+
| port | int | no | - |
19+
| virtual_host | string | no | - |
20+
| username | string | no | - |
21+
| password | string | no | - |
22+
| queue_name | string | yes | - |
23+
| uri | string | yes | - |
24+
| check_for_duplicate | boolean | no | - |
25+
| client_id | boolean | no | - |
26+
| copy_message_on_send | boolean | no | - |
27+
| disable_timeStamps_by_default | boolean | no | - |
28+
| use_compression | boolean | no | - |
29+
| always_session_async | boolean | no | - |
30+
| dispatch_async | boolean | no | - |
31+
| nested_map_and_list_enabled | boolean | no | - |
32+
| warnAboutUnstartedConnectionTimeout | boolean | no | - |
33+
| closeTimeout | int | no | - |
34+
35+
### host [string]
36+
37+
the default host to use for connections
38+
39+
### port [int]
40+
41+
the default port to use for connections
42+
43+
### username [string]
44+
45+
the AMQP user name to use when connecting to the broker
46+
47+
### password [string]
48+
49+
the password to use when connecting to the broker
50+
51+
### uri [string]
52+
53+
convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host
54+
55+
### queue_name [string]
56+
57+
the queue to write the message to
58+
59+
### check_for_duplicate [boolean]
60+
61+
will check for duplucate messages
62+
63+
### client_id [string]
64+
65+
client id
66+
67+
### copy_message_on_send [boolean]
68+
69+
if true, enables new JMS Message object as part of the send method
70+
71+
### disable_timeStamps_by_default [boolean]
72+
73+
disables timestamp for slight performance boost
74+
75+
### use_compression [boolean]
76+
77+
Enables the use of compression on the message’s body.
78+
79+
### always_session_async [boolean]
80+
81+
When true a separate thread is used for dispatching messages for each Session in the Connection.
82+
83+
### always_sync_send [boolean]
84+
85+
When true a MessageProducer will always use Sync sends when sending a Message
86+
87+
### close_timeout [boolean]
88+
89+
Sets the timeout, in milliseconds, before a close is considered complete.
90+
91+
### dispatch_async [boolean]
92+
93+
Should the broker dispatch messages asynchronously to the consumer
94+
95+
### nested_map_and_list_enabled [boolean]
96+
97+
Controls whether Structured Message Properties and MapMessages are supported
98+
99+
### warn_about_unstarted_connection_timeout [int]
100+
101+
The timeout, in milliseconds, from the time of connection creation to when a warning is generated
102+
103+
## Example
104+
105+
simple:
106+
107+
```hocon
108+
sink {
109+
ActiveMQ {
110+
uri="tcp://localhost:61616"
111+
username = "admin"
112+
password = "admin"
113+
queue_name = "test1"
114+
}
115+
}
116+
```
117+
118+
## Changelog
119+
120+
### next version
121+
122+
- Add Activemq Source Connector
123+

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,4 @@ seatunnel.source.ObsFile = connector-file-obs
129129
seatunnel.sink.ObsFile = connector-file-obs
130130
seatunnel.source.Milvus = connector-milvus
131131
seatunnel.sink.Milvus = connector-milvus
132+
seatunnel.sink.ActiveMQ = connector-activemq
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<modelVersion>4.0.0</modelVersion>
23+
<parent>
24+
<groupId>org.apache.seatunnel</groupId>
25+
<artifactId>seatunnel-connectors-v2</artifactId>
26+
<version>${revision}</version>
27+
</parent>
28+
29+
<artifactId>connector-activemq</artifactId>
30+
<name>SeaTunnel : Connectors V2 : Activemq</name>
31+
32+
<properties>
33+
<activemq.version>5.14.5</activemq.version>
34+
</properties>
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.seatunnel</groupId>
38+
<artifactId>connector-common</artifactId>
39+
<version>${project.version}</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.activemq</groupId>
43+
<artifactId>activemq-client</artifactId>
44+
<version>${activemq.version}</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>org.apache.seatunnel</groupId>
49+
<artifactId>seatunnel-format-json</artifactId>
50+
<version>${project.version}</version>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.seatunnel</groupId>
55+
<artifactId>seatunnel-format-json</artifactId>
56+
<version>${project.version}</version>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.apache.seatunnel</groupId>
60+
<artifactId>seatunnel-format-text</artifactId>
61+
<version>${project.version}</version>
62+
</dependency>
63+
64+
</dependencies>
65+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.activemq.client;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorErrorCode;
22+
import org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorException;
23+
24+
import org.apache.activemq.ActiveMQConnectionFactory;
25+
26+
import lombok.AllArgsConstructor;
27+
import lombok.extern.slf4j.Slf4j;
28+
29+
import javax.jms.Connection;
30+
import javax.jms.Destination;
31+
import javax.jms.JMSException;
32+
import javax.jms.MessageProducer;
33+
import javax.jms.Session;
34+
import javax.jms.TextMessage;
35+
36+
import java.nio.charset.StandardCharsets;
37+
38+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SESSION_ASYNC;
39+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.ALWAYS_SYNC_SEND;
40+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CHECK_FOR_DUPLICATE;
41+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLIENT_ID;
42+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CLOSE_TIMEOUT;
43+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.CONSUMER_EXPIRY_CHECK_ENABLED;
44+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.DISPATCH_ASYNC;
45+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.NESTED_MAP_AND_LIST_ENABLED;
46+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.PASSWORD;
47+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.QUEUE_NAME;
48+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.URI;
49+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.USERNAME;
50+
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqConfig.WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT;
51+
52+
@Slf4j
53+
@AllArgsConstructor
54+
public class ActivemqClient {
55+
private final ReadonlyConfig config;
56+
private final ActiveMQConnectionFactory connectionFactory;
57+
private final Connection connection;
58+
59+
public ActivemqClient(ReadonlyConfig config) {
60+
this.config = config;
61+
try {
62+
this.connectionFactory = getConnectionFactory();
63+
log.info("connection factory created");
64+
this.connection = createConnection(config);
65+
log.info("connection created");
66+
67+
} catch (Exception e) {
68+
e.printStackTrace();
69+
throw new ActivemqConnectorException(
70+
ActivemqConnectorErrorCode.CREATE_ACTIVEMQ_CLIENT_FAILED,
71+
"Error while create AMQ client ");
72+
}
73+
}
74+
75+
public ActiveMQConnectionFactory getConnectionFactory() {
76+
log.info("broker url : " + config.get(URI));
77+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(config.get(URI));
78+
79+
if (config.get(ALWAYS_SESSION_ASYNC) != null) {
80+
factory.setAlwaysSessionAsync(config.get(ALWAYS_SESSION_ASYNC));
81+
}
82+
83+
if (config.get(CLIENT_ID) != null) {
84+
factory.setClientID(config.get(CLIENT_ID));
85+
}
86+
87+
if (config.get(ALWAYS_SYNC_SEND) != null) {
88+
factory.setAlwaysSyncSend(config.get(ALWAYS_SYNC_SEND));
89+
}
90+
91+
if (config.get(CHECK_FOR_DUPLICATE) != null) {
92+
factory.setCheckForDuplicates(config.get(CHECK_FOR_DUPLICATE));
93+
}
94+
95+
if (config.get(CLOSE_TIMEOUT) != null) {
96+
factory.setCloseTimeout(config.get(CLOSE_TIMEOUT));
97+
}
98+
99+
if (config.get(CONSUMER_EXPIRY_CHECK_ENABLED) != null) {
100+
factory.setConsumerExpiryCheckEnabled(config.get(CONSUMER_EXPIRY_CHECK_ENABLED));
101+
}
102+
if (config.get(DISPATCH_ASYNC) != null) {
103+
factory.setDispatchAsync(config.get(DISPATCH_ASYNC));
104+
}
105+
106+
if (config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT) != null) {
107+
factory.setWarnAboutUnstartedConnectionTimeout(
108+
config.get(WARN_ABOUT_UNSTARTED_CONNECTION_TIMEOUT));
109+
}
110+
111+
if (config.get(NESTED_MAP_AND_LIST_ENABLED) != null) {
112+
factory.setNestedMapAndListEnabled(config.get(NESTED_MAP_AND_LIST_ENABLED));
113+
}
114+
return factory;
115+
}
116+
117+
public void write(byte[] msg) {
118+
try {
119+
this.connection.start();
120+
Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
121+
Destination destination = session.createQueue(config.get(QUEUE_NAME));
122+
MessageProducer producer = session.createProducer(destination);
123+
String messageBody = new String(msg, StandardCharsets.UTF_8);
124+
TextMessage objectMessage = session.createTextMessage(messageBody);
125+
producer.send(objectMessage);
126+
127+
} catch (JMSException e) {
128+
throw new ActivemqConnectorException(
129+
ActivemqConnectorErrorCode.SEND_MESSAGE_FAILED,
130+
String.format(
131+
"Cannot send AMQ message %s at %s",
132+
config.get(QUEUE_NAME), config.get(CLIENT_ID)),
133+
e);
134+
}
135+
}
136+
137+
public void close() {
138+
try {
139+
if (connection != null) {
140+
connection.close();
141+
}
142+
} catch (JMSException e) {
143+
throw new ActivemqConnectorException(
144+
ActivemqConnectorErrorCode.CLOSE_CONNECTION_FAILED,
145+
String.format(
146+
"Error while closing AMQ connection with %s", config.get(QUEUE_NAME)));
147+
}
148+
}
149+
150+
private Connection createConnection(ReadonlyConfig config) throws JMSException {
151+
if (config.get(USERNAME) != null && config.get(PASSWORD) != null) {
152+
return connectionFactory.createConnection(config.get(USERNAME), config.get(PASSWORD));
153+
}
154+
return connectionFactory.createConnection();
155+
}
156+
}

0 commit comments

Comments
 (0)