Skip to content

Commit 53b2274

Browse files
committed
Support subtable and fieldNames in tdengine source. support fieldNames in sink.
1 parent adf2132 commit 53b2274

File tree

11 files changed

+243
-26
lines changed

11 files changed

+243
-26
lines changed

docs/en/connector-v2/sink/TDengine.md

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ Used to write data to TDengine. You need to create stable before running seatunn
1515

1616
## Options
1717

18-
| name | type | required | default value |
19-
|----------|--------|----------|---------------|
20-
| url | string | yes | - |
21-
| username | string | yes | - |
22-
| password | string | yes | - |
23-
| database | string | yes | |
24-
| stable | string | yes | - |
25-
| timezone | string | no | UTC |
18+
| name | type | required | default value |
19+
|-------------|--------|----------|---------------|
20+
| url | string | yes | - |
21+
| username | string | yes | - |
22+
| password | string | yes | - |
23+
| database | string | yes | |
24+
| stable | string | yes | - |
25+
| timezone | string | no | UTC |
26+
| field_names | string | no | - |
2627

2728
### url [string]
2829

@@ -54,6 +55,9 @@ the stable of the TDengine when you select
5455

5556
the timeznoe of the TDengine sever, it's important to the ts field
5657

58+
### field_names [string]
59+
the field names of the TDengine when you insert, if not set, all fields will be written
60+
5761
## Example
5862

5963
### sink
@@ -67,6 +71,7 @@ sink {
6771
database : "power2"
6872
stable : "meters2"
6973
timezone: UTC
74+
field_names: "ts, voltage, current, power"
7075
}
7176
}
7277
```

docs/en/connector-v2/source/TDengine.md

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@ supports query SQL and can achieve projection effect.
2222

2323
## Options
2424

25-
| name | type | required | default value |
26-
|-------------|--------|----------|---------------|
27-
| url | string | yes | - |
28-
| username | string | yes | - |
29-
| password | string | yes | - |
30-
| database | string | yes | |
31-
| stable | string | yes | - |
32-
| lower_bound | long | yes | - |
33-
| upper_bound | long | yes | - |
25+
| name | type | required | default value |
26+
|---------------|--------|----------|---------------|
27+
| url | string | yes | - |
28+
| username | string | yes | - |
29+
| password | string | yes | - |
30+
| database | string | yes | |
31+
| stable | string | yes | - |
32+
| sub_table | string | no | - |
33+
| lower_bound | long | yes | - |
34+
| upper_bound | long | yes | - |
35+
| field_names | string | no | - |
3436

3537
### url [string]
3638

@@ -58,6 +60,10 @@ the database of the TDengine when you select
5860

5961
the stable of the TDengine when you select
6062

63+
### sub_table [string]
64+
the sub_table of the TDengine, separated by commas. If not specified, all sub-tables will be selected.
65+
If specified, only the specified sub-tables will be selected.
66+
6167
### lower_bound [long]
6268

6369
the lower_bound of the migration period
@@ -66,6 +72,10 @@ the lower_bound of the migration period
6672

6773
the upper_bound of the migration period
6874

75+
### field_names [string]
76+
The field_names of the TDengine when you select, separated by commas. If not specified, all fields will be selected.
77+
If specified, only the specified fields will be selected.
78+
6979
## Example
7080

7181
### source
@@ -78,9 +88,11 @@ source {
7888
password : "taosdata"
7989
database : "power"
8090
stable : "meters"
91+
sub_table : "meter_1,meter_2"
8192
lower_bound : "2018-10-03 14:38:05.000"
8293
upper_bound : "2018-10-03 14:38:16.800"
8394
plugin_output = "tdengine_result"
95+
field_names : "ts,voltage,current,power"
8496
}
8597
}
8698
```

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineCommonOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,13 @@ public abstract class TDengineCommonOptions {
5454
.stringType()
5555
.noDefaultValue()
5656
.withDescription("The TDengine super table name");
57+
58+
public static final Option<String> FIELD_NAMES =
59+
Options.key("field_names")
60+
.stringType()
61+
.noDefaultValue()
62+
.withDescription(
63+
"The field names to be written to TDengine, separated by commas. "
64+
+ "If not specified, all fields will be written. "
65+
+ "This option is useful when the source schema does not match the TDengine table schema.");
5766
}

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSinkConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class TDengineSinkConfig implements Serializable {
3737
private String database;
3838
private String stable;
3939
private String timezone;
40+
private String fieldNames;
4041

4142
public static TDengineSinkConfig of(ReadonlyConfig config) {
4243
Builder builder = TDengineSinkConfig.builder();
@@ -50,6 +51,8 @@ public static TDengineSinkConfig of(ReadonlyConfig config) {
5051
Optional<String> optionalTimezone = config.getOptional(TDengineSinkOptions.TIMEZONE);
5152

5253
builder.timezone(optionalTimezone.orElseGet(TDengineSinkOptions.TIMEZONE::defaultValue));
54+
Optional<String> optionalFieldNames = config.getOptional(TDengineCommonOptions.FIELD_NAMES);
55+
builder.fieldNames(optionalFieldNames.orElse(""));
5356

5457
return builder.build();
5558
}

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import lombok.Data;
2323

2424
import java.io.Serializable;
25+
import java.util.Arrays;
2526
import java.util.List;
27+
import java.util.Set;
28+
import java.util.stream.Collectors;
2629

2730
@Data
2831
public class TDengineSourceConfig implements Serializable {
@@ -36,8 +39,9 @@ public class TDengineSourceConfig implements Serializable {
3639
private String stable;
3740
private String lowerBound;
3841
private String upperBound;
39-
private List<String> fields;
4042
private List<String> tags;
43+
private Set<String> subTables;
44+
private Set<String> fieldNames;
4145

4246
public static TDengineSourceConfig buildSourceConfig(ReadonlyConfig pluginConfig) {
4347
TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
@@ -48,6 +52,19 @@ public static TDengineSourceConfig buildSourceConfig(ReadonlyConfig pluginConfig
4852
tdengineSourceConfig.setPassword(pluginConfig.get(TDengineSourceOptions.PASSWORD));
4953
tdengineSourceConfig.setUpperBound(pluginConfig.get(TDengineSourceOptions.UPPER_BOUND));
5054
tdengineSourceConfig.setLowerBound(pluginConfig.get(TDengineSourceOptions.LOWER_BOUND));
55+
if (pluginConfig.getOptional(TDengineSourceOptions.SUB_TABLE).isPresent()) {
56+
String plugin = pluginConfig.get(TDengineSourceOptions.SUB_TABLE);
57+
String[] subTableArray = plugin.split(",");
58+
tdengineSourceConfig.setSubTables(
59+
Arrays.stream(subTableArray).collect(Collectors.toSet()));
60+
}
61+
if (pluginConfig.getOptional(TDengineCommonOptions.FIELD_NAMES).isPresent()) {
62+
String fieldNames = pluginConfig.get(TDengineCommonOptions.FIELD_NAMES);
63+
tdengineSourceConfig.setFieldNames(
64+
Arrays.stream(fieldNames.split(",")).collect(Collectors.toSet()));
65+
} else {
66+
tdengineSourceConfig.setFieldNames(null);
67+
}
5168
return tdengineSourceConfig;
5269
}
5370
}

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,12 @@ public class TDengineSourceOptions extends TDengineCommonOptions {
3333
.stringType()
3434
.noDefaultValue()
3535
.withDescription("The upper bound for data query range");
36+
37+
public static final Option<String> SUB_TABLE =
38+
Options.key("sub_table")
39+
.stringType()
40+
.noDefaultValue()
41+
.withDescription(
42+
"The sub table names to query data from, separated by comma , "
43+
+ "if not specified, all sub tables will be queried");
3644
}

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public TDengineSinkWriter(TDengineSinkConfig config, SeaTunnelRowType seaTunnelR
9090
@SneakyThrows
9191
@Override
9292
public void write(SeaTunnelRow element) {
93+
9394
final ArrayList<Object> tags = Lists.newArrayList();
9495
for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) {
9596
tags.add(element.getField(i));
@@ -103,11 +104,16 @@ public void write(SeaTunnelRow element) {
103104
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
104105
String sql =
105106
String.format(
106-
"INSERT INTO %s using %s tags ( %s ) VALUES ( %s );",
107+
"INSERT INTO %s using %s tags ( %s ) %s VALUES ( %s );",
107108
element.getField(0),
108109
config.getStable(),
109110
tagValues,
111+
config.getFieldNames().isEmpty()
112+
? ""
113+
: "( " + config.getFieldNames() + " )",
110114
StringUtils.join(convertDataType(metrics), ","));
115+
statement.addBatch(sql);
116+
statement.executeBatch();
111117
final int rowCount = statement.executeUpdate(sql);
112118
if (rowCount == 0) {
113119
Throwables.propagateIfPossible(
@@ -140,6 +146,7 @@ Object[] convertDataType(Object[] objects) {
140146
if (object == null) {
141147
return null;
142148
}
149+
143150
if (LocalDateTime.class.equals(object.getClass())) {
144151
// transform timezone according to the config
145152
return "'"

seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.commons.lang3.ArrayUtils;
3737

3838
import com.taosdata.jdbc.TSDBDriver;
39+
import lombok.Getter;
3940
import lombok.SneakyThrows;
4041

4142
import java.sql.Connection;
@@ -59,8 +60,7 @@
5960
*/
6061
public class TDengineSource
6162
implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, TDengineSourceState> {
62-
63-
private final StableMetadata stableMetadata;
63+
@Getter private final StableMetadata stableMetadata;
6464
private final TDengineSourceConfig tdengineSourceConfig;
6565
private final CatalogTable catalogTable;
6666

@@ -137,12 +137,22 @@ private StableMetadata getStableMetadata(TDengineSourceConfig config) throws SQL
137137
if (timestampFieldName == null) {
138138
timestampFieldName = metaResultSet.getString(1);
139139
}
140+
if (config.getFieldNames() != null
141+
&& !config.getFieldNames().isEmpty()
142+
&& !config.getFieldNames().contains(metaResultSet.getString(1))) {
143+
continue;
144+
}
140145
fieldNames.add(metaResultSet.getString(1));
141146
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
142147
}
143148

144149
while (subTableNameResultSet.next()) {
145150
String subTableName = subTableNameResultSet.getString(1);
151+
if (config.getSubTables() != null
152+
&& !config.getSubTables().isEmpty()
153+
&& !config.getSubTables().contains(subTableName)) {
154+
continue;
155+
}
146156
subTableNames.add(subTableName);
147157
}
148158
}

seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,42 @@
1818

1919
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
2020

21+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2122
import org.apache.seatunnel.api.source.Collector;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2324
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
2425

26+
import org.apache.commons.lang3.StringUtils;
27+
28+
import org.junit.jupiter.api.Assertions;
2529
import org.junit.jupiter.api.BeforeEach;
2630
import org.junit.jupiter.api.Test;
31+
import org.mockito.MockedStatic;
2732

33+
import java.sql.Connection;
34+
import java.sql.DriverManager;
35+
import java.sql.ResultSet;
36+
import java.sql.SQLException;
37+
import java.sql.Statement;
2838
import java.util.ArrayList;
2939
import java.util.Collections;
40+
import java.util.HashMap;
3041
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Properties;
3144
import java.util.Random;
3245
import java.util.concurrent.LinkedBlockingQueue;
3346
import java.util.concurrent.ThreadPoolExecutor;
3447
import java.util.concurrent.TimeUnit;
3548
import java.util.logging.Logger;
3649

50+
import static org.mockito.ArgumentMatchers.any;
51+
import static org.mockito.ArgumentMatchers.anyString;
52+
import static org.mockito.ArgumentMatchers.argThat;
53+
import static org.mockito.Mockito.mock;
54+
import static org.mockito.Mockito.mockStatic;
55+
import static org.mockito.Mockito.when;
56+
3757
class TDengineSourceReaderTest {
3858
Logger logger;
3959
TDengineSourceReader tDengineSourceReader;
@@ -103,6 +123,62 @@ void testPoll() throws InterruptedException {
103123
pool.awaitTermination(3, TimeUnit.SECONDS);
104124
}
105125

126+
@Test
127+
public void testGetStableMetadata() throws SQLException {
128+
129+
try (MockedStatic<DriverManager> dm = mockStatic(DriverManager.class)) {
130+
131+
Connection mockConn = mock(Connection.class);
132+
Statement mockStatement = mock(Statement.class);
133+
ResultSet metadataResultSet = mock(ResultSet.class);
134+
ResultSet tableResultSet = mock(ResultSet.class);
135+
136+
dm.when(() -> DriverManager.getConnection(anyString(), any(Properties.class)))
137+
.thenReturn(mockConn);
138+
139+
when(mockConn.createStatement()).thenReturn(mockStatement);
140+
141+
when(mockStatement.executeQuery(
142+
argThat(
143+
sql ->
144+
StringUtils.isNotEmpty(sql)
145+
&& sql.trim()
146+
.toLowerCase()
147+
.startsWith("desc"))))
148+
.thenReturn(metadataResultSet);
149+
when(metadataResultSet.next()).thenReturn(true, true, false);
150+
when(metadataResultSet.getString(1)).thenReturn("ts", "col1", "col1", "col2");
151+
when(metadataResultSet.getString(2)).thenReturn("INT", "VARCHAR(20)");
152+
153+
when(mockStatement.executeQuery(
154+
argThat(
155+
sql ->
156+
sql.trim()
157+
.toLowerCase()
158+
.startsWith(
159+
"select table_name from information_schema.ins_tables"))))
160+
.thenReturn(tableResultSet);
161+
when(tableResultSet.next()).thenReturn(true, true, false);
162+
when(tableResultSet.getString(1)).thenReturn("sub_table_1", "sub_table_2");
163+
Map<String, Object> map = new HashMap<>();
164+
map.put("url", "jdbc:TAOS-RS://localhost:6041/");
165+
map.put("database", "test_db");
166+
map.put("username", "root");
167+
map.put("password", "taosdata");
168+
map.put("stable", "stable");
169+
map.put("sub_table", "sub_table_1");
170+
map.put("field_names", "col1");
171+
172+
ReadonlyConfig config = ReadonlyConfig.fromMap(map);
173+
TDengineSource source = new TDengineSource(config);
174+
StableMetadata stableMetadata = source.getStableMetadata();
175+
Assertions.assertEquals(1, stableMetadata.getSubTableNames().size());
176+
Assertions.assertEquals("sub_table_1", stableMetadata.getSubTableNames().get(0));
177+
Assertions.assertEquals(2, stableMetadata.getRowType().getFieldNames().length);
178+
Assertions.assertEquals("col1", stableMetadata.getRowType().getFieldNames()[1]);
179+
}
180+
}
181+
106182
private static class TestCollector implements Collector<SeaTunnelRow> {
107183

108184
private final List<SeaTunnelRow> rows = new ArrayList<>();

0 commit comments

Comments
 (0)