esp32c3/src/mqtt.rs

232 lines
7.0 KiB
Rust

use core::fmt::Debug;
use embassy_net::tcp::TcpSocket;
use embassy_net::{dns::Error as DnsError, tcp::ConnectError};
use embassy_time::{with_timeout, Duration, Instant, TimeoutError, Timer};
use embedded_tls::{
Aes128GcmSha256, NoVerify, TlsConfig, TlsConnection, TlsContext, TlsError, UnsecureProvider,
TLS_RECORD_OVERHEAD,
};
use esp_backtrace as _;
use log::{debug, error, info};
use rand::rngs::StdRng;
use rand::{CryptoRng, RngCore, SeedableRng};
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::utils::rng_generator::CountingRng;
use crate::{NetworkStack, WifiHandle, DATA};
#[derive(Debug)]
pub enum SendError {
Dns(DnsError),
NXDomain(&'static str),
Tls(TlsError),
Connect(ConnectError),
MqttConnect(ReasonCode),
MqttPublish(ReasonCode),
Timeout(TimeoutError),
}
#[derive(Debug)]
pub struct ErrorLocation<E: Debug> {
pub error: E,
pub location: Option<(&'static str, u32)>,
}
impl<E: Debug> ErrorLocation<E> {
pub fn from_result<T>(
result: Result<T, E>,
location: (&'static str, u32),
) -> Result<T, ErrorLocation<E>> {
result.map_err(|error| Self {
error,
location: Some(location),
})
}
}
macro_rules! with_line {
($result:expr) => {
ErrorLocation::from_result($result, (file!(), line!()))
};
}
macro_rules! from_impl {
($err:ty, $var:ident) => {
impl From<$err> for SendError {
fn from(value: $err) -> Self {
Self::$var(value)
}
}
};
}
from_impl!(DnsError, Dns);
from_impl!(TlsError, Tls);
from_impl!(ConnectError, Connect);
from_impl!(ReasonCode, MqttConnect);
from_impl!(TimeoutError, Timeout);
const MQTT_SERVER_HOSTNAME: &str = "mqtt.shimun.net";
#[cfg(feature = "tls")]
const MQTT_SERVER_PORT: u16 = 8883;
#[cfg(not(feature = "tls"))]
const MQTT_SERVER_PORT: u16 = 1883;
pub async fn send_message(
stack: NetworkStack,
messages: impl Iterator<Item = (&str, &[u8])>,
rng: impl CryptoRng + RngCore,
) -> core::result::Result<(), SendError> {
async fn inner(
stack: NetworkStack,
messages: impl Iterator<Item = (&str, &[u8])>,
mut rng: impl CryptoRng + RngCore,
) -> core::result::Result<(), SendError> {
let begin = Instant::now();
let dns_resp = stack
.dns_query(MQTT_SERVER_HOSTNAME, embassy_net::dns::DnsQueryType::A)
.await?;
let after_dns = Instant::now();
const TCP_BUFLEN: usize = 2048 << 2;
let mut rx_buffer = [0; TCP_BUFLEN];
let mut tx_buffer = [0; TCP_BUFLEN];
let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
socket.set_timeout(Some(Duration::from_secs(30)));
socket.set_keep_alive(Some(Duration::from_secs(5)));
let socket_addr = dns_resp
.into_iter()
.map(|addr| (addr, MQTT_SERVER_PORT))
.next()
.ok_or(SendError::NXDomain(MQTT_SERVER_HOSTNAME))?;
debug!("got address {socket_addr:?}");
// establish TCP connection
socket.connect(socket_addr).await?;
let after_tcp = Instant::now();
debug!("connected to {socket_addr:?}");
// seed mqtt rng from rng
let mut mqtt_config = ClientConfig::<5, _>::new(
rust_mqtt::client::client_config::MqttVersion::MQTTv5,
CountingRng(rng.next_u64()),
);
if let (Some(user), Some(pass)) = (option_env!("MQTT_USER"), option_env!("MQTT_PASSWORD")) {
mqtt_config.add_username(user);
mqtt_config.add_password(pass);
debug!("{user}:{pass}");
}
// TLS layer
const TLS_BUF_LEN: usize = (1 << 11) + TLS_RECORD_OVERHEAD;
let mut tls_read_record_buffer = [0; TLS_BUF_LEN];
let mut tls_write_record_buffer = [0; TLS_BUF_LEN];
#[cfg(feature = "tls")]
let tls = {
let mut config = TlsConfig::new();
#[cfg(feature = "tls-sni")]
config
.with_max_fragment_length(embedded_tls::MaxFragmentLength::Bits11)
.with_server_name(MQTT_SERVER_HOSTNAME);
let mut tls = TlsConnection::new(
socket,
&mut tls_read_record_buffer,
&mut tls_write_record_buffer,
);
tls.open(TlsContext::<UnsecureProvider<Aes128GcmSha256, _>>::new(
&config,
UnsecureProvider::new(&mut rng),
))
.await?;
tls.flush().await?;
tls
};
#[cfg(feature = "tls")]
let mqtt_backend = tls;
#[cfg(not(feature = "tls"))]
let mut mqtt_backend = socket;
let after_tls = Instant::now();
#[cfg(feature = "tls")]
debug!(
"tls handshake succeeded: +{}ms",
(after_tls - after_tcp).as_millis()
);
mqtt_config.max_packet_size = 100;
const BUF_LEN: usize = 80;
let mut recv_buffer = [0; BUF_LEN];
let mut buffer = [0; BUF_LEN];
// MQTT Layer
let mut mqtt_client = MqttClient::new(
mqtt_backend,
&mut buffer,
BUF_LEN,
&mut recv_buffer,
BUF_LEN,
mqtt_config,
);
mqtt_client.connect_to_broker().await?;
let after_mqtt = Instant::now();
debug!("connected to broker");
for (topic, message) in messages {
mqtt_client
.send_message(
topic,
message,
rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1,
true,
)
.await
.map_err(SendError::MqttPublish)?;
}
let after_mqtt_pub = Instant::now();
info!(
"{:?} {:?} {:?} {:?} {:?}",
after_dns - begin,
after_tcp - begin,
after_tls - begin,
after_mqtt - begin,
after_mqtt_pub - begin
);
Ok(())
}
with_timeout(Duration::from_secs(120), inner(stack, messages, rng)).await?
}
#[embassy_executor::task]
pub async fn publish_data(
stack: NetworkStack,
rng_seed: [u8; 32],
interval: Duration,
wifi: WifiHandle<'static>,
) {
let mut rng = StdRng::from_seed(rng_seed);
loop {
{
let wifi_guard = wifi.configured().await;
Timer::after(Duration::from_millis(10)).await;
let mut data = DATA.lock().await;
let res = send_message(
stack,
data.iter()
.map(|(topic, message)| (topic.as_ref(), message.as_bytes())),
&mut rng,
)
.await;
if let Err(e) = res {
error!("Oh no: {e:?}");
};
drop(wifi_guard);
data.clear();
}
Timer::after(interval).await;
}
}