Merge branch 'new_events'
Some checks are pending
continuous-integration/drone/push Build was killed

This commit is contained in:
shimunn
2019-05-23 19:42:34 +02:00
5 changed files with 175 additions and 176 deletions

View File

@@ -26,36 +26,40 @@ pub(crate) fn gen_events(
(Some(prev), Some(cur)) => {
if let (Some(prev_addr), Some(cur_addr)) = (prev.endpoint, cur.endpoint) {
if prev_addr != cur_addr {
listeners.roaming(&cur, prev_addr);
listeners.fire(Event::Roamed(&cur, prev_addr));
}
}
let timedout_now = || {
if let Some(shake) = cur.last_handshake {
if let Ok(el) = shake.elapsed() {
return el > timeout && el + poll_interval < timeout;
}
}
true
};
let timedout_now = || !cur.last_handshake.is_some();
let timedout_prev = || {
if let Some(shake) = prev.last_handshake {
if let Ok(el) = shake.elapsed().map(|el| el - poll_interval) {
return el > timeout && el < timeout + poll_interval;
}
}
true
};
let timedout_prev = || !prev.last_handshake.is_some();
match (timedout_prev(), timedout_now()) {
(false, true) => listeners.disconnected(&cur),
(true, false) => listeners.connected(&cur),
other => (),
match (
timedout_prev(),
timedout_now(),
prev.last_handshake.and_then(|p_shake| {
cur.last_handshake
.and_then(|c_shake| c_shake.duration_since(p_shake).ok())
}),
) {
(false, true, _) => listeners.fire(Event::Disconnected(&cur)),
(true, false, _) => listeners.fire(Event::Connected(&cur)),
other => {
//dbg!(other);
}
}
if prev.traffic != cur.traffic {
if let ((p_tx, p_rx), (c_tx, c_rx)) = (prev.traffic, cur.traffic) {
listeners.fire(Event::Transfered {
peer: &cur,
tx: c_tx - p_tx,
rx: c_rx - p_rx,
});
}
}
}
(None, Some(cur)) => listeners.added(&cur),
(Some(prev), None) => listeners.removed(&prev),
(None, Some(cur)) => listeners.fire(Event::Added(&cur)),
(Some(prev), None) => listeners.fire(Event::Removed(&prev)),
fail => {
println!("{:?}", fail);
unreachable!()

View File

@@ -1,4 +1,5 @@
use crate::Peer;
use number_prefix::{NumberPrefix, Prefixed, Standalone};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::Command;
@@ -6,51 +7,29 @@ use std::thread;
use std::time::SystemTime;
use threadpool::ThreadPool;
//#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum Event<'a> {
Added(&'a Peer),
Removed(&'a Peer),
Connected(&'a Peer),
Disconnected(&'a Peer),
Roamed(&'a Peer, SocketAddr),
Transfered { peer: &'a Peer, tx: u64, rx: u64 },
}
pub trait EventListener {
fn name(&self) -> &'static str;
fn added<'a>(&self, peer: &'a Peer) {
self.connected(peer);
}
fn connected<'a>(&self, peer: &'a Peer);
fn disconnected<'a>(&self, peer: &'a Peer);
fn removed<'a>(&self, peer: &'a Peer) {
self.disconnected(peer)
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr);
fn fire<'a>(&self, event: Event<'a>);
}
impl EventListener for Vec<Box<EventListener>> {
fn name(&self) -> &'static str {
"List of Listeners"
}
fn added<'a>(&self, peer: &'a Peer) {
if cfg!(feature = "addrem") || cfg!(test) {
self.iter().for_each(|l| l.added(&peer));
}
}
fn connected<'a>(&self, peer: &'a Peer) {
self.iter().for_each(|l| l.connected(&peer));
}
fn disconnected<'a>(&self, peer: &'a Peer) {
self.iter().for_each(|l| l.disconnected(&peer));
}
fn removed<'a>(&self, peer: &'a Peer) {
if cfg!(feature = "addrem") || cfg!(test) {
self.iter().for_each(|l| l.removed(&peer));
}
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr) {
self.iter().for_each(|l| l.roaming(&peer, previous_addr));
fn fire<'a>(&self, event: Event<'a>) {
self.iter().for_each(|l| l.fire(event.clone()));
}
}
@@ -60,29 +39,34 @@ impl EventListener for LogListener {
fn name(&self) -> &'static str {
"Log"
}
fn connected<'a>(&self, peer: &'a Peer) {
println!("{} connected!", peer.key);
}
fn disconnected<'a>(&self, peer: &'a Peer) {
println!("{} disconnected!", peer.key);
}
fn fire<'a>(&self, event: Event<'a>) {
match event {
Event::Connected(peer) => println!("{} connected!", peer.key),
Event::Disconnected(peer) => println!("{} disconnected!", peer.key),
Event::Added(peer) => println!("{} added!", peer.key),
Event::Removed(peer) => println!("{} removed!", peer.key),
Event::Roamed(peer, previous_addr) => println!(
"{} roamed {} -> {}!",
peer.key,
previous_addr,
peer.endpoint.unwrap()
),
Event::Transfered { peer, tx, rx } => {
let prefix = |cnt: u64| match NumberPrefix::binary(cnt as f64) {
Standalone(bytes) => format!("{}B", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
};
println!(
"{} transfered {} up, {} down",
peer.key,
prefix(tx),
prefix(rx)
);
}
fn added<'a>(&self, peer: &'a Peer) {
println!("{} added!", peer.key);
}
fn removed<'a>(&self, peer: &'a Peer) {
println!("{} removed!", peer.key);
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr) {
println!(
"{} roamed {} -> {}!",
peer.key,
previous_addr,
peer.endpoint.unwrap()
);
_ => unimplemented!(),
}
}
}
@@ -93,7 +77,7 @@ pub struct ScriptListener {
impl ScriptListener {
pub fn new(script: PathBuf) -> ScriptListener {
let pool = ThreadPool::new(4);
let pool = ThreadPool::new(8);
ScriptListener { script, pool }
}
@@ -149,27 +133,26 @@ impl EventListener for ScriptListener {
fn name(&self) -> &'static str {
"Script"
}
fn connected<'a>(&self, peer: &'a Peer) {
self.call_sub(vec!["connected", &self.peer_props(peer)]);
}
fn disconnected<'a>(&self, peer: &'a Peer) {
self.call_sub(vec!["disconnected", &self.peer_props(peer)]);
}
fn added<'a>(&self, peer: &'a Peer) {
self.call_sub(vec!["added", &self.peer_props(peer)]);
}
fn removed<'a>(&self, peer: &'a Peer) {
self.call_sub(vec!["removed", &self.peer_props(peer)]);
}
fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr) {
self.call_sub(vec![
"roaming",
&self.peer_props(peer),
&previous_addr.to_string(),
]);
fn fire<'a>(&self, event: Event<'a>) {
match event {
Event::Connected(peer) => self.call_sub(vec!["connected", &self.peer_props(peer)]),
Event::Disconnected(peer) => {
self.call_sub(vec!["disconnected", &self.peer_props(peer)])
}
Event::Added(peer) => self.call_sub(vec!["added", &self.peer_props(peer)]),
Event::Removed(peer) => self.call_sub(vec!["removed", &self.peer_props(peer)]),
Event::Roamed(peer, previous_addr) => self.call_sub(vec![
"roaming",
&self.peer_props(peer),
&previous_addr.to_string(),
]),
Event::Transfered { peer, tx, rx } => self.call_sub(vec![
"transfered",
&self.peer_props(peer),
&tx.to_string(),
&rx.to_string(),
]),
_ => unimplemented!(),
}
}
}