Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 126 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,13 @@ type consumer struct {
lss *lastSeqSkipList
rlimit *rate.Limiter
reqSub *subscription
resetSub *subscription
ackSub *subscription
ackReplyT string
ackSubj string
nextMsgSubj string
nextMsgReqs *ipQueue[*nextMsgReq]
resetSubj string
maxp int
pblimit int
maxpb int
Expand Down Expand Up @@ -1263,6 +1265,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)
o.resetSubj = fmt.Sprintf(JSApiConsumerResetT, mn, o.name)

// Check/update the inactive threshold
o.updateInactiveThreshold(&o.cfg)
Expand Down Expand Up @@ -1547,6 +1550,11 @@ func (o *consumer) setLeader(isLeader bool) {
o.deleteWithoutAdvisory()
return
}
if o.resetSub, err = o.subscribeInternal(o.resetSubj, o.processResetReq); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}

// Check on flow control settings.
if o.cfg.FlowControl {
Expand Down Expand Up @@ -1667,8 +1675,9 @@ func (o *consumer) setLeader(isLeader bool) {
// ok if they are nil, we protect inside unsubscribe()
o.unsubscribe(o.ackSub)
o.unsubscribe(o.reqSub)
o.unsubscribe(o.resetSub)
o.unsubscribe(o.fcSub)
o.ackSub, o.reqSub, o.fcSub = nil, nil, nil
o.ackSub, o.reqSub, o.resetSub, o.fcSub = nil, nil, nil, nil
if o.infoSub != nil {
o.srv.sysUnsubscribe(o.infoSub)
o.infoSub = nil
Expand Down Expand Up @@ -2596,6 +2605,78 @@ func (o *consumer) updateSkipped(seq uint64) {
o.propose(b[:])
}

func (o *consumer) resetStartingSeq(seq uint64, reply string) (uint64, bool, error) {
o.mu.Lock()
defer o.mu.Unlock()

// Reset to a specific sequence, or back to the ack floor.
if seq == 0 {
seq = o.asflr + 1
} else if o.cfg.DeliverPolicy == DeliverAll {
// Always allowed.
goto VALID
} else if o.cfg.DeliverPolicy == DeliverByStartSequence {
// Only allowed if not going below what's configured.
if seq < o.cfg.OptStartSeq {
return 0, false, errors.New("below start seq")
}
goto VALID
} else if o.cfg.DeliverPolicy == DeliverByStartTime && o.mset != nil {
// Only allowed if not going below what's configured.
nseq := o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
if seq < nseq {
return 0, false, errors.New("below start time")
}
goto VALID
} else {
return 0, false, errors.New("not allowed")
}

VALID:
// Must be a minimum of 1.
if seq <= 0 {
seq = 1
}
o.resetLocalStartingSeq(seq)
// Clustered mode and R>1.
if o.node != nil {
b := make([]byte, 1+8+len(reply))
b[0] = byte(resetSeqOp)
var le = binary.LittleEndian
le.PutUint64(b[1:], seq)
copy(b[1+8:], reply)
o.propose(b[:])
return seq, false, nil
} else if o.store != nil {
o.store.Reset(seq - 1)
// Cleanup messages that lost interest.
if o.retention == InterestPolicy {
if mset := o.mset; mset != nil {
o.mu.Unlock()
ss := mset.state()
o.checkStateForInterestStream(&ss)
o.mu.Lock()
}
}

// Recalculate pending, and re-trigger message delivery.
o.streamNumPending()
o.signalNewMessages()
return seq, true, nil
}
return seq, false, nil
}

// Lock should be held.
func (o *consumer) resetLocalStartingSeq(seq uint64) {
o.pending, o.rdc = nil, nil
o.rdq = nil
o.rdqi.Empty()
o.sseq, o.dseq = seq, 1
o.adflr, o.asflr = o.dseq-1, o.sseq-1
o.ldt, o.lat = time.Time{}, time.Time{}
}

func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
// On exit make sure we nil out pch.
defer func() {
Expand Down Expand Up @@ -4119,6 +4200,48 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg)))
}

// processResetReq will reset a consumer to a new starting sequence.
func (o *consumer) processResetReq(_ *subscription, c *client, a *Account, _, reply string, rmsg []byte) {
if reply == _EMPTY_ {
return
}

s := o.srv
var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}}

hdr, msg := c.msgParts(rmsg)
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
return
}

// An empty message resets back to the ack floor, otherwise a custom sequence is used.
var req JSApiConsumerResetRequest
if len(msg) > 0 {
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
return
}
// Resetting to 0 is invalid.
if req.Seq == 0 {
resp.Error = NewJSInvalidJSONError(errors.New("reset to zero seq"))
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
return
}
}
resetSeq, canRespond, err := o.resetStartingSeq(req.Seq, reply)
if err != nil {
resp.Error = NewJSConsumerInvalidResetError(err)
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
} else if canRespond {
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
resp.ResetSeq = resetSeq
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
}
}

func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
o.mu.Lock()
defer o.mu.Unlock()
Expand Down Expand Up @@ -6060,9 +6183,11 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.active = false
o.unsubscribe(o.ackSub)
o.unsubscribe(o.reqSub)
o.unsubscribe(o.resetSub)
o.unsubscribe(o.fcSub)
o.ackSub = nil
o.reqSub = nil
o.resetSub = nil
o.fcSub = nil
if o.infoSub != nil {
o.srv.sysUnsubscribe(o.infoSub)
Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1998,5 +1998,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerInvalidResetErr",
"code": 400,
"error_code": 10202,
"description": "invalid reset: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
9 changes: 9 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11184,6 +11184,7 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) {
func (o *consumerFileStore) SetStarting(sseq uint64) error {
o.mu.Lock()
o.state.Delivered.Stream = sseq
o.state.AckFloor.Stream = sseq
buf, err := o.encodeState()
o.mu.Unlock()
if err != nil {
Expand All @@ -11208,6 +11209,14 @@ func (o *consumerFileStore) UpdateStarting(sseq uint64) {
o.kickFlusher()
}

// Reset all values in the store, and reset the starting sequence.
func (o *consumerFileStore) Reset(sseq uint64) error {
o.mu.Lock()
o.state = ConsumerState{}
o.mu.Unlock()
return o.SetStarting(sseq)
}

// HasState returns if this store has a recorded state.
func (o *consumerFileStore) HasState() bool {
o.mu.Lock()
Expand Down
16 changes: 16 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ const (
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"

// JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s"

// JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
JSApiConsumerUnpin = "$JS.API.CONSUMER.UNPIN.*.*"
JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
Expand Down Expand Up @@ -757,6 +760,19 @@ type JSApiConsumerGetNextRequest struct {
PriorityGroup
}

// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
type JSApiConsumerResetRequest struct {
Seq uint64 `json:"seq"`
}

type JSApiConsumerResetResponse struct {
ApiResponse
*ConsumerInfo
ResetSeq uint64 `json:"reset_seq"`
}

const JSApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response"

// Structure that holds state for a JetStream API request that is processed
// in a separate long-lived go routine. This is to avoid blocking connections.
type jsAPIRoutedReq struct {
Expand Down
35 changes: 35 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ const (
// Batch stream ops.
batchMsgOp
batchCommitMsgOp
// Consumer rest to specific starting sequence.
resetSeqOp
)

// raftGroups are controlled by the metagroup controller.
Expand Down Expand Up @@ -5832,6 +5834,39 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
o.store.UpdateStarting(sseq - 1)
}
o.mu.Unlock()
case resetSeqOp:
o.mu.Lock()
var le = binary.LittleEndian
sseq := le.Uint64(buf[1:9])
reply := string(buf[9:])
o.resetLocalStartingSeq(sseq)
if o.store != nil {
o.store.Reset(sseq - 1)
}
// Cleanup messages that lost interest.
if o.retention == InterestPolicy {
if mset := o.mset; mset != nil {
o.mu.Unlock()
ss := mset.state()
o.checkStateForInterestStream(&ss)
o.mu.Lock()
}
}
// Recalculate pending, and re-trigger message delivery.
if !o.isLeader() {
o.mu.Unlock()
} else {
o.streamNumPending()
o.signalNewMessages()
s, a := o.srv, o.acc
o.mu.Unlock()
if reply != _EMPTY_ {
var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}}
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
resp.ResetSeq = sseq
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
}
}
case addPendingRequest:
o.mu.Lock()
if !o.isLeader() {
Expand Down
Loading
Loading