From 8dfbf88db8fe15b2bd136df3de2283328efc05d1 Mon Sep 17 00:00:00 2001 From: shimun Date: Fri, 10 May 2024 10:52:22 +0200 Subject: [PATCH] plain mqtt works --- Cargo.lock | 27 +++++++++++-- Cargo.toml | 8 +++- src/main.rs | 17 +++++--- src/mqtt.rs | 110 ++++++++++++++++++++++++++++++++++++++-------------- 4 files changed, 122 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f73de9..e4e9750 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,6 +185,12 @@ dependencies = [ "inout", ] +[[package]] +name = "const-decoder" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5241cd7938b1b415942e943ea96f615953d500b50347b505b0b507080bad5a6f" + [[package]] name = "const-oid" version = "0.9.6" @@ -376,7 +382,7 @@ dependencies = [ "atomic-pool", "document-features", "embassy-net-driver", - "embassy-sync", + "embassy-sync 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "embassy-time", "embedded-io-async", "embedded-nal-async", @@ -407,6 +413,18 @@ dependencies = [ "heapless 0.8.0", ] +[[package]] +name = "embassy-sync" +version = "0.5.0" +source = "git+https://github.com/embassy-rs/embassy.git?rev=4b4777e#4b4777e6bb813dbde3f8ec05ff81cadbcb41bee0" +dependencies = [ + "cfg-if", + "critical-section", + "embedded-io-async", + "futures-util", + "heapless 0.8.0", +] + [[package]] name = "embassy-time" version = "0.3.0" @@ -632,7 +650,7 @@ dependencies = [ "critical-section", "document-features", "embassy-futures", - "embassy-sync", + "embassy-sync 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "embassy-time-driver", "embedded-can", "embedded-dma", @@ -726,7 +744,7 @@ dependencies = [ "critical-section", "embassy-futures", "embassy-net-driver", - "embassy-sync", + "embassy-sync 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "embedded-io", "embedded-io-async", "enumset", @@ -780,9 +798,10 @@ dependencies = [ name = "esp32c3" version = "0.1.0" dependencies = [ + "const-decoder", "embassy-executor", "embassy-net", - "embassy-sync", + "embassy-sync 0.5.0 (git+https://github.com/embassy-rs/embassy.git?rev=4b4777e)", "embassy-time", "embedded-io-async", "embedded-tls", diff --git a/Cargo.toml b/Cargo.toml index 7fa71b8..c43692e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,10 @@ authors = [ "Marvin Drescher " ] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +const-decoder = "0.3.0" embassy-executor = { version = "0.5.0", features = ["nightly", "integrated-timers", "arch-riscv32", "executor-thread"] } embassy-net = { version = "0.4.0", features = ["dhcpv4", "dhcpv4-hostname", "dns", "medium-ip", "proto-ipv4", "proto-ipv6", "tcp", "udp"] } -embassy-sync = "0.5.0" +embassy-sync = { git = "https://github.com/embassy-rs/embassy.git", rev = "4b4777e" } embassy-time = { version = "0.3.0" } embedded-io-async = "0.6.1" embedded-tls = { version = "0.17.0", default-features = false, features = ["embedded-io-adapters"] } @@ -24,7 +25,7 @@ log = "0.4.21" mqttrust = "0.6.0" rand = { version = "0.8.5", default-features = false, features = ["std_rng"] } rand_core = "0.6.4" -rust-mqtt = { version = "0.3.0", default-features = false } +rust-mqtt = { version = "0.3.0", default-features = false, features = ["tls"] } smart-leds = "0.4.0" static_cell = { version = "2.0.0", features = ["nightly"] } @@ -33,6 +34,9 @@ opt-level = 3 [profile.dev.package.embedded-tls] opt-level = 3 +[features] +tls = [] + [profile.release] opt-level = "s" lto = true diff --git a/src/main.rs b/src/main.rs index dee6f8f..2468c2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,7 @@ use embassy_executor::Executor; use embassy_net::{Config, DhcpConfig, Stack, StackResources}; use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; use embassy_sync::mutex::Mutex; +use embassy_sync::once_lock::OnceLock; use embassy_time::{Duration, Timer}; use esp_alloc::EspHeap; use esp_backtrace as _; @@ -154,9 +155,12 @@ async fn moisture( supply.set_low(); // Vout = Dout * Vmax / Dmax let milli_volt = adc_voltage(pin_value, Attenuation::Attenuation11dB) as u32; - let moisture = ((milli_volt.checked_sub(submerged_in_water).unwrap_or(0)) - << 8 / (dry - submerged_in_water)) - >> 8; + let moisture = { + let delta = milli_volt.checked_sub(submerged_in_water).unwrap_or(0); + let delta_dry = dry - submerged_in_water; + let dryness = (delta << 12 / delta_dry) * 100 >> 12; + dryness + }; info!("moisture: {moisture}%, v_s: {milli_volt}"); { DATA.lock() @@ -189,7 +193,7 @@ async fn battery_monitor( .await .insert(Cow::from("vbat"), v_bat.to_string()); } - Timer::after(Duration::from_secs(15)).await; + Timer::after(Duration::from_secs(1)).await; } } static EXECUTOR: StaticCell = StaticCell::new(); @@ -258,7 +262,7 @@ fn main() -> ! { let rmt_buffer = smartLedBuffer!(1); let mut led = SmartLedsAdapter::new(rmt.channel0, io.pins.gpio7, rmt_buffer, &clocks); - led.set_color(RGB8::new(0, 255, 255)); + led.set_color(RGB8::new(0, 0, 0)); let led: Box = Box::new(led); @@ -328,6 +332,8 @@ async fn wifi_connection( } } +pub static HAS_IP_ADDRESS: OnceLock<()> = OnceLock::new(); + #[embassy_executor::task] async fn ip_task(stack: NetworkStack) { loop { @@ -340,6 +346,7 @@ async fn ip_task(stack: NetworkStack) { loop { if let Some(config) = stack.config_v4() { info!("Got IP: {}", config.address); + HAS_IP_ADDRESS.get_or_init(|| ()); break; } Timer::after(Duration::from_millis(500)).await; diff --git a/src/mqtt.rs b/src/mqtt.rs index 9802695..61e0ab5 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,6 +1,8 @@ +use core::fmt::Debug; + use embassy_net::tcp::TcpSocket; use embassy_net::{dns::Error as DnsError, tcp::ConnectError}; -use embassy_time::{with_timeout, Duration, TimeoutError, Timer}; +use embassy_time::{with_timeout, Duration, Instant, TimeoutError, Timer}; use embedded_tls::{Aes128GcmSha256, NoVerify, TlsConfig, TlsConnection, TlsContext, TlsError}; use esp_backtrace as _; use esp_println::println; @@ -10,9 +12,10 @@ use rand::{CryptoRng, RngCore, SeedableRng}; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::ClientConfig; use rust_mqtt::packet::v5::reason_codes::ReasonCode; +use rust_mqtt::tests; use rust_mqtt::utils::rng_generator::CountingRng; -use crate::{NetworkStack, DATA}; +use crate::{NetworkStack, DATA, HAS_IP_ADDRESS}; #[derive(Debug)] pub enum SendError { @@ -24,6 +27,30 @@ pub enum SendError { Timeout(TimeoutError), } +#[derive(Debug)] +pub struct ErrorLocation { + pub error: E, + pub location: Option<(&'static str, u32)>, +} + +impl ErrorLocation { + pub fn from_result( + result: Result, + location: (&'static str, u32), + ) -> Result> { + result.map_err(|error| Self { + error, + location: Some(location), + }) + } +} + +macro_rules! with_line { + ($result:expr) => { + ErrorLocation::from_result($result, (file!(), line!())) + }; +} + macro_rules! from_impl { ($err:ty, $var:ident) => { impl From<$err> for SendError { @@ -41,7 +68,10 @@ from_impl!(ReasonCode, MqttReason); from_impl!(TimeoutError, Timeout); const MQTT_SERVER_HOSTNAME: &str = "mqtt.shimun.net"; +#[cfg(feature = "tls")] const MQTT_SERVER_PORT: u16 = 8883; +#[cfg(not(feature = "tls"))] +const MQTT_SERVER_PORT: u16 = 1883; pub async fn send_message( stack: NetworkStack, @@ -53,15 +83,18 @@ pub async fn send_message( mut messages: impl Iterator, mut rng: impl CryptoRng + RngCore, ) -> core::result::Result<(), SendError> { + let begin = Instant::now(); let dns_resp = stack .dns_query(MQTT_SERVER_HOSTNAME, embassy_net::dns::DnsQueryType::A) .await .map_err(SendError::Dns)?; + let after_dns = Instant::now(); const TCP_BUFLEN: usize = 2048 << 2; let mut rx_buffer = [0; TCP_BUFLEN]; let mut tx_buffer = [0; TCP_BUFLEN]; let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); socket.set_timeout(Some(Duration::from_secs(30))); + socket.set_keep_alive(Some(Duration::from_secs(5))); let socket_addr = dns_resp .into_iter() .map(|addr| (addr, MQTT_SERVER_PORT)) @@ -72,7 +105,8 @@ pub async fn send_message( // establish TCP connection socket.connect(socket_addr).await?; - trace!("connected"); + let after_tcp = Instant::now(); + info!("connected to {socket_addr:?}"); // seed mqtt rng from rng let mut mqtt_config = ClientConfig::<5, _>::new( @@ -82,16 +116,18 @@ pub async fn send_message( if let (Some(user), Some(pass)) = (option_env!("MQTT_USER"), option_env!("MQTT_PASSWORD")) { mqtt_config.add_username(user); - mqtt_config.add_username(pass); + mqtt_config.add_password(pass); + info!("{user}:{pass}"); } // TLS layer - const TLS_BUF_LEN: usize = 4096; + const TLS_BUF_LEN: usize = 1 << 12; let mut tls_read_record_buffer = [0; TLS_BUF_LEN]; let mut tls_write_record_buffer = [0; TLS_BUF_LEN]; + + #[cfg(feature = "tls")] let mut tls = { let config = TlsConfig::new(); - let mut tls = TlsConnection::new( socket, &mut tls_read_record_buffer, @@ -100,19 +136,28 @@ pub async fn send_message( tls.open::<_, NoVerify>(TlsContext::::new(&config, &mut rng)) .await?; + tls.flush().await?; tls }; - debug!("tls handshake succeeded"); + #[cfg(feature = "tls")] + let mut mqtt_backend = tls; + #[cfg(not(feature = "tls"))] + let mut mqtt_backend = socket; + let after_tls = Instant::now(); + #[cfg(feature = "tls")] + info!( + "tls handshake succeeded: +{}ms", + (after_tls - after_tcp).as_millis() + ); - const BUF_LEN: usize = 1024; + mqtt_config.max_packet_size = 100; + const BUF_LEN: usize = 80; let mut recv_buffer = [0; BUF_LEN]; let mut buffer = [0; BUF_LEN]; - mqtt_config.add_client_id("esp32c3"); - mqtt_config.max_packet_size = (BUF_LEN - 128) as _; // MQTT Layer let mut mqtt_client = MqttClient::new( - tls, + mqtt_backend, &mut buffer, BUF_LEN, &mut recv_buffer, @@ -120,6 +165,7 @@ pub async fn send_message( mqtt_config, ); mqtt_client.connect_to_broker().await?; + let after_mqtt = Instant::now(); info!("connected to broker"); for (topic, message) in messages { mqtt_client @@ -131,33 +177,39 @@ pub async fn send_message( ) .await?; } + let after_mqtt_pub = Instant::now(); + info!( + "{:?} {:?} {:?} {:?} {:?}", + after_dns - begin, + after_tcp - begin, + after_tls - begin, + after_mqtt - begin, + after_mqtt_pub - begin + ); Ok(()) } - with_timeout(Duration::from_secs(60), inner(stack, messages, rng)).await? + with_timeout(Duration::from_secs(120), inner(stack, messages, rng)).await? } #[embassy_executor::task] pub async fn publish_data(stack: NetworkStack, rng_seed: [u8; 32], interval: Duration) { let mut rng = StdRng::from_seed(rng_seed); + HAS_IP_ADDRESS.get().await; loop { - Timer::after(interval / 10).await; - if stack.is_config_up() { - break; + { + let mut data = DATA.lock().await; + let res = send_message( + stack, + data.iter() + .map(|(topic, message)| (topic.as_ref(), message.as_bytes())), + &mut rng, + ) + .await; + if let Err(e) = res { + error!("Oh no: {e:?}"); + }; + data.clear(); } - } - loop { - let mut data = DATA.lock().await; - let res = send_message( - stack, - data.iter() - .map(|(topic, message)| (topic.as_ref(), message.as_bytes())), - &mut rng, - ) - .await; - if let Err(e) = res { - error!("Oh no: {e:?}"); - }; - data.clear(); Timer::after(interval).await; } }