plain mqtt works

This commit is contained in:
shimun 2024-05-10 10:52:22 +02:00
parent 14d8f7f683
commit 8dfbf88db8
Signed by: shimun
GPG Key ID: E0420647856EA39E
4 changed files with 122 additions and 40 deletions

27
Cargo.lock generated
View File

@ -185,6 +185,12 @@ dependencies = [
"inout",
]
[[package]]
name = "const-decoder"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5241cd7938b1b415942e943ea96f615953d500b50347b505b0b507080bad5a6f"
[[package]]
name = "const-oid"
version = "0.9.6"
@ -376,7 +382,7 @@ dependencies = [
"atomic-pool",
"document-features",
"embassy-net-driver",
"embassy-sync",
"embassy-sync 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"embassy-time",
"embedded-io-async",
"embedded-nal-async",
@ -407,6 +413,18 @@ dependencies = [
"heapless 0.8.0",
]
[[package]]
name = "embassy-sync"
version = "0.5.0"
source = "git+https://github.com/embassy-rs/embassy.git?rev=4b4777e#4b4777e6bb813dbde3f8ec05ff81cadbcb41bee0"
dependencies = [
"cfg-if",
"critical-section",
"embedded-io-async",
"futures-util",
"heapless 0.8.0",
]
[[package]]
name = "embassy-time"
version = "0.3.0"
@ -632,7 +650,7 @@ dependencies = [
"critical-section",
"document-features",
"embassy-futures",
"embassy-sync",
"embassy-sync 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"embassy-time-driver",
"embedded-can",
"embedded-dma",
@ -726,7 +744,7 @@ dependencies = [
"critical-section",
"embassy-futures",
"embassy-net-driver",
"embassy-sync",
"embassy-sync 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"embedded-io",
"embedded-io-async",
"enumset",
@ -780,9 +798,10 @@ dependencies = [
name = "esp32c3"
version = "0.1.0"
dependencies = [
"const-decoder",
"embassy-executor",
"embassy-net",
"embassy-sync",
"embassy-sync 0.5.0 (git+https://github.com/embassy-rs/embassy.git?rev=4b4777e)",
"embassy-time",
"embedded-io-async",
"embedded-tls",

View File

@ -7,9 +7,10 @@ authors = [ "Marvin Drescher <m@sparv.in>" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
const-decoder = "0.3.0"
embassy-executor = { version = "0.5.0", features = ["nightly", "integrated-timers", "arch-riscv32", "executor-thread"] }
embassy-net = { version = "0.4.0", features = ["dhcpv4", "dhcpv4-hostname", "dns", "medium-ip", "proto-ipv4", "proto-ipv6", "tcp", "udp"] }
embassy-sync = "0.5.0"
embassy-sync = { git = "https://github.com/embassy-rs/embassy.git", rev = "4b4777e" }
embassy-time = { version = "0.3.0" }
embedded-io-async = "0.6.1"
embedded-tls = { version = "0.17.0", default-features = false, features = ["embedded-io-adapters"] }
@ -24,7 +25,7 @@ log = "0.4.21"
mqttrust = "0.6.0"
rand = { version = "0.8.5", default-features = false, features = ["std_rng"] }
rand_core = "0.6.4"
rust-mqtt = { version = "0.3.0", default-features = false }
rust-mqtt = { version = "0.3.0", default-features = false, features = ["tls"] }
smart-leds = "0.4.0"
static_cell = { version = "2.0.0", features = ["nightly"] }
@ -33,6 +34,9 @@ opt-level = 3
[profile.dev.package.embedded-tls]
opt-level = 3
[features]
tls = []
[profile.release]
opt-level = "s"
lto = true

View File

@ -19,6 +19,7 @@ use embassy_executor::Executor;
use embassy_net::{Config, DhcpConfig, Stack, StackResources};
use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
use embassy_sync::mutex::Mutex;
use embassy_sync::once_lock::OnceLock;
use embassy_time::{Duration, Timer};
use esp_alloc::EspHeap;
use esp_backtrace as _;
@ -154,9 +155,12 @@ async fn moisture(
supply.set_low();
// Vout = Dout * Vmax / Dmax
let milli_volt = adc_voltage(pin_value, Attenuation::Attenuation11dB) as u32;
let moisture = ((milli_volt.checked_sub(submerged_in_water).unwrap_or(0))
<< 8 / (dry - submerged_in_water))
>> 8;
let moisture = {
let delta = milli_volt.checked_sub(submerged_in_water).unwrap_or(0);
let delta_dry = dry - submerged_in_water;
let dryness = (delta << 12 / delta_dry) * 100 >> 12;
dryness
};
info!("moisture: {moisture}%, v_s: {milli_volt}");
{
DATA.lock()
@ -189,7 +193,7 @@ async fn battery_monitor(
.await
.insert(Cow::from("vbat"), v_bat.to_string());
}
Timer::after(Duration::from_secs(15)).await;
Timer::after(Duration::from_secs(1)).await;
}
}
static EXECUTOR: StaticCell<Executor> = StaticCell::new();
@ -258,7 +262,7 @@ fn main() -> ! {
let rmt_buffer = smartLedBuffer!(1);
let mut led = SmartLedsAdapter::new(rmt.channel0, io.pins.gpio7, rmt_buffer, &clocks);
led.set_color(RGB8::new(0, 255, 255));
led.set_color(RGB8::new(0, 0, 0));
let led: Box<dyn MySmartLed> = Box::new(led);
@ -328,6 +332,8 @@ async fn wifi_connection(
}
}
pub static HAS_IP_ADDRESS: OnceLock<()> = OnceLock::new();
#[embassy_executor::task]
async fn ip_task(stack: NetworkStack) {
loop {
@ -340,6 +346,7 @@ async fn ip_task(stack: NetworkStack) {
loop {
if let Some(config) = stack.config_v4() {
info!("Got IP: {}", config.address);
HAS_IP_ADDRESS.get_or_init(|| ());
break;
}
Timer::after(Duration::from_millis(500)).await;

View File

@ -1,6 +1,8 @@
use core::fmt::Debug;
use embassy_net::tcp::TcpSocket;
use embassy_net::{dns::Error as DnsError, tcp::ConnectError};
use embassy_time::{with_timeout, Duration, TimeoutError, Timer};
use embassy_time::{with_timeout, Duration, Instant, TimeoutError, Timer};
use embedded_tls::{Aes128GcmSha256, NoVerify, TlsConfig, TlsConnection, TlsContext, TlsError};
use esp_backtrace as _;
use esp_println::println;
@ -10,9 +12,10 @@ use rand::{CryptoRng, RngCore, SeedableRng};
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::tests;
use rust_mqtt::utils::rng_generator::CountingRng;
use crate::{NetworkStack, DATA};
use crate::{NetworkStack, DATA, HAS_IP_ADDRESS};
#[derive(Debug)]
pub enum SendError {
@ -24,6 +27,30 @@ pub enum SendError {
Timeout(TimeoutError),
}
#[derive(Debug)]
pub struct ErrorLocation<E: Debug> {
pub error: E,
pub location: Option<(&'static str, u32)>,
}
impl<E: Debug> ErrorLocation<E> {
pub fn from_result<T>(
result: Result<T, E>,
location: (&'static str, u32),
) -> Result<T, ErrorLocation<E>> {
result.map_err(|error| Self {
error,
location: Some(location),
})
}
}
macro_rules! with_line {
($result:expr) => {
ErrorLocation::from_result($result, (file!(), line!()))
};
}
macro_rules! from_impl {
($err:ty, $var:ident) => {
impl From<$err> for SendError {
@ -41,7 +68,10 @@ from_impl!(ReasonCode, MqttReason);
from_impl!(TimeoutError, Timeout);
const MQTT_SERVER_HOSTNAME: &str = "mqtt.shimun.net";
#[cfg(feature = "tls")]
const MQTT_SERVER_PORT: u16 = 8883;
#[cfg(not(feature = "tls"))]
const MQTT_SERVER_PORT: u16 = 1883;
pub async fn send_message(
stack: NetworkStack,
@ -53,15 +83,18 @@ pub async fn send_message(
mut messages: impl Iterator<Item = (&str, &[u8])>,
mut rng: impl CryptoRng + RngCore,
) -> core::result::Result<(), SendError> {
let begin = Instant::now();
let dns_resp = stack
.dns_query(MQTT_SERVER_HOSTNAME, embassy_net::dns::DnsQueryType::A)
.await
.map_err(SendError::Dns)?;
let after_dns = Instant::now();
const TCP_BUFLEN: usize = 2048 << 2;
let mut rx_buffer = [0; TCP_BUFLEN];
let mut tx_buffer = [0; TCP_BUFLEN];
let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
socket.set_timeout(Some(Duration::from_secs(30)));
socket.set_keep_alive(Some(Duration::from_secs(5)));
let socket_addr = dns_resp
.into_iter()
.map(|addr| (addr, MQTT_SERVER_PORT))
@ -72,7 +105,8 @@ pub async fn send_message(
// establish TCP connection
socket.connect(socket_addr).await?;
trace!("connected");
let after_tcp = Instant::now();
info!("connected to {socket_addr:?}");
// seed mqtt rng from rng
let mut mqtt_config = ClientConfig::<5, _>::new(
@ -82,16 +116,18 @@ pub async fn send_message(
if let (Some(user), Some(pass)) = (option_env!("MQTT_USER"), option_env!("MQTT_PASSWORD")) {
mqtt_config.add_username(user);
mqtt_config.add_username(pass);
mqtt_config.add_password(pass);
info!("{user}:{pass}");
}
// TLS layer
const TLS_BUF_LEN: usize = 4096;
const TLS_BUF_LEN: usize = 1 << 12;
let mut tls_read_record_buffer = [0; TLS_BUF_LEN];
let mut tls_write_record_buffer = [0; TLS_BUF_LEN];
#[cfg(feature = "tls")]
let mut tls = {
let config = TlsConfig::new();
let mut tls = TlsConnection::new(
socket,
&mut tls_read_record_buffer,
@ -100,19 +136,28 @@ pub async fn send_message(
tls.open::<_, NoVerify>(TlsContext::<Aes128GcmSha256, _>::new(&config, &mut rng))
.await?;
tls.flush().await?;
tls
};
debug!("tls handshake succeeded");
#[cfg(feature = "tls")]
let mut mqtt_backend = tls;
#[cfg(not(feature = "tls"))]
let mut mqtt_backend = socket;
let after_tls = Instant::now();
#[cfg(feature = "tls")]
info!(
"tls handshake succeeded: +{}ms",
(after_tls - after_tcp).as_millis()
);
const BUF_LEN: usize = 1024;
mqtt_config.max_packet_size = 100;
const BUF_LEN: usize = 80;
let mut recv_buffer = [0; BUF_LEN];
let mut buffer = [0; BUF_LEN];
mqtt_config.add_client_id("esp32c3");
mqtt_config.max_packet_size = (BUF_LEN - 128) as _;
// MQTT Layer
let mut mqtt_client = MqttClient::new(
tls,
mqtt_backend,
&mut buffer,
BUF_LEN,
&mut recv_buffer,
@ -120,6 +165,7 @@ pub async fn send_message(
mqtt_config,
);
mqtt_client.connect_to_broker().await?;
let after_mqtt = Instant::now();
info!("connected to broker");
for (topic, message) in messages {
mqtt_client
@ -131,33 +177,39 @@ pub async fn send_message(
)
.await?;
}
let after_mqtt_pub = Instant::now();
info!(
"{:?} {:?} {:?} {:?} {:?}",
after_dns - begin,
after_tcp - begin,
after_tls - begin,
after_mqtt - begin,
after_mqtt_pub - begin
);
Ok(())
}
with_timeout(Duration::from_secs(60), inner(stack, messages, rng)).await?
with_timeout(Duration::from_secs(120), inner(stack, messages, rng)).await?
}
#[embassy_executor::task]
pub async fn publish_data(stack: NetworkStack, rng_seed: [u8; 32], interval: Duration) {
let mut rng = StdRng::from_seed(rng_seed);
HAS_IP_ADDRESS.get().await;
loop {
Timer::after(interval / 10).await;
if stack.is_config_up() {
break;
{
let mut data = DATA.lock().await;
let res = send_message(
stack,
data.iter()
.map(|(topic, message)| (topic.as_ref(), message.as_bytes())),
&mut rng,
)
.await;
if let Err(e) = res {
error!("Oh no: {e:?}");
};
data.clear();
}
}
loop {
let mut data = DATA.lock().await;
let res = send_message(
stack,
data.iter()
.map(|(topic, message)| (topic.as_ref(), message.as_bytes())),
&mut rng,
)
.await;
if let Err(e) = res {
error!("Oh no: {e:?}");
};
data.clear();
Timer::after(interval).await;
}
}