1#ifndef Espressif_MQTT_Client_h
2#define Espressif_MQTT_Client_h
7#if THINGSBOARD_USE_ESP_MQTT
13#include <mqtt_client.h>
14#include <esp_crt_bundle.h>
20constexpr int MQTT_FAILURE_MESSAGE_ID = -1;
21constexpr char MQTT_DATA_EXCEEDS_BUFFER[] =
"Received amount of data (%u) is bigger than current buffer size (%u), increase accordingly";
22#if THINGSBOARD_ENABLE_DEBUG
23constexpr char RECEIVED_MQTT_EVENT[] =
"Handling received mqtt event: (%s)";
24constexpr char UPDATING_CONFIGURATION[] =
"Updated configuration after inital connection with response: (%s)";
25constexpr char OVERRIDING_DEFAULT_CRT_BUNDLE[] =
"Overriding default CRT bundle with response: (%s)";
34template <
typename Logger = DefaultLogger>
38 Espressif_MQTT_Client() =
default;
40 ~Espressif_MQTT_Client()
override {
41 (void)esp_mqtt_client_destroy(m_mqtt_client);
47 Espressif_MQTT_Client(Espressif_MQTT_Client
const & other) =
delete;
52 void operator=(Espressif_MQTT_Client
const & other) =
delete;
63 bool set_server_certificate(
char const * server_certificate_pem) {
66#if ESP_IDF_VERSION_MAJOR < 5
67 m_mqtt_configuration.cert_pem = server_certificate_pem;
69 m_mqtt_configuration.broker.verification.certificate = server_certificate_pem;
71 return update_configuration();
84 bool set_server_crt_bundle(
uint8_t const * x509_bundle =
nullptr,
size_t const & bundle_size = 0U) {
85 esp_err_t (*crt_bundle_attach)(
void * conf) =
nullptr;
87 crt_bundle_attach = arduino_esp_crt_bundle_attach;
89 crt_bundle_attach = esp_crt_bundle_attach;
92#if ESP_IDF_VERSION_MAJOR < 5
93 m_mqtt_configuration.crt_bundle_attach = crt_bundle_attach;
95 m_mqtt_configuration.broker.verification.crt_bundle_attach = crt_bundle_attach;
98 if (x509_bundle !=
nullptr) {
99 esp_err_t error = ESP_OK;
101 arduino_esp_crt_bundle_set(x509_bundle);
103#if (ESP_IDF_VERSION_MAJOR == 4 && ESP_IDF_VERSION_MINOR <= 4 && ESP_IDF_VERSION_PATCH < 2) || ESP_IDF_VERSION_MAJOR < 4
104 error = esp_crt_bundle_set(x509_bundle);
106 error = esp_crt_bundle_set(x509_bundle, bundle_size);
109#if THINGSBOARD_ENABLE_DEBUG
110 Logger::printfln(OVERRIDING_DEFAULT_CRT_BUNDLE, esp_err_to_name(error));
113 return update_configuration();
121 bool set_keep_alive_timeout(uint16_t keep_alive_timeout_seconds) {
122#if ESP_IDF_VERSION_MAJOR < 5
123 m_mqtt_configuration.keepalive = keep_alive_timeout_seconds;
125 m_mqtt_configuration.session.keepalive = keep_alive_timeout_seconds;
127 return update_configuration();
136 bool set_disable_keep_alive(
bool disable_keep_alive) {
137#if ESP_IDF_VERSION_MAJOR < 5
138 m_mqtt_configuration.disable_keepalive = disable_keep_alive;
140 m_mqtt_configuration.session.disable_keepalive = disable_keep_alive;
142 return update_configuration();
149 bool set_disable_auto_reconnect(
bool disable_auto_reconnect) {
150#if ESP_IDF_VERSION_MAJOR < 5
151 m_mqtt_configuration.disable_auto_reconnect = disable_auto_reconnect;
153 m_mqtt_configuration.network.disable_auto_reconnect = disable_auto_reconnect;
155 return update_configuration();
164 bool set_mqtt_task_configuration(
uint8_t priority, uint16_t stack_size) {
165#if ESP_IDF_VERSION_MAJOR < 5
166 m_mqtt_configuration.task_prio = priority;
167 m_mqtt_configuration.task_stack = stack_size;
169 m_mqtt_configuration.task.priority = priority;
170 m_mqtt_configuration.task.stack_size = stack_size;
172 return update_configuration();
179 bool set_reconnect_timeout(uint16_t reconnect_timeout_milliseconds) {
180#if ESP_IDF_VERSION_MAJOR < 5
181 m_mqtt_configuration.reconnect_timeout_ms = reconnect_timeout_milliseconds;
183 m_mqtt_configuration.network.reconnect_timeout_ms = reconnect_timeout_milliseconds;
185 return update_configuration();
193 bool set_network_timeout(uint16_t network_timeout_milliseconds) {
194#if ESP_IDF_VERSION_MAJOR < 5
195 m_mqtt_configuration.network_timeout_ms = network_timeout_milliseconds;
197 m_mqtt_configuration.network.timeout_ms = network_timeout_milliseconds;
199 return update_configuration();
208 void set_enqueue_messages(
bool enqueue_messages) {
209 m_enqueue_messages = enqueue_messages;
213 m_received_data_callback.Set_Callback(callback);
217 m_connected_callback.Set_Callback(callback);
220 bool set_buffer_size(uint16_t receive_buffer_size, uint16_t send_buffer_size)
override {
221#if ESP_IDF_VERSION_MAJOR < 5
222 m_mqtt_configuration.buffer_size = receive_buffer_size;
223 m_mqtt_configuration.out_buffer_size = send_buffer_size;
225 m_mqtt_configuration.buffer.size = receive_buffer_size;
226 m_mqtt_configuration.buffer.out_size = send_buffer_size;
228 return update_configuration();
231 uint16_t get_receive_buffer_size()
override {
232#if ESP_IDF_VERSION_MAJOR < 5
233 return m_mqtt_configuration.buffer_size;
235 return m_mqtt_configuration.buffer.size;
239 uint16_t get_send_buffer_size()
override {
240#if ESP_IDF_VERSION_MAJOR < 5
241 return m_mqtt_configuration.out_buffer_size;
243 return m_mqtt_configuration.buffer.out_size;
247 void set_server(
char const * domain, uint16_t port)
override {
248#if ESP_IDF_VERSION_MAJOR < 5
249 m_mqtt_configuration.host = domain;
250 m_mqtt_configuration.port = port;
253 bool const transport_over_sll = m_mqtt_configuration.cert_pem !=
nullptr || m_mqtt_configuration.crt_bundle_attach !=
nullptr;
255 m_mqtt_configuration.broker.address.hostname = domain;
256 m_mqtt_configuration.broker.address.port = port;
259 bool const transport_over_sll = m_mqtt_configuration.broker.verification.certificate !=
nullptr || m_mqtt_configuration.broker.verification.crt_bundle_attach !=
nullptr;
262 esp_mqtt_transport_t
const transport = (transport_over_sll ? esp_mqtt_transport_t::MQTT_TRANSPORT_OVER_SSL : esp_mqtt_transport_t::MQTT_TRANSPORT_OVER_TCP);
263#if ESP_IDF_VERSION_MAJOR < 5
264 m_mqtt_configuration.transport = transport;
266 m_mqtt_configuration.broker.address.transport = transport;
270 bool connect(
char const * client_id,
char const * user_name,
char const * password)
override {
271 esp_err_t
const error = start_or_reconnect_mqtt_client(client_id, user_name, password);
272 if (error == ESP_OK) {
273 update_connection_state(MQTT_Connection_State::CONNECTING);
275 return error == ESP_OK;
278 void disconnect()
override {
279 (void)esp_mqtt_client_disconnect(m_mqtt_client);
280 update_connection_state(MQTT_Connection_State::DISCONNECTING);
283 bool loop()
override {
289 bool publish(
char const * topic,
uint8_t const * payload,
size_t const & length)
override {
290 int message_id = MQTT_FAILURE_MESSAGE_ID;
292 if (m_enqueue_messages) {
293 message_id = esp_mqtt_client_enqueue(m_mqtt_client, topic,
reinterpret_cast<const char*
>(payload), length, 0U, 0U,
true);
294 return message_id > MQTT_FAILURE_MESSAGE_ID;
303 message_id = esp_mqtt_client_publish(m_mqtt_client, topic,
reinterpret_cast<const char*
>(payload), length, 0U, 0U);
304 return message_id > MQTT_FAILURE_MESSAGE_ID;
307 bool subscribe(
char const * topic)
override {
313 int const message_id = esp_mqtt_client_subscribe(m_mqtt_client, topic, 0U);
314 return message_id > MQTT_FAILURE_MESSAGE_ID;
317 bool unsubscribe(
char const * topic)
override {
323 int const message_id = esp_mqtt_client_unsubscribe(m_mqtt_client, topic);
324 return message_id > MQTT_FAILURE_MESSAGE_ID;
327 bool connected()
override {
328 return m_connection_state == MQTT_Connection_State::CONNECTED;
332 return m_connection_state;
336 return m_last_connection_error;
340 m_connection_state_changed_callback.Set_Callback(callback);
348 bool update_configuration() {
351 if (m_mqtt_client ==
nullptr) {
355 esp_err_t
const error = esp_mqtt_set_config(m_mqtt_client, &m_mqtt_configuration);
356#if THINGSBOARD_ENABLE_DEBUG
357 Logger::printfln(UPDATING_CONFIGURATION, esp_err_to_name(error));
359 return error == ESP_OK;
362 esp_err_t start_or_reconnect_mqtt_client(
char const * client_id,
char const * user_name,
char const * password) {
363#if ESP_IDF_VERSION_MAJOR < 5
364 m_mqtt_configuration.client_id = client_id;
365 m_mqtt_configuration.username = user_name;
366 m_mqtt_configuration.password = password;
368 m_mqtt_configuration.credentials.client_id = client_id;
369 m_mqtt_configuration.credentials.username = user_name;
370 m_mqtt_configuration.credentials.authentication.password = password;
374 update_configuration();
378 if (m_mqtt_client !=
nullptr) {
379 return esp_mqtt_client_reconnect(m_mqtt_client);
384 m_mqtt_client = esp_mqtt_client_init(&m_mqtt_configuration);
385 esp_err_t error = esp_mqtt_client_register_event(m_mqtt_client, esp_mqtt_event_id_t::MQTT_EVENT_ANY, Espressif_MQTT_Client::static_mqtt_event_handler,
this);
387 if (error == ESP_OK) {
388 error = esp_mqtt_client_start(m_mqtt_client);
396 m_connection_state = new_state;
397 m_connection_state_changed_callback.Call_Callback(get_connection_state(), get_last_connection_error());
400#if THINGSBOARD_ENABLE_DEBUG
401 const char * esp_event_id_to_name(
const esp_mqtt_event_id_t& event_id)
const {
403 case esp_mqtt_event_id_t::MQTT_EVENT_CONNECTED:
404 return "MQTT_EVENT_CONNECTED";
405 case esp_mqtt_event_id_t::MQTT_EVENT_DISCONNECTED:
406 return "MQTT_EVENT_DISCONNECTED";
407 case esp_mqtt_event_id_t::MQTT_EVENT_SUBSCRIBED:
408 return "MQTT_EVENT_SUBSCRIBED";
409 case esp_mqtt_event_id_t::MQTT_EVENT_UNSUBSCRIBED:
410 return "MQTT_EVENT_UNSUBSCRIBED";
411 case esp_mqtt_event_id_t::MQTT_EVENT_PUBLISHED:
412 return "MQTT_EVENT_PUBLISHED";
413 case esp_mqtt_event_id_t::MQTT_EVENT_DATA:
414 return "MQTT_EVENT_DATA";
415 case esp_mqtt_event_id_t::MQTT_EVENT_ERROR:
416 return "MQTT_EVENT_ERROR";
417 case esp_mqtt_event_id_t::MQTT_EVENT_BEFORE_CONNECT:
418 return "MQTT_EVENT_BEFORE_CONNECT";
419 case esp_mqtt_event_id_t::MQTT_EVENT_DELETED:
420 return "MQTT_EVENT_DELETED";
421 case esp_mqtt_event_id_t::MQTT_EVENT_ANY:
422 return "MQTT_EVENT_ANY";
433 void mqtt_event_handler(esp_event_base_t base, esp_mqtt_event_id_t
const & event_id,
void * event_data) {
434 esp_mqtt_event_handle_t
const event =
static_cast<esp_mqtt_event_handle_t
>(event_data);
436#if THINGSBOARD_ENABLE_DEBUG
437 Logger::printfln(RECEIVED_MQTT_EVENT, esp_event_id_to_name(event_id));
440 case esp_mqtt_event_id_t::MQTT_EVENT_CONNECTED:
441 m_connected_callback.Call_Callback();
442 update_connection_state(MQTT_Connection_State::CONNECTED);
444 case esp_mqtt_event_id_t::MQTT_EVENT_DISCONNECTED:
445 update_connection_state(MQTT_Connection_State::DISCONNECTED);
447 case esp_mqtt_event_id_t::MQTT_EVENT_DATA: {
450 if (event->data_len != event->total_data_len) {
451 Logger::printfln(MQTT_DATA_EXCEEDS_BUFFER, event->total_data_len, get_receive_buffer_size());
456 char topic[
event->topic_len + 1] = {};
457 strncpy(topic, event->topic, event->topic_len);
458 m_received_data_callback.Call_Callback(topic,
reinterpret_cast<uint8_t*
>(event->data), event->data_len);
461 case esp_mqtt_event_id_t::MQTT_EVENT_ERROR: {
462 esp_mqtt_error_codes_t
const * error =
event->error_handle;
463 if (error ==
nullptr) {
467 update_connection_state(MQTT_Connection_State::ERROR);
476 static void static_mqtt_event_handler(
void * handler_args, esp_event_base_t base, int32_t event_id,
void * event_data) {
477 if (handler_args ==
nullptr) {
480 auto instance =
static_cast<Espressif_MQTT_Client *
>(handler_args);
481 instance->mqtt_event_handler(base,
static_cast<esp_mqtt_event_id_t
>(event_id), event_data);
489 bool m_enqueue_messages = {};
490 esp_mqtt_client_config_t m_mqtt_configuration = {};
491 esp_mqtt_client_handle_t m_mqtt_client = {};
MQTT_Connection_Error
Possible error states the MQTT broker connection can be in, if connecting the device to the MQTT brok...
Definition: MQTT_Connection_Error.h:9
MQTT_Connection_State
Possible states the MQTT broker connection can be in.
Definition: MQTT_Connection_State.h:11
General purpose safe callback wrapper. Expects either c-style or c++ style function pointer,...
Definition: Callback.h:30
std::function< return_type(argument_types... arguments)> function
Callback signature.
Definition: Callback.h:34
MQTT Client interface that contains the method that a class that can be used to send and receive data...
Definition: IMQTT_Client.h:36