Skip to content

Commit 3ca2240

Browse files
committed
[Improve][Starrocks] Catch lable already exception
1 parent e6413c3 commit 3ca2240

File tree

2 files changed

+103
-1
lines changed

2 files changed

+103
-1
lines changed

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.shade.com.google.common.base.Strings;
2121

2222
import org.apache.seatunnel.api.table.catalog.TableSchema;
23+
import org.apache.seatunnel.common.utils.ExceptionUtils;
2324
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
2425
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
2526
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
@@ -45,9 +46,16 @@ public class StarRocksSinkManager {
4546
private long batchBytesSize = 0;
4647

4748
public StarRocksSinkManager(SinkConfig sinkConfig, TableSchema tableSchema) {
49+
this(sinkConfig, tableSchema, new StarRocksStreamLoadVisitor(sinkConfig, tableSchema));
50+
}
51+
52+
StarRocksSinkManager(
53+
SinkConfig sinkConfig,
54+
TableSchema tableSchema,
55+
StarRocksStreamLoadVisitor streamLoadVisitor) {
4856
this.sinkConfig = sinkConfig;
4957
this.batchList = new ArrayList<>();
50-
starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, tableSchema);
58+
starrocksStreamLoadVisitor = streamLoadVisitor;
5159
}
5260

5361
private void tryInit() throws IOException {
@@ -90,6 +98,13 @@ public synchronized void flush() throws IOException {
9098
}
9199
} catch (Exception e) {
92100
log.warn("Writing records to StarRocks failed, retry times = {}", i, e);
101+
102+
String labelAlreadyMessage =
103+
String.format("Label [%s] has already been used", label);
104+
if (ExceptionUtils.getMessage(e).contains(labelAlreadyMessage)) {
105+
log.warn("Label [{}] has already been used, Skipping this batch", label);
106+
break;
107+
}
93108
if (i >= sinkConfig.getMaxRetries()) {
94109
throw new StarRocksConnectorException(
95110
StarRocksConnectorErrorCode.WRITE_RECORDS_FAILED,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
22+
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
26+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
27+
import static org.junit.jupiter.api.Assertions.assertThrows;
28+
import static org.mockito.Mockito.any;
29+
import static org.mockito.Mockito.doThrow;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.times;
32+
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.when;
34+
35+
public class StarRocksSinkManagerTest {
36+
37+
private SinkConfig mockSinkConfig;
38+
private StarRocksStreamLoadVisitor mockStreamLoadVisitor;
39+
private StarRocksSinkManager sinkManager;
40+
41+
@BeforeEach
42+
void setUp() {
43+
mockSinkConfig = mock(SinkConfig.class);
44+
mockStreamLoadVisitor = mock(StarRocksStreamLoadVisitor.class);
45+
when(mockSinkConfig.getBatchMaxSize()).thenReturn(10);
46+
when(mockSinkConfig.getMaxRetries()).thenReturn(3);
47+
when(mockSinkConfig.getRetryBackoffMultiplierMs()).thenReturn(100);
48+
when(mockSinkConfig.getMaxRetryBackoffMs()).thenReturn(1000);
49+
this.sinkManager =
50+
new StarRocksSinkManager(mockSinkConfig, null, mockStreamLoadVisitor) {
51+
public String createBatchLabel() {
52+
return "test-label";
53+
}
54+
};
55+
}
56+
57+
@Test
58+
void testLabelAlreadyMessageHandledCorrectly() throws Exception {
59+
// Mock behavior for label already used
60+
doThrow(new RuntimeException("Label [test-label] has already been used"))
61+
.when(mockStreamLoadVisitor)
62+
.doStreamLoad(any());
63+
64+
// Add a record to trigger flush
65+
sinkManager.write("test-record");
66+
67+
// Verify that the exception is caught and the batch is skipped
68+
assertDoesNotThrow(() -> sinkManager.flush());
69+
verify(mockStreamLoadVisitor, times(1)).doStreamLoad(any());
70+
}
71+
72+
@Test
73+
void testLabelAlreadyMessageNotHandled() throws Exception {
74+
// Mock behavior for a different exception
75+
doThrow(new RuntimeException("Some other error"))
76+
.when(mockStreamLoadVisitor)
77+
.doStreamLoad(any());
78+
79+
// Add a record to trigger flush
80+
sinkManager.write("test-record");
81+
82+
// Verify that the exception is propagated after retries
83+
assertThrows(StarRocksConnectorException.class, () -> sinkManager.flush());
84+
verify(mockStreamLoadVisitor, times(4))
85+
.doStreamLoad(any()); // 3 retries + 1 initial attempt
86+
}
87+
}

0 commit comments

Comments
 (0)