Skip to content

Commit 4143e8f

Browse files
Hisoka-Xdybyte
authored andcommitted
[Feature][Zeta] Add metrics for task intermediate queue size (apache#9550)
1 parent 4608762 commit 4143e8f

File tree

14 files changed

+82
-32
lines changed

14 files changed

+82
-32
lines changed

docs/en/seatunnel-engine/rest-api-v2.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ Please refer [security](security.md)
157157
"pipelineEdges": {}
158158
},
159159
"metrics": {
160+
"IntermediateQueueSize": "",
160161
"SourceReceivedCount": "",
161162
"SourceReceivedQPS": "",
162163
"SourceReceivedBytes": "",
@@ -237,6 +238,7 @@ This API has been deprecated, please use /job-info/:jobId instead
237238
"pipelineEdges": {}
238239
},
239240
"metrics": {
241+
"IntermediateQueueSize": "",
240242
"SourceReceivedCount": "",
241243
"SourceReceivedQPS": "",
242244
"SourceReceivedBytes": "",

docs/zh/seatunnel-engine/rest-api-v2.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ seatunnel:
153153
"pipelineEdges": {}
154154
},
155155
"metrics": {
156+
"IntermediateQueueSize": "",
156157
"SourceReceivedCount": "",
157158
"SourceReceivedQPS": "",
158159
"SourceReceivedBytes": "",

seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ private MetricNames() {}
3333
public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
3434
public static final String SINK_WRITE_QPS = "SinkWriteQPS";
3535
public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds";
36+
37+
public static final String INTERMEDIATE_QUEUE_SIZE = "IntermediateQueueSize";
3638
}

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,7 @@ public void testGetJobInfoByJobId() {
641641
equalTo("5"))
642642
.body("metrics.SinkWriteCount", equalTo("5"))
643643
.body("metrics.SourceReceivedCount", equalTo("5"))
644+
.body("metrics.IntermediateQueueSize", equalTo("0"))
644645
.body(
645646
"jobDag.envOptions.'job.mode'",
646647
equalTo("BATCH"))
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,29 +39,29 @@
3939
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
4040
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
4141

42-
public class TaskMetricsCalcContext {
42+
public class ConnectorMetricsCalcContext {
4343

4444
private final MetricsContext metricsContext;
4545

4646
private final PluginType type;
4747

4848
private Counter count;
4949

50-
private Map<String, Counter> countPerTable = new ConcurrentHashMap<>();
50+
private final Map<String, Counter> countPerTable = new ConcurrentHashMap<>();
5151

5252
private Meter QPS;
5353

54-
private Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
54+
private final Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
5555

5656
private Counter bytes;
5757

58-
private Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();
58+
private final Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();
5959

6060
private Meter bytesPerSeconds;
6161

62-
private Map<String, Meter> bytesPerSecondsPerTable = new ConcurrentHashMap<>();
62+
private final Map<String, Meter> bytesPerSecondsPerTable = new ConcurrentHashMap<>();
6363

64-
public TaskMetricsCalcContext(
64+
public ConnectorMetricsCalcContext(
6565
MetricsContext metricsContext,
6666
PluginType type,
6767
boolean isMulti,

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.stream.Stream;
7979
import java.util.stream.StreamSupport;
8080

81+
import static org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
8182
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
8283
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
8384
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -245,7 +246,11 @@ private Map<String, Object> getJobMetrics(String jobMetrics) {
245246
Map<String, Object> metricsMap = new HashMap<>();
246247
// To add metrics, populate the corresponding array,
247248
String[] countMetricsNames = {
248-
SOURCE_RECEIVED_COUNT, SINK_WRITE_COUNT, SOURCE_RECEIVED_BYTES, SINK_WRITE_BYTES
249+
SOURCE_RECEIVED_COUNT,
250+
SINK_WRITE_COUNT,
251+
SOURCE_RECEIVED_BYTES,
252+
SINK_WRITE_BYTES,
253+
INTERMEDIATE_QUEUE_SIZE
249254
};
250255
String[] rateMetricsNames = {
251256
SOURCE_RECEIVED_QPS,

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
3333
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
3434
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
35-
import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
35+
import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
3636
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
3737

3838
import org.apache.commons.collections4.CollectionUtils;
@@ -52,9 +52,7 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
5252

5353
private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
5454

55-
private final MetricsContext metricsContext;
56-
57-
private final TaskMetricsCalcContext taskMetricsCalcContext;
55+
private final ConnectorMetricsCalcContext connectorMetricsCalcContext;
5856

5957
private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new AtomicBoolean(false);
6058

@@ -77,14 +75,13 @@ public SeaTunnelSourceCollector(
7775
this.checkpointLock = checkpointLock;
7876
this.outputs = outputs;
7977
this.rowType = rowType;
80-
this.metricsContext = metricsContext;
8178
if (rowType instanceof MultipleRowType) {
8279
((MultipleRowType) rowType)
8380
.iterator()
8481
.forEachRemaining(type -> this.rowTypeMap.put(type.getKey(), type.getValue()));
8582
}
86-
this.taskMetricsCalcContext =
87-
new TaskMetricsCalcContext(
83+
this.connectorMetricsCalcContext =
84+
new ConnectorMetricsCalcContext(
8885
metricsContext,
8986
PluginType.SOURCE,
9087
CollectionUtils.isNotEmpty(tablePaths),
@@ -97,6 +94,8 @@ public void collect(T row) {
9794
try {
9895
if (row instanceof SeaTunnelRow) {
9996
String tableId = ((SeaTunnelRow) row).getTableId();
97+
// init the size of row early with rowType, this way is faster than init the size
98+
// without rowType
10099
int size;
101100
if (rowType instanceof SeaTunnelRowType) {
102101
size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);
@@ -107,7 +106,7 @@ public void collect(T row) {
107106
"Unsupported row type: " + rowType.getClass().getName());
108107
}
109108
flowControlGate.audit((SeaTunnelRow) row);
110-
taskMetricsCalcContext.updateMetrics(row, tableId);
109+
connectorMetricsCalcContext.updateMetrics(row, tableId);
111110
}
112111
sendRecordToNext(new Record<>(row));
113112
emptyThisPollNext = false;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex
245245
this,
246246
completableFuture,
247247
((AbstractTaskGroupWithIntermediateQueue) taskBelongGroup)
248-
.getQueueCache(config.getQueueID()));
248+
.getQueueCache(config.getQueueID(), this.getMetricsContext()));
249249
outputs = flowLifeCycles;
250250
} else {
251251
throw new UnknownFlowException(flow);

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
4040
import org.apache.seatunnel.engine.server.event.JobEventListener;
4141
import org.apache.seatunnel.engine.server.execution.TaskLocation;
42-
import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
42+
import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
4343
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
4444
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
4545
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -91,15 +91,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
9191

9292
private Optional<CommitInfoT> lastCommitInfo;
9393

94-
private MetricsContext metricsContext;
94+
private final MetricsContext metricsContext;
9595

96-
private TaskMetricsCalcContext taskMetricsCalcContext;
96+
private final ConnectorMetricsCalcContext connectorMetricsCalcContext;
9797

9898
private final boolean containAggCommitter;
9999

100-
private EventListener eventListener;
100+
private final EventListener eventListener;
101101

102-
/** Mapping relationship between upstream tablepath and downstream tablepath. */
102+
/** Mapping relationship between upstream TablePath and downstream TablePath. */
103103
private final Map<TablePath, TablePath> tablesMaps = new HashMap<>();
104104

105105
public SinkFlowLifeCycle(
@@ -139,8 +139,9 @@ public SinkFlowLifeCycle(
139139
sinkTables.add(TablePath.DEFAULT);
140140
}
141141
}
142-
this.taskMetricsCalcContext =
143-
new TaskMetricsCalcContext(metricsContext, PluginType.SINK, isMulti, sinkTables);
142+
this.connectorMetricsCalcContext =
143+
new ConnectorMetricsCalcContext(
144+
metricsContext, PluginType.SINK, isMulti, sinkTables);
144145
}
145146

146147
@Override
@@ -264,7 +265,7 @@ public void received(Record<?> record) {
264265
if (prepareClose) {
265266
return;
266267
}
267-
String tableId = "";
268+
String tableId;
268269
writer.write((T) record.getData());
269270
if (record.getData() instanceof SeaTunnelRow) {
270271
if (this.sinkAction.getSink() instanceof MultiTableSink) {
@@ -295,7 +296,7 @@ public void received(Record<?> record) {
295296
.orElseGet(TablePath.DEFAULT::getFullName);
296297
}
297298

298-
taskMetricsCalcContext.updateMetrics(record.getData(), tableId);
299+
connectorMetricsCalcContext.updateMetrics(record.getData(), tableId);
299300
}
300301
}
301302
} catch (Exception e) {

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.engine.server.task.group;
1919

20+
import org.apache.seatunnel.api.common.metrics.MetricsContext;
2021
import org.apache.seatunnel.engine.server.execution.Task;
2122
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
2223
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
@@ -30,5 +31,6 @@ public AbstractTaskGroupWithIntermediateQueue(
3031
super(taskGroupLocation, taskGroupName, tasks);
3132
}
3233

33-
public abstract AbstractIntermediateQueue<?> getQueueCache(long id);
34+
public abstract AbstractIntermediateQueue<?> getQueueCache(
35+
long id, MetricsContext metricsContext);
3436
}

0 commit comments

Comments
 (0)