Skip to content

Conversation

@hawk9821
Copy link
Contributor

@hawk9821 hawk9821 commented Jan 6, 2025

Purpose of this pull request

When format_error_handle_way is skip , the kafka offset is not updated, causing kafka to read data repeatedly

try {
            if (deserializationSchema instanceof CompatibleKafkaConnectDeserializationSchema) {
                ((CompatibleKafkaConnectDeserializationSchema) deserializationSchema)
                        .deserialize(consumerRecord, outputCollector);
            } else {
                deserializationSchema.deserialize(consumerRecord.value(), outputCollector);
            }
            // consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset
            // for the next run
            splitState.setCurrentOffset(consumerRecord.offset() + 1);
        } catch (Exception e) {
            if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) {
                logger.warn(
                        "Deserialize message failed, skip this message, message: {}",
                        new String(consumerRecord.value()));
            } else {
                throw e;
            }
        }

CI log :

90d3aab509febe06be7301028726563

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@Hisoka-X
Copy link
Member

Hisoka-X commented Jan 6, 2025

cc @hailin0

@hailin0 hailin0 merged commit f67f272 into apache:dev Jan 6, 2025
5 checks passed
@hawk9821 hawk9821 deleted the kafka_error_handle_skip branch January 10, 2025 07:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants