ThingsBoard Client SDK 0.16.0
Client SDK to connect with ThingsBoard IoT Platform from IoT devices (Arduino, Espressif, etc.)
Loading...
Searching...
No Matches
Espressif_MQTT_Client.h
Go to the documentation of this file.
1#ifndef Espressif_MQTT_Client_h
2#define Espressif_MQTT_Client_h
3
4// Local include.
5#include "Configuration.h"
6
7#if THINGSBOARD_USE_ESP_MQTT
8
9// Local includes.
10#include "IMQTT_Client.h"
11
12// Library includes.
13#include <mqtt_client.h>
14#include <esp_crt_bundle.h>
15
16// The error integer -1 means a general failure while handling the mqtt client,
17// where as -2 means that the outbox is filled and the message can therefore not be sent.
18// Therefore we have to check if the value is smaller or equal to the MQTT_FAILURE_MESSAGE_ID,
19// to ensure other errors are indentified as well
20constexpr int MQTT_FAILURE_MESSAGE_ID = -1;
21constexpr char MQTT_DATA_EXCEEDS_BUFFER[] = "Received amount of data (%u) is bigger than current buffer size (%u), increase accordingly";
22#if THINGSBOARD_ENABLE_DEBUG
23constexpr char RECEIVED_MQTT_EVENT[] = "Handling received mqtt event: (%s)";
24constexpr char UPDATING_CONFIGURATION[] = "Updated configuration after inital connection with response: (%s)";
25constexpr char OVERRIDING_DEFAULT_CRT_BUNDLE[] = "Overriding default CRT bundle with response: (%s)";
26#endif // THINGSBOARD_ENABLE_DEBUG
27
28
34template <typename Logger = DefaultLogger>
35class Espressif_MQTT_Client : public IMQTT_Client {
36 public:
38 Espressif_MQTT_Client() = default;
39
40 ~Espressif_MQTT_Client() override {
41 (void)esp_mqtt_client_destroy(m_mqtt_client);
42 }
43
47 Espressif_MQTT_Client(Espressif_MQTT_Client const & other) = delete;
48
52 void operator=(Espressif_MQTT_Client const & other) = delete;
53
63 bool set_server_certificate(char const * server_certificate_pem) {
64 // Because PEM format is expected for the server certificate we do not need to set the certificate_len,
65 // as the PEM format expects a null-terminated string
66#if ESP_IDF_VERSION_MAJOR < 5
67 m_mqtt_configuration.cert_pem = server_certificate_pem;
68#else
69 m_mqtt_configuration.broker.verification.certificate = server_certificate_pem;
70#endif // ESP_IDF_VERSION_MAJOR < 5
71 return update_configuration();
72 }
73
84 bool set_server_crt_bundle(uint8_t const * x509_bundle = nullptr, size_t const & bundle_size = 0U) {
85 esp_err_t (*crt_bundle_attach)(void * conf) = nullptr;
86#ifdef ARDUINO
87 crt_bundle_attach = arduino_esp_crt_bundle_attach;
88#else
89 crt_bundle_attach = esp_crt_bundle_attach;
90#endif // ARDUINO
91
92#if ESP_IDF_VERSION_MAJOR < 5
93 m_mqtt_configuration.crt_bundle_attach = crt_bundle_attach;
94#else
95 m_mqtt_configuration.broker.verification.crt_bundle_attach = crt_bundle_attach;
96#endif // ESP_IDF_VERSION_MAJOR < 5
97
98 if (x509_bundle != nullptr) {
99 esp_err_t error = ESP_OK;
100#ifdef ARDUINO
101 arduino_esp_crt_bundle_set(x509_bundle);
102#else
103#if (ESP_IDF_VERSION_MAJOR == 4 && ESP_IDF_VERSION_MINOR <= 4 && ESP_IDF_VERSION_PATCH < 2) || ESP_IDF_VERSION_MAJOR < 4
104 error = esp_crt_bundle_set(x509_bundle);
105#else
106 error = esp_crt_bundle_set(x509_bundle, bundle_size);
107#endif // (ESP_IDF_VERSION_MAJOR == 4 && ESP_IDF_VERSION_MINOR <= 4 ESP_IDF_VERSION_MINO < 2) || ESP_IDF_VERSION_MAJOR < 4
108#endif // ARDUINO
109#if THINGSBOARD_ENABLE_DEBUG
110 Logger::printfln(OVERRIDING_DEFAULT_CRT_BUNDLE, esp_err_to_name(error));
111#endif // THINGSBOARD_ENABLE_DEBUG
112 }
113 return update_configuration();
114 }
115
121 bool set_keep_alive_timeout(uint16_t keep_alive_timeout_seconds) {
122#if ESP_IDF_VERSION_MAJOR < 5
123 m_mqtt_configuration.keepalive = keep_alive_timeout_seconds;
124#else
125 m_mqtt_configuration.session.keepalive = keep_alive_timeout_seconds;
126#endif // ESP_IDF_VERSION_MAJOR < 5
127 return update_configuration();
128 }
129
136 bool set_disable_keep_alive(bool disable_keep_alive) {
137#if ESP_IDF_VERSION_MAJOR < 5
138 m_mqtt_configuration.disable_keepalive = disable_keep_alive;
139#else
140 m_mqtt_configuration.session.disable_keepalive = disable_keep_alive;
141#endif // ESP_IDF_VERSION_MAJOR < 5
142 return update_configuration();
143 }
144
149 bool set_disable_auto_reconnect(bool disable_auto_reconnect) {
150#if ESP_IDF_VERSION_MAJOR < 5
151 m_mqtt_configuration.disable_auto_reconnect = disable_auto_reconnect;
152#else
153 m_mqtt_configuration.network.disable_auto_reconnect = disable_auto_reconnect;
154#endif // ESP_IDF_VERSION_MAJOR < 5
155 return update_configuration();
156 }
157
164 bool set_mqtt_task_configuration(uint8_t priority, uint16_t stack_size) {
165#if ESP_IDF_VERSION_MAJOR < 5
166 m_mqtt_configuration.task_prio = priority;
167 m_mqtt_configuration.task_stack = stack_size;
168#else
169 m_mqtt_configuration.task.priority = priority;
170 m_mqtt_configuration.task.stack_size = stack_size;
171#endif // ESP_IDF_VERSION_MAJOR < 5
172 return update_configuration();
173 }
174
179 bool set_reconnect_timeout(uint16_t reconnect_timeout_milliseconds) {
180#if ESP_IDF_VERSION_MAJOR < 5
181 m_mqtt_configuration.reconnect_timeout_ms = reconnect_timeout_milliseconds;
182#else
183 m_mqtt_configuration.network.reconnect_timeout_ms = reconnect_timeout_milliseconds;
184#endif // ESP_IDF_VERSION_MAJOR < 5
185 return update_configuration();
186 }
187
193 bool set_network_timeout(uint16_t network_timeout_milliseconds) {
194#if ESP_IDF_VERSION_MAJOR < 5
195 m_mqtt_configuration.network_timeout_ms = network_timeout_milliseconds;
196#else
197 m_mqtt_configuration.network.timeout_ms = network_timeout_milliseconds;
198#endif // ESP_IDF_VERSION_MAJOR < 5
199 return update_configuration();
200 }
201
208 void set_enqueue_messages(bool enqueue_messages) {
209 m_enqueue_messages = enqueue_messages;
210 }
211
212 void set_data_callback(Callback<void, char *, uint8_t *, unsigned int>::function callback) override {
213 m_received_data_callback.Set_Callback(callback);
214 }
215
216 void set_connect_callback(Callback<void>::function callback) override {
217 m_connected_callback.Set_Callback(callback);
218 }
219
220 bool set_buffer_size(uint16_t receive_buffer_size, uint16_t send_buffer_size) override {
221#if ESP_IDF_VERSION_MAJOR < 5
222 m_mqtt_configuration.buffer_size = receive_buffer_size;
223 m_mqtt_configuration.out_buffer_size = send_buffer_size;
224#else
225 m_mqtt_configuration.buffer.size = receive_buffer_size;
226 m_mqtt_configuration.buffer.out_size = send_buffer_size;
227#endif // ESP_IDF_VERSION_MAJOR < 5
228 return update_configuration();
229 }
230
231 uint16_t get_receive_buffer_size() override {
232#if ESP_IDF_VERSION_MAJOR < 5
233 return m_mqtt_configuration.buffer_size;
234#else
235 return m_mqtt_configuration.buffer.size;
236#endif // ESP_IDF_VERSION_MAJOR < 5
237 }
238
239 uint16_t get_send_buffer_size() override {
240#if ESP_IDF_VERSION_MAJOR < 5
241 return m_mqtt_configuration.out_buffer_size;
242#else
243 return m_mqtt_configuration.buffer.out_size;
244#endif // ESP_IDF_VERSION_MAJOR < 5
245 }
246
247 void set_server(char const * domain, uint16_t port) override {
248#if ESP_IDF_VERSION_MAJOR < 5
249 m_mqtt_configuration.host = domain;
250 m_mqtt_configuration.port = port;
251 // Decide transport depending on if a certificate was passed, because the set_server() method is called in the connect method meaning if the certificate has not been set yet,
252 // it is to late as we attempt to establish the connection in the connect() method which is called directly after this one.
253 bool const transport_over_sll = m_mqtt_configuration.cert_pem != nullptr || m_mqtt_configuration.crt_bundle_attach != nullptr;
254#else
255 m_mqtt_configuration.broker.address.hostname = domain;
256 m_mqtt_configuration.broker.address.port = port;
257 // Decide transport depending on if a certificate was passed, because the set_server() method is called in the connect method meaning if the certificate has not been set yet,
258 // it is to late as we attempt to establish the connection in the connect() method which is called directly after this one.
259 bool const transport_over_sll = m_mqtt_configuration.broker.verification.certificate != nullptr || m_mqtt_configuration.broker.verification.crt_bundle_attach != nullptr;
260#endif // ESP_IDF_VERSION_MAJOR < 5
261
262 esp_mqtt_transport_t const transport = (transport_over_sll ? esp_mqtt_transport_t::MQTT_TRANSPORT_OVER_SSL : esp_mqtt_transport_t::MQTT_TRANSPORT_OVER_TCP);
263#if ESP_IDF_VERSION_MAJOR < 5
264 m_mqtt_configuration.transport = transport;
265#else
266 m_mqtt_configuration.broker.address.transport = transport;
267#endif // ESP_IDF_VERSION_MAJOR < 5
268 }
269
270 bool connect(char const * client_id, char const * user_name, char const * password) override {
271 esp_err_t const error = start_or_reconnect_mqtt_client(client_id, user_name, password);
272 if (error == ESP_OK) {
273 update_connection_state(MQTT_Connection_State::CONNECTING);
274 }
275 return error == ESP_OK;
276 }
277
278 void disconnect() override {
279 (void)esp_mqtt_client_disconnect(m_mqtt_client);
280 update_connection_state(MQTT_Connection_State::DISCONNECTING);
281 }
282
283 bool loop() override {
284 // Unused because the esp mqtt client uses its own task to handle receiving and sending of data, therefore we do not need to do anything in the loop method.
285 // Because the loop method is meant for clients that do not have their own process method but instead rely on the upper level code calling a loop method to provide processsing time.
286 return connected();
287 }
288
289 bool publish(char const * topic, uint8_t const * payload, size_t const & length) override {
290 int message_id = MQTT_FAILURE_MESSAGE_ID;
291
292 if (m_enqueue_messages) {
293 message_id = esp_mqtt_client_enqueue(m_mqtt_client, topic, reinterpret_cast<const char*>(payload), length, 0U, 0U, true);
294 return message_id > MQTT_FAILURE_MESSAGE_ID;
295 }
296
297 // The blocking version esp_mqtt_client_publish() is sent directly from the users task context.
298 // This way to send messages to the cloud, has the advantage that no internal buffer has to be used to store the message until it should be sent,
299 // because all messages are sent with QoS level 0. If this is not wanted esp_mqtt_client_enqueue() could be used with store = true,
300 // to ensure the sending is done in the mqtt event context instead of the users task context.
301 // Allows to use the publish method without having to worry about any CPU overhead, so it can even be used in callbacks or high priority tasks, without starving other tasks,
302 // but compared to the other method esp_mqtt_client_enqueue() requires to save the message in the outbox, which increases the memory requirements for the internal buffer size
303 message_id = esp_mqtt_client_publish(m_mqtt_client, topic, reinterpret_cast<const char*>(payload), length, 0U, 0U);
304 return message_id > MQTT_FAILURE_MESSAGE_ID;
305 }
306
307 bool subscribe(char const * topic) override {
308 // The esp_mqtt_client_subscribe method does not return false, if we send a subscribe request while not being connected to a broker,
309 // so we have to check for that case to ensure the end user is informed that their subscribe request could not be sent and has been ignored.
310 if (!connected()) {
311 return false;
312 }
313 int const message_id = esp_mqtt_client_subscribe(m_mqtt_client, topic, 0U);
314 return message_id > MQTT_FAILURE_MESSAGE_ID;
315 }
316
317 bool unsubscribe(char const * topic) override {
318 // The esp_mqtt_client_unsubscribe method does not return false, if we send a unsubscribe request while not being connected to a broker,
319 // so we have to check for that case to ensure the end user is informed that their unsubscribe request could not be sent and has been ignored.
320 if (!connected()) {
321 return false;
322 }
323 int const message_id = esp_mqtt_client_unsubscribe(m_mqtt_client, topic);
324 return message_id > MQTT_FAILURE_MESSAGE_ID;
325 }
326
327 bool connected() override {
328 return m_connection_state == MQTT_Connection_State::CONNECTED;
329 }
330
331 MQTT_Connection_State get_connection_state() const override {
332 return m_connection_state;
333 }
334
335 MQTT_Connection_Error get_last_connection_error() const override {
336 return m_last_connection_error;
337 }
338
339 void subscribe_connection_state_changed_callback(Callback<void, MQTT_Connection_State, MQTT_Connection_Error>::function callback) override {
340 m_connection_state_changed_callback.Set_Callback(callback);
341 }
342
343private:
348 bool update_configuration() {
349 // Check if the client has been initalized, because if it did not the value should still be nullptr
350 // and updating the config makes no sense because the changed settings will be applied anyway when the client is first intialized
351 if (m_mqtt_client == nullptr) {
352 return true;
353 }
354
355 esp_err_t const error = esp_mqtt_set_config(m_mqtt_client, &m_mqtt_configuration);
356#if THINGSBOARD_ENABLE_DEBUG
357 Logger::printfln(UPDATING_CONFIGURATION, esp_err_to_name(error));
358#endif // THINGSBOARD_ENABLE_DEBUG
359 return error == ESP_OK;
360 }
361
362 esp_err_t start_or_reconnect_mqtt_client(char const * client_id, char const * user_name, char const * password) {
363#if ESP_IDF_VERSION_MAJOR < 5
364 m_mqtt_configuration.client_id = client_id;
365 m_mqtt_configuration.username = user_name;
366 m_mqtt_configuration.password = password;
367#else
368 m_mqtt_configuration.credentials.client_id = client_id;
369 m_mqtt_configuration.credentials.username = user_name;
370 m_mqtt_configuration.credentials.authentication.password = password;
371#endif // ESP_IDF_VERSION_MAJOR < 5
372 // Update configuration is called to ensure that if we connected previously and call connect again with other credentials,
373 // then we also update the client_id, username and password we connect with. Especially important for the provisioning workflow to work correctly
374 update_configuration();
375
376 // Check wheter the client has been initalzed before already, if it has we do not want to reinitalize,
377 // but simply force reconnection with the client because it has lost that connection
378 if (m_mqtt_client != nullptr) {
379 return esp_mqtt_client_reconnect(m_mqtt_client);
380 }
381
382 // The client is first initalized once the connect has actually been called, this is done because the passed setting are required for the client inizialitation structure,
383 // additionally before we attempt to connect with the client we have to ensure has been fully configued by then.
384 m_mqtt_client = esp_mqtt_client_init(&m_mqtt_configuration);
385 esp_err_t error = esp_mqtt_client_register_event(m_mqtt_client, esp_mqtt_event_id_t::MQTT_EVENT_ANY, Espressif_MQTT_Client::static_mqtt_event_handler, this);
386
387 if (error == ESP_OK) {
388 error = esp_mqtt_client_start(m_mqtt_client);
389 }
390 return error;
391 }
392
395 void update_connection_state(MQTT_Connection_State new_state) {
396 m_connection_state = new_state;
397 m_connection_state_changed_callback.Call_Callback(get_connection_state(), get_last_connection_error());
398 }
399
400#if THINGSBOARD_ENABLE_DEBUG
401 const char * esp_event_id_to_name(const esp_mqtt_event_id_t& event_id) const {
402 switch (event_id) {
403 case esp_mqtt_event_id_t::MQTT_EVENT_CONNECTED:
404 return "MQTT_EVENT_CONNECTED";
405 case esp_mqtt_event_id_t::MQTT_EVENT_DISCONNECTED:
406 return "MQTT_EVENT_DISCONNECTED";
407 case esp_mqtt_event_id_t::MQTT_EVENT_SUBSCRIBED:
408 return "MQTT_EVENT_SUBSCRIBED";
409 case esp_mqtt_event_id_t::MQTT_EVENT_UNSUBSCRIBED:
410 return "MQTT_EVENT_UNSUBSCRIBED";
411 case esp_mqtt_event_id_t::MQTT_EVENT_PUBLISHED:
412 return "MQTT_EVENT_PUBLISHED";
413 case esp_mqtt_event_id_t::MQTT_EVENT_DATA:
414 return "MQTT_EVENT_DATA";
415 case esp_mqtt_event_id_t::MQTT_EVENT_ERROR:
416 return "MQTT_EVENT_ERROR";
417 case esp_mqtt_event_id_t::MQTT_EVENT_BEFORE_CONNECT:
418 return "MQTT_EVENT_BEFORE_CONNECT";
419 case esp_mqtt_event_id_t::MQTT_EVENT_DELETED:
420 return "MQTT_EVENT_DELETED";
421 case esp_mqtt_event_id_t::MQTT_EVENT_ANY:
422 return "MQTT_EVENT_ANY";
423 default:
424 return "UNKNOWN";
425 }
426 }
427#endif // THINGSBOARD_ENABLE_DEBUG
428
433 void mqtt_event_handler(esp_event_base_t base, esp_mqtt_event_id_t const & event_id, void * event_data) {
434 esp_mqtt_event_handle_t const event = static_cast<esp_mqtt_event_handle_t>(event_data);
435
436#if THINGSBOARD_ENABLE_DEBUG
437 Logger::printfln(RECEIVED_MQTT_EVENT, esp_event_id_to_name(event_id));
438#endif // THINGSBOARD_ENABLE_DEBUG
439 switch (event_id) {
440 case esp_mqtt_event_id_t::MQTT_EVENT_CONNECTED:
441 m_connected_callback.Call_Callback();
442 update_connection_state(MQTT_Connection_State::CONNECTED);
443 break;
444 case esp_mqtt_event_id_t::MQTT_EVENT_DISCONNECTED:
445 update_connection_state(MQTT_Connection_State::DISCONNECTED);
446 break;
447 case esp_mqtt_event_id_t::MQTT_EVENT_DATA: {
448 // Check wheter the given message has not bee received completly, but instead would be received in multiple chunks,
449 // if it were we discard the message because receiving a message over multiple chunks is currently not supported
450 if (event->data_len != event->total_data_len) {
451 Logger::printfln(MQTT_DATA_EXCEEDS_BUFFER, event->total_data_len, get_receive_buffer_size());
452 break;
453 }
454 // Topic is not null terminated, to fix this issue we copy the topic string.
455 // This overhead is acceptable, because we nearly always copy only a few bytes (around 20), meaning the overhead is insignificant.
456 char topic[event->topic_len + 1] = {};
457 strncpy(topic, event->topic, event->topic_len);
458 m_received_data_callback.Call_Callback(topic, reinterpret_cast<uint8_t*>(event->data), event->data_len);
459 break;
460 }
461 case esp_mqtt_event_id_t::MQTT_EVENT_ERROR: {
462 esp_mqtt_error_codes_t const * error = event->error_handle;
463 if (error == nullptr) {
464 break;
465 }
466 m_last_connection_error = static_cast<MQTT_Connection_Error>(error->connect_return_code);
467 update_connection_state(MQTT_Connection_State::ERROR);
468 break;
469 }
470 default:
471 // Nothing to do
472 break;
473 }
474 }
475
476 static void static_mqtt_event_handler(void * handler_args, esp_event_base_t base, int32_t event_id, void * event_data) {
477 if (handler_args == nullptr) {
478 return;
479 }
480 auto instance = static_cast<Espressif_MQTT_Client *>(handler_args);
481 instance->mqtt_event_handler(base, static_cast<esp_mqtt_event_id_t>(event_id), event_data);
482 }
483
484 Callback<void, char *, uint8_t *, unsigned int> m_received_data_callback = {}; // Callback that will be called as soon as the mqtt client receives any data
485 Callback<void> m_connected_callback = {}; // Callback that will be called as soon as the mqtt client has connected
486 Callback<void, MQTT_Connection_State, MQTT_Connection_Error> m_connection_state_changed_callback = {}; // Callback that will be called as soon as the mqtt client connection changes
487 MQTT_Connection_State m_connection_state = {}; // Current connection state to the MQTT broker
488 MQTT_Connection_Error m_last_connection_error = {}; // Last error that occured while trying to establish a connection to the MQTT broker
489 bool m_enqueue_messages = {}; // Whether we enqueue messages making nearly all ThingsBoard calls non blocking or wheter we publish instead
490 esp_mqtt_client_config_t m_mqtt_configuration = {}; // Configuration of the underlying mqtt client, saved as a private variable to allow changes after inital configuration with the same options for all non changed settings
491 esp_mqtt_client_handle_t m_mqtt_client = {}; // Handle to the underlying mqtt client, used to establish the communication
492};
493
494#endif // THINGSBOARD_USE_ESP_MQTT
495
496#endif // Espressif_MQTT_Client_h
MQTT_Connection_Error
Possible error states the MQTT broker connection can be in, if connecting the device to the MQTT brok...
Definition: MQTT_Connection_Error.h:9
MQTT_Connection_State
Possible states the MQTT broker connection can be in.
Definition: MQTT_Connection_State.h:11
General purpose safe callback wrapper. Expects either c-style or c++ style function pointer,...
Definition: Callback.h:30
std::function< return_type(argument_types... arguments)> function
Callback signature.
Definition: Callback.h:34
MQTT Client interface that contains the method that a class that can be used to send and receive data...
Definition: IMQTT_Client.h:36