@@ -3459,8 +3459,8 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
34593459 n .resetElectionTimeout ()
34603460 }
34613461
3462- // Just return if closed or we had previous write error.
3463- if n .State () == Closed || n .werr != nil {
3462+ // Just return if closed or we had previous write error, or invalid sub
3463+ if n .State () == Closed || n .werr != nil || sub == nil {
34643464 return nil
34653465 }
34663466
@@ -3515,7 +3515,7 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35153515 catchingUp := n .catchup != nil
35163516 // Is this a new entry? New entries will be delivered on the append entry
35173517 // sub, rather than a catch-up sub.
3518- isNew := sub != nil && sub == n .aesub
3518+ isNew := sub == n .aesub
35193519
35203520 // Track leader directly
35213521 if isNew && ae .leader != noLeader {
@@ -3529,7 +3529,7 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35293529 // If we are/were catching up ignore old catchup subs, but only if catching up from an older server
35303530 // that doesn't send the leader term when catching up. We can reject old catchups from newer subs
35313531 // later, just by checking the append entry is on the correct term.
3532- if ! isNew && sub != nil && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
3532+ if ! isNew && ae .lterm == 0 && (! catchingUp || sub != n .catchup .sub ) {
35333533 n .debug ("AppendEntry ignoring old entry from previous catchup" )
35343534 return nil
35353535 }
@@ -3545,9 +3545,8 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
35453545 n .debug ("Term higher than ours and we are not a follower: %v, stepping down to %q" , n .State (), ae .leader )
35463546 n .stepdownLocked (ae .leader )
35473547 }
3548- } else if lterm < n .term && sub != nil && (isNew || ae .lterm != 0 ) {
3548+ } else if lterm < n .term && (isNew || ae .lterm != 0 ) {
35493549 // Anything that's below our expected highest term needs to be rejected.
3550- // Unless we're replaying (sub=nil), in which case we'll always continue.
35513550 // For backward-compatibility we shouldn't reject if we're being caught up by an old server.
35523551 if isNew {
35533552 n .debug ("Rejected AppendEntry from a leader (%s) with term %d which is less than ours" , ae .leader , lterm )
@@ -3705,35 +3704,28 @@ func (n *raft) processAppendEntryLocked(ae *appendEntry, sub *subscription) *app
37053704CONTINUE:
37063705 // Save to our WAL if we have entries.
37073706 if ae .shouldStore () {
3708- // Only store if an original which will have sub != nil
3709- if sub != nil {
3710- if err := n .storeToWAL (ae ); err != nil {
3711- if err != ErrStoreClosed {
3712- n .warn ("Error storing entry to WAL: %v" , err )
3713- }
3714- return nil
3707+ if err := n .storeToWAL (ae ); err != nil {
3708+ if err != ErrStoreClosed {
3709+ n .warn ("Error storing entry to WAL: %v" , err )
37153710 }
3716- // Save in memory for faster processing during applyCommit.
3717- // Only save so many however to avoid memory bloat.
3718- if l := len (n .pae ); l <= paeDropThreshold {
3719- n .pae [n .pindex ], l = ae , l + 1
3720- if l > paeWarnThreshold && l % paeWarnModulo == 0 {
3721- n .warn ("%d append entries pending" , len (n .pae ))
3722- }
3723- } else {
3724- // Invalidate cache entry at this index, we might have
3725- // stored it previously with a different value.
3726- delete (n .pae , n .pindex )
3727- if l % paeWarnModulo == 0 {
3728- n .debug ("Not saving to append entries pending" )
3729- }
3711+ return nil
3712+ }
3713+ // Save in memory for faster processing during applyCommit.
3714+ // Only save so many however to avoid memory bloat.
3715+ if l := len (n .pae ); l <= paeDropThreshold {
3716+ n .pae [n .pindex ], l = ae , l + 1
3717+ if l > paeWarnThreshold && l % paeWarnModulo == 0 {
3718+ n .warn ("%d append entries pending" , len (n .pae ))
37303719 }
3731- n .resetInitializing ()
37323720 } else {
3733- // This is a replay on startup so just take the appendEntry version.
3734- n .pterm = ae .term
3735- n .pindex = ae .pindex + 1
3721+ // Invalidate cache entry at this index, we might have
3722+ // stored it previously with a different value.
3723+ delete (n .pae , n .pindex )
3724+ if l % paeWarnModulo == 0 {
3725+ n .debug ("Not saving to append entries pending" )
3726+ }
37363727 }
3728+ n .resetInitializing ()
37373729 }
37383730
37393731 // ae should no longer be used after this call as
@@ -3743,7 +3735,7 @@ CONTINUE:
37433735 // Only ever respond to new entries.
37443736 // Never respond to catchup messages, because providing quorum based on this is unsafe.
37453737 // The only way for the leader to receive "success" MUST be through this path.
3746- if sub != nil && isNew {
3738+ if isNew {
37473739 // Success. Send our response.
37483740 return newAppendEntryResponse (n .pterm , n .pindex , n .id , true )
37493741 }
0 commit comments