ThingsBoard Client SDK 0.16.0
Client SDK to connect with ThingsBoard IoT Platform from IoT devices (Arduino, Espressif, etc.)
Loading...
Searching...
No Matches
ThingsBoard.h
Go to the documentation of this file.
1#ifndef ThingsBoard_h
2#define ThingsBoard_h
3
4// Local includes.
5#include "Constants.h"
7#include "IMQTT_Client.h"
8#include "DefaultLogger.h"
9#include "Telemetry.h"
10
11// Library includes.
12#if THINGSBOARD_ENABLE_STREAM_UTILS
13#include <StreamUtils.h>
14#endif // THINGSBOARD_ENABLE_STREAM_UTILS
15
16
17uint16_t constexpr DEFAULT_MQTT_PORT = 1883U;
18char constexpr PROV_ACCESS_TOKEN[] = "provision";
19// Log messages.
20char constexpr UNABLE_TO_DE_SERIALIZE_JSON[] = "Unable to de-serialize received json data with error (DeserializationError::%s)";
21char constexpr INVALID_BUFFER_SIZE[] = "Send buffer size (%u) to small for the given payloads size (%u), increase with Set_Buffer_Size accordingly or install the StreamUtils library";
22char constexpr UNABLE_TO_ALLOCATE_BUFFER[] = "Allocating memory for the internal MQTT buffer failed";
23char constexpr MAX_ENDPOINTS_AMOUNT_TEMPLATE_NAME[] = "MaxEndpointsAmount";
24#if THINGSBOARD_ENABLE_DYNAMIC
25char constexpr MAXIMUM_RESPONSE_EXCEEDED[] = "Prevented allocation on the heap (%u) for JsonDocument. Discarding message that is bigger than maximum response size (%u)";
26char constexpr HEAP_ALLOCATION_FAILED[] = "Failed allocating required size (%u) for JsonDocument. Ensure there is enough heap memory left";
27#else
28char constexpr API_SUBSCRIPTIONS[] = "API implementation";
29#endif // THINGSBOARD_ENABLE_DYNAMIC
30#if THINGSBOARD_ENABLE_DEBUG
31char constexpr RECEIVE_MESSAGE[] = "Received (%u) bytes of data from server over topic (%s)";
32char constexpr ALLOCATING_JSON[] = "Allocated internal JsonDocument for MQTT server response with size (%u)";
33char constexpr SEND_MESSAGE[] = "Sending data to server over topic (%s) with data (%s)";
34char constexpr SEND_SERIALIZED[] = "Hidden, because json data is bigger than buffer, therefore showing in console is skipped";
35#endif // THINGSBOARD_ENABLE_DEBUG
36// Claim topics.
37char constexpr CLAIM_TOPIC[] = "v1/devices/me/claim";
38// Claim data keys.
39char constexpr SECRET_KEY[] = "secretKey";
40char constexpr DURATION_KEY[] = "durationMs";
41
42
43#if THINGSBOARD_ENABLE_DYNAMIC
52template <typename Logger = DefaultLogger>
53#else
64template<size_t MaxResponse = DEFAULT_RESPONSE_AMOUNT, size_t MaxEndpointsAmount = DEFAULT_ENDPOINT_AMOUNT, typename Logger = DefaultLogger>
65#endif // THINGSBOARD_ENABLE_DYNAMIC
67 public:
113 template<typename... Args>
114#if THINGSBOARD_ENABLE_DYNAMIC
115#if THINGSBOARD_ENABLE_STREAM_UTILS
116 ThingsBoardSized(IMQTT_Client & client, uint16_t receive_buffer_size = DEFAULT_PAYLOAD_SIZE, uint16_t send_buffer_size = DEFAULT_PAYLOAD_SIZE, size_t const & max_stack_size = DEFAULT_MAX_STACK_SIZE, size_t const & buffering_size = DEFAULT_BUFFERING_SIZE, size_t const & max_response_size = DEFAULT_MAX_RESPONSE_SIZE, Args const &... args)
117#else
118 ThingsBoardSized(IMQTT_Client & client, uint16_t receive_buffer_size = DEFAULT_PAYLOAD_SIZE, uint16_t send_buffer_size = DEFAULT_PAYLOAD_SIZE, size_t const & max_stack_size = DEFAULT_MAX_STACK_SIZE, size_t const & max_response_size = DEFAULT_MAX_RESPONSE_SIZE, Args const &... args)
119#endif // THINGSBOARD_ENABLE_STREAM_UTILS
120#else
121#if THINGSBOARD_ENABLE_STREAM_UTILS
122 ThingsBoardSized(IMQTT_Client & client, uint16_t receive_buffer_size = DEFAULT_PAYLOAD_SIZE, uint16_t send_buffer_size = DEFAULT_PAYLOAD_SIZE, size_t const & max_stack_size = DEFAULT_MAX_STACK_SIZE, size_t const & buffering_size = DEFAULT_BUFFERING_SIZE, Args const &... args)
123#else
124 ThingsBoardSized(IMQTT_Client & client, uint16_t receive_buffer_size = DEFAULT_PAYLOAD_SIZE, uint16_t send_buffer_size = DEFAULT_PAYLOAD_SIZE, size_t const & max_stack_size = DEFAULT_MAX_STACK_SIZE, Args const &... args)
125#endif // THINGSBOARD_ENABLE_STREAM_UTILS
126#endif // THINGSBOARD_ENABLE_DYNAMIC
127 : m_client(client)
128 , m_max_stack(max_stack_size)
129#if THINGSBOARD_ENABLE_STREAM_UTILS
130 , m_buffering_size(buffering_size)
131#endif // THINGSBOARD_ENABLE_STREAM_UTILS
132#if THINGSBOARD_ENABLE_DYNAMIC
133 , m_max_response_size(max_response_size)
134#endif // THINGSBOARD_ENABLE_DYNAMIC
135 , m_api_implementations(args...)
136 {
137 for (auto & api : m_api_implementations) {
138 if (api == nullptr) {
139 continue;
140 }
141#if THINGSBOARD_ENABLE_STL
142 api->Set_Client_Callbacks(std::bind(&ThingsBoardSized::Subscribe_API_Implementation, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Send_Json, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Send_Json_String, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Subscribe_Topic, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Unsubscribe_Topic, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Get_Receive_Buffer_Size, this), std::bind(&ThingsBoardSized::Get_Send_Buffer_Size, this), std::bind(&ThingsBoardSized::Set_Buffer_Size, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Get_Last_Request_ID, this));
143#else
144 api->Set_Client_Callbacks(ThingsBoardSized::Static_Subscribe_Implementation, ThingsBoardSized::Static_Send_Json, ThingsBoardSized::Static_Send_Json_String, ThingsBoardSized::Static_Subscribe_Topic, ThingsBoardSized::Static_Unsubscribe_Topic, ThingsBoardSized::Static_Get_Receive_Buffer_Size, ThingsBoardSized::Static_Get_Send_Buffer_Size, ThingsBoardSized::Static_Set_Buffer_Size, ThingsBoardSized::Static_Get_Last_Request_ID);
145#endif // THINGSBOARD_ENABLE_STL
146 api->Initialize();
147 }
148 (void)Set_Buffer_Size(receive_buffer_size, send_buffer_size);
149 // Initialize callback.
150#if THINGSBOARD_ENABLE_STL
151 m_client.set_data_callback(std::bind(&ThingsBoardSized::On_MQTT_Message, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
152 m_client.set_connect_callback(std::bind(&ThingsBoardSized::Resubscribe_Permanent_Subscriptionss, this));
153#else
154 m_client.set_data_callback(ThingsBoardSized::On_Static_MQTT_Message);
155 m_client.set_connect_callback(ThingsBoardSized::Static_MQTT_Connect);
156 m_subscribedInstance = this;
157#endif // THINGSBOARD_ENABLE_STL
158 }
159
166 return m_client;
167 }
168
176 void Set_Maximum_Stack_Size(size_t const & max_stack_size) {
177 m_max_stack = max_stack_size;
178 }
179
182 size_t const & Get_Maximum_Stack_Size() const {
183 return m_max_stack;
184 }
185
186#if THINGSBOARD_ENABLE_STREAM_UTILS
190 size_t const & Get_Buffering_Size() const {
191 return m_buffering_size;
192 }
193
200 void Set_Buffering_Size(size_t const & buffering_size) {
201 m_buffering_size = buffering_size;
202 }
203#endif // THINGSBOARD_ENABLE_STREAM_UTILS
204
205#if THINGSBOARD_ENABLE_DYNAMIC
213 void Set_Max_Response_Size(size_t const & max_response_size) {
214 m_max_response_size = max_response_size;
215 }
216
224 size_t const & Get_Max_Response_Size() {
225 return m_max_response_size;
226 }
227#endif // THINGSBOARD_ENABLE_DYNAMIC
228
230 bool Set_Buffer_Size(uint16_t receive_buffer_size, uint16_t send_buffer_size) {
231 bool const result = m_client.set_buffer_size(receive_buffer_size, send_buffer_size);
232 if (!result) {
233 Logger::printfln(UNABLE_TO_ALLOCATE_BUFFER);
234 }
235 return result;
236 }
237
240 return m_client.get_receive_buffer_size();
241 }
242
245 return m_client.get_send_buffer_size();
246 }
247
256 for (auto & api : m_api_implementations) {
257 if (api == nullptr) {
258 continue;
259 }
260 // Results are ignored, because the important part of clearing internal data structures always succeeds
261 (void)api->Unsubscribe();
262 }
263 }
264
287 bool connect(char const * host, char const * access_token = PROV_ACCESS_TOKEN, uint16_t port = DEFAULT_MQTT_PORT, char const * client_id = nullptr, char const * password = nullptr) {
288 if (host == nullptr) {
289 return false;
290 }
291 m_client.set_server(host, port);
292 return Connect_To_Host(access_token, Helper::String_IsNull_Or_Empty(client_id) ? access_token : client_id, Helper::String_IsNull_Or_Empty(password) ? nullptr : password);
293 }
294
296 void disconnect() {
297 m_client.disconnect();
298 }
299
301 bool connected() {
302 return m_client.connected();
303 }
304
307 return m_client.get_connection_state();
308 }
309
312 return m_client.get_last_connection_error();
313 }
314
318 }
319
321 bool loop() {
322#if !THINGSBOARD_USE_ESP_TIMER
323 for (auto & api : m_api_implementations) {
324 if (api == nullptr) {
325 continue;
326 }
327 api->loop();
328 }
329#endif // !THINGSBOARD_USE_ESP_TIMER
330 return m_client.loop();
331 }
332
342 bool Send_Json(char const * topic, JsonDocument const & source) {
343 // Check if allocating needed memory failed when trying to create the JsonDocument,
344 // if it did the isNull() method will return true. See https://arduinojson.org/v6/api/jsonvariant/isnull/ for more information
345 if (source.isNull()) {
346 Logger::printfln(UNABLE_TO_ALLOCATE_JSON);
347 return false;
348 }
349 // Check if inserting any of the internal values failed because the JsonDocument was too small,
350 // if it did the overflowed() method will return true. See https://arduinojson.org/v6/api/jsondocument/overflowed/ for more information
351 if (source.overflowed()) {
352 Logger::printfln(JSON_SIZE_TO_SMALL);
353 return false;
354 }
355 bool result = false;
356
357 size_t const json_size = Helper::Measure_Json(source);
358#if THINGSBOARD_ENABLE_STREAM_UTILS
359 // Check if the size of the given message would be too big for the actual client,
360 // if it is utilize the serialize json work around, so that the internal client buffer can be circumvented
361 if (json_size > m_client.get_send_buffer_size()) {
362#if THINGSBOARD_ENABLE_DEBUG
363 Logger::printfln(SEND_MESSAGE, topic, SEND_SERIALIZED);
364#endif // THINGSBOARD_ENABLE_DEBUG
365 result = Serialize_Json(topic, source, json_size - 1);
366 }
367 else
368#endif // THINGSBOARD_ENABLE_STREAM_UTILS
369 if (json_size > Get_Maximum_Stack_Size()) {
370 char* json = new char[json_size]();
371 if (serializeJson(source, json, json_size) < json_size - 1) {
372 Logger::printfln(UNABLE_TO_SERIALIZE_JSON);
373 }
374 else {
375 result = Send_Json_String(topic, json);
376 }
377 delete[] json;
378 json = nullptr;
379 }
380 else {
381 char json[json_size] = {};
382 if (serializeJson(source, json, json_size) < json_size - 1) {
383 Logger::printfln(UNABLE_TO_SERIALIZE_JSON);
384 return result;
385 }
386 result = Send_Json_String(topic, json);
387 }
388
389 return result;
390 }
391
398 bool Send_Json_String(char const * topic, char const * json) {
399 if (json == nullptr) {
400 return false;
401 }
402
403 uint16_t current_send_buffer_size = m_client.get_send_buffer_size();
404 auto const json_size = strlen(json);
405
406 if (current_send_buffer_size < json_size) {
407 Logger::printfln(INVALID_BUFFER_SIZE, current_send_buffer_size, json_size);
408 return false;
409 }
410
411#if THINGSBOARD_ENABLE_DEBUG
412 Logger::printfln(SEND_MESSAGE, topic, json);
413#endif // THINGSBOARD_ENABLE_DEBUG
414 return m_client.publish(topic, reinterpret_cast<uint8_t const *>(json), json_size);
415 }
416
422#if !THINGSBOARD_ENABLE_DYNAMIC
423 if (m_api_implementations.size() + 1 > m_api_implementations.capacity()) {
425 return;
426 }
427#endif // !THINGSBOARD_ENABLE_DYNAMIC
428#if THINGSBOARD_ENABLE_STL
429 api.Set_Client_Callbacks(std::bind(&ThingsBoardSized::Subscribe_API_Implementation, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Send_Json, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Send_Json_String, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Subscribe_Topic, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Unsubscribe_Topic, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Get_Receive_Buffer_Size, this), std::bind(&ThingsBoardSized::Get_Send_Buffer_Size, this), std::bind(&ThingsBoardSized::Set_Buffer_Size, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Get_Last_Request_ID, this));
430#else
431 api.Set_Client_Callbacks(ThingsBoardSized::Static_Subscribe_Implementation, ThingsBoardSized::Static_Send_Json, ThingsBoardSized::Static_Send_Json_String, ThingsBoardSized::Static_Subscribe_Topic, ThingsBoardSized::Static_Unsubscribe_Topic, ThingsBoardSized::Static_Get_Receive_Buffer_Size, ThingsBoardSized::Static_Get_Send_Buffer_Size, ThingsBoardSized::Static_Set_Buffer_Size, ThingsBoardSized::Static_Get_Last_Request_ID);
432#endif // THINGSBOARD_ENABLE_STL
433 api.Initialize();
434 m_api_implementations.push_back(&api);
435 }
436
446 template <typename InputIterator>
447 void Subscribe_API_Implementations(InputIterator const & first, InputIterator const & last) {
448#if !THINGSBOARD_ENABLE_DYNAMIC
449 size_t const size = Helper::distance(first, last);
450 if (m_api_implementations.size() + size > m_api_implementations.capacity()) {
452 return;
453 }
454#endif // !THINGSBOARD_ENABLE_DYNAMIC
455 for (auto it = first; it != last; ++it) {
456 auto & api = *it;
457 if (api == nullptr) {
458 continue;
459 }
460#if THINGSBOARD_ENABLE_STL
461 api->Set_Client_Callbacks(std::bind(&ThingsBoardSized::Subscribe_API_Implementation, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Send_Json, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Send_Json_String, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Subscribe_Topic, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Unsubscribe_Topic, this, std::placeholders::_1), std::bind(&ThingsBoardSized::Get_Receive_Buffer_Size, this), std::bind(&ThingsBoardSized::Get_Send_Buffer_Size, this), std::bind(&ThingsBoardSized::Set_Buffer_Size, this, std::placeholders::_1, std::placeholders::_2), std::bind(&ThingsBoardSized::Get_Last_Request_ID, this));
462#else
463 api->Set_Client_Callbacks(ThingsBoardSized::Static_Subscribe_Implementation, ThingsBoardSized::Static_Send_Json, ThingsBoardSized::Static_Send_Json_String, ThingsBoardSized::Static_Subscribe_Topic, ThingsBoardSized::Static_Unsubscribe_Topic, ThingsBoardSized::Static_Get_Receive_Buffer_Size, ThingsBoardSized::Static_Get_Send_Buffer_Size, ThingsBoardSized::Static_Set_Buffer_Size, ThingsBoardSized::Static_Get_Last_Request_ID);
464#endif // THINGSBOARD_ENABLE_STL
465 api->Initialize();
466 }
467 m_api_implementations.insert(m_api_implementations.end(), first, last);
468 }
469
470 //----------------------------------------------------------------------------
471 // Claiming API
472
485 bool Claim_Request(size_t const & duration_ms, char const * secret_key = nullptr) {
486 StaticJsonDocument<JSON_OBJECT_SIZE(2)> request_buffer;
487
488 if (!Helper::String_IsNull_Or_Empty(secret_key)) {
489 request_buffer[SECRET_KEY] = secret_key;
490 }
491 request_buffer[DURATION_KEY] = duration_ms;
492 return Send_Json(CLAIM_TOPIC, request_buffer);
493 }
494
495 //----------------------------------------------------------------------------
496 // Telemetry API
497
505 template<typename T>
506 bool Send_Telemetry_Data(char const * key, T const & value) {
507 return Send_Key_Value_Pair(key, value);
508 }
509
521#if THINGSBOARD_ENABLE_DYNAMIC
522 template<typename InputIterator>
523#else
524 template<size_t MaxKeyValuePairAmount, typename InputIterator>
525#endif // THINGSBOARD_ENABLE_DYNAMIC
526 bool Send_Telemetry(InputIterator const & first, InputIterator const & last) {
527#if THINGSBOARD_ENABLE_DYNAMIC
528 return Send_Data_Array(first, last, true);
529#else
530 return Send_Data_Array<MaxKeyValuePairAmount>(first, last, true);
531#endif // THINGSBOARD_ENABLE_DYNAMIC
532 }
533
539 bool Send_Telemetry_String(char const * json) {
540 return Send_Json_String(TELEMETRY_TOPIC, json);
541 }
542
548 bool Send_Telemetry_Json(JsonDocument const & source) {
549 return Send_Json(TELEMETRY_TOPIC, source);
550 }
551
552 //----------------------------------------------------------------------------
553 // Attribute API
554
562 template<typename T>
563 bool Send_Attribute_Data(char const * key, T const & value) {
564 return Send_Key_Value_Pair(key, value, false);
565 }
566
578#if THINGSBOARD_ENABLE_DYNAMIC
579 template<typename InputIterator>
580#else
581 template<size_t MaxKeyValuePairAmount, typename InputIterator>
582#endif // THINGSBOARD_ENABLE_DYNAMIC
583 bool Send_Attributes(InputIterator const & first, InputIterator const & last) {
584#if THINGSBOARD_ENABLE_DYNAMIC
585 return Send_Data_Array(first, last, false);
586#else
587 return Send_Data_Array<MaxKeyValuePairAmount>(first, last, false);
588#endif // THINGSBOARD_ENABLE_DYNAMIC
589 }
590
596 bool Send_Attribute_String(char const * json) {
597 return Send_Json_String(ATTRIBUTE_TOPIC, json);
598 }
599
605 bool Send_Attribute_Json(JsonDocument const & source) {
606 return Send_Json(ATTRIBUTE_TOPIC, source);
607 }
608
609 private:
610#if THINGSBOARD_ENABLE_DYNAMIC
611 using IAPI_Container = Container<IAPI_Implementation *>;
612#else
614#endif // THINGSBOARD_ENABLE_DYNAMIC
615
616#if THINGSBOARD_ENABLE_STREAM_UTILS
625 bool Serialize_Json(char const * topic, JsonDocument const & source) {
626 size_t const json_size = Helper::Measure_Json(source);
627 if (!m_client.begin_publish(topic, json_size - 1)) {
628 Logger::printfln(UNABLE_TO_SERIALIZE_JSON);
629 return false;
630 }
631 BufferingPrint buffered_print(m_client, Get_Buffering_Size());
632 auto const bytes_serialized = serializeJson(source, buffered_print);
633 if (bytes_serialized < json_size - 1) {
634 Logger::printfln(UNABLE_TO_SERIALIZE_JSON);
635 return false;
636 }
637 buffered_print.flush();
638 return m_client.end_publish();
639 }
640#endif // THINGSBOARD_ENABLE_STREAM_UTILS
641
643 bool Subscribe_Topic(char const * topic) {
644 return m_client.subscribe(topic);
645 }
646
648 bool Unsubscribe_Topic(char const * topic) {
649 return m_client.unsubscribe(topic);
650 }
651
656 size_t * Get_Last_Request_ID() {
657 return &m_request_id;
658 }
659
669 bool Connect_To_Host(char const * access_token, char const * client_id, char const * password) {
670 bool const connection_result = m_client.connect(client_id, access_token, password);
671 if (!connection_result) {
672 Logger::printfln(CONNECT_FAILED);
673 }
674 return connection_result;
675 }
676
683 void Resubscribe_Permanent_Subscriptionss() {
684 for (auto & api : m_api_implementations) {
685 if (api == nullptr) {
686 continue;
687 }
688 // Results are ignored, because the important part of clearing internal data structures always succeeds
689 (void)api->Resubscribe_Permanent_Subscriptions();
690 }
691 }
692
699 template<typename T>
700 bool Send_Key_Value_Pair(char const * key, T const & value, bool telemetry = true) {
701 const Telemetry t(key, value);
702 if (t.IsEmpty()) {
703 return false;
704 }
705
706 StaticJsonDocument<JSON_OBJECT_SIZE(1)> json_buffer;
707 if (!t.SerializeKeyValue(json_buffer)) {
708 Logger::printfln(UNABLE_TO_SERIALIZE);
709 return false;
710 }
711 return telemetry ? Send_Telemetry_Json(json_buffer) : Send_Attribute_Json(json_buffer);
712 }
713
725#if THINGSBOARD_ENABLE_DYNAMIC
726 template<typename InputIterator>
727#else
728 template<size_t MaxKeyValuePairAmount, typename InputIterator>
729#endif // THINGSBOARD_ENABLE_DYNAMIC
730 bool Send_Data_Array(InputIterator const & first, InputIterator const & last, bool telemetry) {
731 auto const size = Helper::distance(first, last);
732#if THINGSBOARD_ENABLE_DYNAMIC
733 // char const * are stored as only a pointer inside the JsonDocument --> zero copy, meaning the size for the strings is 0 bytes.
734 // Data structure size, therefore only depends on the amount of key value pairs passed.
735 // See https://arduinojson.org/v6/assistant/ for more information on the needed size for the JsonDocument
736 TBJsonDocument json_buffer(JSON_OBJECT_SIZE(size));
737#else
738 if (size > MaxKeyValuePairAmount) {
739 Logger::printfln(TOO_MANY_JSON_FIELDS, size, "MaxKeyValuePairAmount", MaxKeyValuePairAmount);
740 return false;
741 }
742 StaticJsonDocument<JSON_OBJECT_SIZE(MaxKeyValuePairAmount)> json_buffer;
743#endif // THINGSBOARD_ENABLE_DYNAMIC
744
745#if THINGSBOARD_ENABLE_STL
746 if (std::any_of(first, last, [&json_buffer](Telemetry const & data) { return !data.SerializeKeyValue(json_buffer); })) {
747 Logger::printfln(UNABLE_TO_SERIALIZE);
748 return false;
749 }
750#else
751 for (auto it = first; it != last; ++it) {
752 auto const & data = *it;
753 if (!data.SerializeKeyValue(json_buffer)) {
754 Logger::printfln(UNABLE_TO_SERIALIZE);
755 return false;
756 }
757 }
758#endif // THINGSBOARD_ENABLE_STL
759 return telemetry ? Send_Telemetry_Json(json_buffer) : Send_Attribute_Json(json_buffer);
760 }
761
774 void On_MQTT_Message(char * topic, uint8_t * payload, unsigned int length) {
775#if THINGSBOARD_ENABLE_DEBUG
776 Logger::printfln(RECEIVE_MESSAGE, length, topic);
777#endif // THINGSBOARD_ENABLE_DEBUG
778
779#if THINGSBOARD_ENABLE_STL
780#if THINGSBOARD_ENABLE_CXX20
781 auto filtered_raw_api_implementations = m_api_implementations | std::views::filter([&topic](IAPI_Implementation const * api) {
782#else
783 IAPI_Container filtered_raw_api_implementations = {};
784 std::copy_if(m_api_implementations.begin(), m_api_implementations.end(), std::back_inserter(filtered_raw_api_implementations), [&topic](IAPI_Implementation const * api) {
785#endif // THINGSBOARD_ENABLE_CXX20
786 return (api != nullptr && api->Get_Process_Type() == API_Process_Type::RAW && api->Is_Response_Topic_Matching(topic));
787 });
788
789 for (auto & api : filtered_raw_api_implementations) {
790 api->Process_Response(topic, payload, length);
791 }
792
793 // If the filtered api implementations was not emtpy it means the response was processed as its raw bytes representation atleast once,
794 // and because we interpreted it as raw bytes instead of json, we skip the further processing of those raw bytes as json.
795 // We do that because the received response is in that case not even valid json in the first place and would therefore simply fail deserialization
796 if (!filtered_raw_api_implementations.empty()) {
797 return;
798 }
799#else
800 bool processed_response_as_raw = false;
801 for (auto & api : m_api_implementations) {
802 if (api == nullptr || api->Get_Process_Type() != API_Process_Type::RAW || !api->Is_Response_Topic_Matching(topic)) {
803 continue;
804 }
805 api->Process_Response(topic, payload, length);
806 processed_response_as_raw = true;
807 }
808
809 if (processed_response_as_raw) {
810 return;
811 }
812#endif // THINGSBOARD_ENABLE_STL
813
814 // Calculate size with the total amount of commas, always denotes the end of a key-value pair besides for the last element in an array or in an object where the comma is not permitted,
815 // therfore we have to add the space for another key-value pair for all the occurences of thoose symbols as well
816 auto const size = Helper::Calculate_Symbol_Occurences(payload, ',', length) + Helper::Calculate_Symbol_Occurences(payload, '{', length) + Helper::Calculate_Symbol_Occurences(payload, '[', length);
817#if THINGSBOARD_ENABLE_DYNAMIC
818 // Buffer that we deserialize is writeable and not read only and therefore stored as a pointer inside the JsonDocument --> zero copy, meaning the size for the received payload is 0 bytes.
819 // Data structure size, therefore only depends on the amount of key value pairs received.
820 // See https://arduinojson.org/v6/assistant/ for more information on the needed size for the JsonDocument
821 auto const document_size = JSON_OBJECT_SIZE(size);
822 auto const & max_response_size = Get_Max_Response_Size();
823 if (max_response_size != 0U && document_size > max_response_size) {
824 Logger::printfln(MAXIMUM_RESPONSE_EXCEEDED, document_size, max_response_size);
825 return;
826 }
827 TBJsonDocument json_buffer(document_size);
828 // Because we calcualte the allocation dynamically fromt he payload, which is user input, it could theoretically be malicious ({ "malicious" : "{{{{{{{{{..."}) and contain a lot of the symbols used to calculate the size.
829 // But if that is the case adn the allocation still succeeds we delete the allocated memory relatively fast again so it shouldn't be a problem and if the allocation fails we simply return at this point with an appropriate error message
830 if (json_buffer.capacity() != document_size) {
831 Logger::printfln(HEAP_ALLOCATION_FAILED, document_size);
832 return;
833 }
834#else
835 if (size > MaxResponse) {
836 Logger::printfln(TOO_MANY_JSON_FIELDS, size, "MaxResponse", MaxResponse);
837 return;
838 }
839 auto constexpr document_size = JSON_OBJECT_SIZE(MaxResponse);
840 StaticJsonDocument<document_size> json_buffer;
841#endif // THINGSBOARD_ENABLE_DYNAMIC
842#if THINGSBOARD_ENABLE_DEBUG
843 Logger::printfln(ALLOCATING_JSON, document_size);
844#endif // THINGSBOARD_ENABLE_DEBUG
845
846 // The deserializeJson method we use, can use the zero copy mode because a writeable input was passed,
847 // if that were not the case the needed allocated memory would drastically increase, because the keys would need to be copied as well.
848 // See https://arduinojson.org/v6/doc/deserialization/ for more info on ArduinoJson deserialization
849 DeserializationError const error = deserializeJson(json_buffer, payload, length);
850 if (error) {
851 Logger::printfln(UNABLE_TO_DE_SERIALIZE_JSON, error.c_str());
852 return;
853 }
854
855#if THINGSBOARD_ENABLE_STL
856#if THINGSBOARD_ENABLE_CXX20
857 auto filtered_json_api_implementations = m_api_implementations | std::views::filter([&topic](IAPI_Implementation const * api) {
858#else
859 IAPI_Container filtered_json_api_implementations = {};
860 std::copy_if(m_api_implementations.begin(), m_api_implementations.end(), std::back_inserter(filtered_json_api_implementations), [&topic](IAPI_Implementation const * api) {
861#endif // THINGSBOARD_ENABLE_CXX20
862 return (api != nullptr && api->Get_Process_Type() == API_Process_Type::JSON && api->Is_Response_Topic_Matching(topic));
863 });
864
865 for (auto & api : filtered_json_api_implementations) {
866 api->Process_Json_Response(topic, json_buffer);
867 }
868#else
869 for (auto & api : m_api_implementations) {
870 if (api == nullptr || api->Get_Process_Type() != API_Process_Type::JSON || !api->Is_Response_Topic_Matching(topic)) {
871 continue;
872 }
873 api->Process_Json_Response(topic, json_buffer);
874 }
875#endif // THINGSBOARD_ENABLE_STL
876 }
877
878#if !THINGSBOARD_ENABLE_STL
879 static void On_Static_MQTT_Message(char * topic, uint8_t * payload, unsigned int length) {
880 if (m_subscribedInstance == nullptr) {
881 return;
882 }
883 m_subscribedInstance->On_MQTT_Message(topic, payload, length);
884 }
885
886 static void Static_MQTT_Connect() {
887 if (m_subscribedInstance == nullptr) {
888 return;
889 }
890 m_subscribedInstance->Resubscribe_Permanent_Subscriptionss();
891 }
892
893 static void Static_Subscribe_Implementation(IAPI_Implementation & api) {
894 if (m_subscribedInstance == nullptr) {
895 return;
896 }
897 m_subscribedInstance->Subscribe_API_Implementation(api);
898 }
899
900 static bool Static_Send_Json(char const * topic, JsonDocument const & source) {
901 if (m_subscribedInstance == nullptr) {
902 return false;
903 }
904 return m_subscribedInstance->Send_Json(topic, source);
905 }
906
907 static bool Static_Send_Json_String(char const * topic, char const * json) {
908 if (m_subscribedInstance == nullptr) {
909 return false;
910 }
911 return m_subscribedInstance->Send_Json_String(topic, json);
912 }
913
914 static bool Static_Subscribe_Topic(char const * topic) {
915 if (m_subscribedInstance == nullptr) {
916 return false;
917 }
918 return m_subscribedInstance->Subscribe_Topic(topic);
919 }
920
921 static bool Static_Unsubscribe_Topic(char const * topic) {
922 if (m_subscribedInstance == nullptr) {
923 return false;
924 }
925 return m_subscribedInstance->Unsubscribe_Topic(topic);
926 }
927
928 static size_t * Static_Get_Last_Request_ID() {
929 if (m_subscribedInstance == nullptr) {
930 return nullptr;
931 }
932 return m_subscribedInstance->Get_Last_Request_ID();
933 }
934
935 static uint16_t Static_Get_Receive_Buffer_Size() {
936 if (m_subscribedInstance == nullptr) {
937 return 0U;
938 }
939 return m_subscribedInstance->Get_Receive_Buffer_Size();
940 }
941
942 static uint16_t Static_Get_Send_Buffer_Size() {
943 if (m_subscribedInstance == nullptr) {
944 return 0U;
945 }
946 return m_subscribedInstance->Get_Send_Buffer_Size();
947 }
948
949 static bool Static_Set_Buffer_Size(uint16_t receive_buffer_size, uint16_t send_buffer_size) {
950 if (m_subscribedInstance == nullptr) {
951 return false;
952 }
953 return m_subscribedInstance->Set_Buffer_Size(receive_buffer_size, send_buffer_size);
954 }
955
956 static ThingsBoardSized *m_subscribedInstance;
957#endif // !THINGSBOARD_ENABLE_STL
958
959 IMQTT_Client& m_client = {}; // MQTT client instance.
960 size_t m_max_stack = {}; // Maximum stack size we allocate at once.
961 size_t m_request_id = {}; // Internal id used to differentiate which request should receive which response for certain API calls. Can send 4'294'967'296 requests before wrapping back to 0
962#if THINGSBOARD_ENABLE_STREAM_UTILS
963 size_t m_buffering_size = {}; // Buffering size used to serialize directly into client.
964#endif // THINGSBOARD_ENABLE_STREAM_UTILS
965#if THINGSBOARD_ENABLE_DYNAMIC
966 size_t m_max_response_size = {}; // Maximum size allocated on the heap to hold the Json data structure for received cloud response payload, prevents possible malicious payload allocaitng a lot of memory
967#endif // THINGSBOARD_ENABLE_DYNAMIC
968 IAPI_Container m_api_implementations = {}; // Can hold a pointer to all possible API implementations (Server side RPC, Client side RPC, Shared attribute update, Client-side or shared attribute request, Provision)
969};
970
971#if !THINGSBOARD_ENABLE_STL
972#if !THINGSBOARD_ENABLE_DYNAMIC
973template<size_t MaxResponse, size_t MaxEndpointsAmount, typename Logger>
975#else
976template<typename Logger>
978#endif // !THINGSBOARD_ENABLE_DYNAMIC
979#endif // !THINGSBOARD_ENABLE_STL
980
982
983#endif // ThingsBoard_h
char constexpr TOO_MANY_JSON_FIELDS[]
Definition: Constants.h:32
char constexpr JSON_SIZE_TO_SMALL[]
Definition: Constants.h:38
char constexpr UNABLE_TO_ALLOCATE_JSON[]
Definition: Constants.h:37
char constexpr UNABLE_TO_SERIALIZE_JSON[]
Definition: Constants.h:36
char constexpr UNABLE_TO_SERIALIZE[]
Definition: Constants.h:34
uint8_t constexpr DEFAULT_PAYLOAD_SIZE
Definition: Constants.h:20
uint16_t constexpr DEFAULT_MAX_STACK_SIZE
Definition: Constants.h:21
char constexpr CONNECT_FAILED[]
Definition: Constants.h:35
char constexpr MAX_SUBSCRIPTIONS_EXCEEDED[]
Definition: IAPI_Implementation.h:20
char constexpr ATTRIBUTE_TOPIC[]
Definition: IAPI_Implementation.h:29
char constexpr TELEMETRY_TOPIC[]
Definition: IAPI_Implementation.h:33
MQTT_Connection_Error
Possible error states the MQTT broker connection can be in, if connecting the device to the MQTT brok...
Definition: MQTT_Connection_Error.h:9
MQTT_Connection_State
Possible states the MQTT broker connection can be in.
Definition: MQTT_Connection_State.h:11
uint16_t constexpr DEFAULT_MQTT_PORT
Definition: ThingsBoard.h:17
char constexpr INVALID_BUFFER_SIZE[]
Definition: ThingsBoard.h:21
char constexpr PROV_ACCESS_TOKEN[]
Definition: ThingsBoard.h:18
char constexpr UNABLE_TO_DE_SERIALIZE_JSON[]
Definition: ThingsBoard.h:20
char constexpr UNABLE_TO_ALLOCATE_BUFFER[]
Definition: ThingsBoard.h:22
char constexpr DURATION_KEY[]
Definition: ThingsBoard.h:40
char constexpr SECRET_KEY[]
Definition: ThingsBoard.h:39
char constexpr MAX_ENDPOINTS_AMOUNT_TEMPLATE_NAME[]
Definition: ThingsBoard.h:23
char constexpr CLAIM_TOPIC[]
Definition: ThingsBoard.h:37
char constexpr API_SUBSCRIPTIONS[]
Definition: ThingsBoard.h:28
std::function< return_type(argument_types... arguments)> function
Callback signature.
Definition: Callback.h:34
Custom std::array or std::vector implementation that contains a partial vector-like interface impleme...
Definition: Container.h:29
size_type size() const
Gets the current amount of elements in the underlying data container.
Definition: Container.h:147
void insert(iterator position, InputIterator const &first, InputIterator const &last)
Copies all elements from the given start to exclusively the given end iterator into the underlying da...
Definition: Container.h:257
size_type constexpr capacity() const
Gets the maximum amount of elements that can be stored in the underlying data container.
Definition: Container.h:157
void push_back(const_reference element)
Appends the given element at the end of the underlying data container.
Definition: Container.h:230
iterator end()
Returns a iterator to one-past-the-last element of the underlying data container.
Definition: Container.h:209
static size_t distance(InputIterator const &first, InputIterator const &last)
Calculates the distance between two iterators.
Definition: Helper.h:85
static bool String_IsNull_Or_Empty(char const *str)
Returns wheter the given string is either a nullptr or is an empty string, meaning it only contains a...
Definition: Helper.cpp:27
static size_t Measure_Json(TSource const &source)
Calculates the total size of the string the serializeJson method would produce including the null end...
Definition: Helper.h:73
static size_t Calculate_Symbol_Occurences(uint8_t const *bytes, char symbol, uint32_t length)
Returns the amount of occurences of the given smybol in the given byte payload.
Definition: Helper.cpp:10
Base functionality required by all API implementation.
Definition: IAPI_Implementation.h:37
virtual void Initialize()=0
Method that allows to construct internal objects, after the required callback member methods have bee...
virtual void Process_Json_Response(char const *topic, JsonDocument const &data)=0
Process callback that will be called upon response arrival.
virtual void Set_Client_Callbacks(Callback< void, IAPI_Implementation & >::function subscribe_api_callback, Callback< bool, char const *const, JsonDocument const & >::function send_json_callback, Callback< bool, char const *const, char const *const >::function send_json_string_callback, Callback< bool, char const *const >::function subscribe_topic_callback, Callback< bool, char const *const >::function unsubscribe_topic_callback, Callback< uint16_t >::function get_receive_size_callback, Callback< uint16_t >::function get_send_size_callback, Callback< bool, uint16_t, uint16_t >::function set_buffer_size_callback, Callback< size_t * >::function get_request_id_callback)=0
Sets the underlying callbacks that are required for the different API Implementation to communicate w...
virtual API_Process_Type Get_Process_Type() const =0
Returns the way the server response should be processed.
virtual bool Is_Response_Topic_Matching(char const *topic) const =0
Compares received response topic and the topic this api implementation handles responses on,...
virtual void Process_Response(char const *topic, uint8_t *payload, uint32_t length)=0
Process callback that will be called upon response arrival.
MQTT Client interface that contains the method that a class that can be used to send and receive data...
Definition: IMQTT_Client.h:36
virtual void disconnect()=0
Force disconnects from the previously connected server and should release all used resources.
virtual bool loop()=0
Receives / sends any outstanding messages from and to the MQTT broker.
virtual bool connect(char const *client_id, char const *user_name, char const *password)=0
Connects to the previously with set_server configured server instance and port with the given credent...
virtual void subscribe_connection_state_changed_callback(Callback< void, MQTT_Connection_State, MQTT_Connection_Error >::function callback)=0
Sets the callback that is called, whenever the underlying state of our connection with the MQTT broke...
virtual MQTT_Connection_Error get_last_connection_error() const =0
Allows to deciper the reason for a failure, while attempting to establish a connection to the MQTT br...
virtual uint16_t get_receive_buffer_size()=0
Gets the previously set size of the internal buffer size meant for incoming MQTT data.
virtual void set_data_callback(Callback< void, char *, uint8_t *, unsigned int >::function callback)=0
Sets the callback that is called, if any message is received by the MQTT broker.
virtual bool unsubscribe(char const *topic)=0
Unsubscribes to previously subscribed MQTT messages of the given topic.
virtual bool set_buffer_size(uint16_t receive_buffer_size, uint16_t send_buffer_size)=0
Changes the size of the buffer for sent and received MQTT messages.
virtual MQTT_Connection_State get_connection_state() const =0
Get the current connection state to the server includes possible intermediate states between connecti...
virtual void set_server(char const *domain, uint16_t port)=0
Configures the server and port that the client should connect with over MQTT.
virtual bool publish(char const *topic, uint8_t const *payload, size_t const &length)=0
Sends the given payload over the previously established connection.
virtual uint16_t get_send_buffer_size()=0
Gets the previously set size of the internal buffer size meant for outgoing MQTT data.
virtual bool connected()=0
Returns our current connection status to MQTT, true meaning we are connected, false meaning we have b...
virtual bool subscribe(char const *topic)=0
Subscribes to MQTT message on the given topic, which will cause an internal callback to be called for...
virtual void set_connect_callback(Callback< void >::function callback)=0
Sets the callback that is called, if we have successfully established a connection with the MQTT brok...
Telemetry record class, allows to store different data using a common interface.
Definition: Telemetry.h:16
bool SerializeKeyValue(TSource &source) const
Serializes a key-value pair or only a value, depending on the constructor used.
Definition: Telemetry.h:82
Wrapper around any arbitrary MQTT Client implementing the IMQTT_Client interface, to allow connecting...
Definition: ThingsBoard.h:66
size_t const & Get_Maximum_Stack_Size() const
Returns the maximum amount of bytes that we want to allocate on the stack, before the memory is alloc...
Definition: ThingsBoard.h:182
bool Send_Telemetry_String(char const *json)
Send string containing json as telemetry data. See https://thingsboard.io/docs/user-guide/telemetry/ ...
Definition: ThingsBoard.h:539
bool Send_Attributes(InputIterator const &first, InputIterator const &last)
Send aggregated key-value pair as attribute data.
Definition: ThingsBoard.h:583
void Set_Maximum_Stack_Size(size_t const &max_stack_size)
Sets the maximum amount of bytes that we want to allocate on the stack, before the memory is allocate...
Definition: ThingsBoard.h:176
bool Send_Json_String(char const *topic, char const *json)
Sends key-value pairs from the given json string over the given topic.
Definition: ThingsBoard.h:398
bool Send_Json(char const *topic, JsonDocument const &source)
Sends key-value pairs from the given JsonDocument over the given topic.
Definition: ThingsBoard.h:342
bool connect(char const *host, char const *access_token=PROV_ACCESS_TOKEN, uint16_t port=DEFAULT_MQTT_PORT, char const *client_id=nullptr, char const *password=nullptr)
Connects to the given server instance and port with the given credentials.
Definition: ThingsBoard.h:287
bool Send_Attribute_Json(JsonDocument const &source)
Send key-value pairs as attribute data. See https://thingsboard.io/docs/user-guide/attribute/ for mor...
Definition: ThingsBoard.h:605
bool Claim_Request(size_t const &duration_ms, char const *secret_key=nullptr)
Send a claiming request for this device.
Definition: ThingsBoard.h:485
void Subscribe_Connection_State_Changed_Callback(Callback< void, MQTT_Connection_State, MQTT_Connection_Error >::function callback)
Sets the callback that is called, whenever the underlying state of our connection with the MQTT broke...
Definition: ThingsBoard.h:316
MQTT_Connection_Error Get_Last_Connection_Error()
Allows to deciper the reason for a failure, while attempting to establish a connection to the MQTT br...
Definition: ThingsBoard.h:311
bool Set_Buffer_Size(uint16_t receive_buffer_size, uint16_t send_buffer_size)
Changes the size of the buffer for sent and received MQTT messages.
Definition: ThingsBoard.h:230
ThingsBoardSized(IMQTT_Client &client, uint16_t receive_buffer_size=DEFAULT_PAYLOAD_SIZE, uint16_t send_buffer_size=DEFAULT_PAYLOAD_SIZE, size_t const &max_stack_size=DEFAULT_MAX_STACK_SIZE, Args const &... args)
Constructs a instance with the given network client that should be used to establish the connection t...
Definition: ThingsBoard.h:124
uint16_t Get_Receive_Buffer_Size()
Gets the previously set size of the internal buffer size meant for incoming MQTT data.
Definition: ThingsBoard.h:239
bool loop()
Receives / sends any outstanding messages from and to the MQTT broker.
Definition: ThingsBoard.h:321
bool Send_Attribute_Data(char const *key, T const &value)
Sends the given key-value pair as attribute data. See https://thingsboard.io/docs/user-guide/attribut...
Definition: ThingsBoard.h:563
IMQTT_Client & Get_Client()
Gets the registered underlying MQTT Client implementation.
Definition: ThingsBoard.h:165
MQTT_Connection_State Get_Connection_State()
Get the current connection state to the server includes possible intermediate states between connecti...
Definition: ThingsBoard.h:306
bool Send_Telemetry(InputIterator const &first, InputIterator const &last)
Send aggregated key-value pair as telemetry data.
Definition: ThingsBoard.h:526
void Subscribe_API_Implementations(InputIterator const &first, InputIterator const &last)
Subscribes the given API implementation.
Definition: ThingsBoard.h:447
uint16_t Get_Send_Buffer_Size()
Gets the previously set size of the internal buffer size meant for outgoing MQTT data.
Definition: ThingsBoard.h:244
void Subscribe_API_Implementation(IAPI_Implementation &api)
Subscribes the given API implementation.
Definition: ThingsBoard.h:421
void disconnect()
Force disconnects from the previously connected server and should release all used resources.
Definition: ThingsBoard.h:296
bool connected()
Returns our current connection status to MQTT, true meaning we are connected, false meaning we have b...
Definition: ThingsBoard.h:301
bool Send_Telemetry_Data(char const *key, T const &value)
Sends the given key-value pair as telemetry data. See https://thingsboard.io/docs/user-guide/telemetr...
Definition: ThingsBoard.h:506
bool Send_Attribute_String(char const *json)
Send string containing json as attribute data. See https://thingsboard.io/docs/user-guide/attribute/ ...
Definition: ThingsBoard.h:596
bool Send_Telemetry_Json(JsonDocument const &source)
Send key-value pairs as telemetry data. See https://thingsboard.io/docs/user-guide/telemetry/ for mor...
Definition: ThingsBoard.h:548
void Cleanup_Subscriptions()
Clears all currently subscribed callbacks and unsubscribed from all currently subscribed MQTT topics.
Definition: ThingsBoard.h:255