From 263b7f30051136790dcb33f57552f5e7353a54c5 Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Tue, 23 Sep 2025 09:10:14 +0200 Subject: [PATCH 1/5] Baseline: Write out batch sizes to the log. An easy way to collect batch sizes. For performance testing only. Will be removed. --- server/raft.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/raft.go b/server/raft.go index 69f09f9c3ad..44485f13575 100644 --- a/server/raft.go +++ b/server/raft.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "hash" + "log" "math" "math/rand" "net" @@ -2669,6 +2670,7 @@ func (n *raft) runAsLeader() { if sz < maxBatch && len(entries) < maxEntries { continue } + log.Println("Batch:", len(entries), "entries", sz, "bytes") n.sendAppendEntry(entries) // Reset our sz and entries. // We need to re-create `entries` because there is a reference @@ -2676,6 +2678,7 @@ func (n *raft) runAsLeader() { sz, entries = 0, nil } if len(entries) > 0 { + log.Println("Batch:", len(entries), "entries", sz, "bytes") n.sendAppendEntry(entries) } // Respond to any proposals waiting for a confirmation. From 494350d7cccf3001b500ee1e88b608515dc6afd9 Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Tue, 23 Sep 2025 09:13:26 +0200 Subject: [PATCH 2/5] Raft batching: Make stream writes async with sync_interval = always This is the baseline for perfomance testing Raft's batching capabilities. The behavior of the batching mechanism Raft is easier to observe if disk writes are synchronous. I.e we want to write() + fsync() the Raft log. So that producers can easily keep the proposal queue busy. To do so one can set "sync_interval= always". However, that results in disastrous performance: when the leader receives acks for a "big" batch of log entries, the upper layer will write() and fsync() all entries in the batch, individually. So this commit disables "sync always" on stream writes. This *should* work in principle because the data is already in the raft log. Alternatively, one could implement "group commit" for streams, i.e. fsync() only one time after processing a batch of entries. For performance testing only at this point. --- server/stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/stream.go b/server/stream.go index 7dbc7d2b4e3..759182a20e3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -888,6 +888,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.Compression = config.Compression // Async flushing is only allowed if the stream has a sync log backing it. fsCfg.AsyncFlush = !fsCfg.SyncAlways && config.Replicas > 1 + fsCfg.SyncAlways = false // Async persist mode opts in to async flushing, // sync always would also be disabled if it was configured. From 186ea50cb6e8254ae1d72e2d4325648b52143efa Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Fri, 19 Sep 2025 11:50:46 +0200 Subject: [PATCH 3/5] Raft batching: Avoid sending leftovers right away This commit removes a "pathological" case from the current Raft batching mechanism: if the proposal queue contains more entries than one batch can fit, then raft will send a full batch, followed by a small batch containing the leftovers. However, it was observed that its quite possible that while the first batch was being stored and sent, clients may already have pushed more stuff into the proposal queue in the meantime. With this fix the server will compose and send a full batch, then the leftovers are handled as follows: if more proposals were pushed into the proposal queue, then we carry over the leftovers to the next iteration. So that the leftovers are batched together with the proposals that were added pushed in the meantime. If there are no more proposals, then we send the leftovers right away. For performance testing only at point. --- server/raft.go | 105 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 31 deletions(-) diff --git a/server/raft.go b/server/raft.go index 44485f13575..41c1a26ab2b 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2635,6 +2635,8 @@ func (n *raft) runAsLeader() { n.sendPeerState() n.Unlock() + var propBatch []*proposedEntry + hb := time.NewTicker(hbInterval) defer hb.Stop() @@ -2654,41 +2656,43 @@ func (n *raft) runAsLeader() { } n.resp.recycle(&ars) case <-n.prop.ch: - const maxBatch = 256 * 1024 - const maxEntries = 512 - var entries []*Entry - - es, sz := n.prop.pop(), 0 - for _, b := range es { - if b.Type == EntryRemovePeer { - n.doRemovePeerAsLeader(string(b.Data)) + // Drain the channel and combine with any leftovers. + newProposals := n.prop.pop() + propBatch = append(propBatch, newProposals...) + + // Loop until all proposals are batched and sent. + for len(propBatch) > 0 { + batchEntries, newLeftovers, sz := n.composeBatch(propBatch) + + // Send our batch if we have one. + if len(batchEntries) > 0 { + log.Println("Batch:", len(batchEntries), "entries", sz, "bytes") + n.sendAppendEntry(batchEntries) } - entries = append(entries, b.Entry) - // Increment size. - sz += len(b.Data) + 1 - // If below thresholds go ahead and send. - if sz < maxBatch && len(entries) < maxEntries { - continue + + // Only handle replies for proposals that were consumed. + numConsumed := len(propBatch) - len(newLeftovers) + consumedProposals := propBatch[:numConsumed] + for _, pe := range consumedProposals { + if pe.reply != _EMPTY_ { + n.sendReply(pe.reply, nil) + } + pe.returnToPool() } - log.Println("Batch:", len(entries), "entries", sz, "bytes") - n.sendAppendEntry(entries) - // Reset our sz and entries. - // We need to re-create `entries` because there is a reference - // to it in the node's pae map. - sz, entries = 0, nil - } - if len(entries) > 0 { - log.Println("Batch:", len(entries), "entries", sz, "bytes") - n.sendAppendEntry(entries) - } - // Respond to any proposals waiting for a confirmation. - for _, pe := range es { - if pe.reply != _EMPTY_ { - n.sendReply(pe.reply, nil) + + // The new leftovers become the batch for the next iteration. + propBatch = newLeftovers + + // If we have leftovers and the proposal channel is empty, + // loop again to send them immediately. Otherwise, break + // to allow the select to pull more from the channel. + if len(propBatch) > 0 && n.prop.len() == 0 { + continue } - pe.returnToPool() + break } - n.prop.recycle(&es) + // Recycle the container for the new proposals that were popped. + n.prop.recycle(&newProposals) case <-hb.C: if n.notActive() { @@ -2721,6 +2725,45 @@ func (n *raft) runAsLeader() { } } +// composeBatch will compose a batch from a set of proposals. +// It will return a batch of entries to be sent and any new leftovers. +func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*proposedEntry, int) { + const maxBatch = 256 * 1024 + const maxEntries = 512 + + if len(allProposals) == 0 { + return nil, nil, 0 + } + + var sz int + end := 0 + for end < len(allProposals) { + p := allProposals[end] + sz += len(p.Data) + 1 + end++ + if sz < maxBatch && end < maxEntries { + continue + } + break + } + + // The batch to send is from the start up to `end`. + batchProposals := allProposals[:end] + // The new leftovers are from `end` to the end. + newLeftovers := allProposals[end:] + + // Create the entries to be sent. + entries := make([]*Entry, len(batchProposals)) + for i, p := range batchProposals { + if p.Type == EntryRemovePeer { + n.doRemovePeerAsLeader(string(p.Data)) + } + entries[i] = p.Entry + } + + return entries, newLeftovers, sz +} + // Quorum reports the quorum status. Will be called on former leaders. func (n *raft) Quorum() bool { n.RLock() From 8d376d6462df4319d614221e2b19c913324da16d Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Thu, 25 Sep 2025 11:23:26 +0200 Subject: [PATCH 4/5] Raft batching: Reduce lock contention on raft mutex This is an attempt to reduce contention between Propose() and sendAppendEntry(). Change Propose() to acquire a read lock on Raft, and avoid locking Raft during storeToWAL() (which potentially does IO and may take a long time). This works as long as sendAppendEntry() is called from the Raft's goroutine only, unless the entry does not require to be stored to the Raft log. So the rest of the changes are for enforcing the above requirement: * Change EntryLeaderTransfer so that it is not store to the Raft log. * Push EntryPeerState and EntrySnapshot entries to the proposal queue. * Make sure EntrySnapshot entries skip the leader check, so make sure those are not batched together with other entries. For performance testing only at this point. --- server/filestore.go | 4 + server/jetstream_cluster_1_test.go | 2 +- server/jetstream_cluster_4_test.go | 2 +- server/memstore.go | 4 + server/raft.go | 186 ++++++++++++++++++----------- server/raft_test.go | 11 +- server/store.go | 1 + 7 files changed, 134 insertions(+), 76 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 572e4e33dd7..4e7dc557c65 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -8143,6 +8143,10 @@ func fileStoreMsgSizeEstimate(slen, maxPayload int) uint64 { return uint64(emptyRecordLen + slen + 4 + maxPayload) } +func (fs *fileStore) MsgSize(msg []byte) uint64 { + return fileStoreMsgSizeRaw(0, 0, len(msg)) +} + // ResetState resets any state that's temporary. For example when changing leaders. func (fs *fileStore) ResetState() { fs.mu.Lock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index c75b697cee3..99fb0df1f09 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6973,7 +6973,7 @@ func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) { n := mset.node.(*raft) n.Lock() ae := n.buildAppendEntry(nil) - err = n.storeToWAL(ae) + _, _, err = n.storeToWAL(ae) n.Unlock() index, commit, applied := n.Progress() require_NoError(t, err) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 516ae2459f5..bc7520a1894 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4269,7 +4269,7 @@ func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { rn.Lock() entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}} ae := encode(t, rn.buildAppendEntry(entries)) - err = rn.storeToWAL(ae) + _, _, err = rn.storeToWAL(ae) minPindex := rn.pindex rn.Unlock() require_NoError(t, err) diff --git a/server/memstore.go b/server/memstore.go index f75146c6ab3..5cc9379b795 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -2033,6 +2033,10 @@ func memStoreMsgSize(subj string, hdr, msg []byte) uint64 { return memStoreMsgSizeRaw(len(subj), len(hdr), len(msg)) } +func (ms *memStore) MsgSize(msg []byte) uint64 { + return memStoreMsgSizeRaw(0, 0, len(msg)) +} + // ResetState resets any state that's temporary. For example when changing leaders. func (ms *memStore) ResetState() { ms.mu.Lock() diff --git a/server/raft.go b/server/raft.go index 41c1a26ab2b..b72af241c41 100644 --- a/server/raft.go +++ b/server/raft.go @@ -91,6 +91,7 @@ type RaftNode interface { type WAL interface { Type() StorageType StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, int64, error) + MsgSize(msg []byte) uint64 LoadMsg(index uint64, sm *StoreMsg) (*StoreMsg, error) RemoveMsg(index uint64) (bool, error) Compact(index uint64) (uint64, error) @@ -867,19 +868,22 @@ func (s *Server) transferRaftLeaders() bool { // Propose will propose a new entry to the group. // This should only be called on the leader. func (n *raft) Propose(data []byte) error { - n.Lock() - defer n.Unlock() - // Check state under lock, we might not be leader anymore. - if state := n.State(); state != Leader { + n.RLock() + state := n.State() + writeError := n.werr + prop := n.prop + n.RUnlock() + + if state != Leader { n.debug("Proposal ignored, not leader (state: %v)", state) return errNotLeader } - // Error if we had a previous write error. - if werr := n.werr; werr != nil { - return werr + if writeError != nil { + return writeError } - n.prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_)) + + prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_)) return nil } @@ -1014,10 +1018,9 @@ func (n *raft) AdjustBootClusterSize(csz int) error { // Must be the leader. func (n *raft) AdjustClusterSize(csz int) error { n.Lock() - defer n.Unlock() - // Check state under lock, we might not be leader anymore. if n.State() != Leader { + n.Unlock() return errNotLeader } // Same floor as bootstrap. @@ -1029,8 +1032,10 @@ func (n *raft) AdjustClusterSize(csz int) error { // a quorum. n.csz = csz n.qn = n.csz/2 + 1 + n.Unlock() - n.sendPeerState() + n.prop.push(newProposedEntry( + newEntry(EntryPeerState, encodePeerState(n.currentPeerState())), _EMPTY_)) return nil } @@ -1213,10 +1218,8 @@ func (n *raft) encodeSnapshot(snap *snapshot) []byte { // Should only be used when the upper layers know this is most recent. // Used when restoring streams, moving a stream from R1 to R>1, etc. func (n *raft) SendSnapshot(data []byte) error { - n.Lock() - defer n.Unlock() - // Don't check if we're leader before sending and storing, this is used on scaleup. - n.sendAppendEntryLocked([]*Entry{{EntrySnapshot, data}}, false) + // TODO Need to copy data? + n.prop.push(newProposedEntry(newEntry(EntrySnapshot, data), _EMPTY_)) return nil } @@ -1714,6 +1717,8 @@ func (n *raft) StepDown(preferred ...string) error { // Send the append entry directly rather than via the proposals queue, // as we will switch to follower state immediately and will blow away // the contents of the proposal queue in the process. + // Also, we won't store the entry in the Raft log, so it is OK ot call + // into sendAppendEntry() directly from here. if maybeLeader != noLeader { n.debug("Selected %q for new leader, stepping down due to leadership transfer", maybeLeader) ae := newEntry(EntryLeaderTransfer, []byte(maybeLeader)) @@ -1829,13 +1834,16 @@ func (n *raft) Peers() []*Peer { // Update and propose our known set of peers. func (n *raft) ProposeKnownPeers(knownPeers []string) { n.Lock() - defer n.Unlock() // If we are the leader update and send this update out. if n.State() != Leader { + n.Unlock() return } n.updateKnownPeersLocked(knownPeers) - n.sendPeerState() + n.Unlock() + + n.prop.push(newProposedEntry( + newEntry(EntryPeerState, encodePeerState(n.currentPeerState())), _EMPTY_)) } // Update our known set of peers. @@ -2630,12 +2638,11 @@ func (n *raft) runAsLeader() { n.unsubscribe(rpsub) n.Unlock() }() - - // To send out our initial peer state. - n.sendPeerState() n.Unlock() var propBatch []*proposedEntry + n.sendAppendEntry( + []*Entry{{EntryPeerState, encodePeerState(n.currentPeerState())}}) hb := time.NewTicker(hbInterval) defer hb.Stop() @@ -2739,6 +2746,14 @@ func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*propose end := 0 for end < len(allProposals) { p := allProposals[end] + // If we have a snapshot do not batch with anything else. + if p.Type == EntrySnapshot { + if end == 0 { + sz = len(p.Data) + 1 + end = 1 + } + break + } sz += len(p.Data) + 1 end++ if sz < maxBatch && end < maxEntries { @@ -3812,13 +3827,24 @@ CONTINUE: if ae.shouldStore() { // Only store if an original which will have sub != nil if sub != nil { - if err := n.storeToWAL(ae); err != nil { + n.Unlock() + size, seq, err := n.storeToWAL(ae) + n.Lock() + if err != nil { if err != ErrStoreClosed { n.warn("Error storing entry to WAL: %v", err) } + if err == errEntryStoreFailed { + n.resetWAL() + n.cancelCatchup() + } n.Unlock() return } + n.bytes += size + n.pterm = ae.term + n.pindex = seq + n.active = time.Now() n.cachePendingEntry(ae) n.resetInitializing() } else { @@ -3979,50 +4005,54 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry { return newAppendEntry(n.id, n.term, n.commit, n.pterm, n.pindex, entries) } -// Determine if we should store an entry. This stops us from storing -// heartbeat messages. +// Determine if we should store an entry. +// This stops us from storing heartbeat and leader transfer messages. func (ae *appendEntry) shouldStore() bool { - return ae != nil && len(ae.entries) > 0 + if ae == nil { + return false + } + l := len(ae.entries) + if l == 0 { + return false + } + if l == 1 { + return ae.entries[0].Type != EntryLeaderTransfer + } + return true +} + +func (ae *appendEntry) shouldCheckLeader() bool { + if ae != nil && len(ae.entries) == 1 && + ae.entries[0].Type == EntrySnapshot { + return true + } + return false } // Store our append entry to our WAL. -// lock should be held. -func (n *raft) storeToWAL(ae *appendEntry) error { +// Returns the number of bytes written and the sequence number +// assigned to the message. +func (n *raft) storeToWAL(ae *appendEntry) (uint64, uint64, error) { if ae == nil { - return fmt.Errorf("raft: Missing append entry for storage") + return 0, 0, fmt.Errorf("raft: Missing append entry for storage") } + if n.werr != nil { - return n.werr + return 0, 0, n.werr } seq, _, err := n.wal.StoreMsg(_EMPTY_, nil, ae.buf, 0) if err != nil { - n.setWriteErrLocked(err) - return err + return 0, 0, err } - // Sanity checking for now. if index := ae.pindex + 1; index != seq { n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex) - if n.State() == Leader { - n.stepdownLocked(n.selectNextLeader()) - } - // Reset and cancel any catchup. - n.resetWAL() - n.cancelCatchup() - return errEntryStoreFailed + return 0, 0, errEntryStoreFailed } - var sz uint64 - if n.wtype == FileStorage { - sz = fileStoreMsgSize(_EMPTY_, nil, ae.buf) - } else { - sz = memStoreMsgSize(_EMPTY_, nil, ae.buf) - } - n.bytes += sz - n.pterm = ae.term - n.pindex = seq - return nil + sz := n.wal.MsgSize(ae.buf) + return sz, seq, nil } const ( @@ -4031,19 +4061,25 @@ const ( paeWarnModulo = 5_000 ) +// sendAppendEntry builds a appendEntry and stores it to the WAL, +// before sending it to the followers. +// It is expected for this method to be called from Raft's main +// goroutine, unless the appendEntry does not need to be stored +// to the WAL (heartbeat or EntryLeaderTransfer) func (n *raft) sendAppendEntry(entries []*Entry) { - n.Lock() - defer n.Unlock() - n.sendAppendEntryLocked(entries, true) -} -func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) { - // Safeguard against sending an append entry right after a stepdown from a different goroutine. - // Specifically done while holding the lock to not race. - if checkLeader && n.State() != Leader { + // Safeguard against sending an append entry right after a stepdown + // from a different goroutine. Specifically done while holding the + // lock to not race. + n.RLock() + state := n.State() + ae := n.buildAppendEntry(entries) + n.RUnlock() + + if ae.shouldCheckLeader() && state != Leader { n.debug("Not sending append entry, not leader") + ae.returnToPool() return } - ae := n.buildAppendEntry(entries) var err error var scratch [1024]byte @@ -4055,12 +4091,30 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) { // If we have entries store this in our wal. shouldStore := ae.shouldStore() if shouldStore { - if err := n.storeToWAL(ae); err != nil { + size, seq, err := n.storeToWAL(ae) + n.Lock() + if err != nil { + n.setWriteErrLocked(err) + if err == errEntryStoreFailed { + if n.State() == Leader { + n.stepdownLocked(n.selectNextLeader()) + } + // are we sure we want this? + n.resetWAL() + n.cancelCatchup() + } + n.Unlock() return } + + n.bytes += size + n.pterm = ae.term + n.pindex = seq n.active = time.Now() n.cachePendingEntry(ae) + n.Unlock() } + n.sendRPC(n.asubj, n.areply, ae.buf) if !shouldStore { ae.returnToPool() @@ -4149,23 +4203,15 @@ func (n *raft) peerNames() []string { func (n *raft) currentPeerState() *peerState { n.RLock() - ps := n.currentPeerStateLocked() - n.RUnlock() - return ps -} - -func (n *raft) currentPeerStateLocked() *peerState { + defer n.RUnlock() return &peerState{n.peerNames(), n.csz, n.extSt} } -// sendPeerState will send our current peer state to the cluster. -// Lock should be held. -func (n *raft) sendPeerState() { - n.sendAppendEntryLocked([]*Entry{{EntryPeerState, encodePeerState(n.currentPeerStateLocked())}}, true) -} - // Send a heartbeat. func (n *raft) sendHeartbeat() { + // OK to call sendAppendEntry() directly here. + // No need to push heardbeats into prop queue + // because we don't store those into the log. n.sendAppendEntry(nil) } diff --git a/server/raft_test.go b/server/raft_test.go index ab704357e5c..dd70c859c4e 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1096,7 +1096,7 @@ func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { ae := n.buildAppendEntry(entries) ae.buf, err = ae.encode(scratch[:]) require_NoError(t, err) - err = n.storeToWAL(ae) + _, _, err = n.storeToWAL(ae) n.Unlock() require_NoError(t, err) @@ -1620,7 +1620,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { require_Equal(t, n.wal.State().Msgs, 0) // Store a third message, it stays uncommitted. - require_NoError(t, n.storeToWAL(aeMsg3)) + _, _, err = n.storeToWAL(aeMsg3) + require_NoError(t, err) require_Equal(t, n.commit, 2) require_Equal(t, n.wal.State().Msgs, 1) entry, err = n.loadEntry(3) @@ -3070,13 +3071,15 @@ func TestNRGSizeAndApplied(t *testing.T) { require_Equal(t, bytes, 0) // Store the first append entry. - require_NoError(t, n.storeToWAL(aeMsg1)) + _, _, err := n.storeToWAL(aeMsg1) + require_NoError(t, err) entries, bytes = n.Size() require_Equal(t, entries, 1) require_Equal(t, bytes, 105) // Store the second append entry. - require_NoError(t, n.storeToWAL(aeMsg2)) + _, _, err = n.storeToWAL(aeMsg2) + require_NoError(t, err) entries, bytes = n.Size() require_Equal(t, entries, 2) require_Equal(t, bytes, 210) diff --git a/server/store.go b/server/store.go index 358d7ec2900..4d4bfae0d72 100644 --- a/server/store.go +++ b/server/store.go @@ -91,6 +91,7 @@ type ProcessJetStreamMsgHandler func(*inMsg) type StreamStore interface { StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error) StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64, ttl int64) error + MsgSize(msg []byte) uint64 SkipMsg() uint64 SkipMsgs(seq uint64, num uint64) error FlushAllPending() From 83995aaf4b2f5a9a03d046a6450590f62487c7b5 Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Mon, 20 Oct 2025 17:26:54 +0200 Subject: [PATCH 5/5] Raft Batching: Maximize batch size Limit batch size based on the configured max_payload. --- server/raft.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/server/raft.go b/server/raft.go index b72af241c41..61cbd13c58e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2673,7 +2673,7 @@ func (n *raft) runAsLeader() { // Send our batch if we have one. if len(batchEntries) > 0 { - log.Println("Batch:", len(batchEntries), "entries", sz, "bytes") + log.Println("Batch:", len(batchEntries), "entries", sz, "bytes", "maxBatch", n.maxBatchSize()) n.sendAppendEntry(batchEntries) } @@ -2732,31 +2732,48 @@ func (n *raft) runAsLeader() { } } +// Returns the maximum number of bytes we can safely +// send in a single message. +func (n *raft) maxBatchSize() int { + max_payload := MAX_PAYLOAD_SIZE + if n.s.info.MaxPayload > 0 { + max_payload = int(n.s.info.MaxPayload) + } + if n.acc.mpay > 0 { + max_payload = int(n.acc.mpay) + } + return max_payload - MAX_CONTROL_LINE_SIZE +} + // composeBatch will compose a batch from a set of proposals. // It will return a batch of entries to be sent and any new leftovers. func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*proposedEntry, int) { - const maxBatch = 256 * 1024 - const maxEntries = 512 + const maxEntries = math.MaxUint16 + maxBatchSize := n.maxBatchSize() if len(allProposals) == 0 { return nil, nil, 0 } - var sz int end := 0 + batchSize := int(appendEntryBaseLen) + for end < len(allProposals) { p := allProposals[end] + msgSize := len(p.Data) + 1 + 4 // to encode type and size + // If we have a snapshot do not batch with anything else. if p.Type == EntrySnapshot { if end == 0 { - sz = len(p.Data) + 1 + batchSize += msgSize end = 1 } break } - sz += len(p.Data) + 1 - end++ - if sz < maxBatch && end < maxEntries { + + if end == 0 || (batchSize+msgSize) < maxBatchSize && end < maxEntries { + batchSize += msgSize + end++ continue } break @@ -2776,7 +2793,7 @@ func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*propose entries[i] = p.Entry } - return entries, newLeftovers, sz + return entries, newLeftovers, batchSize } // Quorum reports the quorum status. Will be called on former leaders.