Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.FileLockFactory;
Expand Down Expand Up @@ -64,6 +66,10 @@ public final class Queue implements Closeable {

protected volatile long unreadCount;

// the readDemand is a record of the currently-waiting-reader's demand and expiry
// it *MUST ONLY* be accessed when `lock.isHeldByCurrentThread() == true`
private ReadDemand readDemand;

private final CheckpointIO checkpointIO;
private final int pageCapacity;
private final long maxBytes;
Expand Down Expand Up @@ -428,6 +434,10 @@ public long write(Queueable element) throws IOException {
throw new QueueRuntimeException(QueueExceptionMessages.BIGGER_DATA_THAN_PAGE_SIZE);
}

// since a reader's batch cannot span multiple pages,
// we flag a force-flush when changing the head page.
boolean needsForceFlush = false;

// create a new head page if the current does not have sufficient space left for data to be written
if (!this.headPage.hasSpace(data.length)) {

Expand All @@ -446,13 +456,14 @@ public long write(Queueable element) throws IOException {

// create new head page
newCheckpointedHeadpage(newHeadPageNum);
needsForceFlush = true;
}

long seqNum = this.seqNum += 1;
this.headPage.write(data, seqNum, this.checkpointMaxWrites);
this.unreadCount++;

notEmpty.signal();
maybeSignalReadDemand(needsForceFlush);

// now check if we reached a queue full state and block here until it is not full
// for the next write or the queue was closed.
Expand Down Expand Up @@ -647,7 +658,7 @@ private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) thr
boolean elapsed;
// a head page is fully read but can be written to so let's wait for more data
try {
elapsed = !notEmpty.await(timeout, TimeUnit.MILLISECONDS);
elapsed = !awaitReadDemand(timeout, left);
} catch (InterruptedException e) {
// set back the interrupted flag
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -917,4 +928,39 @@ private Batch deserialize() {
return new Batch(elements, firstSeqNum, Queue.this);
}
}

private boolean awaitReadDemand(final long timeoutMillis, final int elementsNeeded) throws InterruptedException {
assert this.lock.isHeldByCurrentThread();

final long deadlineMillis = Math.addExact(System.currentTimeMillis(), timeoutMillis);
this.readDemand = new ReadDemand(deadlineMillis, elementsNeeded);

boolean unElapsed = this.notEmpty.awaitUntil(new Date(deadlineMillis));
this.readDemand = null;
return unElapsed;
}

private void maybeSignalReadDemand(boolean forceSignal) {
assert this.lock.isHeldByCurrentThread();

// if we're not forcing, and if the current read demand has
// neither been met nor expired, this method becomes a no-op.
if (!forceSignal && Objects.nonNull(readDemand)) {
if (unreadCount < readDemand.elementsNeeded && System.currentTimeMillis() < readDemand.deadlineMillis) {
return;
}
}

this.notEmpty.signal();
}

private static class ReadDemand {
final long deadlineMillis;
final int elementsNeeded;

ReadDemand(long deadlineMillis, int elementsNeeded) {
this.deadlineMillis = deadlineMillis;
this.elementsNeeded = elementsNeeded;
}
}
}