Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ console-subscriber = { workspace = true, optional = true }
tracing.workspace = true
tracing-subscriber.workspace = true
flume.workspace = true
crossterm = "0.27"
tokio-util = "0.7"
214 changes: 187 additions & 27 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
use crossterm::{
event::{self, Event, KeyCode, KeyModifiers},
terminal::{self},
};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use std::{
collections::HashMap,
fmt::Write,
io::SeekFrom,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use tokio::{
fs::{create_dir_all, File},
io::{AsyncSeekExt, AsyncWriteExt},
signal,
};
use tokio_util::sync::CancellationToken;
use tracing::trace;

use bit_rev::{session::Session, utils};
use bit_rev::{
session::{DownloadState, PieceResult, Session},
utils,
};

fn graceful_shutdown() {
let _ = terminal::disable_raw_mode();
println!("\n\nShutting down gracefully...");
std::process::exit(0);
}

#[tokio::main]
async fn main() {
Expand All @@ -25,12 +41,14 @@ async fn main() {
let output = std::env::args().nth(2);

if let Err(err) = download_file(&filename, output).await {
let _ = terminal::disable_raw_mode();
eprintln!("Error: {:?}", err);
}
}

pub async fn download_file(filename: &str, out_file: Option<String>) -> anyhow::Result<()> {
let session = Session::new();
let session = Arc::new(Session::new());
let shutdown_token = CancellationToken::new();

let add_torrent_result = session.add_torrent(filename.into()).await?;
let torrent = add_torrent_result.torrent.clone();
Expand Down Expand Up @@ -85,56 +103,198 @@ pub async fn download_file(filename: &str, out_file: Option<String>) -> anyhow::

let total_downloaded = Arc::new(AtomicU64::new(0));
let total_downloaded_clone = total_downloaded.clone();
let session_clone = session.clone();

// Spawn progress update task
tokio::spawn(async move {
loop {
let new = total_downloaded_clone.load(std::sync::atomic::Ordering::Relaxed);
pb.set_position(new);
pb.set_message("Downloading");
let status = match session_clone.get_download_state() {
DownloadState::Init => "Initializing",
DownloadState::Downloading => "Downloading",
DownloadState::Paused => "Paused",
};
pb.set_message(status);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
});

let mut hashset = std::collections::HashSet::new();
while hashset.len() < torrent.piece_hashes.len() {
let pr = add_torrent_result.pr_rx.recv_async().await?;
// Enable raw mode for single keypress detection
terminal::enable_raw_mode().expect("Failed to enable raw mode");

hashset.insert(pr.index);
// Set up Ctrl+C signal handler
let _shutdown_token_signal = shutdown_token.clone();
tokio::spawn(async move {
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Failed to install SIGINT handler");

// Map piece to files and write data accordingly
let file_mappings = utils::map_piece_to_files(&torrent, pr.index as usize);
let mut piece_offset = 0;
sigint.recv().await;
graceful_shutdown();
});

for mapping in file_mappings {
let file = file_handles.get_mut(&mapping.file_index).ok_or_else(|| {
anyhow::anyhow!("File handle not found for index {}", mapping.file_index)
})?;
// Spawn keyboard input handler
let session_input = session.clone();
let shutdown_token_input = shutdown_token.clone();
tokio::spawn(async move {
loop {
// Check for cancellation
if shutdown_token_input.is_cancelled() {
break;
}

if event::poll(Duration::from_millis(100)).unwrap_or(false) {
if let Ok(Event::Key(key_event)) = event::read() {
match key_event.code {
KeyCode::Char('p') | KeyCode::Char('P') => {
match session_input.get_download_state() {
DownloadState::Paused => {
session_input.resume();
}
DownloadState::Downloading => {
session_input.pause();
}
DownloadState::Init => {
println!("\r\nCannot pause during initialization");
}
}
}
KeyCode::Char('q') | KeyCode::Char('Q') => {
graceful_shutdown();
}
KeyCode::Char('c')
if key_event.modifiers.contains(KeyModifiers::CONTROL) =>
{
graceful_shutdown();
}
_ => {}
}
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
});

// Seek to correct position in file
file.seek(SeekFrom::Start(mapping.file_offset as u64))
let mut hashset = std::collections::HashSet::new();
let mut pending_pieces: Vec<_> = Vec::new(); // Queue for pieces received while paused

while hashset.len() < torrent.piece_hashes.len() {
// Check for shutdown signal
if shutdown_token.is_cancelled() {
break;
}
// Process any pending pieces first if we're now downloading
if session.get_download_state() == DownloadState::Downloading && !pending_pieces.is_empty()
{
let pieces_to_process = std::mem::take(&mut pending_pieces);
for pr in pieces_to_process {
process_piece(
&pr,
&torrent,
&mut file_handles,
&mut hashset,
&total_downloaded,
)
.await?;
}
}

// Write the portion of the piece that belongs to this file
let piece_data = &pr.buf[piece_offset..piece_offset + mapping.length];
file.write_all(piece_data).await?;
// Use a timeout to periodically check if we should process pending pieces
let pr_result = tokio::time::timeout(
Duration::from_millis(100),
add_torrent_result.pr_rx.recv_async(),
)
.await;

piece_offset += mapping.length;
match pr_result {
Ok(Ok(pr)) => {
// If paused, queue the piece but don't process it yet
if session.get_download_state() != DownloadState::Downloading {
pending_pieces.push(pr);
continue;
}

trace!(
"Wrote {} bytes to file {} at offset {}",
mapping.length,
mapping.file_index,
mapping.file_offset
);
// Process piece immediately if downloading
process_piece(
&pr,
&torrent,
&mut file_handles,
&mut hashset,
&total_downloaded,
)
.await?;
}
Ok(Err(_)) => {
// Channel closed
break;
}
Err(_) => {
// Timeout - continue loop to check pending pieces
continue;
}
}
}

total_downloaded.fetch_add(pr.length as u64, std::sync::atomic::Ordering::Relaxed);
// Process any remaining pending pieces at the end
for pr in pending_pieces {
process_piece(
&pr,
&torrent,
&mut file_handles,
&mut hashset,
&total_downloaded,
)
.await?;
}

// Sync all files
for (_, file) in file_handles {
file.sync_all().await?;
}

// Restore terminal on completion
let _ = terminal::disable_raw_mode();
println!("\nDownload completed!");

Ok(())
}

async fn process_piece(
pr: &PieceResult,
torrent: &bit_rev::torrent::Torrent,
file_handles: &mut HashMap<usize, File>,
hashset: &mut std::collections::HashSet<u32>,
total_downloaded: &Arc<AtomicU64>,
) -> anyhow::Result<bool> {
hashset.insert(pr.index);

// Map piece to files and write data accordingly
let file_mappings = utils::map_piece_to_files(torrent, pr.index as usize);
let mut piece_offset = 0;

for mapping in file_mappings {
let file = file_handles.get_mut(&mapping.file_index).ok_or_else(|| {
anyhow::anyhow!("File handle not found for index {}", mapping.file_index)
})?;

// Seek to correct position in file
file.seek(SeekFrom::Start(mapping.file_offset as u64))
.await?;

// Write the portion of the piece that belongs to this file
let piece_data = &pr.buf[piece_offset..piece_offset + mapping.length];
file.write_all(piece_data).await?;

piece_offset += mapping.length;

trace!(
"Wrote {} bytes to file {} at offset {}",
mapping.length,
mapping.file_index,
mapping.file_offset
);
}

total_downloaded.fetch_add(pr.length as u64, std::sync::atomic::Ordering::Relaxed);
Ok(true)
}