Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/en/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ Please refer [security](security.md)
"pipelineEdges": {}
},
"metrics": {
"IntermediateQueueSize": "",
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
Expand Down Expand Up @@ -237,6 +238,7 @@ This API has been deprecated, please use /job-info/:jobId instead
"pipelineEdges": {}
},
"metrics": {
"IntermediateQueueSize": "",
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
Expand Down
1 change: 1 addition & 0 deletions docs/zh/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ seatunnel:
"pipelineEdges": {}
},
"metrics": {
"IntermediateQueueSize": "",
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ private MetricNames() {}
public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
public static final String SINK_WRITE_QPS = "SinkWriteQPS";
public static final String SINK_WRITE_BYTES_PER_SECONDS = "SinkWriteBytesPerSeconds";

public static final String INTERMEDIATE_QUEUE_SIZE = "IntermediateQueueSize";
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ public void testGetJobInfoByJobId() {
equalTo("5"))
.body("metrics.SinkWriteCount", equalTo("5"))
.body("metrics.SourceReceivedCount", equalTo("5"))
.body("metrics.IntermediateQueueSize", equalTo("0"))
.body(
"jobDag.envOptions.'job.mode'",
equalTo("BATCH"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,29 @@
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;

public class TaskMetricsCalcContext {
public class ConnectorMetricsCalcContext {

private final MetricsContext metricsContext;

private final PluginType type;

private Counter count;

private Map<String, Counter> countPerTable = new ConcurrentHashMap<>();
private final Map<String, Counter> countPerTable = new ConcurrentHashMap<>();

private Meter QPS;

private Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
private final Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();

private Counter bytes;

private Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();
private final Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();

private Meter bytesPerSeconds;

private Map<String, Meter> bytesPerSecondsPerTable = new ConcurrentHashMap<>();
private final Map<String, Meter> bytesPerSecondsPerTable = new ConcurrentHashMap<>();

public TaskMetricsCalcContext(
public ConnectorMetricsCalcContext(
MetricsContext metricsContext,
PluginType type,
boolean isMulti,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
Expand Down Expand Up @@ -245,7 +246,11 @@ private Map<String, Object> getJobMetrics(String jobMetrics) {
Map<String, Object> metricsMap = new HashMap<>();
// To add metrics, populate the corresponding array,
String[] countMetricsNames = {
SOURCE_RECEIVED_COUNT, SINK_WRITE_COUNT, SOURCE_RECEIVED_BYTES, SINK_WRITE_BYTES
SOURCE_RECEIVED_COUNT,
SINK_WRITE_COUNT,
SOURCE_RECEIVED_BYTES,
SINK_WRITE_BYTES,
INTERMEDIATE_QUEUE_SIZE
};
String[] rateMetricsNames = {
SOURCE_RECEIVED_QPS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;

import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -52,9 +52,7 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {

private final List<OneInputFlowLifeCycle<Record<?>>> outputs;

private final MetricsContext metricsContext;

private final TaskMetricsCalcContext taskMetricsCalcContext;
private final ConnectorMetricsCalcContext connectorMetricsCalcContext;

private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new AtomicBoolean(false);

Expand All @@ -77,14 +75,13 @@ public SeaTunnelSourceCollector(
this.checkpointLock = checkpointLock;
this.outputs = outputs;
this.rowType = rowType;
this.metricsContext = metricsContext;
if (rowType instanceof MultipleRowType) {
((MultipleRowType) rowType)
.iterator()
.forEachRemaining(type -> this.rowTypeMap.put(type.getKey(), type.getValue()));
}
this.taskMetricsCalcContext =
new TaskMetricsCalcContext(
this.connectorMetricsCalcContext =
new ConnectorMetricsCalcContext(
metricsContext,
PluginType.SOURCE,
CollectionUtils.isNotEmpty(tablePaths),
Expand All @@ -97,6 +94,8 @@ public void collect(T row) {
try {
if (row instanceof SeaTunnelRow) {
String tableId = ((SeaTunnelRow) row).getTableId();
// init the size of row early with rowType, this way is faster than init the size
// without rowType
int size;
if (rowType instanceof SeaTunnelRowType) {
size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);
Expand All @@ -107,7 +106,7 @@ public void collect(T row) {
"Unsupported row type: " + rowType.getClass().getName());
}
flowControlGate.audit((SeaTunnelRow) row);
taskMetricsCalcContext.updateMetrics(row, tableId);
connectorMetricsCalcContext.updateMetrics(row, tableId);
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Ex
this,
completableFuture,
((AbstractTaskGroupWithIntermediateQueue) taskBelongGroup)
.getQueueCache(config.getQueueID()));
.getQueueCache(config.getQueueID(), this.getMetricsContext()));
outputs = flowLifeCycles;
} else {
throw new UnknownFlowException(flow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.event.JobEventListener;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
Expand Down Expand Up @@ -91,15 +91,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo

private Optional<CommitInfoT> lastCommitInfo;

private MetricsContext metricsContext;
private final MetricsContext metricsContext;

private TaskMetricsCalcContext taskMetricsCalcContext;
private final ConnectorMetricsCalcContext connectorMetricsCalcContext;

private final boolean containAggCommitter;

private EventListener eventListener;
private final EventListener eventListener;

/** Mapping relationship between upstream tablepath and downstream tablepath. */
/** Mapping relationship between upstream TablePath and downstream TablePath. */
private final Map<TablePath, TablePath> tablesMaps = new HashMap<>();

public SinkFlowLifeCycle(
Expand Down Expand Up @@ -139,8 +139,9 @@ public SinkFlowLifeCycle(
sinkTables.add(TablePath.DEFAULT);
}
}
this.taskMetricsCalcContext =
new TaskMetricsCalcContext(metricsContext, PluginType.SINK, isMulti, sinkTables);
this.connectorMetricsCalcContext =
new ConnectorMetricsCalcContext(
metricsContext, PluginType.SINK, isMulti, sinkTables);
}

@Override
Expand Down Expand Up @@ -264,7 +265,7 @@ public void received(Record<?> record) {
if (prepareClose) {
return;
}
String tableId = "";
String tableId;
writer.write((T) record.getData());
if (record.getData() instanceof SeaTunnelRow) {
if (this.sinkAction.getSink() instanceof MultiTableSink) {
Expand Down Expand Up @@ -295,7 +296,7 @@ public void received(Record<?> record) {
.orElseGet(TablePath.DEFAULT::getFullName);
}

taskMetricsCalcContext.updateMetrics(record.getData(), tableId);
connectorMetricsCalcContext.updateMetrics(record.getData(), tableId);
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.engine.server.task.group;

import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
Expand All @@ -30,5 +31,6 @@ public AbstractTaskGroupWithIntermediateQueue(
super(taskGroupLocation, taskGroupName, tasks);
}

public abstract AbstractIntermediateQueue<?> getQueueCache(long id);
public abstract AbstractIntermediateQueue<?> getQueueCache(
long id, MetricsContext metricsContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.engine.server.task.group;

import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
Expand All @@ -25,12 +27,16 @@
import org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
import org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;

import org.apache.commons.lang3.tuple.Pair;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;

public class TaskGroupWithIntermediateBlockingQueue extends AbstractTaskGroupWithIntermediateQueue {

public static final int QUEUE_SIZE = 2048;
Expand All @@ -40,7 +46,7 @@ public TaskGroupWithIntermediateBlockingQueue(
super(taskGroupLocation, taskGroupName, tasks);
}

private Map<Long, BlockingQueue<Record<?>>> blockingQueueCache = null;
private Map<Long, Pair<BlockingQueue<Record<?>>, Counter>> blockingQueueCache = null;

@Override
public void init() {
Expand All @@ -52,9 +58,15 @@ public void init() {
}

@Override
public AbstractIntermediateQueue<?> getQueueCache(long id) {
blockingQueueCache.computeIfAbsent(id, i -> new ArrayBlockingQueue<>(QUEUE_SIZE));
return new IntermediateBlockingQueue(blockingQueueCache.get(id));
public AbstractIntermediateQueue<?> getQueueCache(long id, MetricsContext metricsContext) {
blockingQueueCache.computeIfAbsent(
id,
i ->
Pair.of(
new ArrayBlockingQueue<>(QUEUE_SIZE),
metricsContext.counter(INTERMEDIATE_QUEUE_SIZE)));
Pair<BlockingQueue<Record<?>>, Counter> cache = blockingQueueCache.get(id);
return new IntermediateBlockingQueue(cache.getLeft(), cache.getRight());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.engine.server.task.group;

import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupType;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void init() {
}

@Override
public AbstractIntermediateQueue<?> getQueueCache(long id) {
public AbstractIntermediateQueue<?> getQueueCache(long id, MetricsContext metricsContext) {
EventFactory<RecordEvent> eventFactory = new RecordEventFactory();
Disruptor<RecordEvent> disruptor =
new Disruptor<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.engine.server.task.group.queue;

import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
Expand All @@ -29,14 +30,19 @@

public class IntermediateBlockingQueue extends AbstractIntermediateQueue<BlockingQueue<Record<?>>> {

public IntermediateBlockingQueue(BlockingQueue<Record<?>> queue) {
private final Counter intermediateQueueSize;

public IntermediateBlockingQueue(
BlockingQueue<Record<?>> queue, Counter intermediateQueueSize) {
super(queue);
this.intermediateQueueSize = intermediateQueueSize;
}

@Override
public void received(Record<?> record) {
try {
handleRecord(record, getIntermediateQueue()::put);
intermediateQueueSize.inc();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -48,6 +54,7 @@ public void collect(Collector<Record<?>> collector) throws Exception {
Record<?> record = getIntermediateQueue().poll(100, TimeUnit.MILLISECONDS);
if (record != null) {
handleRecord(record, collector::collect);
intermediateQueueSize.dec();
} else {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
Expand Down Expand Up @@ -89,6 +90,22 @@ public void testGetJobMetrics() throws Exception {
assertEquals(30, (Long) jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
assertTrue((Double) jobMetrics.get(SOURCE_RECEIVED_QPS).get(0).value() > 0);
assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0);
assertEquals(0, (Long) jobMetrics.get(INTERMEDIATE_QUEUE_SIZE).get(0).value());
}

@Test
public void testMetricsWhenJobFailed() {
long jobId = System.currentTimeMillis();
startJob(jobId, "stream_fake_to_inmemory_with_error.conf", false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FAILED,
server.getCoordinatorService().getJobStatus(jobId)));

JobMetrics jobMetrics = server.getCoordinatorService().getJobMetrics(jobId);
assertTrue((Long) jobMetrics.get(INTERMEDIATE_QUEUE_SIZE).get(0).value() > 0);
}

@Test
Expand Down