Skip to content

Commit 1ad1dcb

Browse files
corgy-wdybyte
authored andcommitted
[Fix][Connector-V2] Update catalog table schema of debezium json (apache#9525)
1 parent 588f0d0 commit 1ad1dcb

File tree

2 files changed

+126
-1
lines changed

2 files changed

+126
-1
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,10 @@ public List<CatalogTable> getProducedCatalogTables() {
128128
readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
129129
return Collections.singletonList(
130130
CatalogTableUtil.getCatalogTable(
131-
"default.default",
131+
"schema",
132+
"default",
133+
"default",
134+
"default",
132135
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE));
133136
}
134137
return catalogTables;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.debezium.format;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
24+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
25+
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
26+
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
27+
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
28+
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
29+
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
30+
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
31+
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
32+
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
33+
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
34+
35+
import org.junit.jupiter.api.Assertions;
36+
import org.junit.jupiter.api.Test;
37+
38+
import java.util.Arrays;
39+
import java.util.Collections;
40+
import java.util.List;
41+
42+
class DebeziumJsonFormatTest {
43+
44+
public static final SingleChoiceOption STARTUP_MODE =
45+
Options.key(SourceOptions.STARTUP_MODE_KEY)
46+
.singleChoice(
47+
StartupMode.class,
48+
Arrays.asList(
49+
StartupMode.INITIAL,
50+
StartupMode.EARLIEST,
51+
StartupMode.LATEST,
52+
StartupMode.SPECIFIC))
53+
.defaultValue(StartupMode.INITIAL)
54+
.withDescription(
55+
"Optional startup mode for CDC source, valid enumerations are "
56+
+ "\"initial\", \"earliest\", \"latest\" or \"specific\"");
57+
58+
public static final SingleChoiceOption STOP_MODE =
59+
Options.key(SourceOptions.STOP_MODE_KEY)
60+
.singleChoice(
61+
StopMode.class,
62+
Arrays.asList(StopMode.LATEST, StopMode.SPECIFIC, StopMode.NEVER))
63+
.defaultValue(StopMode.NEVER)
64+
.withDescription(
65+
"Optional stop mode for CDC source, valid enumerations are "
66+
+ "\"never\", \"latest\" or \"specific\"");
67+
68+
static class TestIncrementalSource extends IncrementalSource<Object, SourceConfig> {
69+
public TestIncrementalSource(ReadonlyConfig options, List<CatalogTable> catalogTables) {
70+
super(options, catalogTables);
71+
}
72+
73+
@Override
74+
public Option<StartupMode> getStartupModeOption() {
75+
return STARTUP_MODE;
76+
}
77+
78+
@Override
79+
public Option<StopMode> getStopModeOption() {
80+
return STOP_MODE;
81+
}
82+
83+
@Override
84+
public SourceConfig.Factory<SourceConfig> createSourceConfigFactory(ReadonlyConfig config) {
85+
return null;
86+
}
87+
88+
@Override
89+
public DebeziumDeserializationSchema<Object> createDebeziumDeserializationSchema(
90+
ReadonlyConfig config) {
91+
return null;
92+
}
93+
94+
@Override
95+
public DataSourceDialect<SourceConfig> createDataSourceDialect(ReadonlyConfig config) {
96+
return null;
97+
}
98+
99+
@Override
100+
public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
101+
return null;
102+
}
103+
104+
@Override
105+
public String getPluginName() {
106+
return "";
107+
}
108+
}
109+
110+
@Test
111+
void testGetProducedCatalogTablesWithCompatibleDebeziumJson() {
112+
ReadonlyConfig config =
113+
ReadonlyConfig.fromMap(
114+
Collections.singletonMap(
115+
JdbcSourceOptions.FORMAT.key(), "compatible_debezium_json"));
116+
TestIncrementalSource source = new TestIncrementalSource(config, Collections.emptyList());
117+
List<CatalogTable> tables = source.getProducedCatalogTables();
118+
Assertions.assertEquals(1, tables.size());
119+
Assertions.assertEquals(
120+
"default.default.default", tables.get(0).getTableId().toTablePath().getFullName());
121+
}
122+
}

0 commit comments

Comments
 (0)