@@ -36,7 +36,9 @@ use crate::core::sm;
3636use crate :: core:: sm:: handle;
3737use crate :: core:: sm:: CommandSeq ;
3838use crate :: core:: ServerState ;
39+ use crate :: display_ext:: DisplayInstantExt ;
3940use crate :: display_ext:: DisplayOption ;
41+ use crate :: display_ext:: DisplayOptionExt ;
4042use crate :: display_ext:: DisplaySlice ;
4143use crate :: engine:: Command ;
4244use crate :: engine:: Condition ;
@@ -71,6 +73,7 @@ use crate::raft::AppendEntriesRequest;
7173use crate :: raft:: AppendEntriesResponse ;
7274use crate :: raft:: ClientWriteResponse ;
7375use crate :: raft:: VoteRequest ;
76+ use crate :: raft:: VoteResponse ;
7477use crate :: raft_state:: LogIOId ;
7578use crate :: raft_state:: LogStateReader ;
7679use crate :: replication;
@@ -138,20 +141,14 @@ impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
138141///
139142/// It is created when RaftCore enters leader state, and will be dropped when it quits leader state.
140143pub ( crate ) struct LeaderData < C : RaftTypeConfig > {
141- /// A mapping of node IDs the replication state of the target node.
142- // TODO(xp): make it a field of RaftCore. it does not have to belong to leader.
143- // It requires the Engine to emit correct add/remove replication commands
144- pub ( super ) replications : BTreeMap < C :: NodeId , ReplicationHandle < C > > ,
145-
146144 /// The time to send next heartbeat.
147145 pub ( crate ) next_heartbeat : <C :: AsyncRuntime as AsyncRuntime >:: Instant ,
148146}
149147
150148impl < C : RaftTypeConfig > LeaderData < C > {
151149 pub ( crate ) fn new ( ) -> Self {
152150 Self {
153- replications : BTreeMap :: new ( ) ,
154- next_heartbeat : <C :: AsyncRuntime as AsyncRuntime >:: Instant :: now ( ) ,
151+ next_heartbeat : InstantOf :: < C > :: now ( ) ,
155152 }
156153 }
157154}
@@ -187,6 +184,9 @@ where
187184 /// Channels to send result back to client when logs are applied.
188185 pub ( crate ) client_resp_channels : BTreeMap < u64 , ResponderOf < C > > ,
189186
187+ /// A mapping of node IDs the replication state of the target node.
188+ pub ( crate ) replications : BTreeMap < C :: NodeId , ReplicationHandle < C > > ,
189+
190190 pub ( crate ) leader_data : Option < LeaderData < C > > ,
191191
192192 #[ allow( dead_code) ]
@@ -320,7 +320,7 @@ where
320320 let mut pending = FuturesUnordered :: new ( ) ;
321321
322322 let voter_progresses = {
323- let l = & self . engine . internal_server_state . leading ( ) . unwrap ( ) ;
323+ let l = & self . engine . leader . as_ref ( ) . unwrap ( ) ;
324324 l. progress . iter ( ) . filter ( |( id, _v) | l. progress . is_voter ( id) == Some ( true ) )
325325 } ;
326326
@@ -533,7 +533,7 @@ where
533533
534534 #[ tracing:: instrument( level = "debug" , skip_all) ]
535535 pub fn flush_metrics ( & mut self ) {
536- let leader_metrics = if let Some ( leader) = self . engine . internal_server_state . leading ( ) {
536+ let leader_metrics = if let Some ( leader) = self . engine . leader . as_ref ( ) {
537537 let prog = & leader. progress ;
538538 Some ( prog. iter ( ) . map ( |( id, p) | ( * id, * p. borrow ( ) ) ) . collect ( ) )
539539 } else {
@@ -702,7 +702,7 @@ where
702702 /// from a quorum of followers, indicating its leadership is current and recognized.
703703 /// If the node is not a leader or no acknowledgment has been received, `None` is returned.
704704 fn last_quorum_acked_time ( & mut self ) -> Option < InstantOf < C > > {
705- let leading = self . engine . internal_server_state . leading_mut ( ) ;
705+ let leading = self . engine . leader . as_mut ( ) ;
706706 leading. and_then ( |l| l. last_quorum_acked_time ( ) )
707707 }
708708
@@ -831,7 +831,9 @@ where
831831 let network = self . network . new_client ( target, target_node) . await ;
832832 let snapshot_network = self . network . new_client ( target, target_node) . await ;
833833
834- let session_id = ReplicationSessionId :: new ( * self . engine . state . vote_ref ( ) , * membership_log_id) ;
834+ let leader = self . engine . leader . as_ref ( ) . unwrap ( ) ;
835+
836+ let session_id = ReplicationSessionId :: new ( leader. vote , * membership_log_id) ;
835837
836838 ReplicationCore :: < C , N , LS > :: spawn (
837839 target,
@@ -853,27 +855,23 @@ where
853855 pub async fn remove_all_replication ( & mut self ) {
854856 tracing:: info!( "remove all replication" ) ;
855857
856- if let Some ( l) = & mut self . leader_data {
857- let nodes = std:: mem:: take ( & mut l. replications ) ;
858+ let nodes = std:: mem:: take ( & mut self . replications ) ;
858859
859- tracing:: debug!(
860- targets = debug( nodes. iter( ) . map( |x| * x. 0 ) . collect:: <Vec <_>>( ) ) ,
861- "remove all targets from replication_metrics"
862- ) ;
860+ tracing:: debug!(
861+ targets = debug( nodes. iter( ) . map( |x| * x. 0 ) . collect:: <Vec <_>>( ) ) ,
862+ "remove all targets from replication_metrics"
863+ ) ;
863864
864- for ( target, s) in nodes {
865- let handle = s. join_handle ;
865+ for ( target, s) in nodes {
866+ let handle = s. join_handle ;
866867
867- // Drop sender to notify the task to shutdown
868- drop ( s. tx_repl ) ;
868+ // Drop sender to notify the task to shutdown
869+ drop ( s. tx_repl ) ;
869870
870- tracing:: debug!( "joining removed replication: {}" , target) ;
871- let _x = handle. await ;
872- tracing:: info!( "Done joining removed replication : {}" , target) ;
873- }
874- } else {
875- unreachable ! ( "it has to be a leader!!!" ) ;
876- } ;
871+ tracing:: debug!( "joining removed replication: {}" , target) ;
872+ let _x = handle. await ;
873+ tracing:: info!( "Done joining removed replication : {}" , target) ;
874+ }
877875 }
878876
879877 /// Run as many commands as possible.
@@ -1154,8 +1152,8 @@ where
11541152 RaftMsg :: RequestVote { rpc, tx } => {
11551153 let now = <C :: AsyncRuntime as AsyncRuntime >:: Instant :: now ( ) ;
11561154 tracing:: info!(
1157- now = debug ( now) ,
1158- vote_request = display( rpc. summary ( ) ) ,
1155+ now = display ( now. display ( ) ) ,
1156+ vote_request = display( & rpc) ,
11591157 "received RaftMsg::RequestVote: {}" ,
11601158 func_name!( )
11611159 ) ;
@@ -1237,36 +1235,36 @@ where
12371235 Notify :: VoteResponse {
12381236 target,
12391237 resp,
1240- sender_vote : vote ,
1238+ sender_vote,
12411239 } => {
12421240 let now = <C :: AsyncRuntime as AsyncRuntime >:: Instant :: now ( ) ;
12431241
12441242 tracing:: info!(
1245- now = debug ( now) ,
1246- resp = display( resp. summary ( ) ) ,
1243+ now = display ( now. display ( ) ) ,
1244+ resp = display( & resp) ,
12471245 "received Notify::VoteResponse: {}" ,
12481246 func_name!( )
12491247 ) ;
12501248
1251- if self . does_vote_match ( & vote , "VoteResponse" ) {
1249+ if self . does_vote_match ( & sender_vote , "VoteResponse" ) {
12521250 self . engine . handle_vote_resp ( target, resp) ;
12531251 }
12541252 }
12551253
12561254 Notify :: HigherVote {
12571255 target,
12581256 higher,
1259- sender_vote : vote ,
1257+ sender_vote,
12601258 } => {
12611259 tracing:: info!(
12621260 target = display( target) ,
12631261 higher_vote = display( & higher) ,
1264- sending_vote = display( & vote ) ,
1262+ sending_vote = display( & sender_vote ) ,
12651263 "received Notify::HigherVote: {}" ,
12661264 func_name!( )
12671265 ) ;
12681266
1269- if self . does_vote_match ( & vote , "HigherVote" ) {
1267+ if self . does_vote_match ( & sender_vote , "HigherVote" ) {
12701268 // Rejected vote change is ok.
12711269 let _ = self . engine . vote_handler ( ) . update_vote ( & higher) ;
12721270 }
@@ -1282,6 +1280,7 @@ where
12821280
12831281 // TODO: test: fixture: make isolated_nodes a single-way isolating.
12841282
1283+ // TODO: check if it is Leader with Engine
12851284 // Leader send heartbeat
12861285 let heartbeat_at = self . leader_data . as_ref ( ) . map ( |x| x. next_heartbeat ) ;
12871286 if let Some ( t) = heartbeat_at {
@@ -1347,16 +1346,20 @@ where
13471346 return Err ( Fatal :: from ( error) ) ;
13481347 }
13491348
1350- replication:: Response :: HigherVote { target, higher, vote } => {
1349+ replication:: Response :: HigherVote {
1350+ target,
1351+ higher,
1352+ sender_vote,
1353+ } => {
13511354 tracing:: info!(
13521355 target = display( target) ,
13531356 higher_vote = display( & higher) ,
1354- sending_vote = display( & vote ) ,
1357+ sender_vote = display( & sender_vote ) ,
13551358 "received Notify::HigherVote: {}" ,
13561359 func_name!( )
13571360 ) ;
13581361
1359- if self . does_vote_match ( & vote , "HigherVote" ) {
1362+ if self . does_vote_match ( & sender_vote , "HigherVote" ) {
13601363 // Rejected vote change is ok.
13611364 let _ = self . engine . vote_handler ( ) . update_vote ( & higher) ;
13621365 }
@@ -1506,30 +1509,40 @@ where
15061509 "handle_replication_progress"
15071510 ) ;
15081511
1512+ #[ allow( clippy:: collapsible_if) ]
15091513 if tracing:: enabled!( Level :: DEBUG ) {
1510- if let Some ( l) = & self . leader_data {
1511- if !l. replications . contains_key ( & target) {
1512- tracing:: warn!( "leader has removed target: {}" , target) ;
1513- } ;
1514- } else {
1515- // TODO: A leader may have stepped down.
1516- }
1514+ if !self . replications . contains_key ( & target) {
1515+ tracing:: warn!( "leader has removed target: {}" , target) ;
1516+ } ;
15171517 }
15181518
15191519 // A leader may have stepped down.
1520- if self . engine . internal_server_state . is_leading ( ) {
1520+ if self . engine . leader . is_some ( ) {
15211521 self . engine . replication_handler ( ) . update_progress ( target, request_id, result) ;
15221522 }
15231523 }
15241524
15251525 /// If a message is sent by a previous server state but is received by current server state,
15261526 /// it is a stale message and should be just ignored.
1527- fn does_vote_match ( & self , vote : & Vote < C :: NodeId > , msg : impl Display ) -> bool {
1528- if vote != self . engine . state . vote_ref ( ) {
1527+ fn does_vote_match ( & self , sender_vote : & Vote < C :: NodeId > , msg : impl Display ) -> bool {
1528+ // Get the current leading vote:
1529+ // - If input `sender_vote` is committed, it is sent by a Leader. Therefore we check against current
1530+ // Leader's vote.
1531+ // - Otherwise, it is sent by a Candidate, we check against the current in progress voting state.
1532+ let my_vote = if sender_vote. is_committed ( ) {
1533+ let l = self . engine . leader . as_ref ( ) ;
1534+ l. map ( |x| x. vote )
1535+ } else {
1536+ // If it finished voting, Candidate's vote is None.
1537+ let candidate = self . engine . candidate_ref ( ) ;
1538+ candidate. map ( |x| * x. vote_ref ( ) )
1539+ } ;
1540+
1541+ if Some ( * sender_vote) != my_vote {
15291542 tracing:: warn!(
1530- "vote changed: msg sent by: {:? }; curr : {}; ignore when ({})" ,
1531- vote ,
1532- self . engine . state . vote_ref ( ) ,
1543+ "A message will be ignored because vote changed: msg sent by vote : {}; current my vote : {}; when ({})" ,
1544+ sender_vote ,
1545+ my_vote . display ( ) ,
15331546 msg
15341547 ) ;
15351548 false
@@ -1544,7 +1557,7 @@ where
15441557 session_id : & ReplicationSessionId < C :: NodeId > ,
15451558 msg : impl Display + Copy ,
15461559 ) -> bool {
1547- if !self . does_vote_match ( & session_id. vote , msg) {
1560+ if !self . does_vote_match ( session_id. vote_ref ( ) , msg) {
15481561 return false ;
15491562 }
15501563
@@ -1624,6 +1637,13 @@ where
16241637 Command :: SaveVote { vote } => {
16251638 self . log_store . save_vote ( & vote) . await ?;
16261639 self . engine . state . io_state_mut ( ) . update_vote ( vote) ;
1640+
1641+ let _ = self . tx_notify . send ( Notify :: VoteResponse {
1642+ target : self . id ,
1643+ // last_log_id is not used when sending VoteRequest to local node
1644+ resp : VoteResponse :: new ( vote, None ) ,
1645+ sender_vote : vote,
1646+ } ) ;
16271647 }
16281648 Command :: PurgeLog { upto } => {
16291649 self . log_store . purge ( upto) . await ?;
@@ -1656,12 +1676,8 @@ where
16561676 self . spawn_parallel_vote_requests ( & vote_req) . await ;
16571677 }
16581678 Command :: ReplicateCommitted { committed } => {
1659- if let Some ( l) = & self . leader_data {
1660- for node in l. replications . values ( ) {
1661- let _ = node. tx_repl . send ( Replicate :: Committed ( committed) ) ;
1662- }
1663- } else {
1664- unreachable ! ( "it has to be a leader!!!" ) ;
1679+ for node in self . replications . values ( ) {
1680+ let _ = node. tx_repl . send ( Replicate :: Committed ( committed) ) ;
16651681 }
16661682 }
16671683 Command :: Commit {
@@ -1673,38 +1689,29 @@ where
16731689 self . apply_to_state_machine ( seq, already_committed. next_index ( ) , upto. index ) . await ?;
16741690 }
16751691 Command :: Replicate { req, target } => {
1676- if let Some ( l) = & self . leader_data {
1677- let node = l. replications . get ( & target) . expect ( "replication to target node exists" ) ;
1692+ let node = self . replications . get ( & target) . expect ( "replication to target node exists" ) ;
16781693
1679- match req {
1680- Inflight :: None => {
1681- let _ = node. tx_repl . send ( Replicate :: Heartbeat ) ;
1682- }
1683- Inflight :: Logs { id, log_id_range } => {
1684- let _ = node. tx_repl . send ( Replicate :: logs ( RequestId :: new_append_entries ( id) , log_id_range) ) ;
1685- }
1686- Inflight :: Snapshot { id, last_log_id } => {
1687- // unwrap: The replication channel must not be dropped or it is a bug.
1688- node. tx_repl . send ( Replicate :: snapshot ( RequestId :: new_snapshot ( id) , last_log_id) ) . map_err (
1689- |_e| StorageIOError :: read_snapshot ( None , AnyError :: error ( "replication channel closed" ) ) ,
1690- ) ?;
1691- }
1694+ match req {
1695+ Inflight :: None => {
1696+ let _ = node. tx_repl . send ( Replicate :: Heartbeat ) ;
1697+ }
1698+ Inflight :: Logs { id, log_id_range } => {
1699+ let _ = node. tx_repl . send ( Replicate :: logs ( RequestId :: new_append_entries ( id) , log_id_range) ) ;
1700+ }
1701+ Inflight :: Snapshot { id, last_log_id } => {
1702+ // unwrap: The replication channel must not be dropped or it is a bug.
1703+ node. tx_repl . send ( Replicate :: snapshot ( RequestId :: new_snapshot ( id) , last_log_id) ) . map_err (
1704+ |_e| StorageIOError :: read_snapshot ( None , AnyError :: error ( "replication channel closed" ) ) ,
1705+ ) ?;
16921706 }
1693- } else {
1694- unreachable ! ( "it has to be a leader!!!" ) ;
16951707 }
16961708 }
16971709 Command :: RebuildReplicationStreams { targets } => {
16981710 self . remove_all_replication ( ) . await ;
16991711
17001712 for ( target, matching) in targets. iter ( ) {
17011713 let handle = self . spawn_replication_stream ( * target, * matching) . await ;
1702-
1703- if let Some ( l) = & mut self . leader_data {
1704- l. replications . insert ( * target, handle) ;
1705- } else {
1706- unreachable ! ( "it has to be a leader!!!" ) ;
1707- }
1714+ self . replications . insert ( * target, handle) ;
17081715 }
17091716 }
17101717 Command :: StateMachine { command } => {
0 commit comments