esp32c3/src/mqtt.rs
2024-05-05 22:13:18 +02:00

141 lines
4.0 KiB
Rust

use embassy_net::tcp::TcpSocket;
use embassy_net::{dns::Error as DnsError, tcp::ConnectError};
use embassy_time::{Duration, Timer};
use embedded_tls::{Aes128GcmSha256, NoVerify, TlsConfig, TlsConnection, TlsContext, TlsError};
use esp_println::println;
use rand::rngs::StdRng;
use rand::{CryptoRng, RngCore, SeedableRng};
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::ClientConfig;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::utils::rng_generator::CountingRng;
use crate::{NetworkStack, DATA};
#[derive(Debug)]
pub enum SendError {
Dns(DnsError),
NXDomain(&'static str),
Tls(TlsError),
Connect(ConnectError),
MqttReason(ReasonCode),
}
macro_rules! from_impl {
($err:ty, $var:ident) => {
impl From<$err> for SendError {
fn from(value: $err) -> Self {
Self::$var(value)
}
}
};
}
from_impl!(DnsError, Dns);
from_impl!(TlsError, Tls);
from_impl!(ConnectError, Connect);
from_impl!(ReasonCode, MqttReason);
const MQTT_SERVER_HOSTNAME: &str = "mqtt.shimun.net";
const MQTT_SERVER_PORT: u16 = 8883;
pub async fn send_message(
stack: NetworkStack,
mut messages: impl Iterator<Item = (&str, &[u8])>,
mut rng: impl CryptoRng + RngCore,
) -> core::result::Result<(), SendError> {
let dns_resp = stack
.dns_query(MQTT_SERVER_HOSTNAME, embassy_net::dns::DnsQueryType::A)
.await
.map_err(SendError::Dns)?;
let mut rx_buffer = [0; 4096];
let mut tx_buffer = [0; 4096];
let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
socket.set_timeout(Some(Duration::from_secs(10)));
let socket_addr = dns_resp
.into_iter()
.map(|addr| (addr, MQTT_SERVER_PORT))
.next()
.ok_or(SendError::NXDomain(MQTT_SERVER_HOSTNAME))?;
// establish TCP connection
socket.connect(socket_addr).await?;
// seed mqtt rng from rng
let mut mqtt_config = ClientConfig::<5, _>::new(
rust_mqtt::client::client_config::MqttVersion::MQTTv5,
CountingRng(rng.next_u64()),
);
if let (Some(user), Some(pass)) = (option_env!("MQTT_USER"), option_env!("MQTT_PASSWORD")) {
mqtt_config.add_username(user);
mqtt_config.add_username(pass);
}
// TLS layer
const TLS_BUF_LEN: usize = 1 << 12;
let mut tls_read_record_buffer = [0; TLS_BUF_LEN];
let mut tls_write_record_buffer = [0; TLS_BUF_LEN];
let mut tls = {
let config = TlsConfig::new()
.with_server_name(MQTT_SERVER_HOSTNAME)
.with_max_fragment_length(embedded_tls::MaxFragmentLength::Bits12);
let mut tls = TlsConnection::new(
socket,
&mut tls_read_record_buffer,
&mut tls_write_record_buffer,
);
tls.open::<_, NoVerify>(TlsContext::<Aes128GcmSha256, _>::new(&config, &mut rng))
.await?;
tls
};
const BUF_LEN: usize = 1 << 13;
let mut recv_buffer = [0; BUF_LEN];
let mut buffer = [0; BUF_LEN];
// MQTT Layer
let mut mqtt_client = MqttClient::new(
tls,
&mut buffer,
BUF_LEN,
&mut recv_buffer,
BUF_LEN,
mqtt_config,
);
mqtt_client.connect_to_broker().await?;
for (topic, message) in messages {
mqtt_client
.send_message(
topic,
message,
rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1,
true,
)
.await?;
}
Ok(())
}
#[embassy_executor::task]
pub async fn publish_data(stack: NetworkStack, rng_seed: [u8; 32], interval: Duration) {
let mut rng = StdRng::from_seed(rng_seed);
loop {
Timer::after(interval).await;
let mut data = DATA.lock().await;
let res = send_message(
stack,
data.iter()
.map(|(topic, message)| (topic.as_ref(), message.as_bytes())),
&mut rng,
)
.await;
if let Err(e) = res {
println!("Oh no: {e:?}");
};
data.clear();
}
}