use crate::listener::*; use crate::*; use std::collections::{HashMap, HashSet}; use std::env; use std::fmt; use std::io::prelude::*; use std::io::{BufRead, BufReader, Error, ErrorKind, Result}; use std::net::SocketAddr; use std::os::unix::net::UnixStream; use std::path::PathBuf; use std::rc::Rc; use std::{thread, time}; pub(crate) fn gen_events( state: &HashMap, prev: &HashMap, listeners: &Vec>, timeout: time::Duration, poll_interval: time::Duration, ) { let side_by_side = { state .keys() .map(String::as_ref) .chain(prev.keys().map(String::as_ref)) .collect::>() .iter() .map(|p| (p.to_owned(), (prev.get(*p), state.get(*p)))) .collect::, Option<&Peer>)>>() }; for (_id, (prev, cur)) in side_by_side { match (prev, cur) { (Some(prev), Some(cur)) => { let timedout = |peer: &Peer| match peer.last_handshake_rel() { Some(shake) if shake > timeout && shake + poll_interval < timeout => true, Some(_) => false, _ => true, }; if !timedout(&prev) && timedout(&cur) { listeners.disconnected(&cur); continue; } if timedout(&prev) && !timedout(&cur) { listeners.connected(&cur); } if prev.endpoint != cur.endpoint { if let (Some(prev_addr), Some(_)) = (prev.endpoint, cur.endpoint) { listeners.roaming(&cur, prev_addr); } } } (None, Some(cur)) => listeners.added(&cur), (Some(prev), None) => listeners.removed(&prev), (None, Some(_cur)) => (), (Some(_prev), None) => (), fail => { println!("{:?}", fail); unreachable!() } } } } #[cfg(test)] mod test { use super::*; use crate::listener::*; use crate::*; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::env; use std::fmt; use std::io::prelude::*; use std::io::{BufRead, BufReader, Error, ErrorKind, Result}; use std::net::SocketAddr; use std::os::unix::net::UnixStream; use std::path::PathBuf; use std::{thread, time}; struct TestListener { calls: Rc>>, } impl TestListener { fn new() -> TestListener { Self::from(Rc::new(RefCell::new(vec![]))) } fn from(calls: Rc>>) -> TestListener { TestListener { calls: calls } } } impl EventListener for TestListener { fn added<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("add {}", peer.public_key)); } fn connected<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("con {}", peer.public_key)); } fn disconnected<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("dis {}", peer.public_key)); } fn removed<'a>(&self, peer: &'a Peer) { self.calls .borrow_mut() .push(format!("rem {}", peer.public_key)); } fn roaming<'a>(&self, peer: &'a Peer, previous_addr: SocketAddr) { self.calls .borrow_mut() .push(format!("rom {}", peer.public_key)); } } fn listeners() -> (Vec>, Rc>>) { let l = TestListener::new(); let calls = l.calls.clone(); (vec![Box::new(l)], calls) } #[test] fn test_setup() { let (listeners, calls) = listeners(); let peer = peer(); listeners.connected(&peer); assert_eq!( vec![["con", &peer.public_key].join(" ")], calls.borrow().clone() ); } fn b2h(b: &str) -> String { hex::encode(base64::decode(b).unwrap()) } fn peer() -> Peer { let bkey = "HhRgEL2xsnEIqThSTUKLGaTXusorM1MFdjSSYvzBynY="; let key = b2h(bkey); Peer::from_kv(&vec![ ("public_key".to_string(), key.clone()), /*( "last_handshake_time_nsec".to_string(), (1000 * 1000 * 1).to_string(), ),*/ ("endpoint".to_string(), "1.1.1.1:22222".to_string()), ]) .unwrap() } #[test] fn connected() { let peer = peer(); let mut peer_cur = peer.clone(); let mut prev: HashMap = HashMap::new(); let mut cur: HashMap = HashMap::new(); cur.insert(peer_cur.public_key.clone(), peer_cur.clone()); let (listener, calls) = listeners(); let interval = time::Duration::from_secs(3); gen_events( &cur, &prev, &listener, time::Duration::from_secs(3), interval, ); assert_eq!( vec![["add", &peer_cur.public_key].join(" ")], calls.borrow().clone() ); gen_events( &cur, &cur, &listener, time::Duration::from_secs(3), interval, ); //Shouldn't gen any new events assert!(calls.borrow().len() == 1); let (listener, calls) = listeners(); gen_events( &prev, &cur, &listener, time::Duration::from_secs(10), interval, ); assert_eq!( vec![["rem", &peer.public_key].join(" ")], calls.borrow().clone() ); calls.borrow_mut().clear(); let mut peer_prev = peer.clone(); peer_prev.endpoint = Some("2.2.2.2:33333".parse::().unwrap()); peer_prev.last_handshake = Some(time::Duration::from_secs(1000)); prev.insert(peer_prev.public_key.clone(), peer_prev.clone()); gen_events( &prev, &cur, &listener, time::Duration::from_secs(10), interval, ); assert!(calls .borrow() .clone() .contains(&["rom", &peer.public_key].join(" "))); calls.borrow_mut().clear(); let mut peer_prev = peer.clone(); peer_cur.last_handshake = Some(time::Duration::from_secs(5)); cur.insert(peer_cur.public_key.clone(), peer_cur.clone()); prev.insert(peer_prev.public_key.clone(), peer_prev.clone()); gen_events( &cur, &prev, &listener, time::Duration::from_secs(10), interval, ); assert_eq!( vec![["con", &peer.public_key].join(" ")], calls.borrow().clone() ); calls.borrow_mut().clear(); //Other way around should be a disconnect gen_events( &prev, &cur, &listener, time::Duration::from_secs(3), interval, ); assert_eq!( vec![["dis", &peer.public_key].join(" ")], calls.borrow().clone() ); } }