Skip to content

Commit 9c45d48

Browse files
committed
[Feature][Zeta engine] Added the metrics information of table statistics in multi-table mode #6959
1 parent b990930 commit 9c45d48

File tree

7 files changed

+48
-20
lines changed

7 files changed

+48
-20
lines changed

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,18 @@ public void multiTableMetrics() {
9797
.body("jobStatus", equalTo("FINISHED"))
9898
.body("metrics.SourceReceivedCount", equalTo("50"))
9999
.body("metrics.SinkWriteCount", equalTo("50"))
100-
.body("metrics.TableSourceReceivedCount.fake1", equalTo("20"))
101-
.body("metrics.TableSourceReceivedCount.fake2", equalTo("30"))
102-
.body("metrics.TableSinkWriteCount.fake1", equalTo("20"))
103-
.body("metrics.TableSinkWriteCount.fake2", equalTo("30"));
100+
.body(
101+
"metrics.TableSourceReceivedCount.'fake.table1'",
102+
equalTo("20"))
103+
.body(
104+
"metrics.TableSourceReceivedCount.'fake.public.table2'",
105+
equalTo("30"))
106+
.body(
107+
"metrics.TableSinkWriteCount.'fake.table1'",
108+
equalTo("20"))
109+
.body(
110+
"metrics.TableSinkWriteCount.'fake.public.table2'",
111+
equalTo("30"));
104112
});
105113
}
106114

seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -587,10 +587,11 @@ public void testGetMultiTableJobMetrics() {
587587

588588
String jobMetrics = jobClient.getJobMetrics(jobId);
589589

590-
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake1"));
591-
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake2"));
592-
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake1"));
593-
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake2"));
590+
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.table1"));
591+
Assertions.assertTrue(
592+
jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.public.table2"));
593+
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.table1"));
594+
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.public.table2"));
594595

595596
log.info("jobMetrics : {}", jobMetrics);
596597
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);

seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ source {
2929
result_table_name = "fake1"
3030
row.num = 20
3131
schema = {
32+
table = "fake.table1"
3233
fields {
3334
name = "string"
3435
age = "int"
@@ -41,6 +42,7 @@ source {
4142
result_table_name = "fake2"
4243
row.num = 30
4344
schema = {
45+
table = "fake.public.table2"
4446
fields {
4547
name = "string"
4648
age = "int"

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ private Map<String, Object> getJobMetrics(String jobMetrics) {
382382
.forEach(
383383
metricName -> {
384384
String tableName =
385-
TablePath.of(metricName.split("#")[1]).getTableName();
385+
TablePath.of(metricName.split("#")[1]).getFullName();
386386
if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
387387
tableSourceReceivedCountMap.put(
388388
tableName, jobMetricsStr.get(metricName));

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.seatunnel.engine.server.task;
1919

20-
import lombok.extern.slf4j.Slf4j;
2120
import org.apache.seatunnel.api.common.metrics.Counter;
2221
import org.apache.seatunnel.api.common.metrics.Meter;
2322
import org.apache.seatunnel.api.common.metrics.MetricsContext;
2423
import org.apache.seatunnel.api.source.Collector;
24+
import org.apache.seatunnel.api.table.catalog.TablePath;
2525
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
2626
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
2727
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
@@ -35,10 +35,17 @@
3535
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
3636
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
3737

38+
import org.apache.commons.collections4.CollectionUtils;
39+
import org.apache.commons.lang3.StringUtils;
40+
41+
import lombok.extern.slf4j.Slf4j;
42+
3843
import java.io.IOException;
3944
import java.util.HashMap;
4045
import java.util.List;
4146
import java.util.Map;
47+
import java.util.Objects;
48+
import java.util.concurrent.ConcurrentHashMap;
4249
import java.util.concurrent.atomic.AtomicBoolean;
4350

4451
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
@@ -124,8 +131,7 @@ public void collect(T row) {
124131
sourceReceivedBytesPerSeconds.markEvent(size);
125132
flowControlGate.audit((SeaTunnelRow) row);
126133
if (StringUtils.isNotEmpty(tableId)) {
127-
String tableName =
128-
Optional.of(TablePath.of(tableId).getFullName()).orElse("null");
134+
String tableName = getFullName(TablePath.of(tableId));
129135
Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName);
130136
if (Objects.nonNull(sourceTableCounter)) {
131137
sourceTableCounter.inc();
@@ -226,4 +232,12 @@ public void sendRecordToNext(Record<?> record) throws IOException {
226232
}
227233
}
228234
}
235+
236+
private String getFullName(TablePath tablePath) {
237+
if (StringUtils.isBlank(tablePath.getTableName())) {
238+
tablePath =
239+
TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default");
240+
}
241+
return tablePath.getFullName();
242+
}
229243
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.source.SourceSplit;
2323
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2424
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
2526
import org.apache.seatunnel.api.table.catalog.TablePath;
2627
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2728
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
@@ -87,12 +88,7 @@ public void init() throws Exception {
8788
tablePaths =
8889
producedCatalogTables.stream()
8990
.map(CatalogTable::getTableId)
90-
.map(
91-
tableIdentifier ->
92-
TablePath.of(
93-
tableIdentifier.getDatabaseName(),
94-
tableIdentifier.getSchemaName(),
95-
tableIdentifier.getTableName()))
91+
.map(TableIdentifier::toTablePath)
9692
.collect(Collectors.toList());
9793
} catch (UnsupportedOperationException e) {
9894
// TODO remove it when all connector use `getProducedCatalogTables`

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.seatunnel.engine.server.task.flow;
1919

20-
import com.hazelcast.cluster.Address;
21-
import lombok.extern.slf4j.Slf4j;
2220
import org.apache.seatunnel.api.common.metrics.Counter;
2321
import org.apache.seatunnel.api.common.metrics.Meter;
2422
import org.apache.seatunnel.api.common.metrics.MetricsContext;
@@ -28,6 +26,8 @@
2826
import org.apache.seatunnel.api.sink.SinkCommitter;
2927
import org.apache.seatunnel.api.sink.SinkWriter;
3028
import org.apache.seatunnel.api.sink.SupportResourceShare;
29+
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
30+
import org.apache.seatunnel.api.table.catalog.TablePath;
3131
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
3232
import org.apache.seatunnel.api.table.type.Record;
3333
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -45,15 +45,22 @@
4545
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
4646
import org.apache.seatunnel.engine.server.task.record.Barrier;
4747

48+
import org.apache.commons.lang3.StringUtils;
49+
50+
import com.hazelcast.cluster.Address;
51+
import lombok.extern.slf4j.Slf4j;
52+
4853
import java.io.IOException;
4954
import java.io.Serializable;
5055
import java.util.ArrayList;
5156
import java.util.Collection;
5257
import java.util.Collections;
5358
import java.util.List;
59+
import java.util.Map;
5460
import java.util.Objects;
5561
import java.util.Optional;
5662
import java.util.concurrent.CompletableFuture;
63+
import java.util.concurrent.ConcurrentHashMap;
5764
import java.util.concurrent.ExecutionException;
5865
import java.util.stream.Collectors;
5966

0 commit comments

Comments
 (0)