Skip to content

Commit 5f5d7e9

Browse files
committed
Feature: Add TypeConfigExt to simplify RaftTypeConfig Access
This commit introduces a new trait, `TypeConfigExt`, which extends `RaftTypeConfig`. The purpose of this trait is to simplify the access to various functionalities provided by the `RaftTypeConfig` trait, enhancing code readability and reducing complexity. **Methods Added to `TypeConfigExt`:** - `now()` - `sleep()` - `sleep_until()` - `timeout()` - `timeout_at()` - `oneshot()` - `spawn()` **Usage Improvement:** - Instead of using the `<<C as RaftTypeConfig>::AsyncRuntime as AsyncRuntime>::Instant::now()`, you can now simply call `C::now()`.
1 parent 0b1293f commit 5f5d7e9

File tree

21 files changed

+154
-97
lines changed

21 files changed

+154
-97
lines changed

openraft/src/core/raft_core.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ use crate::storage::LogFlushed;
8888
use crate::storage::RaftLogReaderExt;
8989
use crate::storage::RaftLogStorage;
9090
use crate::storage::RaftStateMachine;
91-
use crate::type_config::alias::AsyncRuntimeOf;
9291
use crate::type_config::alias::InstantOf;
9392
use crate::type_config::alias::ResponderOf;
93+
use crate::type_config::TypeConfigExt;
9494
use crate::AsyncRuntime;
9595
use crate::ChangeMembers;
9696
use crate::Instant;
@@ -148,7 +148,7 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
148148
impl<C: RaftTypeConfig> LeaderData<C> {
149149
pub(crate) fn new() -> Self {
150150
Self {
151-
next_heartbeat: InstantOf::<C>::now(),
151+
next_heartbeat: C::now(),
152152
}
153153
}
154154
}
@@ -509,19 +509,12 @@ where
509509
/// Currently heartbeat is a blank log
510510
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
511511
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
512-
tracing::debug!(
513-
now = debug(<C::AsyncRuntime as AsyncRuntime>::Instant::now()),
514-
"send_heartbeat"
515-
);
512+
tracing::debug!(now = debug(C::now()), "send_heartbeat");
516513

517514
let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
518515
lh
519516
} else {
520-
tracing::debug!(
521-
now = debug(<C::AsyncRuntime as AsyncRuntime>::Instant::now()),
522-
"{} failed to send heartbeat",
523-
emitter
524-
);
517+
tracing::debug!(now = debug(C::now()), "{} failed to send heartbeat", emitter);
525518
return false;
526519
};
527520

@@ -1150,7 +1143,7 @@ where
11501143
self.handle_append_entries_request(rpc, tx);
11511144
}
11521145
RaftMsg::RequestVote { rpc, tx } => {
1153-
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
1146+
let now = C::now();
11541147
tracing::info!(
11551148
now = display(now.display()),
11561149
vote_request = display(&rpc),
@@ -1237,7 +1230,7 @@ where
12371230
resp,
12381231
sender_vote,
12391232
} => {
1240-
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
1233+
let now = C::now();
12411234

12421235
tracing::info!(
12431236
now = display(now.display()),
@@ -1273,7 +1266,7 @@ where
12731266
Notify::Tick { i } => {
12741267
// check every timer
12751268

1276-
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
1269+
let now = C::now();
12771270
tracing::debug!("received tick: {}, now: {:?}", i, now);
12781271

12791272
self.handle_tick_election();
@@ -1291,8 +1284,7 @@ where
12911284

12921285
// Install next heartbeat
12931286
if let Some(l) = &mut self.leader_data {
1294-
l.next_heartbeat = <C::AsyncRuntime as AsyncRuntime>::Instant::now()
1295-
+ Duration::from_millis(self.config.heartbeat_interval);
1287+
l.next_heartbeat = C::now() + Duration::from_millis(self.config.heartbeat_interval);
12961288
}
12971289
}
12981290
}
@@ -1431,7 +1423,7 @@ where
14311423

14321424
#[tracing::instrument(level = "debug", skip_all)]
14331425
fn handle_tick_election(&mut self) {
1434-
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
1426+
let now = C::now();
14351427

14361428
tracing::debug!("try to trigger election by tick, now: {:?}", now);
14371429

@@ -1660,7 +1652,7 @@ where
16601652

16611653
// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
16621654
#[allow(clippy::let_underscore_future)]
1663-
let _ = AsyncRuntimeOf::<C>::spawn(async move {
1655+
let _ = C::spawn(async move {
16641656
for (log_index, tx) in removed.into_iter() {
16651657
tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
16661658
leader_id,

openraft/src/core/sm/handle.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
use tokio::sync::mpsc;
44

55
use crate::core::sm;
6-
use crate::type_config::alias::AsyncRuntimeOf;
76
use crate::type_config::alias::JoinHandleOf;
8-
use crate::AsyncRuntime;
7+
use crate::type_config::TypeConfigExt;
98
use crate::RaftTypeConfig;
109
use crate::Snapshot;
1110

@@ -55,7 +54,7 @@ where C: RaftTypeConfig
5554
/// If the state machine worker has shutdown, it will return an error.
5655
/// If there is not snapshot available, it will return `Ok(None)`.
5756
pub(crate) async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, &'static str> {
58-
let (tx, rx) = AsyncRuntimeOf::<C>::oneshot();
57+
let (tx, rx) = C::oneshot();
5958

6059
let cmd = sm::Command::get_snapshot(tx);
6160
tracing::debug!("SnapshotReader sending command to sm::Worker: {:?}", cmd);

openraft/src/core/tick.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@ use tracing::Level;
1414
use tracing::Span;
1515

1616
use crate::core::notify::Notify;
17-
use crate::type_config::alias::AsyncRuntimeOf;
18-
use crate::type_config::alias::InstantOf;
1917
use crate::type_config::alias::JoinHandleOf;
20-
use crate::AsyncRuntime;
21-
use crate::Instant;
18+
use crate::type_config::TypeConfigExt;
2219
use crate::RaftTypeConfig;
2320

2421
/// Emit RaftMsg::Tick event at regular `interval`.
@@ -68,7 +65,7 @@ where C: RaftTypeConfig
6865

6966
let shutdown = Mutex::new(Some(shutdown));
7067

71-
let join_handle = AsyncRuntimeOf::<C>::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
68+
let join_handle = C::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
7269
parent: &Span::current(),
7370
Level::DEBUG,
7471
"tick"
@@ -87,8 +84,8 @@ where C: RaftTypeConfig
8784
let mut cancel = std::pin::pin!(cancel_rx);
8885

8986
loop {
90-
let at = InstantOf::<C>::now() + self.interval;
91-
let mut sleep_fut = AsyncRuntimeOf::<C>::sleep_until(at);
87+
let at = C::now() + self.interval;
88+
let mut sleep_fut = C::sleep_until(at);
9289
let sleep_fut = std::pin::pin!(sleep_fut);
9390
let cancel_fut = cancel.as_mut();
9491

@@ -159,8 +156,7 @@ mod tests {
159156
use tokio::time::Duration;
160157

161158
use crate::core::Tick;
162-
use crate::type_config::alias::AsyncRuntimeOf;
163-
use crate::AsyncRuntime;
159+
use crate::type_config::TypeConfigExt;
164160
use crate::RaftTypeConfig;
165161
use crate::TokioRuntime;
166162

@@ -187,9 +183,9 @@ mod tests {
187183
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
188184
let th = Tick::<TickUTConfig>::spawn(Duration::from_millis(100), tx, true);
189185

190-
AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
186+
TickUTConfig::sleep(Duration::from_millis(500)).await;
191187
let _ = th.shutdown().unwrap().await;
192-
AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
188+
TickUTConfig::sleep(Duration::from_millis(500)).await;
193189

194190
let mut received = vec![];
195191
while let Some(x) = rx.recv().await {

openraft/src/display_ext.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Implement [`fmt::Display`] for types such as `Option<T>` and slice `&[T]`.
1+
//! Implement [`std::fmt::Display`] for types such as `Option<T>` and slice `&[T]`.
22
33
pub(crate) mod display_instant;
44
pub(crate) mod display_option;

openraft/src/engine/engine_impl.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,10 @@ use crate::raft::VoteResponse;
4242
use crate::raft_state::LogStateReader;
4343
use crate::raft_state::RaftState;
4444
use crate::summary::MessageSummary;
45-
use crate::type_config::alias::InstantOf;
4645
use crate::type_config::alias::ResponderOf;
4746
use crate::type_config::alias::SnapshotDataOf;
47+
use crate::type_config::TypeConfigExt;
4848
use crate::AsyncRuntime;
49-
use crate::Instant;
5049
use crate::LogId;
5150
use crate::LogIdOptionExt;
5251
use crate::Membership;
@@ -128,7 +127,7 @@ where C: RaftTypeConfig
128127
/// The candidate `last_log_id` is initialized with the attributes of Acceptor part:
129128
/// [`RaftState`]
130129
pub(crate) fn new_candidate(&mut self, vote: Vote<C::NodeId>) -> &mut Candidate<C, LeaderQuorumSet<C::NodeId>> {
131-
let now = InstantOf::<C>::now();
130+
let now = C::now();
132131
let last_log_id = self.state.last_log_id().copied();
133132

134133
let membership = self.state.membership_state.effective().membership();
@@ -290,7 +289,7 @@ where C: RaftTypeConfig
290289

291290
#[tracing::instrument(level = "debug", skip_all)]
292291
pub(crate) fn handle_vote_req(&mut self, req: VoteRequest<C::NodeId>) -> VoteResponse<C::NodeId> {
293-
let now = <C::AsyncRuntime as AsyncRuntime>::Instant::now();
292+
let now = C::now();
294293
let lease = self.config.timer_config.leader_lease;
295294
let vote = self.state.vote_ref();
296295

openraft/src/engine/handler/vote_handler/accept_vote_test.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ use crate::engine::Respond;
1111
use crate::error::Infallible;
1212
use crate::raft::VoteResponse;
1313
use crate::testing::log_id;
14-
use crate::type_config::alias::AsyncRuntimeOf;
14+
use crate::type_config::TypeConfigExt;
1515
use crate::utime::UTime;
16-
use crate::AsyncRuntime;
1716
use crate::EffectiveMembership;
1817
use crate::Membership;
1918
use crate::TokioInstant;
@@ -48,12 +47,12 @@ fn test_accept_vote_reject_smaller_vote() -> anyhow::Result<()> {
4847
let mut eng = eng();
4948
eng.output.take_commands();
5049

51-
let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
50+
let (tx, _rx) = UTConfig::oneshot();
5251
let resp = eng.vote_handler().accept_vote(&Vote::new(1, 2), tx, |_state, _err| mk_res());
5352

5453
assert!(resp.is_none());
5554

56-
let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
55+
let (tx, _rx) = UTConfig::oneshot();
5756
assert_eq!(
5857
vec![
5958
//
@@ -74,7 +73,7 @@ fn test_accept_vote_granted_greater_vote() -> anyhow::Result<()> {
7473
let mut eng = eng();
7574
eng.output.take_commands();
7675

77-
let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
76+
let (tx, _rx) = UTConfig::oneshot();
7877
let resp = eng.vote_handler().accept_vote(&Vote::new(3, 3), tx, |_state, _err| mk_res());
7978

8079
assert!(resp.is_some());

openraft/src/engine/handler/vote_handler/mod.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use crate::proposer::CandidateState;
1515
use crate::proposer::LeaderState;
1616
use crate::raft_state::LogStateReader;
1717
use crate::type_config::alias::InstantOf;
18+
use crate::type_config::TypeConfigExt;
1819
use crate::AsyncRuntime;
19-
use crate::Instant;
2020
use crate::LogId;
2121
use crate::OptionalSend;
2222
use crate::RaftState;
@@ -134,19 +134,15 @@ where C: RaftTypeConfig
134134
if vote > self.state.vote_ref() {
135135
tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote);
136136

137-
self.state.vote.update(<C::AsyncRuntime as AsyncRuntime>::Instant::now(), *vote);
137+
self.state.vote.update(C::now(), *vote);
138138
self.output.push_command(Command::SaveVote { vote: *vote });
139139
} else {
140-
self.state.vote.touch(<C::AsyncRuntime as AsyncRuntime>::Instant::now());
140+
self.state.vote.touch(C::now());
141141
}
142142

143143
// Update vote related timer and lease.
144144

145-
tracing::debug!(
146-
now = debug(<C::AsyncRuntime as AsyncRuntime>::Instant::now()),
147-
"{}",
148-
func_name!()
149-
);
145+
tracing::debug!(now = debug(C::now()), "{}", func_name!());
150146

151147
self.update_internal_server_state();
152148

openraft/src/engine/tests/install_full_snapshot_test.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ use crate::engine::LogIdList;
1212
use crate::engine::Respond;
1313
use crate::raft::SnapshotResponse;
1414
use crate::testing::log_id;
15-
use crate::type_config::alias::AsyncRuntimeOf;
16-
use crate::AsyncRuntime;
15+
use crate::type_config::TypeConfigExt;
1716
use crate::Membership;
1817
use crate::Snapshot;
1918
use crate::SnapshotMeta;
@@ -63,7 +62,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
6362

6463
let curr_vote = *eng.state.vote_ref();
6564

66-
let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
65+
let (tx, _rx) = UTConfig::oneshot();
6766

6867
eng.handle_install_full_snapshot(
6968
curr_vote,
@@ -87,7 +86,7 @@ fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
8786
eng.state.snapshot_meta
8887
);
8988

90-
let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
89+
let (dummy_tx, _rx) = UTConfig::oneshot();
9190
assert_eq!(
9291
vec![
9392
//
@@ -111,7 +110,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {
111110

112111
let curr_vote = *eng.state.vote_ref();
113112

114-
let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
113+
let (tx, _rx) = UTConfig::oneshot();
115114

116115
eng.handle_install_full_snapshot(
117116
curr_vote,
@@ -135,7 +134,7 @@ fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {
135134
eng.state.snapshot_meta
136135
);
137136

138-
let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
137+
let (dummy_tx, _rx) = UTConfig::oneshot();
139138
assert_eq!(
140139
vec![
141140
//

openraft/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ pub(crate) mod log_id_range;
4646
pub(crate) mod proposer;
4747
pub(crate) mod raft_state;
4848
pub(crate) mod timer;
49-
pub(crate) mod type_config;
5049
pub(crate) mod utime;
5150

5251
pub mod async_runtime;
@@ -63,6 +62,7 @@ pub mod network;
6362
pub mod raft;
6463
pub mod storage;
6564
pub mod testing;
65+
pub mod type_config;
6666

6767
#[cfg(test)]
6868
mod feature_serde_test;

openraft/src/network/snapshot_transport.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ use crate::error::StreamingError;
2020
use crate::network::RPCOption;
2121
use crate::raft::InstallSnapshotRequest;
2222
use crate::raft::SnapshotResponse;
23-
use crate::type_config::alias::AsyncRuntimeOf;
24-
use crate::AsyncRuntime;
23+
use crate::type_config::TypeConfigExt;
2524
use crate::ErrorSubject;
2625
use crate::ErrorVerb;
2726
use crate::OptionalSend;
@@ -124,7 +123,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io:
124123
// Sleep a short time otherwise in test environment it is a dead-loop that never
125124
// yields.
126125
// Because network implementation does not yield.
127-
AsyncRuntimeOf::<C>::sleep(Duration::from_millis(1)).await;
126+
C::sleep(Duration::from_millis(1)).await;
128127

129128
snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(subject_verb)?;
130129

@@ -160,7 +159,7 @@ where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io:
160159
);
161160

162161
#[allow(deprecated)]
163-
let res = AsyncRuntimeOf::<C>::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await;
162+
let res = C::timeout(option.hard_ttl(), net.install_snapshot(req, option.clone())).await;
164163

165164
let resp = match res {
166165
Ok(outer_res) => match outer_res {

0 commit comments

Comments
 (0)