Skip to content

Commit d607625

Browse files
committed
Parallelize ChannelMonitorUpdate loading
When reading `ChannelMonitor`s from a `MonitorUpdatingPersister` on startup, we have to make sure to load any `ChannelMonitorUpdate`s and re-apply them as well. Now that we know which `ChannelMonitorUpdate`s to load from `list`ing the entries from the `KVStore` we can parallelize the reads themselves, which we do here. Now, loading all `ChannelMonitor`s from an async `KVStore` requires only three full RTTs - one to list the set of `ChannelMonitor`s, one to both fetch the `ChanelMonitor` and list the set of `ChannelMonitorUpdate`s, and one to fetch all the `ChannelMonitorUpdate`s (with the last one skipped when there are no `ChannelMonitorUpdate`s to read).
1 parent 5232ea8 commit d607625

File tree

1 file changed

+22
-18
lines changed

1 file changed

+22
-18
lines changed

lightning/src/util/persist.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,28 +1097,32 @@ where
10971097
Some(res) => res,
10981098
None => return Ok(None),
10991099
};
1100-
let mut current_update_id = monitor.get_latest_update_id();
1100+
let current_update_id = monitor.get_latest_update_id();
11011101
let updates: Result<Vec<_>, _> =
11021102
list_res?.into_iter().map(|name| UpdateName::new(name)).collect();
11031103
let mut updates = updates?;
11041104
updates.sort_unstable();
1105-
// TODO: Parallelize this loop
1106-
for update_name in updates {
1107-
if update_name.0 > current_update_id {
1108-
let update = self.read_monitor_update(monitor_key, &update_name).await?;
1109-
monitor
1110-
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1111-
.map_err(|e| {
1112-
log_error!(
1113-
self.logger,
1114-
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1115-
monitor_key,
1116-
update_name.as_str(),
1117-
e
1118-
);
1119-
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1120-
})?;
1121-
}
1105+
let updates_to_load = updates.iter().filter(|update| update.0 > current_update_id);
1106+
let mut update_futures = Vec::with_capacity(updates_to_load.clone().count());
1107+
for update_name in updates_to_load {
1108+
update_futures.push(ResultFuture::Pending(Box::pin(async move {
1109+
(update_name, self.read_monitor_update(monitor_key, update_name).await)
1110+
})));
1111+
}
1112+
for (update_name, update_res) in MultiResultFuturePoller::new(update_futures).await {
1113+
let update = update_res?;
1114+
monitor
1115+
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1116+
.map_err(|e| {
1117+
log_error!(
1118+
self.logger,
1119+
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1120+
monitor_key,
1121+
update_name.as_str(),
1122+
e
1123+
);
1124+
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1125+
})?;
11221126
}
11231127
Ok(Some((block_hash, monitor)))
11241128
}

0 commit comments

Comments
 (0)