Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 78 additions & 11 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ const (
AckAll
// AckExplicit requires ack or nack for all messages.
AckExplicit
// AckFlowControl functions like AckAll, but acks based on responses to flow control.
AckFlowControl
)

func (a AckPolicy) String() string {
Expand All @@ -345,6 +347,8 @@ func (a AckPolicy) String() string {
return "none"
case AckAll:
return "all"
case AckFlowControl:
return "flow_control"
default:
return "explicit"
}
Expand Down Expand Up @@ -652,7 +656,7 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
config.InactiveThreshold = streamCfg.ConsumerLimits.InactiveThreshold
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
if config.MaxAckPending == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll || config.AckPolicy == AckFlowControl) {
ackPending := JsDefaultMaxAckPending
if lim.MaxAckPending > 0 && lim.MaxAckPending < ackPending {
ackPending = lim.MaxAckPending
Expand All @@ -674,6 +678,11 @@ func setConsumerConfigDefaults(config *ConsumerConfig, streamCfg *StreamConfig,
if config.PriorityPolicy == PriorityPinnedClient && config.PinnedTTL == 0 {
config.PinnedTTL = JsDefaultPinnedTTL
}

// Set default values for flow control policy.
if config.AckPolicy == AckFlowControl && !pedantic {
config.FlowControl = true
}
return nil
}

Expand Down Expand Up @@ -723,6 +732,25 @@ func checkConsumerCfg(
return NewJSConsumerAckWaitNegativeError()
}

// Ack Flow Control policy requires push-based flow-controlled consumer.
if config.AckPolicy == AckFlowControl {
if config.DeliverSubject == _EMPTY_ {
return NewJSConsumerAckFCRequiresPushError()
}
if !config.FlowControl {
return NewJSConsumerAckFCRequiresFCError()
}
if config.MaxAckPending <= 0 {
return NewJSConsumerAckFCRequiresMaxAckPendingError()
}
if config.AckWait != 0 || len(config.BackOff) > 0 {
return NewJSConsumerAckFCRequiresNoAckWaitError()
}
if config.MaxDeliver > 0 {
return NewJSConsumerAckFCRequiresNoMaxDeliverError()
}
}

// Check if we have a BackOff defined that MaxDeliver is within range etc.
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver != -1 && lbo > config.MaxDeliver {
return NewJSConsumerMaxDeliverBackoffError()
Expand Down Expand Up @@ -1075,7 +1103,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Check on stream type conflicts with WorkQueues.
if cfg.Retention == WorkQueuePolicy && !config.Direct {
// Force explicit acks here.
if config.AckPolicy != AckExplicit {
if config.AckPolicy != AckExplicit && config.AckPolicy != AckFlowControl {
mset.mu.Unlock()
return nil, NewJSConsumerWQRequiresExplicitAckError()
}
Expand Down Expand Up @@ -1535,7 +1563,7 @@ func (o *consumer) setLeader(isLeader bool) {
}

var err error
if o.cfg.AckPolicy != AckNone {
if o.cfg.AckPolicy != AckNone && o.cfg.AckPolicy != AckFlowControl {
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
Expand Down Expand Up @@ -3371,7 +3399,9 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}

// Check if this ack is above the current pointer to our next to deliver.
if sseq >= o.sseq {
// Ignore if it's a flow-controlled consumer, its state could end up further ahead
// since its state is not replicated before delivery.
if sseq >= o.sseq && !o.cfg.FlowControl {
// Let's make sure this is valid.
// This is only received on the consumer leader, so should never be higher
// than the last stream sequence. But could happen if we've just become
Expand Down Expand Up @@ -3429,7 +3459,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
}
delete(o.rdc, sseq)
o.removeFromRedeliverQueue(sseq)
case AckAll:
case AckAll, AckFlowControl:
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
Expand Down Expand Up @@ -3589,7 +3619,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}

switch o.cfg.AckPolicy {
case AckNone, AckAll:
case AckNone, AckAll, AckFlowControl:
needAck = sseq > asflr
case AckExplicit:
if sseq > asflr {
Expand Down Expand Up @@ -5181,7 +5211,15 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if o.isActive() {
o.mu.RLock()
o.sendIdleHeartbeat(odsubj)
flowControl := o.cfg.AckPolicy == AckFlowControl && len(o.pending) > 0
o.mu.RUnlock()

// Send flow control on EOS if it's used for acknowledgements.
if flowControl {
o.mu.Lock()
o.sendFlowControl()
o.mu.Unlock()
}
}
// Reset our idle heartbeat timer.
hb.Reset(hbd)
Expand Down Expand Up @@ -5349,7 +5387,7 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
// Update delivered first.
o.updateDelivered(dseq, seq, dc, ts)

if ap == AckExplicit || ap == AckAll {
if ap == AckExplicit || ap == AckAll || ap == AckFlowControl {
o.trackPending(seq, dseq)
} else if ap == AckNone {
o.adflr = dseq
Expand Down Expand Up @@ -5401,19 +5439,23 @@ func (o *consumer) needFlowControl(sz int) bool {
if o.fcid == _EMPTY_ && o.pbytes > o.maxpb/2 {
return true
}
// Or, when acking based on flow control, we need to send it if we've hit the max pending limit earlier.
if o.fcid == _EMPTY_ && o.cfg.AckPolicy == AckFlowControl && o.maxp > 0 && len(o.pending) >= o.maxp {
return true
}
// If we have an existing outstanding FC, check to see if we need to expand the o.fcsz
if o.fcid != _EMPTY_ && (o.pbytes-o.fcsz) >= o.maxpb {
o.fcsz += sz
}
return false
}

func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, subj, _ string, _ []byte) {
func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, subj, _ string, rmsg []byte) {
o.mu.Lock()
defer o.mu.Unlock()

// Ignore if not the latest we have sent out.
if subj != o.fcid {
o.mu.Unlock()
return
}

Expand All @@ -5433,6 +5475,21 @@ func (o *consumer) processFlowControl(_ *subscription, c *client, _ *Account, su
o.fcid, o.fcsz = _EMPTY_, 0

o.signalNewMessages()
o.mu.Unlock()

hdr, _ := c.msgParts(rmsg)
if len(hdr) > 0 {
ldseq := parseInt64(sliceHeader(JSLastConsumerSeq, hdr))
lsseq := parseInt64(sliceHeader(JSLastStreamSeq, hdr))
if lsseq > 0 {
// Delivered sequence is allowed to be zero as a response
// to flow control without any deliveries.
if ldseq <= 0 {
ldseq = 0
}
o.processAckMsg(uint64(lsseq), uint64(ldseq), 1, _EMPTY_, false)
}
}
}

// Lock should be held.
Expand Down Expand Up @@ -5615,8 +5672,9 @@ func (o *consumer) checkPending() {
defer o.mu.Unlock()

mset := o.mset
ttl := int64(o.cfg.AckWait)
// On stop, mset and timer will be nil.
if o.closed || mset == nil || o.ptmr == nil {
if o.closed || mset == nil || o.ptmr == nil || ttl == 0 {
o.stopAndClearPtmr()
return
}
Expand All @@ -5627,7 +5685,6 @@ func (o *consumer) checkPending() {
fseq := state.FirstSeq

now := time.Now().UnixNano()
ttl := int64(o.cfg.AckWait)
next := int64(o.ackWait(0))
// However, if there is backoff, initializes with the largest backoff.
// It will be adjusted as needed.
Expand Down Expand Up @@ -6638,6 +6695,12 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}

func (o *consumer) resetPtmr(delay time.Duration) {
// A delay of zero means it should be stopped.
if delay == 0 {
o.stopAndClearPtmr()
return
}

if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
Expand All @@ -6647,6 +6710,10 @@ func (o *consumer) resetPtmr(delay time.Duration) {
}

func (o *consumer) stopAndClearPtmr() {
// If the end time is unset, short-circuit since the timer will already be stopped.
if o.ptmrEnd.IsZero() {
return
}
stopAndClearTimer(&o.ptmr)
o.ptmrEnd = time.Time{}
}
Expand Down
70 changes: 70 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -2008,5 +2008,75 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorConsumerRequiresAckFCErr",
"code": 400,
"error_code": 10203,
"description": "stream mirror consumer requires flow control ack policy",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSSourceConsumerRequiresAckFCErr",
"code": 400,
"error_code": 10204,
"description": "stream source consumer requires flow control ack policy",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerAckFCRequiresPushErr",
"code": 400,
"error_code": 10205,
"description": "flow control ack policy requires a push based consumer",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerAckFCRequiresFCErr",
"code": 400,
"error_code": 10206,
"description": "flow control ack policy requires flow control",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerAckFCRequiresMaxAckPendingErr",
"code": 400,
"error_code": 10207,
"description": "flow control ack policy requires max ack pending",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerAckFCRequiresNoAckWaitErr",
"code": 400,
"error_code": 10208,
"description": "flow control ack policy requires unset ack wait",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerAckFCRequiresNoMaxDeliverErr",
"code": 400,
"error_code": 10209,
"description": "flow control ack policy requires unset max deliver",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11319,7 +11319,7 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
}

// Check for AckAll here.
if o.cfg.AckPolicy == AckAll {
if o.cfg.AckPolicy == AckAll || o.cfg.AckPolicy == AckFlowControl {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
Expand Down
1 change: 1 addition & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ type JSApiConsumerResetRequest struct {
Seq uint64 `json:"seq"`
}

// JSApiConsumerResetResponse is a superset of JSApiConsumerCreateResponse, but including an explicit ResetSeq.
type JSApiConsumerResetResponse struct {
ApiResponse
*ConsumerInfo
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3653,7 +3653,7 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
if needLock {
mset.mu.RLock()
}
mset.sendFlowControlReply(reply)
mset.sendFlowControlReply(reply, hdr)
if needLock {
mset.mu.RUnlock()
}
Expand Down Expand Up @@ -5900,7 +5900,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
o.lat = time.Now()

var sagap uint64
if o.cfg.AckPolicy == AckAll {
if o.cfg.AckPolicy == AckAll || o.cfg.AckPolicy == AckFlowControl {
// Always use the store state, as o.asflr is skipped ahead already.
// Capture before updating store.
state, err := o.store.BorrowState()
Expand Down Expand Up @@ -8408,7 +8408,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// Check if we are work queue policy.
// We will do pre-checks here to avoid thrashing meta layer.
if sa.Config.Retention == WorkQueuePolicy && !cfg.Direct {
if cfg.AckPolicy != AckExplicit {
if cfg.AckPolicy != AckExplicit && cfg.AckPolicy != AckFlowControl {
resp.Error = NewJSConsumerWQRequiresExplicitAckError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
Expand Down
Loading