@@ -16,17 +16,13 @@ extern crate lightning;
1616extern crate bitcoin;
1717extern crate libc;
1818
19- use bitcoin:: hash_types:: { BlockHash , Txid } ;
20- use bitcoin:: hashes:: hex:: FromHex ;
21- use lightning:: chain:: channelmonitor:: ChannelMonitor ;
22- use lightning:: sign:: { EntropySource , SignerProvider } ;
23- use lightning:: util:: ser:: { ReadableArgs , Writeable } ;
19+ use lightning:: util:: ser:: Writeable ;
2420use lightning:: util:: persist:: KVStorePersister ;
2521use std:: fs;
26- use std:: io:: Cursor ;
27- use std:: ops:: Deref ;
2822use std:: path:: { Path , PathBuf } ;
2923
24+ const TEMP_FILE_EXT : & str = "tmp" ;
25+
3026/// FilesystemPersister persists channel data on disk, where each channel's
3127/// data is stored in a file named after its funding outpoint.
3228///
@@ -56,96 +52,68 @@ impl FilesystemPersister {
5652 pub fn get_data_dir ( & self ) -> String {
5753 self . path_to_channel_data . clone ( )
5854 }
59-
60- /// Read `ChannelMonitor`s from disk.
61- pub fn read_channelmonitors < ES : Deref , SP : Deref > (
62- & self , entropy_source : ES , signer_provider : SP
63- ) -> std:: io:: Result < Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: Signer > ) > >
64- where
65- ES :: Target : EntropySource + Sized ,
66- SP :: Target : SignerProvider + Sized
67- {
68- let mut path = PathBuf :: from ( & self . path_to_channel_data ) ;
69- path. push ( "monitors" ) ;
70- if !Path :: new ( & path) . exists ( ) {
71- return Ok ( Vec :: new ( ) ) ;
72- }
73- let mut res = Vec :: new ( ) ;
74- for file_option in fs:: read_dir ( path) ? {
75- let file = file_option. unwrap ( ) ;
76- let owned_file_name = file. file_name ( ) ;
77- let filename = owned_file_name. to_str ( )
78- . ok_or_else ( || std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidData ,
79- "File name is not a valid utf8 string" ) ) ?;
80- if !filename. is_ascii ( ) || filename. len ( ) < 65 {
81- return Err ( std:: io:: Error :: new (
82- std:: io:: ErrorKind :: InvalidData ,
83- "Invalid ChannelMonitor file name" ,
84- ) ) ;
85- }
86- if filename. ends_with ( ".tmp" ) {
87- // If we were in the middle of committing an new update and crashed, it should be
88- // safe to ignore the update - we should never have returned to the caller and
89- // irrevocably committed to the new state in any way.
90- continue ;
91- }
92-
93- let txid: Txid = Txid :: from_hex ( filename. split_at ( 64 ) . 0 )
94- . map_err ( |_| std:: io:: Error :: new (
95- std:: io:: ErrorKind :: InvalidData ,
96- "Invalid tx ID in filename" ,
97- ) ) ?;
98-
99- let index: u16 = filename. split_at ( 65 ) . 1 . parse ( )
100- . map_err ( |_| std:: io:: Error :: new (
101- std:: io:: ErrorKind :: InvalidData ,
102- "Invalid tx index in filename" ,
103- ) ) ?;
104-
105- let contents = fs:: read ( & file. path ( ) ) ?;
106- let mut buffer = Cursor :: new ( & contents) ;
107- match <( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: Signer > ) >:: read ( & mut buffer, ( & * entropy_source, & * signer_provider) ) {
108- Ok ( ( blockhash, channel_monitor) ) => {
109- if channel_monitor. get_funding_txo ( ) . 0 . txid != txid || channel_monitor. get_funding_txo ( ) . 0 . index != index {
110- return Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidData ,
111- "ChannelMonitor was stored in the wrong file" ) ) ;
112- }
113- res. push ( ( blockhash, channel_monitor) ) ;
114- }
115- Err ( e) => return Err ( std:: io:: Error :: new (
116- std:: io:: ErrorKind :: InvalidData ,
117- format ! ( "Failed to deserialize ChannelMonitor: {}" , e) ,
118- ) )
119- }
120- }
121- Ok ( res)
122- }
12355}
12456
12557impl KVStorePersister for FilesystemPersister {
12658 fn persist < W : Writeable > ( & self , key : & str , object : & W ) -> std:: io:: Result < ( ) > {
127- let mut dest_file = PathBuf :: from ( self . path_to_channel_data . clone ( ) ) ;
59+ let mut dest_file = PathBuf :: from ( self . get_data_dir ( ) ) ;
12860 dest_file. push ( key) ;
12961 util:: write_to_file ( dest_file, object)
13062 }
63+ /// Read an object from storage.
64+ fn get < K : AsRef < Path > > ( & self , key : & K ) -> std:: io:: Result < Vec < u8 > > {
65+ let dest_file = PathBuf :: from ( self . get_data_dir ( ) ) . join ( key) ;
66+ fs:: read ( dest_file)
67+ }
68+ /// Read an object from storage.
69+ fn delete < K : AsRef < Path > > ( & self , key : & K ) -> std:: io:: Result < ( ) > {
70+ let dest_file = PathBuf :: from ( self . get_data_dir ( ) ) . join ( key) ;
71+ fs:: remove_file ( dest_file)
72+ }
73+ /// Return a sorted list of names in a key path. Paths (directories) are omitted, and the listing
74+ /// is not recursive. If path does not exist, the list will be empty.
75+ fn list_names < P : AsRef < Path > > ( & self , path : & P ) -> std:: io:: Result < Vec < String > > {
76+ let path = PathBuf :: from ( self . get_data_dir ( ) ) . join ( path) ;
77+ if !path. exists ( ) {
78+ return Ok ( vec ! [ ] ) ;
79+ } ;
80+ let mut files: Vec < String > = fs:: read_dir ( & path) ?
81+ . flatten ( )
82+ . filter_map ( |entry| {
83+ if entry. path ( ) . is_dir ( ) {
84+ return None ;
85+ }
86+ entry. file_name ( ) . into_string ( ) . ok ( )
87+ } )
88+ // Exclude temp files. These are an artifact of how we persist, and shouldn't be
89+ // surfaced to complicate the logic for other functions.
90+ . filter ( |f| !f. ends_with ( TEMP_FILE_EXT ) )
91+ . collect ( ) ;
92+ files. sort ( ) ;
93+ Ok ( files)
94+ }
13195}
13296
97+
13398#[ cfg( test) ]
13499mod tests {
135100 extern crate lightning;
136101 extern crate bitcoin;
137102 use crate :: FilesystemPersister ;
103+ use crate :: lightning:: util:: persist:: KVStoreChannelMonitorReader ;
138104 use bitcoin:: hashes:: hex:: FromHex ;
139105 use bitcoin:: Txid ;
140106 use lightning:: chain:: ChannelMonitorUpdateStatus ;
141107 use lightning:: chain:: chainmonitor:: Persist ;
142108 use lightning:: chain:: channelmonitor:: CLOSED_CHANNEL_UPDATE_ID ;
143109 use lightning:: chain:: transaction:: OutPoint ;
110+ use lightning:: util:: logger:: { Logger , Record } ;
144111 use lightning:: { check_closed_broadcast, check_closed_event, check_added_monitors} ;
145112 use lightning:: events:: { ClosureReason , MessageSendEventsProvider } ;
146113 use lightning:: ln:: functional_test_utils:: * ;
147114 use lightning:: util:: test_utils;
148115 use std:: fs;
116+ use std:: rc:: Rc ;
149117 #[ cfg( target_os = "windows" ) ]
150118 use {
151119 lightning:: get_event_msg,
@@ -163,6 +131,22 @@ mod tests {
163131 }
164132 }
165133
134+ pub struct StdoutLogger { }
135+
136+ impl Logger for StdoutLogger {
137+ fn log ( & self , record : & Record ) {
138+ let raw_log = record. args . to_string ( ) ;
139+ let log = format ! (
140+ "{:<5} [{}:{}] {}\n " ,
141+ record. level. to_string( ) ,
142+ record. module_path,
143+ record. line,
144+ raw_log
145+ ) ;
146+ print ! ( "{}" , log) ;
147+ }
148+ }
149+
166150 #[ test]
167151 fn test_if_monitors_is_not_dir ( ) {
168152 let persister = FilesystemPersister :: new ( "test_monitors_is_not_dir" . to_string ( ) ) ;
@@ -178,11 +162,20 @@ mod tests {
178162 node_cfgs[ 0 ] . chain_monitor = chain_mon_0;
179163 let node_chanmgrs = create_node_chanmgrs ( 1 , & node_cfgs, & [ None ] ) ;
180164 let nodes = create_network ( 1 , & node_cfgs, & node_chanmgrs) ;
181-
165+ let logger = Rc :: new ( StdoutLogger { } ) ;
166+
182167 // Check that read_channelmonitors() returns error if monitors/ is not a
183168 // directory.
184- assert ! ( persister. read_channelmonitors( nodes[ 0 ] . keys_manager, nodes[ 0 ] . keys_manager) . is_err( ) ) ;
185- }
169+ assert ! ( persister
170+ . read_channelmonitors(
171+ nodes[ 0 ] . keys_manager,
172+ nodes[ 0 ] . keys_manager,
173+ &&chanmon_cfgs[ 0 ] . tx_broadcaster,
174+ & chanmon_cfgs[ 0 ] . fee_estimator,
175+ & logger
176+ )
177+ . is_err( ) ) ;
178+ }
186179
187180 // Integration-test the FilesystemPersister. Test relaying a few payments
188181 // and check that the persisted data is updated the appropriate number of
@@ -192,31 +185,47 @@ mod tests {
192185 // Create the nodes, giving them FilesystemPersisters for data persisters.
193186 let persister_0 = FilesystemPersister :: new ( "test_filesystem_persister_0" . to_string ( ) ) ;
194187 let persister_1 = FilesystemPersister :: new ( "test_filesystem_persister_1" . to_string ( ) ) ;
195- let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
188+ let chanmon_cfgs = create_chanmon_cfgs ( 4 ) ;
196189 let mut node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
197190 let chain_mon_0 = test_utils:: TestChainMonitor :: new ( Some ( & chanmon_cfgs[ 0 ] . chain_source ) , & chanmon_cfgs[ 0 ] . tx_broadcaster , & chanmon_cfgs[ 0 ] . logger , & chanmon_cfgs[ 0 ] . fee_estimator , & persister_0, node_cfgs[ 0 ] . keys_manager ) ;
198191 let chain_mon_1 = test_utils:: TestChainMonitor :: new ( Some ( & chanmon_cfgs[ 1 ] . chain_source ) , & chanmon_cfgs[ 1 ] . tx_broadcaster , & chanmon_cfgs[ 1 ] . logger , & chanmon_cfgs[ 1 ] . fee_estimator , & persister_1, node_cfgs[ 1 ] . keys_manager ) ;
199192 node_cfgs[ 0 ] . chain_monitor = chain_mon_0;
200193 node_cfgs[ 1 ] . chain_monitor = chain_mon_1;
201194 let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
202195 let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
196+ let logger = Rc :: new ( StdoutLogger { } ) ;
197+ // sharing the broadcasters with TestChainMonitor seems to deadlock
198+ let broadcaster_0 = & chanmon_cfgs[ 2 ] . tx_broadcaster ;
199+ let broadcaster_1 = & chanmon_cfgs[ 3 ] . tx_broadcaster ;
203200
204201 // Check that the persisted channel data is empty before any channels are
205202 // open.
206- let mut persisted_chan_data_0 = persister_0. read_channelmonitors ( nodes[ 0 ] . keys_manager , nodes[ 0 ] . keys_manager ) . unwrap ( ) ;
203+ let mut persisted_chan_data_0 = persister_0. read_channelmonitors ( nodes[ 0 ] . keys_manager , nodes[ 0 ] . keys_manager ,
204+ & broadcaster_0,
205+ & chanmon_cfgs[ 0 ] . fee_estimator ,
206+ & logger) . unwrap ( ) ;
207207 assert_eq ! ( persisted_chan_data_0. len( ) , 0 ) ;
208- let mut persisted_chan_data_1 = persister_1. read_channelmonitors ( nodes[ 1 ] . keys_manager , nodes[ 1 ] . keys_manager ) . unwrap ( ) ;
208+ let mut persisted_chan_data_1 = persister_1. read_channelmonitors ( nodes[ 1 ] . keys_manager , nodes[ 1 ] . keys_manager ,
209+ & broadcaster_1,
210+ & chanmon_cfgs[ 1 ] . fee_estimator ,
211+ & logger) . unwrap ( ) ;
209212 assert_eq ! ( persisted_chan_data_1. len( ) , 0 ) ;
210213
211214 // Helper to make sure the channel is on the expected update ID.
212215 macro_rules! check_persisted_data {
213216 ( $expected_update_id: expr) => {
214- persisted_chan_data_0 = persister_0. read_channelmonitors( nodes[ 0 ] . keys_manager, nodes[ 0 ] . keys_manager) . unwrap( ) ;
217+ persisted_chan_data_0 = persister_0. read_channelmonitors( nodes[ 0 ] . keys_manager, nodes[ 0 ] . keys_manager,
218+ & broadcaster_0,
219+ & chanmon_cfgs[ 0 ] . fee_estimator,
220+ & logger) . unwrap( ) ;
215221 assert_eq!( persisted_chan_data_0. len( ) , 1 ) ;
216222 for ( _, mon) in persisted_chan_data_0. iter( ) {
217223 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
218224 }
219- persisted_chan_data_1 = persister_1. read_channelmonitors( nodes[ 1 ] . keys_manager, nodes[ 1 ] . keys_manager) . unwrap( ) ;
225+ persisted_chan_data_1 = persister_1. read_channelmonitors( nodes[ 1 ] . keys_manager, nodes[ 1 ] . keys_manager,
226+ & broadcaster_1,
227+ & chanmon_cfgs[ 1 ] . fee_estimator,
228+ & logger) . unwrap( ) ;
220229 assert_eq!( persisted_chan_data_1. len( ) , 1 ) ;
221230 for ( _, mon) in persisted_chan_data_1. iter( ) {
222231 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
0 commit comments