Skip to content

Commit da8d2e7

Browse files
committed
feat: q to quit and p to pause on CLI
1 parent 8cedf41 commit da8d2e7

File tree

3 files changed

+251
-28
lines changed

3 files changed

+251
-28
lines changed

Cargo.lock

Lines changed: 61 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ console-subscriber = { workspace = true, optional = true }
2020
tracing.workspace = true
2121
tracing-subscriber.workspace = true
2222
flume.workspace = true
23+
crossterm = "0.27"
24+
tokio-util = "0.7"

crates/cli/src/main.rs

Lines changed: 188 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,33 @@
1+
use crossterm::{
2+
event::{self, Event, KeyCode, KeyModifiers},
3+
terminal::{self},
4+
};
15
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
26
use std::{
37
collections::HashMap,
48
fmt::Write,
59
io::SeekFrom,
610
sync::{atomic::AtomicU64, Arc},
11+
time::Duration,
712
};
813
use tokio::{
914
fs::{create_dir_all, File},
1015
io::{AsyncSeekExt, AsyncWriteExt},
16+
signal,
1117
};
18+
use tokio_util::sync::CancellationToken;
1219
use tracing::trace;
1320

14-
use bit_rev::{session::Session, utils};
21+
use bit_rev::{
22+
session::{DownloadState, PieceResult, Session},
23+
utils,
24+
};
25+
26+
fn graceful_shutdown() {
27+
let _ = terminal::disable_raw_mode();
28+
println!("\n\nShutting down gracefully...");
29+
std::process::exit(0);
30+
}
1531

1632
#[tokio::main]
1733
async fn main() {
@@ -25,12 +41,14 @@ async fn main() {
2541
let output = std::env::args().nth(2);
2642

2743
if let Err(err) = download_file(&filename, output).await {
44+
let _ = terminal::disable_raw_mode();
2845
eprintln!("Error: {:?}", err);
2946
}
3047
}
3148

3249
pub async fn download_file(filename: &str, out_file: Option<String>) -> anyhow::Result<()> {
33-
let session = Session::new();
50+
let session = Arc::new(Session::new());
51+
let shutdown_token = CancellationToken::new();
3452

3553
let add_torrent_result = session.add_torrent(filename.into()).await?;
3654
let torrent = add_torrent_result.torrent.clone();
@@ -85,56 +103,199 @@ pub async fn download_file(filename: &str, out_file: Option<String>) -> anyhow::
85103

86104
let total_downloaded = Arc::new(AtomicU64::new(0));
87105
let total_downloaded_clone = total_downloaded.clone();
106+
let session_clone = session.clone();
88107

108+
// Spawn progress update task
89109
tokio::spawn(async move {
90110
loop {
91111
let new = total_downloaded_clone.load(std::sync::atomic::Ordering::Relaxed);
92112
pb.set_position(new);
93-
pb.set_message("Downloading");
113+
let status = match session_clone.get_download_state() {
114+
DownloadState::Init => "Initializing",
115+
DownloadState::Downloading => "Downloading",
116+
DownloadState::Paused => "Paused",
117+
};
118+
pb.set_message(status);
94119
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
95120
}
96121
});
97122

98-
let mut hashset = std::collections::HashSet::new();
99-
while hashset.len() < torrent.piece_hashes.len() {
100-
let pr = add_torrent_result.pr_rx.recv_async().await?;
123+
// Enable raw mode for single keypress detection
124+
terminal::enable_raw_mode().expect("Failed to enable raw mode");
101125

102-
hashset.insert(pr.index);
126+
// Set up Ctrl+C signal handler
127+
let _shutdown_token_signal = shutdown_token.clone();
128+
tokio::spawn(async move {
129+
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
130+
.expect("Failed to install SIGINT handler");
103131

104-
// Map piece to files and write data accordingly
105-
let file_mappings = utils::map_piece_to_files(&torrent, pr.index as usize);
106-
let mut piece_offset = 0;
132+
sigint.recv().await;
133+
graceful_shutdown();
134+
});
107135

108-
for mapping in file_mappings {
109-
let file = file_handles.get_mut(&mapping.file_index).ok_or_else(|| {
110-
anyhow::anyhow!("File handle not found for index {}", mapping.file_index)
111-
})?;
136+
// Spawn keyboard input handler
137+
// println!("Press 'p' to pause/resume, 'q' to quit (no Enter needed)");
138+
let session_input = session.clone();
139+
let shutdown_token_input = shutdown_token.clone();
140+
tokio::spawn(async move {
141+
loop {
142+
// Check for cancellation
143+
if shutdown_token_input.is_cancelled() {
144+
break;
145+
}
146+
147+
if event::poll(Duration::from_millis(100)).unwrap_or(false) {
148+
if let Ok(Event::Key(key_event)) = event::read() {
149+
match key_event.code {
150+
KeyCode::Char('p') | KeyCode::Char('P') => {
151+
match session_input.get_download_state() {
152+
DownloadState::Paused => {
153+
session_input.resume();
154+
}
155+
DownloadState::Downloading => {
156+
session_input.pause();
157+
}
158+
DownloadState::Init => {
159+
println!("\r\nCannot pause during initialization");
160+
}
161+
}
162+
}
163+
KeyCode::Char('q') | KeyCode::Char('Q') => {
164+
graceful_shutdown();
165+
}
166+
KeyCode::Char('c')
167+
if key_event.modifiers.contains(KeyModifiers::CONTROL) =>
168+
{
169+
graceful_shutdown();
170+
}
171+
_ => {}
172+
}
173+
}
174+
}
175+
tokio::time::sleep(Duration::from_millis(50)).await;
176+
}
177+
});
112178

113-
// Seek to correct position in file
114-
file.seek(SeekFrom::Start(mapping.file_offset as u64))
179+
let mut hashset = std::collections::HashSet::new();
180+
let mut pending_pieces: Vec<_> = Vec::new(); // Queue for pieces received while paused
181+
182+
while hashset.len() < torrent.piece_hashes.len() {
183+
// Check for shutdown signal
184+
if shutdown_token.is_cancelled() {
185+
break;
186+
}
187+
// Process any pending pieces first if we're now downloading
188+
if session.get_download_state() == DownloadState::Downloading && !pending_pieces.is_empty()
189+
{
190+
let pieces_to_process = std::mem::take(&mut pending_pieces);
191+
for pr in pieces_to_process {
192+
process_piece(
193+
&pr,
194+
&torrent,
195+
&mut file_handles,
196+
&mut hashset,
197+
&total_downloaded,
198+
)
115199
.await?;
200+
}
201+
}
116202

117-
// Write the portion of the piece that belongs to this file
118-
let piece_data = &pr.buf[piece_offset..piece_offset + mapping.length];
119-
file.write_all(piece_data).await?;
203+
// Use a timeout to periodically check if we should process pending pieces
204+
let pr_result = tokio::time::timeout(
205+
Duration::from_millis(100),
206+
add_torrent_result.pr_rx.recv_async(),
207+
)
208+
.await;
120209

121-
piece_offset += mapping.length;
210+
match pr_result {
211+
Ok(Ok(pr)) => {
212+
// If paused, queue the piece but don't process it yet
213+
if session.get_download_state() != DownloadState::Downloading {
214+
pending_pieces.push(pr);
215+
continue;
216+
}
122217

123-
trace!(
124-
"Wrote {} bytes to file {} at offset {}",
125-
mapping.length,
126-
mapping.file_index,
127-
mapping.file_offset
128-
);
218+
// Process piece immediately if downloading
219+
process_piece(
220+
&pr,
221+
&torrent,
222+
&mut file_handles,
223+
&mut hashset,
224+
&total_downloaded,
225+
)
226+
.await?;
227+
}
228+
Ok(Err(_)) => {
229+
// Channel closed
230+
break;
231+
}
232+
Err(_) => {
233+
// Timeout - continue loop to check pending pieces
234+
continue;
235+
}
129236
}
237+
}
130238

131-
total_downloaded.fetch_add(pr.length as u64, std::sync::atomic::Ordering::Relaxed);
239+
// Process any remaining pending pieces at the end
240+
for pr in pending_pieces {
241+
process_piece(
242+
&pr,
243+
&torrent,
244+
&mut file_handles,
245+
&mut hashset,
246+
&total_downloaded,
247+
)
248+
.await?;
132249
}
133250

134251
// Sync all files
135252
for (_, file) in file_handles {
136253
file.sync_all().await?;
137254
}
138255

256+
// Restore terminal on completion
257+
let _ = terminal::disable_raw_mode();
258+
println!("\nDownload completed!");
259+
139260
Ok(())
140261
}
262+
263+
async fn process_piece(
264+
pr: &PieceResult,
265+
torrent: &bit_rev::torrent::Torrent,
266+
file_handles: &mut HashMap<usize, File>,
267+
hashset: &mut std::collections::HashSet<u32>,
268+
total_downloaded: &Arc<AtomicU64>,
269+
) -> anyhow::Result<bool> {
270+
hashset.insert(pr.index);
271+
272+
// Map piece to files and write data accordingly
273+
let file_mappings = utils::map_piece_to_files(torrent, pr.index as usize);
274+
let mut piece_offset = 0;
275+
276+
for mapping in file_mappings {
277+
let file = file_handles.get_mut(&mapping.file_index).ok_or_else(|| {
278+
anyhow::anyhow!("File handle not found for index {}", mapping.file_index)
279+
})?;
280+
281+
// Seek to correct position in file
282+
file.seek(SeekFrom::Start(mapping.file_offset as u64))
283+
.await?;
284+
285+
// Write the portion of the piece that belongs to this file
286+
let piece_data = &pr.buf[piece_offset..piece_offset + mapping.length];
287+
file.write_all(piece_data).await?;
288+
289+
piece_offset += mapping.length;
290+
291+
trace!(
292+
"Wrote {} bytes to file {} at offset {}",
293+
mapping.length,
294+
mapping.file_index,
295+
mapping.file_offset
296+
);
297+
}
298+
299+
total_downloaded.fetch_add(pr.length as u64, std::sync::atomic::Ordering::Relaxed);
300+
Ok(true)
301+
}

0 commit comments

Comments
 (0)