5 Commits

Author SHA1 Message Date
shimunn
0301a53ac9 implemented listeners for transfered event [CI SKIP] 2019-04-04 21:18:21 +02:00
shimunn
ceb3d7b77d Merge branch 'timeout' into new_events 2019-04-04 20:39:00 +02:00
shimunn
f3eeac404c added transfered event [CI SKIP] 2019-04-04 20:34:54 +02:00
shimunn
a2cda16977 handle timeouts based on the existence of last_handshake 2019-04-04 18:59:17 +02:00
shimunn
ba247655bd new event system [CI SKIP] 2019-04-03 17:54:53 +02:00
5 changed files with 100 additions and 102 deletions

View File

@@ -24,10 +24,13 @@ case "$EVENT" in
PREV_ENDPOINT=$8 PREV_ENDPOINT=$8
;; ;;
"added") "added")
;; ;;
"removed") "removed")
;; ;;
"transfered")
*) *)
exit 1 exit 1
;; ;;

View File

@@ -132,6 +132,11 @@ name = "libc"
version = "0.2.47" version = "0.2.47"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "number_prefix"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "0.4.27" version = "0.4.27"
@@ -251,6 +256,7 @@ dependencies = [
"base64 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "base64 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_builder 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "derive_builder 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"number_prefix 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"structopt 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "structopt 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
"structopt-derive 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "structopt-derive 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -292,6 +298,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" "checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
"checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" "checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
"checksum libc 0.2.47 (registry+https://github.com/rust-lang/crates.io-index)" = "48450664a984b25d5b479554c29cc04e3150c97aa4c01da5604a2d4ed9151476" "checksum libc 0.2.47 (registry+https://github.com/rust-lang/crates.io-index)" = "48450664a984b25d5b479554c29cc04e3150c97aa4c01da5604a2d4ed9151476"
"checksum number_prefix 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a"
"checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915" "checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915"
"checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c" "checksum quote 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "53fa22a1994bd0f9372d7a816207d8a2677ad0325b073f5c5332760f0fb62b5c"
"checksum redox_syscall 0.1.50 (registry+https://github.com/rust-lang/crates.io-index)" = "52ee9a534dc1301776eff45b4fa92d2c39b1d8c3d3357e6eb593e0d795506fc2" "checksum redox_syscall 0.1.50 (registry+https://github.com/rust-lang/crates.io-index)" = "52ee9a534dc1301776eff45b4fa92d2c39b1d8c3d3357e6eb593e0d795506fc2"

View File

@@ -11,6 +11,7 @@ time = "0.1.42"
structopt = "0.2.14" structopt = "0.2.14"
structopt-derive = "0.2.14" structopt-derive = "0.2.14"
derive_builder = "0.7.1" derive_builder = "0.7.1"
number_prefix = "0.3.0"
[profile.release] [profile.release]
lto = true lto = true

View File

@@ -26,36 +26,40 @@ pub(crate) fn gen_events(
(Some(prev), Some(cur)) => { (Some(prev), Some(cur)) => {
if let (Some(prev_addr), Some(cur_addr)) = (prev.endpoint, cur.endpoint) { if let (Some(prev_addr), Some(cur_addr)) = (prev.endpoint, cur.endpoint) {
if prev_addr != cur_addr { if prev_addr != cur_addr {
listeners.roaming(&cur, prev_addr); listeners.fire(Event::Roamed(&cur, prev_addr));
} }
} }
let timedout_now = || { let timedout_now = || !cur.last_handshake.is_some();
if let Some(shake) = cur.last_handshake {
if let Ok(el) = shake.elapsed() {
return el > timeout && el + poll_interval < timeout;
}
}
true
};
let timedout_prev = || { let timedout_prev = || !prev.last_handshake.is_some();
if let Some(shake) = prev.last_handshake {
if let Ok(el) = shake.elapsed().map(|el| el - poll_interval) {
return el > timeout && el < timeout + poll_interval;
}
}
true
};
match (timedout_prev(), timedout_now()) { match (
(false, true) => listeners.disconnected(&cur), timedout_prev(),
(true, false) => listeners.connected(&cur), timedout_now(),
other => (), prev.last_handshake.and_then(|p_shake| {
cur.last_handshake
.and_then(|c_shake| c_shake.duration_since(p_shake).ok())
}),
) {
(false, true, _) => listeners.fire(Event::Disconnected(&cur)),
(true, false, _) => listeners.fire(Event::Connected(&cur)),
other => {
//dbg!(other);
} }
} }
(None, Some(cur)) => listeners.added(&cur), if prev.traffic != cur.traffic {
(Some(prev), None) => listeners.removed(&prev), if let ((p_tx, p_rx), (c_tx, c_rx)) = (prev.traffic, cur.traffic) {
listeners.fire(Event::Transfered {
peer: &cur,
tx: c_tx - p_tx,
rx: c_rx - p_rx,
});
}
}
}
(None, Some(cur)) => listeners.fire(Event::Added(&cur)),
(Some(prev), None) => listeners.fire(Event::Removed(&prev)),
fail => { fail => {
println!("{:?}", fail); println!("{:?}", fail);
unreachable!() unreachable!()

View File

@@ -1,55 +1,34 @@
use crate::Peer; use crate::Peer;
use number_prefix::{NumberPrefix, Prefixed, Standalone};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Command; use std::process::Command;
use std::thread; use std::thread;
use std::time::SystemTime; use std::time::SystemTime;
//#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum Event<'a> {
Added(&'a Peer),
Removed(&'a Peer),
Connected(&'a Peer),
Disconnected(&'a Peer),
Roamed(&'a Peer, SocketAddr),
Transfered { peer: &'a Peer, tx: u64, rx: u64 },
}
pub trait EventListener { pub trait EventListener {
fn name(&self) -> &'static str; fn name(&self) -> &'static str;
fn added<'a>(&self, peer: &'a Peer) { fn fire<'a>(&self, event: Event<'a>);
self.connected(peer);
}
fn connected<'a>(&self, peer: &'a Peer);
fn disconnected<'a>(&self, peer: &'a Peer);
fn removed<'a>(&self, peer: &'a Peer) {
self.disconnected(peer)
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr);
} }
impl EventListener for Vec<Box<EventListener>> { impl EventListener for Vec<Box<EventListener>> {
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
"List of Listeners" "List of Listeners"
} }
fn fire<'a>(&self, event: Event<'a>) {
fn added<'a>(&self, peer: &'a Peer) { self.iter().for_each(|l| l.fire(event.clone()));
if cfg!(feature = "addrem") || cfg!(test) {
self.iter().for_each(|l| l.added(&peer));
}
}
fn connected<'a>(&self, peer: &'a Peer) {
self.iter().for_each(|l| l.connected(&peer));
}
fn disconnected<'a>(&self, peer: &'a Peer) {
self.iter().for_each(|l| l.disconnected(&peer));
}
fn removed<'a>(&self, peer: &'a Peer) {
if cfg!(feature = "addrem") || cfg!(test) {
self.iter().for_each(|l| l.removed(&peer));
}
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr) {
self.iter().for_each(|l| l.roaming(&peer, previous_addr));
} }
} }
@@ -59,30 +38,35 @@ impl EventListener for LogListener {
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
"Log" "Log"
} }
fn connected<'a>(&self, peer: &'a Peer) {
println!("{} connected!", peer.key);
}
fn disconnected<'a>(&self, peer: &'a Peer) { fn fire<'a>(&self, event: Event<'a>) {
println!("{} disconnected!", peer.key); match event {
} Event::Connected(peer) => println!("{} connected!", peer.key),
Event::Disconnected(peer) => println!("{} disconnected!", peer.key),
fn added<'a>(&self, peer: &'a Peer) { Event::Added(peer) => println!("{} added!", peer.key),
println!("{} added!", peer.key); Event::Removed(peer) => println!("{} removed!", peer.key),
} Event::Roamed(peer, previous_addr) => println!(
fn removed<'a>(&self, peer: &'a Peer) {
println!("{} removed!", peer.key);
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr) {
println!(
"{} roamed {} -> {}!", "{} roamed {} -> {}!",
peer.key, peer.key,
previous_addr, previous_addr,
peer.endpoint.unwrap() peer.endpoint.unwrap()
),
Event::Transfered { peer, tx, rx } => {
let prefix = |cnt: u64| match NumberPrefix::binary(cnt as f64) {
Standalone(bytes) => format!("{}B", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
};
println!(
"{} transfered {} up, {} down",
peer.key,
prefix(tx),
prefix(rx)
); );
} }
_ => unimplemented!(),
}
}
} }
pub struct ScriptListener { pub struct ScriptListener {
@@ -146,27 +130,26 @@ impl EventListener for ScriptListener {
fn name(&self) -> &'static str { fn name(&self) -> &'static str {
"Script" "Script"
} }
fn connected<'a>(&self, peer: &'a Peer) { fn fire<'a>(&self, event: Event<'a>) {
self.call_sub(vec!["connected", &self.peer_props(peer)]); match event {
Event::Connected(peer) => self.call_sub(vec!["connected", &self.peer_props(peer)]),
Event::Disconnected(peer) => {
self.call_sub(vec!["disconnected", &self.peer_props(peer)])
} }
Event::Added(peer) => self.call_sub(vec!["added", &self.peer_props(peer)]),
fn disconnected<'a>(&self, peer: &'a Peer) { Event::Removed(peer) => self.call_sub(vec!["removed", &self.peer_props(peer)]),
self.call_sub(vec!["disconnected", &self.peer_props(peer)]); Event::Roamed(peer, previous_addr) => self.call_sub(vec![
}
fn added<'a>(&self, peer: &'a Peer) {
self.call_sub(vec!["added", &self.peer_props(peer)]);
}
fn removed<'a>(&self, peer: &'a Peer) {
self.call_sub(vec!["removed", &self.peer_props(peer)]);
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr) {
self.call_sub(vec![
"roaming", "roaming",
&self.peer_props(peer), &self.peer_props(peer),
&previous_addr.to_string(), &previous_addr.to_string(),
]); ]),
Event::Transfered { peer, tx, rx } => self.call_sub(vec![
"transfered",
&self.peer_props(peer),
&tx.to_string(),
&rx.to_string(),
]),
_ => unimplemented!(),
}
} }
} }