Skip to content

Commit e8459d3

Browse files
authored
KAFKA-19875 Duplicated topic config prevents broker start (#20844)
Ignore duplicated values in a config of List type instead of failing for backward compatibility. Reviewers: Jun Rao <[email protected]>
1 parent d04f171 commit e8459d3

File tree

9 files changed

+101
-45
lines changed

9 files changed

+101
-45
lines changed

clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
import org.apache.kafka.common.config.types.Password;
2020
import org.apache.kafka.common.utils.Utils;
2121

22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
2225
import java.util.ArrayList;
2326
import java.util.Arrays;
2427
import java.util.Collections;
@@ -81,7 +84,7 @@
8184
public class ConfigDef {
8285

8386
private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
84-
87+
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDef.class);
8588
/**
8689
* A unique Java object which represents the lack of a default value.
8790
*/
@@ -536,6 +539,14 @@ Object parseValue(ConfigKey key, Object value, boolean isSet) {
536539
// otherwise assign setting its default value
537540
parsedValue = key.defaultValue;
538541
}
542+
if (key.validator instanceof ValidList && parsedValue instanceof List) {
543+
List<?> originalListValue = (List<?>) parsedValue;
544+
parsedValue = originalListValue.stream().distinct().collect(Collectors.toList());
545+
if (originalListValue.size() != ((List<?>) parsedValue).size()) {
546+
LOGGER.warn("Configuration key \"{}\" contains duplicate values. Duplicates will be removed. The original value " +
547+
"is: {}, the updated value is: {}", key.name, originalListValue, parsedValue);
548+
}
549+
}
539550
if (key.validator != null) {
540551
key.validator.ensureValid(key.name, parsedValue);
541552
}
@@ -1070,8 +1081,7 @@ private void validateIndividualValues(String name, List<Object> values) {
10701081
}
10711082

10721083
public String toString() {
1073-
return validString + (isEmptyAllowed ? " (empty config allowed)" : " (empty not allowed)") +
1074-
(isNullAllowed ? " (null config allowed)" : " (null not allowed)");
1084+
return !validString.validStrings.isEmpty() ? validString.toString() : "";
10751085
}
10761086
}
10771087

clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,14 @@ public class BrokerSecurityConfigs {
169169
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, MEDIUM, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
170170
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
171171
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, STRING, null, LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC)
172-
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST, Collections.emptyList(), MEDIUM, SslConfigs.SSL_CIPHER_SUITES_DOC)
172+
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST, List.of(), ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM, SslConfigs.SSL_CIPHER_SUITES_DOC)
173173
.define(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, CLASS, null, LOW, SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC)
174174

175175
// Sasl Configuration
176176
.define(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, MEDIUM, BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC)
177-
.define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, MEDIUM, BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
177+
.define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM, BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
178178
.define(BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG, CLASS, null, MEDIUM, BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_DOC)
179-
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, MEDIUM, BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
179+
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.ValidList.anyNonDuplicateValues(true, false), MEDIUM, BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
180180
.define(SaslConfigs.SASL_JAAS_CONFIG, PASSWORD, null, MEDIUM, SaslConfigs.SASL_JAAS_CONFIG_DOC)
181181
.define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
182182
.define(SaslConfigs.SASL_LOGIN_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_LOGIN_CLASS_DOC)
@@ -218,6 +218,6 @@ public class BrokerSecurityConfigs {
218218
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
219219
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
220220
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, INT, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, LOW, SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
221-
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, List.of(), LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
221+
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, List.of(), ConfigDef.ValidList.anyNonDuplicateValues(true, false), LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
222222
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, STRING, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
223223
}

clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -760,34 +760,55 @@ public void testListSizeValidatorToString() {
760760

761761
@Test
762762
public void testListValidatorAnyNonDuplicateValues() {
763-
ConfigDef.ValidList allowAnyNonDuplicateValues = ConfigDef.ValidList.anyNonDuplicateValues(true, true);
764-
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
765-
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
766-
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", null));
767-
ConfigException exception1 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "a")));
763+
ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyListAndNull = ConfigDef.ValidList.anyNonDuplicateValues(true, true);
764+
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("a", "b", "c")));
765+
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of()));
766+
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", null));
767+
ConfigException exception1 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("a", "a")));
768768
assertEquals("Configuration 'test.config' values must not be duplicated.", exception1.getMessage());
769-
ConfigException exception2 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
769+
ConfigException exception2 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("")));
770770
assertEquals("Configuration 'test.config' values must not be empty.", exception2.getMessage());
771-
771+
ConfigException exception3 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("a", "", "b")));
772+
assertEquals("Configuration 'test.config' values must not be empty.", exception3.getMessage());
773+
772774
ConfigDef.ValidList allowAnyNonDuplicateValuesAndNull = ConfigDef.ValidList.anyNonDuplicateValues(false, true);
773775
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "b", "c")));
774776
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", null));
775-
ConfigException exception3 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
776-
assertEquals("Configuration 'test.config' must not be empty. Valid values include: any non-empty value", exception3.getMessage());
777-
ConfigException exception4 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "a")));
778-
assertEquals("Configuration 'test.config' values must not be duplicated.", exception4.getMessage());
779-
ConfigException exception5 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
780-
assertEquals("Configuration 'test.config' values must not be empty.", exception5.getMessage());
777+
ConfigException exception4 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
778+
assertEquals("Configuration 'test.config' must not be empty. Valid values include: any non-empty value", exception4.getMessage());
779+
ConfigException exception5 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "a")));
780+
assertEquals("Configuration 'test.config' values must not be duplicated.", exception5.getMessage());
781+
ConfigException exception6 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
782+
assertEquals("Configuration 'test.config' values must not be empty.", exception6.getMessage());
783+
ConfigException exception7 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "", "b")));
784+
assertEquals("Configuration 'test.config' values must not be empty.", exception7.getMessage());
785+
781786

782787
ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyList = ConfigDef.ValidList.anyNonDuplicateValues(true, false);
783788
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "b", "c")));
784789
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of()));
785-
ConfigException exception6 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
786-
assertEquals("Configuration 'test.config' values must not be null.", exception6.getMessage());
787-
ConfigException exception7 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "a")));
788-
assertEquals("Configuration 'test.config' values must not be duplicated.", exception7.getMessage());
789-
ConfigException exception8 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("")));
790-
assertEquals("Configuration 'test.config' values must not be empty.", exception8.getMessage());
790+
ConfigException exception8 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
791+
assertEquals("Configuration 'test.config' values must not be null.", exception8.getMessage());
792+
ConfigException exception9 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "a")));
793+
assertEquals("Configuration 'test.config' values must not be duplicated.", exception9.getMessage());
794+
ConfigException exception10 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("")));
795+
assertEquals("Configuration 'test.config' values must not be empty.", exception10.getMessage());
796+
ConfigException exception11 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "", "b")));
797+
assertEquals("Configuration 'test.config' values must not be empty.", exception11.getMessage());
798+
799+
800+
ConfigDef.ValidList allowAnyNonDuplicateValues = ConfigDef.ValidList.anyNonDuplicateValues(false, false);
801+
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
802+
ConfigException exception12 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", null));
803+
assertEquals("Configuration 'test.config' values must not be null.", exception12.getMessage());
804+
ConfigException exception13 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
805+
assertEquals("Configuration 'test.config' must not be empty. Valid values include: any non-empty value", exception13.getMessage());
806+
ConfigException exception14 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "", "b")));
807+
assertEquals("Configuration 'test.config' values must not be empty.", exception14.getMessage());
808+
ConfigException exception15 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
809+
assertEquals("Configuration 'test.config' values must not be empty.", exception15.getMessage());
810+
ConfigException exception16 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "", "b")));
811+
assertEquals("Configuration 'test.config' values must not be empty.", exception16.getMessage());
791812
}
792813

793814
@Test
@@ -813,4 +834,24 @@ public void testListValidatorIn() {
813834
ConfigException exception7 = assertThrows(ConfigException.class, () -> notAllowEmptyValidator.ensureValid("test.config", List.of("a", "a")));
814835
assertEquals("Configuration 'test.config' values must not be duplicated.", exception7.getMessage());
815836
}
837+
838+
@Test
839+
public void testParsedValueWillRemoveDuplicatesInValidList() {
840+
ConfigDef def = new ConfigDef()
841+
.define(
842+
"list",
843+
Type.LIST,
844+
List.of(),
845+
ConfigDef.ValidList.anyNonDuplicateValues(true, true),
846+
Importance.HIGH,
847+
"list doc"
848+
);
849+
850+
Map<String, String> props = new HashMap<>();
851+
props.put("list", "a,b,c,a,b");
852+
853+
Map<String, Object> parsed = def.parse(props);
854+
List<String> expectedList = List.of("a", "b", "c");
855+
assertEquals(expectedList, parsed.get("list"));
856+
}
816857
}

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
/** Filters excluded property names or regexes. */
2828
public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
29-
29+
3030
public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude";
3131
public static final String USE_DEFAULTS_FROM = "use.defaults.from";
3232
private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults (source or target) to use "
@@ -71,6 +71,7 @@ static class ConfigPropertyFilterConfig extends AbstractConfig {
7171
.define(CONFIG_PROPERTIES_EXCLUDE_CONFIG,
7272
Type.LIST,
7373
CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
74+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
7475
Importance.HIGH,
7576
CONFIG_PROPERTIES_EXCLUDE_DOC)
7677
.define(USE_DEFAULTS_FROM,

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
/** Uses an include and exclude pattern. */
2828
public class DefaultTopicFilter implements TopicFilter {
29-
29+
3030
public static final String TOPICS_INCLUDE_CONFIG = "topics";
3131
private static final String TOPICS_INCLUDE_DOC = "List of topics and/or regexes to replicate.";
3232
public static final String TOPICS_INCLUDE_DEFAULT = ".*";
@@ -64,11 +64,13 @@ static class TopicFilterConfig extends AbstractConfig {
6464
.define(TOPICS_INCLUDE_CONFIG,
6565
Type.LIST,
6666
TOPICS_INCLUDE_DEFAULT,
67+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
6768
Importance.HIGH,
6869
TOPICS_INCLUDE_DOC)
6970
.define(TOPICS_EXCLUDE_CONFIG,
7071
Type.LIST,
7172
TOPICS_EXCLUDE_DEFAULT,
73+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
7274
Importance.HIGH,
7375
TOPICS_EXCLUDE_DOC);
7476

connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AllowlistConnectorClientConfigOverridePolicy.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,14 @@ public class AllowlistConnectorClientConfigOverridePolicy extends AbstractConnec
4040
private static final String ALLOWLIST_CONFIG_DOC = "List of client configurations that can be overridden by " +
4141
"connectors. If empty, connectors can't override any client configurations.";
4242
private static final ConfigDef CONFIG_DEF = new ConfigDef()
43-
.define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC);
43+
.define(
44+
ALLOWLIST_CONFIG,
45+
ConfigDef.Type.LIST,
46+
ALLOWLIST_CONFIG_DEFAULT,
47+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
48+
ConfigDef.Importance.MEDIUM,
49+
ALLOWLIST_CONFIG_DOC
50+
);
4451

4552
private List<String> allowlist = ALLOWLIST_CONFIG_DEFAULT;
4653

core/src/test/scala/unit/kafka/log/LogConfigTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ class LogConfigTest {
294294
assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
295295
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
296296
assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
297-
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete")
297+
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,delete,delete")
298298
validateCleanupPolicy()
299299
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "")
300300
validateCleanupPolicy()

0 commit comments

Comments
 (0)