From a03e8903490479152da03411f68a852ae0b94668 Mon Sep 17 00:00:00 2001 From: shimun Date: Sun, 5 May 2024 22:13:18 +0200 Subject: [PATCH] mqtt --- Cargo.lock | 28 +++++++++++ Cargo.toml | 2 + flake.nix | 2 +- src/main.rs | 36 ++++++++++---- src/mqtt.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 189 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 250edad..1e10fe9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -793,6 +793,8 @@ dependencies = [ "esp-wifi", "heapless 0.8.0", "mqttrust", + "rand", + "rand_core", "rust-mqtt", "smart-leds", "static_cell", @@ -1328,6 +1330,12 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "primeorder" version = "0.13.6" @@ -1394,6 +1402,26 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd7a31eed1591dcbc95d92ad7161908e72f4677f8fabf2a32ca49b4237cbf211" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + [[package]] name = "rand_core" version = "0.6.4" diff --git a/Cargo.toml b/Cargo.toml index d596f8a..5d370ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,8 @@ esp-println = { version = "0.9.1", features = ["esp32c3", "uart"] } esp-wifi = { version = "0.4.0", features = ["embassy-net", "esp32c3", "wifi"] } heapless = { version = "0.8.0", features = ["portable-atomic", "portable-atomic-unsafe-assume-single-core"] } mqttrust = "0.6.0" +rand = { version = "0.8.5", default-features = false, features = ["std_rng"] } +rand_core = "0.6.4" rust-mqtt = { version = "0.3.0", default-features = false } smart-leds = "0.4.0" static_cell = { version = "2.0.0", features = ["nightly"] } diff --git a/flake.nix b/flake.nix index c2b5417..4e74338 100644 --- a/flake.nix +++ b/flake.nix @@ -52,7 +52,7 @@ devShell = with pkgs; mkShell ({ RUST_SRC_PATH = "${fenix'.complete.rust-src}/lib/rustlib/src/rust/library"; - nativeBuildInputs = [ toolchain cargo-espmonitor cargo-espflash ] ++ defaultPackage.depsBuildBuild; + nativeBuildInputs = [ toolchain cargo-espmonitor cargo-espflash mosquitto ] ++ defaultPackage.depsBuildBuild; } // (lib.filterAttrs (name: _: lib.hasPrefix "CARGO_" name || lib.hasPrefix "RUST" name) defaultPackage)); } ); diff --git a/src/main.rs b/src/main.rs index 4cdbf49..75b102a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,11 +11,13 @@ extern crate alloc; use core::mem::{self, MaybeUninit}; +use alloc::borrow::Cow; use alloc::boxed::Box; +use alloc::collections::BTreeMap; +use alloc::string::{String, ToString}; use embassy_executor::Executor; -use embassy_net::{Config, DhcpConfig, Ipv4Address, Stack, StackResources}; -use embassy_sync::blocking_mutex::raw::NoopRawMutex; -use embassy_sync::blocking_mutex::NoopMutex; +use embassy_net::{Config, DhcpConfig, Stack, StackResources}; +use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; use embassy_sync::mutex::Mutex; use embassy_time::{Duration, Timer}; use esp_alloc::EspHeap; @@ -38,6 +40,8 @@ use esp_wifi::{initialize as initialize_wifi, EspWifiInitFor}; use smart_leds::{SmartLedsWrite, RGB8}; use static_cell::{make_static, StaticCell}; +use crate::mqtt::publish_data; + mod mqtt; #[global_allocator] @@ -70,6 +74,9 @@ impl MySmartLed } } +pub static DATA: Mutex, String>> = + Mutex::new(BTreeMap::new()); + #[embassy_executor::task] async fn blink(mut led: Box) { loop { @@ -98,7 +105,7 @@ async fn async_read( interval: Duration, ) -> Result { loop { - match (fun()) { + match fun() { Err(nb::Error::WouldBlock) => Timer::after(interval).await, Err(nb::Error::Other(e)) => return Err(e), Ok(val) => return Ok(val), @@ -142,6 +149,11 @@ async fn moisture( << 8 / (dry - submerged_in_water)) >> 8; esp_println::println!("moisture: {moisture}%, v_s: {milli_volt}"); + { + DATA.lock() + .await + .insert(Cow::from("moisture"), moisture.to_string()); + } Timer::after(Duration::from_secs(10) - warmup).await; } } @@ -163,7 +175,11 @@ async fn battery_monitor( // account for 50:50 voltage divider let v_bat = v_out * 2; println!("V_bat: {}", v_bat); - + { + DATA.lock() + .await + .insert(Cow::from("vbat"), v_bat.to_string()); + } Timer::after(Duration::from_secs(15)).await; } } @@ -175,7 +191,7 @@ fn main() -> ! { init_heap(); let peripherals = Peripherals::take(); - let mut system = peripherals.SYSTEM.split(); + let system = peripherals.SYSTEM.split(); let clocks = ClockControl::configure(system.clock_control, CpuClock::Clock160MHz).freeze(); let mut rtc = Rtc::new(peripherals.LPWR); @@ -195,7 +211,7 @@ fn main() -> ! { let mut rng = Rng::new(peripherals.RNG); - let mut seed = [0u8; 8]; // very random, very secure seed + let mut seed = [0u8; 32]; // very random, very secure seed rng.read(&mut seed).expect("random seed"); let wifi_init = initialize_wifi( @@ -222,7 +238,7 @@ fn main() -> ! { wifi_interface, config, make_static!(StackResources::<3>::new()), - u64::from_le_bytes(seed) + u64::from_le_bytes(seed[..8].try_into().unwrap()) )); embassy::init(&clocks, timer_group0); @@ -239,8 +255,7 @@ fn main() -> ! { let mut adc1_config = AdcConfig::new(); let pin = adc1_config.enable_pin(io.pins.gpio4.into_analog(), Attenuation::Attenuation11dB); - let mut vbat_in = - adc1_config.enable_pin(io.pins.gpio3.into_analog(), Attenuation::Attenuation11dB); + let vbat_in = adc1_config.enable_pin(io.pins.gpio3.into_analog(), Attenuation::Attenuation11dB); let moisture_sensor_suppy_pin = io.pins.gpio5.into_push_pull_output().degrade(); let adc1: &'static Mutex> = @@ -260,6 +275,7 @@ fn main() -> ! { .spawn(moisture(pin, adc1, moisture_sensor_suppy_pin.into())) .unwrap(); spawner.spawn(battery_monitor(vbat_in, adc1)).unwrap(); + spawner.spawn(publish_data(&stack, seed, Duration::from_secs(60))); }) } diff --git a/src/mqtt.rs b/src/mqtt.rs index d6bbb0a..8b6b018 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,11 +1,140 @@ use embassy_net::tcp::TcpSocket; -use embassy_time::Duration; +use embassy_net::{dns::Error as DnsError, tcp::ConnectError}; +use embassy_time::{Duration, Timer}; +use embedded_tls::{Aes128GcmSha256, NoVerify, TlsConfig, TlsConnection, TlsContext, TlsError}; +use esp_println::println; +use rand::rngs::StdRng; +use rand::{CryptoRng, RngCore, SeedableRng}; +use rust_mqtt::client::client::MqttClient; +use rust_mqtt::client::client_config::ClientConfig; +use rust_mqtt::packet::v5::reason_codes::ReasonCode; +use rust_mqtt::utils::rng_generator::CountingRng; -use crate::NetworkStack; +use crate::{NetworkStack, DATA}; -async fn send_message(stack: NetworkStack, message: &[u8]) { +#[derive(Debug)] +pub enum SendError { + Dns(DnsError), + NXDomain(&'static str), + Tls(TlsError), + Connect(ConnectError), + MqttReason(ReasonCode), +} + +macro_rules! from_impl { + ($err:ty, $var:ident) => { + impl From<$err> for SendError { + fn from(value: $err) -> Self { + Self::$var(value) + } + } + }; +} + +from_impl!(DnsError, Dns); +from_impl!(TlsError, Tls); +from_impl!(ConnectError, Connect); +from_impl!(ReasonCode, MqttReason); + +const MQTT_SERVER_HOSTNAME: &str = "mqtt.shimun.net"; +const MQTT_SERVER_PORT: u16 = 8883; + +pub async fn send_message( + stack: NetworkStack, + mut messages: impl Iterator, + mut rng: impl CryptoRng + RngCore, +) -> core::result::Result<(), SendError> { + let dns_resp = stack + .dns_query(MQTT_SERVER_HOSTNAME, embassy_net::dns::DnsQueryType::A) + .await + .map_err(SendError::Dns)?; let mut rx_buffer = [0; 4096]; let mut tx_buffer = [0; 4096]; let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); socket.set_timeout(Some(Duration::from_secs(10))); + let socket_addr = dns_resp + .into_iter() + .map(|addr| (addr, MQTT_SERVER_PORT)) + .next() + .ok_or(SendError::NXDomain(MQTT_SERVER_HOSTNAME))?; + + // establish TCP connection + socket.connect(socket_addr).await?; + + // seed mqtt rng from rng + let mut mqtt_config = ClientConfig::<5, _>::new( + rust_mqtt::client::client_config::MqttVersion::MQTTv5, + CountingRng(rng.next_u64()), + ); + + if let (Some(user), Some(pass)) = (option_env!("MQTT_USER"), option_env!("MQTT_PASSWORD")) { + mqtt_config.add_username(user); + mqtt_config.add_username(pass); + } + + // TLS layer + const TLS_BUF_LEN: usize = 1 << 12; + let mut tls_read_record_buffer = [0; TLS_BUF_LEN]; + let mut tls_write_record_buffer = [0; TLS_BUF_LEN]; + let mut tls = { + let config = TlsConfig::new() + .with_server_name(MQTT_SERVER_HOSTNAME) + .with_max_fragment_length(embedded_tls::MaxFragmentLength::Bits12); + + let mut tls = TlsConnection::new( + socket, + &mut tls_read_record_buffer, + &mut tls_write_record_buffer, + ); + + tls.open::<_, NoVerify>(TlsContext::::new(&config, &mut rng)) + .await?; + tls + }; + + const BUF_LEN: usize = 1 << 13; + let mut recv_buffer = [0; BUF_LEN]; + let mut buffer = [0; BUF_LEN]; + + // MQTT Layer + let mut mqtt_client = MqttClient::new( + tls, + &mut buffer, + BUF_LEN, + &mut recv_buffer, + BUF_LEN, + mqtt_config, + ); + mqtt_client.connect_to_broker().await?; + for (topic, message) in messages { + mqtt_client + .send_message( + topic, + message, + rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1, + true, + ) + .await?; + } + Ok(()) +} + +#[embassy_executor::task] +pub async fn publish_data(stack: NetworkStack, rng_seed: [u8; 32], interval: Duration) { + let mut rng = StdRng::from_seed(rng_seed); + loop { + Timer::after(interval).await; + let mut data = DATA.lock().await; + let res = send_message( + stack, + data.iter() + .map(|(topic, message)| (topic.as_ref(), message.as_bytes())), + &mut rng, + ) + .await; + if let Err(e) = res { + println!("Oh no: {e:?}"); + }; + data.clear(); + } }