@@ -36,7 +36,7 @@ use crate::ln::types::ChannelId;
3636use crate :: sign:: { ecdsa:: EcdsaChannelSigner , EntropySource , SignerProvider } ;
3737use crate :: sync:: Mutex ;
3838use crate :: util:: async_poll:: {
39- dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture ,
39+ dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture , TwoFutureJoiner ,
4040} ;
4141use crate :: util:: logger:: Logger ;
4242use crate :: util:: native_async:: FutureSpawner ;
@@ -576,15 +576,6 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
576576/// list channel monitors themselves and load channels individually using
577577/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
578578///
579- /// ## EXTREMELY IMPORTANT
580- ///
581- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
582- /// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
583- /// that circumstance (not when there is really a permissions error, for example). This is because
584- /// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
585- /// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
586- /// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
587- ///
588579/// # Pruning stale channel updates
589580///
590581/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
@@ -658,10 +649,6 @@ where
658649 }
659650
660651 /// Reads all stored channel monitors, along with any stored updates for them.
661- ///
662- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
663- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
664- /// documentation for [`MonitorUpdatingPersister`].
665652 pub fn read_all_channel_monitors_with_updates (
666653 & self ,
667654 ) -> Result <
@@ -673,10 +660,6 @@ where
673660
674661 /// Read a single channel monitor, along with any stored updates for it.
675662 ///
676- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
677- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
678- /// documentation for [`MonitorUpdatingPersister`].
679- ///
680663 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
681664 /// underscore `_` between txid and index for v1 channels. For example, given:
682665 ///
@@ -873,10 +856,6 @@ where
873856 /// While the reads themselves are performend in parallel, deserializing the
874857 /// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
875858 /// this may substantially limit the parallelism of this method.
876- ///
877- /// It is extremely important that your [`KVStore::read`] implementation uses the
878- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
879- /// documentation for [`MonitorUpdatingPersister`].
880859 pub async fn read_all_channel_monitors_with_updates (
881860 & self ,
882861 ) -> Result <
@@ -911,10 +890,6 @@ where
911890 /// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
912891 /// and other multi-threaded runtime requirements), this method requires that `self` be an
913892 /// `Arc` that can live for `'static` and be sent and accessed across threads.
914- ///
915- /// It is extremely important that your [`KVStore::read`] implementation uses the
916- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
917- /// documentation for [`MonitorUpdatingPersister`].
918893 pub async fn read_all_channel_monitors_with_updates_parallel (
919894 self : & Arc < Self > ,
920895 ) -> Result <
@@ -955,10 +930,6 @@ where
955930
956931 /// Read a single channel monitor, along with any stored updates for it.
957932 ///
958- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
959- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
960- /// documentation for [`MonitorUpdatingPersister`].
961- ///
962933 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
963934 /// underscore `_` between txid and index for v1 channels. For example, given:
964935 ///
@@ -1117,40 +1088,37 @@ where
11171088 io:: Error ,
11181089 > {
11191090 let monitor_name = MonitorName :: from_str ( monitor_key) ?;
1120- let read_res = self . maybe_read_monitor ( & monitor_name, monitor_key) . await ?;
1121- let ( block_hash, monitor) = match read_res {
1091+ let read_future = pin ! ( self . maybe_read_monitor( & monitor_name, monitor_key) ) ;
1092+ let list_future = pin ! ( self
1093+ . kv_store
1094+ . list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , monitor_key) ) ;
1095+ let ( read_res, list_res) = TwoFutureJoiner :: new ( read_future, list_future) . await ;
1096+ let ( block_hash, monitor) = match read_res? {
11221097 Some ( res) => res,
11231098 None => return Ok ( None ) ,
11241099 } ;
11251100 let mut current_update_id = monitor. get_latest_update_id ( ) ;
1126- // TODO: Parallelize this loop by speculatively reading a batch of updates
1127- loop {
1128- current_update_id = match current_update_id. checked_add ( 1 ) {
1129- Some ( next_update_id) => next_update_id,
1130- None => break ,
1131- } ;
1132- let update_name = UpdateName :: from ( current_update_id) ;
1133- let update = match self . read_monitor_update ( monitor_key, & update_name) . await {
1134- Ok ( update) => update,
1135- Err ( err) if err. kind ( ) == io:: ErrorKind :: NotFound => {
1136- // We can't find any more updates, so we are done.
1137- break ;
1138- } ,
1139- Err ( err) => return Err ( err) ,
1140- } ;
1141-
1142- monitor
1143- . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1144- . map_err ( |e| {
1145- log_error ! (
1146- self . logger,
1147- "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1148- monitor_key,
1149- update_name. as_str( ) ,
1150- e
1151- ) ;
1152- io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1153- } ) ?;
1101+ let updates: Result < Vec < _ > , _ > =
1102+ list_res?. into_iter ( ) . map ( |name| UpdateName :: new ( name) ) . collect ( ) ;
1103+ let mut updates = updates?;
1104+ updates. sort_unstable ( ) ;
1105+ // TODO: Parallelize this loop
1106+ for update_name in updates {
1107+ if update_name. 0 > current_update_id {
1108+ let update = self . read_monitor_update ( monitor_key, & update_name) . await ?;
1109+ monitor
1110+ . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1111+ . map_err ( |e| {
1112+ log_error ! (
1113+ self . logger,
1114+ "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1115+ monitor_key,
1116+ update_name. as_str( ) ,
1117+ e
1118+ ) ;
1119+ io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1120+ } ) ?;
1121+ }
11541122 }
11551123 Ok ( Some ( ( block_hash, monitor) ) )
11561124 }
@@ -1525,7 +1493,7 @@ impl core::fmt::Display for MonitorName {
15251493/// let monitor_name = "some_monitor_name";
15261494/// let storage_key = format!("channel_monitor_updates/{}/{}", monitor_name, update_name.as_str());
15271495/// ```
1528- #[ derive( Debug ) ]
1496+ #[ derive( Debug , PartialEq , Eq , PartialOrd , Ord ) ]
15291497pub struct UpdateName ( pub u64 , String ) ;
15301498
15311499impl UpdateName {
0 commit comments