diff --git a/server/consumer.go b/server/consumer.go index 230cb43536..cdf46e5256 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -437,11 +437,13 @@ type consumer struct { lss *lastSeqSkipList rlimit *rate.Limiter reqSub *subscription + resetSub *subscription ackSub *subscription ackReplyT string ackSubj string nextMsgSubj string nextMsgReqs *ipQueue[*nextMsgReq] + resetSubj string maxp int pblimit int maxpb int @@ -1263,6 +1265,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre) o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre) o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name) + o.resetSubj = fmt.Sprintf(JSApiConsumerResetT, mn, o.name) // Check/update the inactive threshold o.updateInactiveThreshold(&o.cfg) @@ -1547,6 +1550,11 @@ func (o *consumer) setLeader(isLeader bool) { o.deleteWithoutAdvisory() return } + if o.resetSub, err = o.subscribeInternal(o.resetSubj, o.processResetReq); err != nil { + o.mu.Unlock() + o.deleteWithoutAdvisory() + return + } // Check on flow control settings. if o.cfg.FlowControl { @@ -1667,8 +1675,9 @@ func (o *consumer) setLeader(isLeader bool) { // ok if they are nil, we protect inside unsubscribe() o.unsubscribe(o.ackSub) o.unsubscribe(o.reqSub) + o.unsubscribe(o.resetSub) o.unsubscribe(o.fcSub) - o.ackSub, o.reqSub, o.fcSub = nil, nil, nil + o.ackSub, o.reqSub, o.resetSub, o.fcSub = nil, nil, nil, nil if o.infoSub != nil { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil @@ -2596,6 +2605,78 @@ func (o *consumer) updateSkipped(seq uint64) { o.propose(b[:]) } +func (o *consumer) resetStartingSeq(seq uint64, reply string) (uint64, bool, error) { + o.mu.Lock() + defer o.mu.Unlock() + + // Reset to a specific sequence, or back to the ack floor. + if seq == 0 { + seq = o.asflr + 1 + } else if o.cfg.DeliverPolicy == DeliverAll { + // Always allowed. + goto VALID + } else if o.cfg.DeliverPolicy == DeliverByStartSequence { + // Only allowed if not going below what's configured. + if seq < o.cfg.OptStartSeq { + return 0, false, errors.New("below start seq") + } + goto VALID + } else if o.cfg.DeliverPolicy == DeliverByStartTime && o.mset != nil { + // Only allowed if not going below what's configured. + nseq := o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime) + if seq < nseq { + return 0, false, errors.New("below start time") + } + goto VALID + } else { + return 0, false, errors.New("not allowed") + } + +VALID: + // Must be a minimum of 1. + if seq <= 0 { + seq = 1 + } + o.resetLocalStartingSeq(seq) + // Clustered mode and R>1. + if o.node != nil { + b := make([]byte, 1+8+len(reply)) + b[0] = byte(resetSeqOp) + var le = binary.LittleEndian + le.PutUint64(b[1:], seq) + copy(b[1+8:], reply) + o.propose(b[:]) + return seq, false, nil + } else if o.store != nil { + o.store.Reset(seq - 1) + // Cleanup messages that lost interest. + if o.retention == InterestPolicy { + if mset := o.mset; mset != nil { + o.mu.Unlock() + ss := mset.state() + o.checkStateForInterestStream(&ss) + o.mu.Lock() + } + } + + // Recalculate pending, and re-trigger message delivery. + o.streamNumPending() + o.signalNewMessages() + return seq, true, nil + } + return seq, false, nil +} + +// Lock should be held. +func (o *consumer) resetLocalStartingSeq(seq uint64) { + o.pending, o.rdc = nil, nil + o.rdq = nil + o.rdqi.Empty() + o.sseq, o.dseq = seq, 1 + o.adflr, o.asflr = o.dseq-1, o.sseq-1 + o.ldt, o.lat = time.Time{}, time.Time{} +} + func (o *consumer) loopAndForwardProposals(qch chan struct{}) { // On exit make sure we nil out pch. defer func() { @@ -4119,6 +4200,48 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg))) } +// processResetReq will reset a consumer to a new starting sequence. +func (o *consumer) processResetReq(_ *subscription, c *client, a *Account, _, reply string, rmsg []byte) { + if reply == _EMPTY_ { + return + } + + s := o.srv + var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}} + + hdr, msg := c.msgParts(rmsg) + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp)) + return + } + + // An empty message resets back to the ack floor, otherwise a custom sequence is used. + var req JSApiConsumerResetRequest + if len(msg) > 0 { + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = NewJSInvalidJSONError(err) + s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp)) + return + } + // Resetting to 0 is invalid. + if req.Seq == 0 { + resp.Error = NewJSInvalidJSONError(errors.New("reset to zero seq")) + s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp)) + return + } + } + resetSeq, canRespond, err := o.resetStartingSeq(req.Seq, reply) + if err != nil { + resp.Error = NewJSConsumerInvalidResetError(err) + s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp)) + } else if canRespond { + resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info()) + resp.ResetSeq = resetSeq + s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp)) + } +} + func (o *consumer) processNextMsgRequest(reply string, msg []byte) { o.mu.Lock() defer o.mu.Unlock() @@ -6060,9 +6183,11 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { o.active = false o.unsubscribe(o.ackSub) o.unsubscribe(o.reqSub) + o.unsubscribe(o.resetSub) o.unsubscribe(o.fcSub) o.ackSub = nil o.reqSub = nil + o.resetSub = nil o.fcSub = nil if o.infoSub != nil { o.srv.sysUnsubscribe(o.infoSub) diff --git a/server/errors.json b/server/errors.json index 410544bdaa..c0e2818980 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1998,5 +1998,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerInvalidResetErr", + "code": 400, + "error_code": 10202, + "description": "invalid reset: {err}", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/filestore.go b/server/filestore.go index e485fea6e6..34f2ea3813 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -11184,6 +11184,7 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) { func (o *consumerFileStore) SetStarting(sseq uint64) error { o.mu.Lock() o.state.Delivered.Stream = sseq + o.state.AckFloor.Stream = sseq buf, err := o.encodeState() o.mu.Unlock() if err != nil { @@ -11208,6 +11209,14 @@ func (o *consumerFileStore) UpdateStarting(sseq uint64) { o.kickFlusher() } +// Reset all values in the store, and reset the starting sequence. +func (o *consumerFileStore) Reset(sseq uint64) error { + o.mu.Lock() + o.state = ConsumerState{} + o.mu.Unlock() + return o.SetStarting(sseq) +} + // HasState returns if this store has a recorded state. func (o *consumerFileStore) HasState() bool { o.mu.Lock() diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 0fae308e0a..14fb243a36 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -155,6 +155,9 @@ const ( // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s" + // JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence. + JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s" + // JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer. JSApiConsumerUnpin = "$JS.API.CONSUMER.UNPIN.*.*" JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s" @@ -757,6 +760,19 @@ type JSApiConsumerGetNextRequest struct { PriorityGroup } +// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence. +type JSApiConsumerResetRequest struct { + Seq uint64 `json:"seq"` +} + +type JSApiConsumerResetResponse struct { + ApiResponse + *ConsumerInfo + ResetSeq uint64 `json:"reset_seq"` +} + +const JSApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response" + // Structure that holds state for a JetStream API request that is processed // in a separate long-lived go routine. This is to avoid blocking connections. type jsAPIRoutedReq struct { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5f53a6e393..e601a42bb9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -124,6 +124,8 @@ const ( // Batch stream ops. batchMsgOp batchCommitMsgOp + // Consumer rest to specific starting sequence. + resetSeqOp ) // raftGroups are controlled by the metagroup controller. @@ -5832,6 +5834,39 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea o.store.UpdateStarting(sseq - 1) } o.mu.Unlock() + case resetSeqOp: + o.mu.Lock() + var le = binary.LittleEndian + sseq := le.Uint64(buf[1:9]) + reply := string(buf[9:]) + o.resetLocalStartingSeq(sseq) + if o.store != nil { + o.store.Reset(sseq - 1) + } + // Cleanup messages that lost interest. + if o.retention == InterestPolicy { + if mset := o.mset; mset != nil { + o.mu.Unlock() + ss := mset.state() + o.checkStateForInterestStream(&ss) + o.mu.Lock() + } + } + // Recalculate pending, and re-trigger message delivery. + if !o.isLeader() { + o.mu.Unlock() + } else { + o.streamNumPending() + o.signalNewMessages() + s, a := o.srv, o.acc + o.mu.Unlock() + if reply != _EMPTY_ { + var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}} + resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info()) + resp.ResetSeq = sseq + s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp)) + } + } case addPendingRequest: o.mu.Lock() if !o.isLeader() { diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 9cc79db264..329cda195c 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -10424,3 +10424,358 @@ func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) { require_Equal(t, msg.Header.Get("Status"), "404") require_Equal(t, msg.Header.Get("Description"), "No Messages") } + +func TestJetStreamConsumerResetToSequence(t *testing.T) { + test := func(replicas int) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + } + _, err := js.AddStream(cfg) + require_NoError(t, err) + + sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", + nats.BindStream("TEST"), + nats.MaxAckPending(1), + nats.AckWait(time.Second), + nats.ConsumerReplicas(replicas), + ) + require_NoError(t, err) + defer sub.Drain() + + for i := range 4 { + _, err = js.Publish("foo", []byte(fmt.Sprintf("msg%d", i+1))) + require_NoError(t, err) + } + + // Fetch and ack the first message. + msgs, err := sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, string(msgs[0].Data), "msg1") + require_NoError(t, msgs[0].AckSync()) + + // Now switch the stream to interest now that the messages and consumer are all available. + cfg.Retention = nats.InterestPolicy + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + sl := c.streamLeader(globalAccountName, "TEST") + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + mset, err := sl.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + state := mset.state() + if state.FirstSeq != 2 { + return fmt.Errorf("expected first seq to be 2, got %v", state) + } + return checkState(t, c, globalAccountName, "TEST") + }) + + s := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + type Expected struct{ numPending, numAckPending, dseq, adflr, sseq, asflr uint64 } + checkConsumerInfo := func(e Expected) { + t.Helper() + ci, err := js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + o.mu.RLock() + sseq, dseq, adflr, asflr, pending := o.sseq, o.dseq, o.adflr, o.asflr, len(o.pending) + o.mu.RUnlock() + require_Equal(t, ci.NumPending, e.numPending) + // NumAckPending needs to match both in the store and running state. + require_Equal(t, ci.NumAckPending, int(e.numAckPending)) + require_Equal(t, pending, int(e.numAckPending)) + // Delivered.Consumer needs to match both in the store and running state. + require_Equal(t, ci.Delivered.Consumer, e.dseq) + require_Equal(t, dseq-1, e.dseq) + // AckFloor.Consumer needs to match both in the store and running state. + require_Equal(t, ci.AckFloor.Consumer, e.adflr) + require_Equal(t, adflr, e.adflr) + // Delivered.Stream needs to match both in the store and running state. + require_Equal(t, ci.Delivered.Stream, e.sseq) + require_Equal(t, sseq-1, e.sseq) + // AckFloor.Stream needs to match both in the store and running state. + require_Equal(t, ci.AckFloor.Stream, e.asflr) + require_Equal(t, asflr, e.asflr) + } + // A single message was acked, 3 are ready to be delivered. + checkConsumerInfo(Expected{ + numPending: 3, numAckPending: 0, + dseq: 1, adflr: 1, + sseq: 1, asflr: 1, + }) + + msgs, err = sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, string(msgs[0].Data), "msg2") + // Don't ack. + + // A message has been delivered and needs to be acked still. + checkConsumerInfo(Expected{ + numPending: 2, numAckPending: 1, + dseq: 2, adflr: 1, + sseq: 2, asflr: 1, + }) + + // Resetting the consumer with an empty request results in a reset back to the ack floor. + var resp JSApiConsumerResetResponse + msg, err := nc.Request("$JS.API.CONSUMER.RESET.TEST.CONSUMER", nil, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_Equal(t, resp.Type, JSApiConsumerResetResponseType) + require_Equal(t, resp.ResetSeq, 2) + require_Equal(t, resp.Delivered.Stream, 1) + require_Equal(t, resp.Delivered.Last, nil) + require_Equal(t, resp.AckFloor.Stream, 1) + require_Equal(t, resp.AckFloor.Last, nil) + + // Should be back. + require_Equal(t, resp.NumPending, 3) + checkConsumerInfo(Expected{ + numPending: 3, numAckPending: 0, + dseq: 0, adflr: 0, + sseq: 1, asflr: 1, + }) + + // Trying to reset to zero fails. + req := JSApiConsumerResetRequest{Seq: 0} + data, err := json.Marshal(req) + require_NoError(t, err) + resp = JSApiConsumerResetResponse{} + msg, err = nc.Request("$JS.API.CONSUMER.RESET.TEST.CONSUMER", data, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_Equal(t, resp.Type, JSApiConsumerResetResponseType) + require_NotNil(t, resp.Error) + require_Error(t, resp.Error, NewJSInvalidJSONError(errors.New("reset to zero seq"))) + + // Resetting the consumer to the last message's sequence so it can be delivered still. + req = JSApiConsumerResetRequest{Seq: 4} + data, err = json.Marshal(req) + require_NoError(t, err) + resp = JSApiConsumerResetResponse{} + msg, err = nc.Request("$JS.API.CONSUMER.RESET.TEST.CONSUMER", data, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_Equal(t, resp.Type, JSApiConsumerResetResponseType) + require_Equal(t, resp.ResetSeq, 4) + require_Equal(t, resp.Delivered.Stream, 3) + require_Equal(t, resp.Delivered.Last, nil) + require_Equal(t, resp.AckFloor.Stream, 3) + require_Equal(t, resp.AckFloor.Last, nil) + + // One message left as pending, the delivered counts are now zero, + // but the sequence and AckFloor are moved up. + checkConsumerInfo(Expected{ + numPending: 1, numAckPending: 0, + dseq: 0, adflr: 0, + sseq: 3, asflr: 3, + }) + + // As a result of moving the starting sequence up, some messages + // have now lost interest and need to be removed. + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + mset, err := sl.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + state := mset.state() + if state.FirstSeq != 4 { + return fmt.Errorf("expected first seq to be 4, got %v", state) + } + return checkState(t, c, globalAccountName, "TEST") + }) + + // We fetch the last message. + msgs, err = sub.Fetch(1, nats.MaxWait(2*time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, string(msgs[0].Data), "msg4") + checkConsumerInfo(Expected{ + numPending: 0, numAckPending: 1, + dseq: 1, adflr: 0, + sseq: 4, asflr: 3, + }) + + // After acking, the delivered count should only show 1, but the AckFloor is higher. + require_NoError(t, msgs[0].AckSync()) + checkConsumerInfo(Expected{ + numPending: 0, numAckPending: 0, + dseq: 1, adflr: 1, + sseq: 4, asflr: 4, + }) + + // The stream should now be empty. + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + mset, err := sl.globalAccount().lookupStream("TEST") + if err != nil { + return err + } + state := mset.state() + if state.Msgs != 0 || state.FirstSeq != 5 { + return fmt.Errorf("stream is not empty: %v", state) + } + return checkState(t, c, globalAccountName, "TEST") + }) + } + + for _, replicas := range []int{1, 3} { + t.Run(fmt.Sprintf("R%d", replicas), func(t *testing.T) { + test(replicas) + }) + } +} + +func TestJetStreamConsumerResetToSequenceConstraintOnStartSeq(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + } + _, err := js.AddStream(cfg) + require_NoError(t, err) + + sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", + nats.BindStream("TEST"), + nats.StartSequence(3), + ) + require_NoError(t, err) + defer sub.Drain() + + for i := range 4 { + _, err = js.Publish("foo", []byte(fmt.Sprintf("msg%d", i+1))) + require_NoError(t, err) + } + + // Fetch and ack the first message. + msgs, err := sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, string(msgs[0].Data), "msg3") + require_NoError(t, msgs[0].AckSync()) + + // Trying to reset below the configured starting sequence fails. + req := JSApiConsumerResetRequest{Seq: 2} + data, err := json.Marshal(req) + require_NoError(t, err) + var resp JSApiConsumerResetResponse + msg, err := nc.Request("$JS.API.CONSUMER.RESET.TEST.CONSUMER", data, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_Equal(t, resp.Type, JSApiConsumerResetResponseType) + require_NotNil(t, resp.Error) + require_Error(t, resp.Error, NewJSConsumerInvalidResetError(errors.New("below start seq"))) + + // Resetting above or equal to the configured starting sequence succeeds. + req = JSApiConsumerResetRequest{Seq: 3} + data, err = json.Marshal(req) + require_NoError(t, err) + resp = JSApiConsumerResetResponse{} + msg, err = nc.Request("$JS.API.CONSUMER.RESET.TEST.CONSUMER", data, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_Equal(t, resp.Type, JSApiConsumerResetResponseType) + require_Equal(t, resp.ResetSeq, 3) + require_Equal(t, resp.Delivered.Stream, 2) + require_Equal(t, resp.Delivered.Last, nil) + require_Equal(t, resp.AckFloor.Stream, 2) + require_Equal(t, resp.AckFloor.Last, nil) + + // Fetching should now get the same message. + msgs, err = sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, string(msgs[0].Data), "msg3") + require_NoError(t, msgs[0].AckSync()) +} + +func TestJetStreamConsumerResetToSequenceConstraintOnStartTime(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + } + _, err := js.AddStream(cfg) + require_NoError(t, err) + + for i := range 2 { + _, err = js.Publish("foo", []byte(fmt.Sprintf("msg%d", i+1))) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", + nats.BindStream("TEST"), + nats.StartTime(time.Now()), + ) + require_NoError(t, err) + defer sub.Drain() + + for i := range 2 { + _, err = js.Publish("foo", []byte(fmt.Sprintf("msg%d", i+3))) + require_NoError(t, err) + } + + // Fetch and ack the first message. + msgs, err := sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, string(msgs[0].Data), "msg3") + require_NoError(t, msgs[0].AckSync()) + + // Trying to reset below the configured starting time fails. + req := JSApiConsumerResetRequest{Seq: 2} + data, err := json.Marshal(req) + require_NoError(t, err) + var resp JSApiConsumerResetResponse + msg, err := nc.Request("$JS.API.CONSUMER.RESET.TEST.CONSUMER", data, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_Equal(t, resp.Type, JSApiConsumerResetResponseType) + require_NotNil(t, resp.Error) + require_Error(t, resp.Error, NewJSConsumerInvalidResetError(errors.New("below start time"))) + + // Resetting to a sequence that has a starting time above or equal to the configured starting time succeeds. + req = JSApiConsumerResetRequest{Seq: 3} + data, err = json.Marshal(req) + require_NoError(t, err) + resp = JSApiConsumerResetResponse{} + msg, err = nc.Request("$JS.API.CONSUMER.RESET.TEST.CONSUMER", data, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_Equal(t, resp.Type, JSApiConsumerResetResponseType) + require_Equal(t, resp.ResetSeq, 3) + require_Equal(t, resp.Delivered.Stream, 2) + require_Equal(t, resp.Delivered.Last, nil) + require_Equal(t, resp.AckFloor.Stream, 2) + require_Equal(t, resp.AckFloor.Last, nil) + + // Fetching should now get the same message. + msgs, err = sub.Fetch(1, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, string(msgs[0].Data), "msg3") + require_NoError(t, msgs[0].AckSync()) +} diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index d244ebd7ac..323603fc1f 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -164,6 +164,9 @@ const ( // JSConsumerInvalidPriorityGroupErr Provided priority group does not exist for this consumer JSConsumerInvalidPriorityGroupErr ErrorIdentifier = 10160 + // JSConsumerInvalidResetErr invalid reset: {err} + JSConsumerInvalidResetErr ErrorIdentifier = 10202 + // JSConsumerInvalidSamplingErrF failed to parse consumer sampling configuration: {err} JSConsumerInvalidSamplingErrF ErrorIdentifier = 10095 @@ -661,6 +664,7 @@ var ( JSConsumerInvalidGroupNameErr: {Code: 400, ErrCode: 10162, Description: "Valid priority group name must match A-Z, a-z, 0-9, -_/=)+ and may not exceed 16 characters"}, JSConsumerInvalidPolicyErrF: {Code: 400, ErrCode: 10094, Description: "{err}"}, JSConsumerInvalidPriorityGroupErr: {Code: 400, ErrCode: 10160, Description: "Provided priority group does not exist for this consumer"}, + JSConsumerInvalidResetErr: {Code: 400, ErrCode: 10202, Description: "invalid reset: {err}"}, JSConsumerInvalidSamplingErrF: {Code: 400, ErrCode: 10095, Description: "failed to parse consumer sampling configuration: {err}"}, JSConsumerMaxDeliverBackoffErr: {Code: 400, ErrCode: 10116, Description: "max deliver is required to be > length of backoff values"}, JSConsumerMaxPendingAckExcessErrF: {Code: 400, ErrCode: 10121, Description: "consumer max ack pending exceeds system limit of {limit}"}, @@ -1405,6 +1409,22 @@ func NewJSConsumerInvalidPriorityGroupError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerInvalidPriorityGroupErr] } +// NewJSConsumerInvalidResetError creates a new JSConsumerInvalidResetErr error: "invalid reset: {err}" +func NewJSConsumerInvalidResetError(err error, opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + e := ApiErrors[JSConsumerInvalidResetErr] + args := e.toReplacerArgs([]interface{}{"{err}", err}) + return &ApiError{ + Code: e.Code, + ErrCode: e.ErrCode, + Description: strings.NewReplacer(args...).Replace(e.Description), + } +} + // NewJSConsumerInvalidSamplingError creates a new JSConsumerInvalidSamplingErrF error: "failed to parse consumer sampling configuration: {err}" func NewJSConsumerInvalidSamplingError(err error, opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/memstore.go b/server/memstore.go index 2f771b88c2..091561b4ee 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -2308,6 +2308,7 @@ func (o *consumerMemStore) Update(state *ConsumerState) error { func (o *consumerMemStore) SetStarting(sseq uint64) error { o.mu.Lock() o.state.Delivered.Stream = sseq + o.state.AckFloor.Stream = sseq o.mu.Unlock() return nil } @@ -2326,6 +2327,14 @@ func (o *consumerMemStore) UpdateStarting(sseq uint64) { } } +// Reset all values in the store, and reset the starting sequence. +func (o *consumerMemStore) Reset(sseq uint64) error { + o.mu.Lock() + o.state = ConsumerState{} + o.mu.Unlock() + return o.SetStarting(sseq) +} + // HasState returns if this store has a recorded state. func (o *consumerMemStore) HasState() bool { o.mu.Lock() diff --git a/server/store.go b/server/store.go index a11cb14901..7d7d6291ce 100644 --- a/server/store.go +++ b/server/store.go @@ -358,6 +358,7 @@ func (dbs DeleteBlocks) NumDeleted() (total uint64) { type ConsumerStore interface { SetStarting(sseq uint64) error UpdateStarting(sseq uint64) + Reset(sseq uint64) error HasState() bool UpdateDelivered(dseq, sseq, dc uint64, ts int64) error UpdateAcks(dseq, sseq uint64) error