use crate::listener::*; use crate::*; use std::collections::{HashMap, HashSet}; use std::time; pub(crate) fn gen_events( state: &HashMap, prev: &HashMap, listeners: &Vec>, timeout: time::Duration, poll_interval: time::Duration, ) { let side_by_side = { state .keys() .chain(prev.keys()) .collect::>() .iter() .map(|p| (*p, (prev.get(*p), state.get(*p)))) .collect::, Option<&Peer>)>>() }; let d_zero = Duration::from_secs(0); let h_ms = Duration::from_millis(100); for (_id, (prev, cur)) in side_by_side { match (prev, cur) { (Some(prev), Some(cur)) => { if let (Some(prev_addr), Some(cur_addr)) = (prev.endpoint, cur.endpoint) { if prev_addr != cur_addr { listeners.roaming(&cur, prev_addr); } } let timedout_now = || { if let Some(shake) = cur.last_handshake { if let Ok(el) = shake.elapsed() { return el > timeout && el + poll_interval < timeout; } } true }; let timedout_prev = || { if let Some(shake) = prev.last_handshake { if let Ok(el) = shake.elapsed().map(|el| el - poll_interval) { return el > timeout && el < timeout + poll_interval; } } true }; match (timedout_prev(), timedout_now()) { (false, true) => listeners.disconnected(&cur), (true, false) => listeners.connected(&cur), other => (), } } (None, Some(cur)) => listeners.added(&cur), (Some(prev), None) => listeners.removed(&prev), fail => { println!("{:?}", fail); unreachable!() } } } } /* #[cfg(test)] mod test { use super::*; use crate::listener::*; use crate::*; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::env; use std::fmt; use std::io::prelude::*; use std::io::{BufRead, BufReader, Error, ErrorKind, Result}; use std::net::SocketAddr; use std::os::unix::net::UnixStream; use std::path::PathBuf; use std::{thread, time}; struct TestListener { calls: Rc>>, } impl TestListener { fn new() -> TestListener { Self::from(Rc::new(RefCell::new(vec![]))) } fn from(calls: Rc>>) -> TestListener { TestListener { calls: calls } } } impl EventListener for TestListener { fn added<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("add {}", peer.key)); } fn connected<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("con {}", peer.key)); } fn disconnected<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("dis {}", peer.key)); } fn removed<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("rem {}", peer.key)); } fn roaming<'a>(&self, peer: &'a Peer, _previous_addr: SocketAddr) { self.calls .borrow_mut() .push(format!("rom {}", peer.key)); } } fn listeners() -> (Vec>, Rc>>) { let l = TestListener::new(); let calls = l.calls.clone(); (vec![Box::new(l)], calls) } #[test] fn test_setup() { let (listeners, calls) = listeners(); let peer = peer(); listeners.connected(&peer); assert_eq!( vec![["con", &peer.key].join(" ")], calls.borrow().clone() ); } fn b2h(b: &str) -> String { hex::encode(base64::decode(b).unwrap()) } fn peer() -> Peer { let bkey = "HhRgEL2xsnEIqThSTUKLGaTXusorM1MFdjSSYvzBynY="; let key = b2h(bkey); Peer::from_kv(&vec![ ("key".to_string(), key.clone()), /*( "last_handshake_time_nsec".to_string(), (1000 * 1000 * 1).to_string(), ),*/ ("endpoint".to_string(), "1.1.1.1:22222".to_string()), ]) .unwrap() } #[test] fn connected() { let peer = peer(); let mut peer_cur = peer.clone(); let mut prev: HashMap = HashMap::new(); let mut cur: HashMap = HashMap::new(); cur.insert(peer_cur.key.clone(), peer_cur.clone()); let (listener, calls) = listeners(); let interval = time::Duration::from_secs(3); gen_events( &cur, &prev, &listener, time::Duration::from_secs(3), interval, ); assert_eq!( vec![["add", &peer_cur.key].join(" ")], calls.borrow().clone() ); gen_events( &cur, &cur, &listener, time::Duration::from_secs(3), interval, ); //Shouldn't gen any new events assert!(calls.borrow().len() == 1); let (listener, calls) = listeners(); gen_events( &prev, &cur, &listener, time::Duration::from_secs(10), interval, ); assert_eq!( vec![["rem", &peer.key].join(" ")], calls.borrow().clone() ); calls.borrow_mut().clear(); let mut peer_prev = peer.clone(); peer_prev.endpoint = Some("2.2.2.2:33333".parse::().unwrap()); peer_prev.last_handshake = Some(time::Duration::from_secs(1000)); prev.insert(peer_prev.key.clone(), peer_prev.clone()); gen_events( &prev, &cur, &listener, time::Duration::from_secs(10), interval, ); assert!(calls .borrow() .clone() .contains(&["rom", &peer.key].join(" "))); calls.borrow_mut().clear(); let peer_prev = peer.clone(); peer_cur.last_handshake = Some(time::Duration::from_secs(5)); cur.insert(peer_cur.key.clone(), peer_cur.clone()); prev.insert(peer_prev.key.clone(), peer_prev.clone()); gen_events( &cur, &prev, &listener, time::Duration::from_secs(10), interval, ); assert_eq!( vec![["con", &peer.key].join(" ")], calls.borrow().clone() ); calls.borrow_mut().clear(); //Other way around should be a disconnect gen_events( &prev, &cur, &listener, time::Duration::from_secs(3), interval, ); assert_eq!( vec![["dis", &peer.key].join(" ")], calls.borrow().clone() ); } }*/