Skip to content

Commit 3dfae8c

Browse files
committed
Updates per comments
1 parent 5d64f3d commit 3dfae8c

File tree

5 files changed

+30
-140
lines changed

5 files changed

+30
-140
lines changed

streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java

Lines changed: 0 additions & 60 deletions
This file was deleted.

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.kafka.common.utils.Time;
2424
import org.apache.kafka.streams.KeyValue;
2525
import org.apache.kafka.streams.errors.ProcessorStateException;
26-
import org.apache.kafka.streams.internals.metrics.OpenIterators;
2726
import org.apache.kafka.streams.kstream.internals.Change;
2827
import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
2928
import org.apache.kafka.streams.processor.StateStore;
@@ -49,9 +48,14 @@
4948
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
5049

5150
import java.util.ArrayList;
51+
import java.util.Comparator;
5252
import java.util.List;
5353
import java.util.Map;
54+
import java.util.NavigableSet;
55+
import java.util.NoSuchElementException;
5456
import java.util.Objects;
57+
import java.util.concurrent.ConcurrentSkipListSet;
58+
import java.util.concurrent.atomic.LongAdder;
5559
import java.util.function.Function;
5660

5761
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -93,7 +97,9 @@ public class MeteredKeyValueStore<K, V>
9397
private TaskId taskId;
9498
private Sensor restoreSensor;
9599

96-
protected OpenIterators openIterators;
100+
protected LongAdder numOpenIterators = new LongAdder();
101+
protected NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
102+
97103

98104
@SuppressWarnings("rawtypes")
99105
private final Map<Class, QueryHandler> queryHandlers =
@@ -148,10 +154,15 @@ private void registerMetrics() {
148154
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
149155
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
150156
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
151-
(config, now) -> openIterators.sum());
152-
openIterators = new OpenIterators();
157+
(config, now) -> numOpenIterators.sum());
153158
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
154-
(config, now) -> openIterators.oldestStartTimestamp());
159+
(config, now) -> {
160+
try {
161+
return openIterators.isEmpty() ? 0L : openIterators.first().startTimestamp();
162+
} catch (final NoSuchElementException ignore) {
163+
return 0L;
164+
}
165+
});
155166
}
156167

157168
@Override
@@ -442,6 +453,7 @@ private MeteredKeyValueIterator(final KeyValueIterator<Bytes, byte[]> iter,
442453
this.sensor = sensor;
443454
this.startTimestamp = time.milliseconds();
444455
this.startNs = time.nanoseconds();
456+
numOpenIterators.increment();
445457
openIterators.add(this);
446458
}
447459

@@ -471,6 +483,7 @@ public void close() {
471483
final long duration = time.nanoseconds() - startNs;
472484
sensor.record(duration);
473485
iteratorDurationSensor.record(duration);
486+
numOpenIterators.decrement();
474487
openIterators.remove(this);
475488
}
476489
}
@@ -499,6 +512,7 @@ private MeteredKeyValueTimestampedIterator(
499512
this.valueDeserializer = valueDeserializer;
500513
this.startTimestamp = time.milliseconds();
501514
this.startNs = time.nanoseconds();
515+
numOpenIterators.increment();
502516
openIterators.add(this);
503517
}
504518

@@ -528,6 +542,7 @@ public void close() {
528542
final long duration = time.nanoseconds() - startNs;
529543
sensor.record(duration);
530544
iteratorDurationSensor.record(duration);
545+
numOpenIterators.decrement();
531546
openIterators.remove(this);
532547
}
533548
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
import org.apache.kafka.common.metrics.Sensor;
2020
import org.apache.kafka.common.utils.Time;
21-
import org.apache.kafka.streams.internals.metrics.OpenIterators;
2221
import org.apache.kafka.streams.state.VersionedRecord;
2322
import org.apache.kafka.streams.state.VersionedRecordIterator;
2423

24+
import java.util.Set;
25+
import java.util.concurrent.atomic.LongAdder;
2526
import java.util.function.Function;
2627

2728
class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V>, MeteredIterator {
@@ -32,20 +33,24 @@ class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterato
3233
private final Time time;
3334
private final long startNs;
3435
private final long startTimestampMs;
35-
private final OpenIterators openIterators;
36+
private final Set<MeteredIterator> openIterators;
37+
private final LongAdder numOpenIterators;
3638

3739
public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator,
3840
final Sensor sensor,
3941
final Time time,
4042
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
41-
final OpenIterators openIterators) {
43+
final LongAdder numOpenIterators,
44+
final Set<MeteredIterator> openIterators) {
4245
this.iterator = iterator;
4346
this.deserializeValue = deserializeValue;
47+
this.numOpenIterators = numOpenIterators;
4448
this.openIterators = openIterators;
4549
this.sensor = sensor;
4650
this.time = time;
4751
this.startNs = time.nanoseconds();
4852
this.startTimestampMs = time.milliseconds();
53+
numOpenIterators.increment();
4954
openIterators.add(this);
5055
}
5156

@@ -60,6 +65,7 @@ public void close() {
6065
iterator.close();
6166
} finally {
6267
sensor.record(time.nanoseconds() - startNs);
68+
numOpenIterators.decrement();
6369
openIterators.remove(this);
6470
}
6571
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, final
269269
iteratorDurationSensor,
270270
time,
271271
StoreQueryUtils.deserializeValue(plainValueSerdes),
272+
numOpenIterators,
272273
openIterators
273274
);
274275
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult =

streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)