1717
1818package org .apache .seatunnel .connectors .seatunnel .iotdb .config ;
1919
20- import org .apache .seatunnel .shade .com .typesafe .config .Config ;
21-
22- import org .apache .seatunnel .api .configuration .Option ;
23- import org .apache .seatunnel .api .configuration .Options ;
20+ import org .apache .seatunnel .api .configuration .ReadonlyConfig ;
2421
2522import lombok .Getter ;
2623import lombok .NonNull ;
3027import java .time .ZoneId ;
3128import java .util .List ;
3229
33- import static org .apache .seatunnel .shade .com .google .common .base .Preconditions .checkArgument ;
3430import static org .apache .seatunnel .shade .com .google .common .base .Preconditions .checkNotNull ;
3531
3632@ Setter
3733@ Getter
3834@ ToString
3935public class SinkConfig extends CommonConfig {
4036
41- private static final int DEFAULT_BATCH_SIZE = 1024 ;
42-
43- public static final Option <String > KEY_TIMESTAMP =
44- Options .key ("key_timestamp" )
45- .stringType ()
46- .noDefaultValue ()
47- .withDescription ("key timestamp" );
48- public static final Option <String > KEY_DEVICE =
49- Options .key ("key_device" ).stringType ().noDefaultValue ().withDescription ("key device" );
50- public static final Option <List <String >> KEY_MEASUREMENT_FIELDS =
51- Options .key ("key_measurement_fields" )
52- .listType ()
53- .noDefaultValue ()
54- .withDescription ("key measurement fields" );
55- public static final Option <String > STORAGE_GROUP =
56- Options .key ("storage_group" )
57- .stringType ()
58- .noDefaultValue ()
59- .withDescription ("store group" );
60- public static final Option <Integer > BATCH_SIZE =
61- Options .key ("batch_size" )
62- .intType ()
63- .defaultValue (DEFAULT_BATCH_SIZE )
64- .withDescription ("batch size" );
65- public static final Option <Integer > MAX_RETRIES =
66- Options .key ("max_retries" ).intType ().noDefaultValue ().withDescription ("max retries" );
67- public static final Option <Integer > RETRY_BACKOFF_MULTIPLIER_MS =
68- Options .key ("retry_backoff_multiplier_ms" )
69- .intType ()
70- .noDefaultValue ()
71- .withDescription ("retry backoff multiplier ms " );
72- public static final Option <Integer > MAX_RETRY_BACKOFF_MS =
73- Options .key ("max_retry_backoff_ms" )
74- .intType ()
75- .noDefaultValue ()
76- .withDescription ("max retry backoff ms " );
77- public static final Option <Integer > DEFAULT_THRIFT_BUFFER_SIZE =
78- Options .key ("default_thrift_buffer_size" )
79- .intType ()
80- .noDefaultValue ()
81- .withDescription ("default thrift buffer size" );
82- public static final Option <Integer > MAX_THRIFT_FRAME_SIZE =
83- Options .key ("max_thrift_frame_size" )
84- .intType ()
85- .noDefaultValue ()
86- .withDescription ("max thrift frame size" );
87- public static final Option <String > ZONE_ID =
88- Options .key ("zone_id" ).stringType ().noDefaultValue ().withDescription ("zone id" );
89- public static final Option <Boolean > ENABLE_RPC_COMPRESSION =
90- Options .key ("enable_rpc_compression" )
91- .booleanType ()
92- .noDefaultValue ()
93- .withDescription ("enable rpc comm" );
94- public static final Option <Integer > CONNECTION_TIMEOUT_IN_MS =
95- Options .key ("connection_timeout_in_ms" )
96- .intType ()
97- .noDefaultValue ()
98- .withDescription ("connection timeout ms" );
99-
10037 private String keyTimestamp ;
10138 private String keyDevice ;
10239 private List <String > keyMeasurementFields ;
10340 private String storageGroup ;
104- private int batchSize = BATCH_SIZE . defaultValue () ;
41+ private int batchSize ;
10542 private int maxRetries ;
10643 private int retryBackoffMultiplierMs ;
10744 private int maxRetryBackoffMs ;
@@ -116,70 +53,50 @@ public SinkConfig(
11653 super (nodeUrls , username , password );
11754 }
11855
119- public static SinkConfig loadConfig (Config pluginConfig ) {
56+ public static SinkConfig loadConfig (ReadonlyConfig pluginConfig ) {
12057 SinkConfig sinkConfig =
12158 new SinkConfig (
122- pluginConfig .getStringList (NODE_URLS .key ()),
123- pluginConfig .getString ( USERNAME . key () ),
124- pluginConfig .getString ( PASSWORD . key () ));
59+ pluginConfig .toConfig (). getStringList (IoTDBSinkOptions . NODE_URLS .key ()),
60+ pluginConfig .get ( IoTDBSinkOptions . USERNAME ),
61+ pluginConfig .get ( IoTDBSinkOptions . PASSWORD ));
12562
126- sinkConfig .setKeyDevice (pluginConfig .getString (KEY_DEVICE .key ()));
127- if (pluginConfig .hasPath (KEY_TIMESTAMP .key ())) {
128- sinkConfig .setKeyTimestamp (pluginConfig .getString (KEY_TIMESTAMP .key ()));
129- }
130- if (pluginConfig .hasPath (KEY_MEASUREMENT_FIELDS .key ())) {
131- sinkConfig .setKeyMeasurementFields (
132- pluginConfig .getStringList (KEY_MEASUREMENT_FIELDS .key ()));
133- }
134- if (pluginConfig .hasPath (STORAGE_GROUP .key ())) {
135- sinkConfig .setStorageGroup (pluginConfig .getString (STORAGE_GROUP .key ()));
63+ sinkConfig .setKeyDevice (pluginConfig .get (IoTDBSinkOptions .KEY_DEVICE ));
64+ sinkConfig .setKeyTimestamp (pluginConfig .get (IoTDBSinkOptions .KEY_TIMESTAMP ));
65+ sinkConfig .setKeyMeasurementFields (
66+ pluginConfig .get (IoTDBSinkOptions .KEY_MEASUREMENT_FIELDS ));
67+ sinkConfig .setStorageGroup (pluginConfig .get (IoTDBSinkOptions .STORAGE_GROUP ));
68+ if (pluginConfig .getOptional (IoTDBSinkOptions .BATCH_SIZE ).isPresent ()) {
69+ sinkConfig .setBatchSize (pluginConfig .get (IoTDBSinkOptions .BATCH_SIZE ));
13670 }
137- if (pluginConfig .hasPath (BATCH_SIZE .key ())) {
138- int batchSize = checkIntArgument (pluginConfig .getInt (BATCH_SIZE .key ()));
139- sinkConfig .setBatchSize (batchSize );
71+ if (pluginConfig .getOptional (IoTDBSinkOptions .MAX_RETRIES ).isPresent ()) {
72+ sinkConfig .setMaxRetries (pluginConfig .get (IoTDBSinkOptions .MAX_RETRIES ));
14073 }
141- if (pluginConfig .hasPath ( MAX_RETRIES . key () )) {
142- int maxRetries = checkIntArgument ( pluginConfig . getInt ( MAX_RETRIES . key ()));
143- sinkConfig . setMaxRetries ( maxRetries );
74+ if (pluginConfig .getOptional ( IoTDBSinkOptions . RETRY_BACKOFF_MULTIPLIER_MS ). isPresent ( )) {
75+ sinkConfig . setRetryBackoffMultiplierMs (
76+ pluginConfig . get ( IoTDBSinkOptions . RETRY_BACKOFF_MULTIPLIER_MS ) );
14477 }
145- if (pluginConfig .hasPath (RETRY_BACKOFF_MULTIPLIER_MS .key ())) {
146- int retryBackoffMultiplierMs =
147- checkIntArgument (pluginConfig .getInt (RETRY_BACKOFF_MULTIPLIER_MS .key ()));
148- sinkConfig .setRetryBackoffMultiplierMs (retryBackoffMultiplierMs );
78+ if (pluginConfig .getOptional (IoTDBSinkOptions .MAX_RETRY_BACKOFF_MS ).isPresent ()) {
79+ sinkConfig .setMaxRetryBackoffMs (
80+ pluginConfig .get (IoTDBSinkOptions .MAX_RETRY_BACKOFF_MS ));
14981 }
150- if (pluginConfig .hasPath (MAX_RETRY_BACKOFF_MS .key ())) {
151- int maxRetryBackoffMs =
152- checkIntArgument (pluginConfig .getInt (MAX_RETRY_BACKOFF_MS .key ()));
153- sinkConfig .setMaxRetryBackoffMs (maxRetryBackoffMs );
82+ if (pluginConfig .getOptional (IoTDBSinkOptions .DEFAULT_THRIFT_BUFFER_SIZE ).isPresent ()) {
83+ sinkConfig .setThriftDefaultBufferSize (
84+ pluginConfig .get (IoTDBSinkOptions .DEFAULT_THRIFT_BUFFER_SIZE ));
15485 }
155- if (pluginConfig .hasPath (DEFAULT_THRIFT_BUFFER_SIZE .key ())) {
156- int thriftDefaultBufferSize =
157- checkIntArgument (pluginConfig .getInt (DEFAULT_THRIFT_BUFFER_SIZE .key ()));
158- sinkConfig .setThriftDefaultBufferSize (thriftDefaultBufferSize );
86+ if (pluginConfig .getOptional (IoTDBSinkOptions .MAX_THRIFT_FRAME_SIZE ).isPresent ()) {
87+ sinkConfig .setThriftMaxFrameSize (
88+ pluginConfig .get (IoTDBSinkOptions .MAX_THRIFT_FRAME_SIZE ));
15989 }
160- if (pluginConfig .hasPath (MAX_THRIFT_FRAME_SIZE .key ())) {
161- int thriftMaxFrameSize =
162- checkIntArgument (pluginConfig .getInt (MAX_THRIFT_FRAME_SIZE .key ()));
163- sinkConfig .setThriftMaxFrameSize (thriftMaxFrameSize );
90+ if (pluginConfig .getOptional (IoTDBSinkOptions .ZONE_ID ).isPresent ()) {
91+ sinkConfig .setZoneId (ZoneId .of (pluginConfig .get (IoTDBSinkOptions .ZONE_ID )));
16492 }
165- if (pluginConfig .hasPath (ZONE_ID .key ())) {
166- sinkConfig .setZoneId (ZoneId .of (pluginConfig .getString (ZONE_ID .key ())));
167- }
168- if (pluginConfig .hasPath (ENABLE_RPC_COMPRESSION .key ())) {
169- sinkConfig .setEnableRPCCompression (
170- pluginConfig .getBoolean (ENABLE_RPC_COMPRESSION .key ()));
171- }
172- if (pluginConfig .hasPath (CONNECTION_TIMEOUT_IN_MS .key ())) {
173- int connectionTimeoutInMs =
174- checkIntArgument (pluginConfig .getInt (CONNECTION_TIMEOUT_IN_MS .key ()));
93+ sinkConfig .setEnableRPCCompression (
94+ pluginConfig .get (IoTDBSinkOptions .ENABLE_RPC_COMPRESSION ));
95+ if (pluginConfig .getOptional (IoTDBSinkOptions .CONNECTION_TIMEOUT_IN_MS ).isPresent ()) {
17596 checkNotNull (sinkConfig .getEnableRPCCompression ());
176- sinkConfig .setConnectionTimeoutInMs (connectionTimeoutInMs );
97+ sinkConfig .setConnectionTimeoutInMs (
98+ pluginConfig .get (IoTDBSinkOptions .CONNECTION_TIMEOUT_IN_MS ));
17799 }
178100 return sinkConfig ;
179101 }
180-
181- private static int checkIntArgument (int args ) {
182- checkArgument (args > 0 );
183- return args ;
184- }
185102}
0 commit comments