Skip to content

Commit 07384a2

Browse files
committed
feat: state management and pause option
1 parent 3e4052f commit 07384a2

File tree

6 files changed

+411
-52
lines changed

6 files changed

+411
-52
lines changed

Cargo.lock

Lines changed: 61 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/bit_rev/src/peer_connection.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
peer::PeerAddr,
2121
peer_state::{PeerState, PeerStates},
2222
protocol::{Protocol, ProtocolError},
23-
session::PieceWork,
23+
session::{DownloadState, PieceWork},
2424
utils,
2525
};
2626

@@ -226,6 +226,7 @@ pub struct PeerHandler {
226226
requests_sem: Semaphore,
227227
peer: PeerAddr,
228228
torrent_downloaded_state: Arc<TorrentDownloadedState>,
229+
download_state: Arc<Mutex<DownloadState>>,
229230
}
230231

231232
impl PeerHandler {
@@ -237,6 +238,7 @@ impl PeerHandler {
237238
peers_state: Arc<PeerStates>,
238239
//pieces: Vec<PieceWork>,
239240
torrent_downloaded_state: Arc<TorrentDownloadedState>,
241+
download_state: Arc<Mutex<DownloadState>>,
240242
) -> Self {
241243
Self {
242244
unchoke_notify: unchoked_notify,
@@ -249,6 +251,7 @@ impl PeerHandler {
249251
peer_writer_tx,
250252
peer,
251253
torrent_downloaded_state,
254+
download_state,
252255
//torrent_downloaded_state: Arc::new(TorrentDownloadedState {
253256
//
254257
// semaphore: Semaphore::new(1),
@@ -278,6 +281,14 @@ impl PeerHandler {
278281
}
279282
}
280283

284+
pub fn get_download_state(&self) -> DownloadState {
285+
*self.download_state.lock().unwrap()
286+
}
287+
288+
pub fn is_downloading(&self) -> bool {
289+
self.get_download_state() == DownloadState::Downloading
290+
}
291+
281292
// The job of this is to request chunks and also to keep peer alive.
282293
// The moment this ends, the peer is disconnected.
283294
pub async fn task_peer_chunk_requester(&self) -> Result<(), anyhow::Error> {
@@ -307,6 +318,11 @@ impl PeerHandler {
307318
};
308319

309320
loop {
321+
// Wait while not downloading
322+
while !self.is_downloading() {
323+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
324+
}
325+
310326
update_interest(self, true)?;
311327

312328
trace!("waiting for unchoke");
@@ -335,6 +351,14 @@ impl PeerHandler {
335351

336352
let mut offset: u32 = 0;
337353
while offset < piece.length {
354+
// Check download state before requesting each block
355+
if !self.is_downloading() {
356+
// Wait while not downloading
357+
while !self.is_downloading() {
358+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
359+
}
360+
}
361+
338362
loop {
339363
match (tokio::time::timeout(
340364
Duration::from_secs(5),

crates/bit_rev/src/session.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, Mutex};
22

33
use crate::file::{self, TorrentMeta};
44
use crate::peer_state::PeerStates;
@@ -8,6 +8,13 @@ use crate::utils;
88
use dashmap::DashMap;
99
use flume::Receiver;
1010

11+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12+
pub enum DownloadState {
13+
Init,
14+
Downloading,
15+
Paused,
16+
}
17+
1118
#[derive(Debug, Clone, Copy)]
1219
pub struct PieceWork {
1320
pub index: u32,
@@ -31,6 +38,7 @@ pub struct State {
3138

3239
pub struct Session {
3340
pub streams: DashMap<[u8; 20], TrackerPeers>,
41+
pub download_state: Arc<Mutex<DownloadState>>,
3442
}
3543

3644
pub struct AddTorrentOptions {
@@ -70,9 +78,56 @@ impl Session {
7078
pub fn new() -> Self {
7179
Self {
7280
streams: DashMap::new(),
81+
download_state: Arc::new(Mutex::new(DownloadState::Init)),
82+
}
83+
}
84+
85+
pub fn start_downloading(&self) {
86+
{
87+
let mut state = self.download_state.lock().unwrap();
88+
*state = DownloadState::Downloading;
89+
}
90+
for entry in self.streams.iter() {
91+
entry.value().set_download_state(DownloadState::Downloading);
7392
}
7493
}
7594

95+
pub fn pause(&self) {
96+
{
97+
let mut state = self.download_state.lock().unwrap();
98+
*state = DownloadState::Paused;
99+
}
100+
for entry in self.streams.iter() {
101+
entry.value().set_download_state(DownloadState::Paused);
102+
}
103+
}
104+
105+
pub fn resume(&self) {
106+
{
107+
let mut state = self.download_state.lock().unwrap();
108+
*state = DownloadState::Downloading;
109+
}
110+
for entry in self.streams.iter() {
111+
entry.value().set_download_state(DownloadState::Downloading);
112+
}
113+
}
114+
115+
pub fn get_download_state(&self) -> DownloadState {
116+
*self.download_state.lock().unwrap()
117+
}
118+
119+
pub fn is_paused(&self) -> bool {
120+
self.get_download_state() == DownloadState::Paused
121+
}
122+
123+
pub fn is_downloading(&self) -> bool {
124+
self.get_download_state() == DownloadState::Downloading
125+
}
126+
127+
pub fn is_init(&self) -> bool {
128+
self.get_download_state() == DownloadState::Init
129+
}
130+
76131
pub async fn add_torrent(
77132
&self,
78133
add_torrent: AddTorrentOptions,
@@ -91,6 +146,7 @@ impl Session {
91146
peer_states,
92147
have_broadcast.clone(),
93148
pr_rx.clone(),
149+
self.download_state.clone(),
94150
);
95151

96152
let pieces_of_work = (0..(torrent.piece_hashes.len()) as u64)
@@ -128,6 +184,9 @@ impl Session {
128184
self.streams
129185
.insert(torrent.info_hash, tracker_stream.clone());
130186

187+
// Start downloading
188+
self.start_downloading();
189+
131190
Ok(AddTorrentResult {
132191
torrent,
133192
torrent_meta,

0 commit comments

Comments
 (0)