Skip to content

Commit 8cd0038

Browse files
committed
Feature: Add RaftLogReader::limited_get_log_entries()
This commit adds the `RaftLogReader::limited_get_log_entries()` method, which enables applications to fetch log entries that are equal to or smaller than a specified range. This functionality is particularly useful for customizing the size of AppendEntries requests at the storage API level. - Applications can now decide the number of log entries to return based on the input range. If the application determines that the requested log entries range is too large for a single RPC, it can opt to return only the first several requested log entries instead of the full range. - The method provides a default implementation that delegates the operation to `RaftLogReader::try_get_log_entries`. This enhancement allows for more flexible and efficient handling of log entries, particularly in scenarios where network constraints or performance considerations require smaller data transfers.
1 parent 291e293 commit 8cd0038

File tree

3 files changed

+51
-10
lines changed

3 files changed

+51
-10
lines changed

openraft/src/replication/mod.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -394,21 +394,23 @@ where
394394
let r = LogIdRange::new(rng.prev, rng.prev);
395395
(vec![], r)
396396
} else {
397-
let logs = self.log_reader.try_get_log_entries(start..end).await?;
398-
debug_assert_eq!(
399-
logs.len(),
400-
(end - start) as usize,
401-
"expect logs {}..{} but got only {} entries, first: {}, last: {}",
397+
// limited_get_log_entries will return logs smaller than the range [start, end).
398+
let logs = self.log_reader.limited_get_log_entries(start, end).await?;
399+
400+
let first = *logs.first().map(|x| x.get_log_id()).unwrap();
401+
let last = *logs.last().map(|x| x.get_log_id()).unwrap();
402+
403+
debug_assert!(
404+
!logs.is_empty() && logs.len() <= (end - start) as usize,
405+
"expect logs ⊆ [{}..{}) but got {} entries, first: {}, last: {}",
402406
start,
403407
end,
404408
logs.len(),
405-
logs.first().map(|ent| ent.get_log_id()).display(),
406-
logs.last().map(|ent| ent.get_log_id()).display()
409+
first,
410+
last
407411
);
408412

409-
let last_log_id = logs.last().map(|ent| *ent.get_log_id());
410-
411-
let r = LogIdRange::new(rng.prev, last_log_id);
413+
let r = LogIdRange::new(rng.prev, Some(last));
412414
(logs, r)
413415
}
414416
};

openraft/src/storage/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,23 @@ where C: RaftTypeConfig
167167
&mut self,
168168
range: RB,
169169
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>;
170+
171+
/// Returns log entries within range `[start, end)`, `end` is exclusive,
172+
/// potentially limited by implementation-defined constraints.
173+
///
174+
/// If the specified range is too large, the implementation may return only the first few log
175+
/// entries to ensure the result is not excessively large.
176+
///
177+
/// It must not return empty result if the input range is not empty.
178+
///
179+
/// The default implementation just returns the full range of log entries.
180+
async fn limited_get_log_entries(
181+
&mut self,
182+
start: u64,
183+
end: u64,
184+
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
185+
self.try_get_log_entries(start..end).await
186+
}
170187
}
171188

172189
/// A trait defining the interface for a Raft state machine snapshot subsystem.

openraft/src/testing/suite.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ where
100100
run_fut(run_test(builder, Self::get_initial_state_re_apply_committed))?;
101101
run_fut(run_test(builder, Self::save_vote))?;
102102
run_fut(run_test(builder, Self::get_log_entries))?;
103+
run_fut(run_test(builder, Self::limited_get_log_entries))?;
103104
run_fut(run_test(builder, Self::try_get_log_entry))?;
104105
run_fut(run_test(builder, Self::initial_logs))?;
105106
run_fut(run_test(builder, Self::get_log_state))?;
@@ -720,6 +721,27 @@ where
720721
Ok(())
721722
}
722723

724+
pub async fn limited_get_log_entries(mut store: LS, mut sm: SM) -> Result<(), StorageError<C::NodeId>> {
725+
Self::feed_10_logs_vote_self(&mut store).await?;
726+
727+
tracing::info!("--- get start == stop");
728+
{
729+
let logs = store.limited_get_log_entries(3, 3).await?;
730+
assert_eq!(logs.len(), 0, "expected no logs to be returned");
731+
}
732+
733+
tracing::info!("--- get start < stop");
734+
{
735+
let logs = store.limited_get_log_entries(5, 7).await?;
736+
737+
assert!(!logs.is_empty());
738+
assert!(logs.len() <= 2);
739+
assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5));
740+
}
741+
742+
Ok(())
743+
}
744+
723745
pub async fn try_get_log_entry(mut store: LS, mut sm: SM) -> Result<(), StorageError<C::NodeId>> {
724746
Self::feed_10_logs_vote_self(&mut store).await?;
725747

0 commit comments

Comments
 (0)