Skip to content

Conversation

@chncaesar
Copy link
Contributor

@chncaesar chncaesar commented Jul 19, 2025

Purpose of this pull request

This pr close #9592. It adds two options into source : sub_table and field_names, one option to sink : field_names.

Does this PR introduce any user-facing change?

Yes. New tdengine connector options are added . Users can specify sub_table and field_name when retrieving data from tdengine; and specify field_name when inserting into tdegnine. However, these are optional. Meaning old tdengine source and sink pipelines are not affected by the change, they still work as expected.

The documentation is updated in this PR.

How to use new options to retreive data

The following pipeline retrieves 3 sub tables of super table signal_data, ignoring the rest. And only selects specified fields.
Please note that the tags should always be the last element of field_names, in our case it's signal_id.

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  TDengine {
    url : "jdbc:TAOS-RS://192.168.1.1:6041/"
    username : "root"
    password : "taosdata"
    database : "signal"
    stable : "signal_data"
    lower_bound : "2024-07-15 00:00:00.000"
    upper_bound : "2025-07-11 00:00:00.000"
    sub_table: "signal_data_342,signal_data_358,signal_data_349"
    field_names = "ts,gateway_id,signal_name,return_value,signal_id"
  }
}

sink {
  LocalFile {
    path = "/mnt/sdc/data/taos/"
    file_format_type = "parquet"
    compress_codec = "snappy"
  }
}

How to use new options on the sink side

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}

source {
  LocalFile {
    path = "/work/data/taos_mfrs_signal_data_current/"
    file_format_type = "parquet"
    compress_codec = "snappy"
  }
}


sink {
  TDengine {
    url : "jdbc:TAOS-RS://192.168.1.1:6041/"
    username : "root"
    password : "taosdata"
    database : "signal"
    stable : "signal_data"
    field_names = "ts,gateway_id,signal_name,return_value"
  }
}

When specifying field_names on the sink side, please ignore the tag column, "signal_id" in the example. The tdegnine automatically puts the tags column in the insert statement. Here's an example.

insert into signal.signal_data_342 
using (signal_data) 
tags ( 1 )
( ts,gateway_id,signal_name,return_value) 
values ("2025-01-01 00:00:00", 1, "name", 0.1)

How was this patch tested?

  • Added unit test to cover sub_table and field_name on the source side. Please see TDengineSourceReaderTest.java
  • Added e2e test to cover sub_table and field_name on the source side, field_name on the sink side. Please see TDengine.java

Check list

This comment was marked as outdated.


public static final Option<String> FIELD_NAMES =
Options.key("field_names")
.stringType()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use list type, it's better to write and read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

.withDescription("The TDengine super table name");

public static final Option<String> FIELD_NAMES =
Options.key("field_names")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Options.key("field_names")
Options.key("read_columns")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


public static final Option<String> SUB_TABLE =
Options.key("sub_table")
.stringType()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks for the effort.

.withDescription("The upper bound for data query range");

public static final Option<String> SUB_TABLE =
Options.key("sub_table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Options.key("sub_table")
Options.key("sub_tables")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Hisoka-X Hisoka-X changed the title Support subtable and fieldNames in tdengine source. support fieldName… [Feature][connector-tdengine] Support subtable and fieldNames in tdengine source Jul 21, 2025
@chncaesar chncaesar requested review from Hisoka-X and Copilot July 24, 2025 02:08

This comment was marked as outdated.

@chncaesar chncaesar force-pushed the taos_zjc branch 2 times, most recently from 51c58dc to f1816eb Compare July 24, 2025 02:27
@chncaesar chncaesar requested a review from Copilot July 24, 2025 02:29

This comment was marked as outdated.

This comment was marked as outdated.

This comment was marked as outdated.

@chncaesar chncaesar requested a review from Copilot July 24, 2025 02:52

This comment was marked as outdated.

database : "power2"
stable : "meters2"
timezone: UTC
write_columns: "ts, voltage, current, power"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chncaesar . I think we should use list as type same as read_columns.

This comment was marked as outdated.

@chncaesar chncaesar requested a review from Hisoka-X July 27, 2025 14:13
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chncaesar

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for filtering data in TDengine source and sink connectors by introducing new configuration options for subtable selection and column filtering. This addresses issue #9592 by allowing users to specify which subtables to read from and which columns to include/exclude.

  • Adds sub_tables and read_columns options to TDengine source for filtering data
  • Adds write_columns option to TDengine sink for column specification
  • Updates documentation for both source and sink connectors with the new options

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated no comments.

Show a summary per file
File Description
TDengineSourceOptions.java Adds new configuration options for subtable and column filtering
TDengineSourceConfig.java Updates config class to handle new filtering options
TDengineSinkOptions.java Adds write_columns option for sink
TDengineSinkConfig.java Updates sink config to handle write_columns
TDengineSource.java Implements filtering logic for subtables and columns
TDengineSinkWriter.java Updates SQL generation to include column specification
TDengineSourceReaderTest.java Adds unit tests for new filtering functionality
TDengineIT.java Adds e2e test for new filtering features
Documentation files Updates English and Chinese docs with new options
Comments suppressed due to low confidence (4)

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf:35

  • The configuration key 'sub_tables' is inconsistent with the source code which uses 'SUB_TABLES'. This should be 'sub_tables' to match the option key defined in TDengineSourceOptions.
      sub_tables: ["d1001","d1002"]

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf:36

  • The configuration key 'read_columns' is inconsistent with the source code which uses 'READ_COLUMNS'. This should be 'read_columns' to match the option key defined in TDengineSourceOptions.
      read_columns: ["ts","current","voltage","phase","off","nc","location","groupid"]

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/resources/tdengine/tdengine_source_to_sink_filter_by_fieldNames.conf:51

  • The configuration key 'write_columns' is inconsistent with the source code which uses 'WRITE_COLUMNS'. This should be 'write_columns' to match the option key defined in TDengineSinkOptions.
    write_columns: ["ts","current","voltage","phase","off","nc"]

@chncaesar
Copy link
Contributor Author

@Hisoka-X Could you invite another commiter to review the PR ? thank you.

@Hisoka-X Hisoka-X requested review from corgy-w and liunaijie July 28, 2025 06:53
@corgy-w corgy-w merged commit b136a0d into apache:dev Jul 28, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature][connector-tdengine] Enhance the tdengine connector to retrieve specified sub-table and field-name and sink specified field-name data.

3 participants