MySensors Library & Examples  2.3.2-62-ge298769
MQTTTranslate.h
1 
2 /* MQTTTranslate uses the ReconnectingMqttClient
3  https://github.com/fredilarsen/ReconnectingMqttClient
4  library to deliver PJON packets over TCP on local network (LAN) as a MQTT
5  protocol client.
6 
7  This strategy works in one of four modes.
8  The first two modes are for allowing a PJON bus via MQTT, the first mode is
9  "closed" and the second is "open" to use by non-PJON programs.
10  The last two modes are for behaving like MQTT devices normally do.
11 
12  * "Raw bus mode" will send the binary JSON packets delivered to a topic like
13  pjon/device45 (where 45 is a receiver device id). Each device
14  will subscribe to a topic with its own name and will receive packets like
15  from any other PJON strategy. This strategy requires that all senders and
16  receivers are linked with PJON for encoding/decoding, so other systems
17  are not easily connected.
18 
19  * "JSON bus mode" will send JSON packets with to, from and data, delivered
20  to a topic like pjon/device45 (where 45 is a receiver device id). Each
21  device will subscribe to a topic with its own name and will receive
22  packets like
23  {to:45,from:44,data:"message text sent from device 44 to device 45"}.
24 
25  * "Device mirror, translating" mode will not use JSON encapsulation of
26  values, and will publish to its own topic, not the receiver's. It will
27  publish to a "output" folder and subscribe to an "input" folder. An
28  outgoing packet with payload "P=44.1,T=22.0" would result in the topics
29  pjon/device44/output/temperature, with a value "44.1"
30  pjon/device44/output/pressure, with a value "22.0"
31  Likewise, a receiving an update of:
32  pjon/device44/input/setpoint, with a value "45"
33  would result in a packet with payload "S=45".
34  This mode supports a translation table to allow short names to be used
35  in packets while topic names are longer. For example "T" -> "temperature".
36  If no translation table is populated, the same names will be used in
37  the packets and the topics.
38 
39  * "Device mirror, direct" works like the first device mirror mode, but will
40  just pass the payload on without any translation, leaving the formatting
41  to the user. It will not split packets into separate topics but transfer
42  the packets as-is to one output topic and from one input topic:
43  pjon/device44/output
44  pjon/device44/input
45  The user sketch will have control of the format used, which can be
46  plain text like "P=44.1,T=22.0" or a JSON text.
47 
48  The "Translate" in the strategy name is because a translation table can be
49  used to translate PJON packet contents to MQTT topics and back. This is to
50  enable PJON packets to remain small ("t=44.3") between devices with limited
51  memory, while the MQTT packets are made more explicit ("temperature") to
52  support longer name syntax in external systems.
53 
54  The preprocessor define MQTTT_USE_MAC can be set when using one of the
55  MIRROR modes, to change the topic name from i.e. /pjon/device44/output to
56  pjon/AE7804FEA7D0/output. This can be useful to avoid setting a device id
57  and instead just use the MAC address as a unique subject identifier.
58 
59  Compliant with the PJON protocol layer specification v4.0
60  _____________________________________________________________________________
61 
62  MQTTTranslate strategy proposed and developed by Fred Larsen 07/12/2019
63 
64  Licensed under the Apache License, Version 2.0 (the "License");
65  you may not use this file except in compliance with the License.
66  You may obtain a copy of the License at
67 
68  http://www.apache.org/licenses/LICENSE-2.0
69 
70  Unless required by applicable law or agreed to in writing, software
71  distributed under the License is distributed on an "AS IS" BASIS,
72  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
73  See the License for the specific language governing permissions and
74  limitations under the License. */
75 
76 #pragma once
77 #include <PJONDefines.h>
78 #include <ReconnectingMqttClient.h>
79 #ifndef ARDUINO
80 #include <stdlib.h>
81 #endif
82 
83 #define MQTTT_DEFAULT_PORT 1883
84 #ifndef MQTTT_RESPONSE_TIMEOUT
85 #define MQTTT_RESPONSE_TIMEOUT (uint32_t) 10000
86 #endif
87 
88 // This is the maximum size of MQTT packets after translation
89 #ifndef MQTTT_BUFFER_SIZE
90 #define MQTTT_BUFFER_SIZE (uint32_t) PJON_PACKET_MAX_LENGTH
91 #endif
92 
93 // Max size of key and value in MQTTT_MODE_MIRROR_TRANSLATE mode
94 #ifndef MQTTT_KEY_SIZE
95 #define MQTTT_KEY_SIZE 15
96 #endif
97 #ifndef MQTTT_VALUE_SIZE
98 #define MQTTT_VALUE_SIZE 15
99 #endif
100 
101 #define MQTTT_MODE_BUS_RAW 0
102 #define MQTTT_MODE_BUS_JSON 1
103 #define MQTTT_MODE_MIRROR_TRANSLATE 2
104 #define MQTTT_MODE_MIRROR_DIRECT 3
105 
106 // Select which mode to use
107 #ifndef MQTTT_MODE
108 #define MQTTT_MODE MQTTT_MODE_BUS_RAW
109 #endif
110 
111 // The maximum number of keys to be translated in MIRROR_TRANSLATE mode
112 #ifndef MQTTT_TRANSLATION_TABLE_SIZE
113 #define MQTTT_TRANSLATION_TABLE_SIZE 5
114 #endif
115 
116 // Recommended receive time for this strategy, in microseconds
117 #ifndef MQTTT_RECEIVE_TIME
118 #define MQTTT_RECEIVE_TIME 0
119 #endif
120 
121 #if defined(MQTTT_USE_MAC) && ((MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE) || (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT))
122 #define MQTTT_MAC
123 #endif
124 
126 {
127  bool last_send_success = false;
128 
129  uint16_t incoming_packet_size = 0;
130  // TODO: Eliminate extra buffer -- is the on in the MqttClient not enough?
131  uint8_t packet_buffer[MQTTT_BUFFER_SIZE];
132  PJON_Packet_Info _packet_info; // Used for parsing incoming and outgoing packets
133 
134 #ifdef MQTTT_MAC
135  uint8_t mac[6];
136 
137  char *add_mac(char *p)
138  {
139  sprintf(p, "/%2X%2X%2X%2X%2X%2X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
140  p += strlen(p);
141  return p;
142  }
143 #endif
144 
145 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
146  char key[MQTTT_KEY_SIZE];
147  char value[MQTTT_VALUE_SIZE];
148  // Translation table
149  uint8_t translation_count = 0;
150  char pjon_keys[MQTTT_TRANSLATION_TABLE_SIZE][MQTTT_KEY_SIZE];
151  char mqtt_keys[MQTTT_TRANSLATION_TABLE_SIZE][MQTTT_KEY_SIZE];
152 
153  bool translate(char *key, uint8_t len, bool to_mqtt)
154  {
155  for (uint8_t i=0; i<translation_count; i++) {
156  if (strcmp(key, to_mqtt ? pjon_keys[i] : mqtt_keys[i]) == 0) {
157  strncpy(key, to_mqtt ? mqtt_keys[i] : pjon_keys[i], min(len, MQTTT_KEY_SIZE));
158  key[len-1] = 0;
159  return true;
160  }
161  }
162  return false;
163  }
164 #endif
165 
166  static void static_receiver(const char *topic, const uint8_t *payload, uint16_t len,
167  void *callback_object)
168  {
169  if (callback_object) {
170  ((MQTTTranslate*)callback_object)->receiver(topic, payload, len);
171  }
172  }
173 
174 #if (MQTTT_MODE == MQTTT_MODE_BUS_JSON)
175  bool find_next_json_key(const char **p, const char *last)
176  {
177  while (**p && *p < last && **p != '{' && **p != ',') {
178  (*p)++; // Find start brace or comma
179  }
180  if (!**p || *p >= last) {
181  return false;
182  }
183  (*p)++; // Skip start brace or comma {"to":44,"from":"45","data":"sgfsdf"}
184  while (**p && *p < last && **p != '\"') {
185  (*p)++; // Skip until double quote
186  }
187  if (**p != '\"' || *p >= last) {
188  return false;
189  }
190  (*p)++; // Point to first char of key
191  return true;
192  }
193 
194  bool find_next_json_value(const char **p, const char *last)
195  {
196  while (**p && *p < last && **p != ':') {
197  (*p)++; // Find colon
198  }
199  if (!**p || *p >= last) {
200  return false;
201  }
202  (*p)++; // Skip colon {"to":44,"from":"45","data":"sgfsdf"}
203  while (**p && *p < last && (**p == ' ' || **p == '\t')) {
204  (*p)++; // Skip potential whitespace
205  }
206  if (!**p || *p >= last) {
207  return false;
208  }
209  if (**p == '\"' && *p < last) {
210  (*p)++; // Skip leading quote if present
211  }
212  return true;
213  }
214 #endif
215 
216  void receiver(const char *topic, const uint8_t *payload, uint16_t len)
217  {
218 #if (MQTTT_MODE == MQTTT_MODE_BUS_RAW)
219  if(len <= MQTTT_BUFFER_SIZE) {
220  memcpy(packet_buffer, payload, len);
221  incoming_packet_size = len;
222  }
223 #endif
224 #if (MQTTT_MODE == MQTTT_MODE_BUS_JSON)
225  // Must assume that payload is text, unless UUencoding/base64encoding
226  // {"to": to_id, "from": from id, "data": "payload"}
227  uint8_t sender_id = 0, receiver_id = 0;
228  const char *p = (const char*)payload, *last = p+len-1;
229  bool found = false;
230  while (find_next_json_key(&p, last)) { // to, from or data
231  if (strncmp(p, "to\"", 3)==0 && find_next_json_value(&p, last)) {
232  receiver_id = atoi(p);
233  } else if (strncmp(p, "from\"", 5)==0 && find_next_json_value(&p, last)) {
234  sender_id = atoi(p);
235  } else if (strncmp(p, "data\"", 5)==0 && find_next_json_value(&p, last)) {
236  const char *p2 = p;
237  while (p2-(const char*)payload+1 < len && *p2 && *p2 != '\"') {
238  p2++;
239  }
240  if (*p2 == '\"') {
241  found = true;
242  payload = (uint8_t*)p;
243  len = p2 - p;
244  }
245  }
246  }
247  if (receiver_id == 0 || !found) {
248  return;
249  }
250  // Package the data message into a PJON packet
251  uint8_t h = header;
252  if (sender_id != 0) {
253  header |= PJON_TX_INFO_BIT;
254  }
255  incoming_packet_size = PJONTools::compose_packet(sender_id, bus_id, receiver_id,
256  bus_id, packet_buffer, payload, len, h);
257 #endif
258 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE || MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
259  uint8_t receiver_id = my_id;
260 #ifdef MQTTT_MAC
261  // Parse topic to get source device MAC
262  const char *device_start = strstr(topic, "/");
263 #else
264  // Parse topic to get source device id
265  const char *device_start = strstr(topic, "/device");
266  if (device_start) {
267  receiver_id = (uint8_t) atoi(&device_start[7]);
268  }
269 #endif
270 #endif
271 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
272  // Split into multiple topics (must assume a specific payload format to parse):
273  // "T=44.1,P=1.1" ->
274  // pjon/device44/output/temperature 44.1
275  // pjon/device44/output/pressure 1.1
276  if (device_start) {
277  // Find start of /input/
278  const char *start = (const char*)memchr(device_start+1, '/',
279  (const char*)payload-device_start+len-2);
280  // Find end of /input/
281  if (start) {
282  start = (const char*)memchr(start+1, '/', (const char*)payload-start+len-2);
283  }
284  if (start) { // Get variable name
285  uint8_t l = min(start - device_start + len -1, sizeof key -1);
286  strncpy(key, start+1, l);
287  key[l] = 0; // Null terminate
288  translate(key, sizeof key, false);
289  l = min(len, sizeof value-1);
290  strncpy(value, (const char*)payload, l);
291  value[l] = 0; // Null terminate
292  String s = key;
293  s += "=";
294  s += value;
295  // Package the key=value into a PJON packet
296  incoming_packet_size = PJONTools::compose_packet(receiver_id, bus_id, receiver_id,
297  bus_id, packet_buffer, s.c_str(), s.length()+1, header);
298  }
299  }
300  return;
301 #endif
302 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
303  // Package the payload as it is, into a PJON packet
304  incoming_packet_size = PJONTools::compose_packet(receiver_id, bus_id, receiver_id,
305  bus_id, packet_buffer, payload, len, header);
306 #endif
307  }
308 
309 public:
310  ReconnectingMqttClient mqttclient;
311  bool retain = false; // Leave message in broker for clients to receive at connect
312  uint8_t qos = 0; // Set this to 1 to have guaranteed delivery
313  String topic = "pjon"; // e.g. "pjon" without trailing slash
314  uint8_t my_id = PJON_NOT_ASSIGNED;
315  uint8_t bus_id[4] = {0,0,0,0};
316  uint8_t header = 0;
317  bool lowercase_topics = true;
318  bool subscribe_all = false; // Read input from all devices or just my own topic?
319 
320  void set_config(uint8_t id, const uint8_t bus_id[4], uint8_t header)
321  {
322  my_id = id;
323  if (bus_id != NULL) {
324  memcpy(this->bus_id, bus_id, 4);
325  }
326  this->header = header;
327  }
328  void set_qos(uint8_t qos)
329  {
330  this->qos = qos;
331  }
332  void set_retain(bool retain)
333  {
334  this->retain = retain;
335  }
336  void set_topic(const char *topic)
337  {
338  this->topic = topic;
339  }
340 
341 
342  /* Subscribe to input from all devices, not only this device?
343  This is needed to use router mode in PJON.
344  */
345 
346  void set_subscribe_all(bool yes)
347  {
348  subscribe_all = yes;
349  }
350 
351 
352  /* Set the broker's ip, the port used and the topic */
353 
354  void set_address(
355  const uint8_t server_ip[4],
356  const uint16_t server_port,
357  const char *client_id
358  )
359  {
360  mqttclient.set_address(server_ip, server_port, client_id);
361  }
362 
363 
364  /* Returns the suggested delay related to the attempts passed as parameter: */
365 
366  uint32_t back_off(uint8_t attempts)
367  {
368  return 10000;
369  };
370 
371 
372  /* Begin method, to be called on initialization */
373 
374  bool begin(uint8_t device_id = 0)
375  {
376 #ifdef MQTTT_MAC
377  PJON_GET_MAC(mac);
378 #endif
379  my_id = device_id;
380  mqttclient.set_receive_callback(static_receiver, this);
381  char *p = (char*)packet_buffer;
382  strcpy(p, topic.c_str());
383  p += strlen(p);
384 #ifdef MQTTT_MAC
385  // In this mode (MIRROR modes), I are only interested in my own input
386  p = add_mac(p); // Adds slash and MAC
387 #else
388  if (subscribe_all) {
389  strcpy(p, "/+"); // Pick up for all devices
390  p += 2;
391  } else { // For one single device id
392  strcpy(p, "/device");
393  p += strlen(p);
394  p += mqttclient.uint8toa(device_id, p); // Now like pjon/device44
395  }
396 #endif
397 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
398  strcat(p, "/input/+");
399 #endif
400 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
401  strcat(p, "/input"); // Only one input topic
402 #endif
403  mqttclient.subscribe((const char*)packet_buffer, qos);
404  return mqttclient.connect();
405  };
406 
407 
408  /* Check if the channel is free for transmission */
409 
410  bool can_start()
411  {
412  return mqttclient.connect();
413  };
414 
415 
416  /* Returns the maximum number of attempts for each transmission: */
417 
418  static uint8_t get_max_attempts()
419  {
420  return 0;
421  };
422 
423 
424  /* Returns the recommended receive time for this strategy: */
425 
426  static uint16_t get_receive_time()
427  {
428  return MQTTT_RECEIVE_TIME;
429  };
430 
431 
432  /* Handle a collision (empty because handled on Ethernet level): */
433 
434  void handle_collision() { };
435 
436 
437  /* Receive a frame: */
438 
439  uint16_t receive_frame(uint8_t *data, uint16_t max_length)
440  {
441  if (incoming_packet_size == 0) {
442  mqttclient.update();
443  }
444  if (incoming_packet_size > 0 && incoming_packet_size <= max_length) {
445  memcpy(data, packet_buffer, incoming_packet_size);
446  uint16_t len = incoming_packet_size;
447  incoming_packet_size = 0; // Flag as handled
448  return len;
449  }
450  return PJON_FAIL;
451  }
452 
453 
454  /* Receive byte response */
455 
456  uint16_t receive_response()
457  {
458  return last_send_success ? PJON_ACK : PJON_FAIL;
459  };
460 
461 
462  /* Send byte response to package transmitter */
463 
464  void send_response(uint8_t response) // Empty
465  {
466  };
467 
468 
469  /* Send a frame: */
470 
471  void send_frame(uint8_t *data, uint16_t length)
472  {
473  // Extract some info from the packet header
474  PJONTools::parse_header(data, _packet_info);
475 
476  // Compose topic
477  uint8_t len = strlen(topic.c_str());
478  if (len >= SMCTOPICSIZE) {
479  return;
480  }
481  strcpy(mqttclient.topic_buf(), topic.c_str());
482  char *p = &mqttclient.topic_buf()[len];
483 #ifdef MQTTT_MAC
484  if (p-mqttclient.topic_buf()+7+7 >= SMCTOPICSIZE) {
485  return;
486  }
487  p = add_mac(p);
488  strcpy(p, "/output");
489  p += strlen(p); // End of /output
490 #else
491  if (p-mqttclient.topic_buf()+7+3+7 >= SMCTOPICSIZE) {
492  return;
493  }
494  strcpy(p, "/device");
495  p += 7;
496 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE || MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
497  p += mqttclient.uint8toa(_packet_info.tx.id, p);
498  strcat(p, "/output"); // Like pjon/device44/output
499  p = &p[strlen(p)]; // End of /output
500 #else // One of the bus modes, publish to receiver device
501  mqttclient.uint8toa(_packet_info.rx.id, p);
502 #endif
503 #endif
504 #if (MQTTT_MODE != MQTTT_MODE_BUS_RAW)
505  uint8_t overhead = PJONTools::packet_overhead(_packet_info.header);
506  uint8_t crc_size = PJONTools::crc_overhead(_packet_info.header);
507 #endif
508 
509  // Re-compose packet in other modes than RAW
510 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
511  // Split into multiple topics (must assume a specific payload format to parse):
512  // "T=44.1,P=1.1" ->
513  // pjon/device44/output/temperature 44.1
514  // pjon/device44/output/pressure 1.1
515  uint8_t send_cnt = 0;
516  const char *d = (const char*)&data[overhead - crc_size], *v = d, *c, *e;
517  uint16_t plen = length - overhead;
518  while (v && (c = find_value_separator(v, d-v+plen))) {
519  if (e = (const char *)memchr(v, '=', (uint16_t)(c-v))) {
520  uint8_t l = min(e-v, sizeof key-1);
521  strncpy(key, v, l); // Complete topic like /pjon/device44/output/temperature
522  key[l] = 0;
523  l = min(c-e-1, sizeof value-1);
524  strncpy(value, e+1, l);
525  value[l] = 0;
526  if (!translate(key, sizeof key, true))
527  if (lowercase_topics) for (char *k=key; *k!=0; k++) {
528  *k = tolower(*k);
529  }
530  if (p-mqttclient.topic_buf()+1+strlen(key) >= SMCTOPICSIZE) {
531  return;
532  }
533  *p = '/';
534  strcpy(p+1, key);
535  send_cnt += mqttclient.publish(mqttclient.topic_buf(), (uint8_t*)value, strlen(value), retain, qos);
536  v = c-d >= plen ? NULL : c+1;
537  }
538  }
539  last_send_success = send_cnt > 0;
540  return; // We have sent multiple smaller packets, just return
541 #endif
542 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_DIRECT)
543  // Post just the payload as it is, to the output topic
544  data = &data[overhead - crc_size];
545  length -= overhead;
546 #endif
547 #if (MQTTT_MODE == MQTTT_MODE_BUS_JSON)
548  // Must assume that payload is text, unless UUencoding/base64encoding
549  // {"to": to_id, "from": from id, "data": "payload"}
550  p = (char *) packet_buffer;
551  if (6+3+8+3+9+payload_len+2 >= MQTTT_BUFFER_SIZE) {
552  return;
553  }
554  strcpy(p, "{\"to\":");
555  p += 6;
556  p += mqttclient.uint8toa(_packet_info.rx.id, p);
557  strcpy(p, ",\"from\":");
558  p+= 8;
559  p += mqttclient.uint8toa(_packet_info.tx.id, p);
560  strcpy(p, ",\"data\":\"");
561  p+= 9;
562  uint8_t payload_len = length - overhead;
563  strncpy(p, (const char*)&data[overhead - crc_size], payload_len);
564  p[payload_len] = 0;
565  p += strlen(p);
566  strcpy(p, "\"}");
567  p += 2;
568  data = packet_buffer;
569  length = ((uint8_t*)p - packet_buffer);
570 #endif
571 
572  // Publish
573  last_send_success = mqttclient.publish(mqttclient.topic_buf(), data, length, retain, qos);
574  };
575 
576  const char *find_value_separator(const char *value, uint16_t len)
577  {
578  // This does the job of a strchr but accepting that null-terminator may be missing
579  const char *p = value;
580  while (p != NULL && (p-value < len) && *p != ',' && *p != 0) {
581  p++;
582  }
583  return p;
584  }
585 
586 #if (MQTTT_MODE == MQTTT_MODE_MIRROR_TRANSLATE)
587  bool add_translation(const char *pjon_key, const char *mqtt_key)
588  {
589  if (translation_count >= MQTTT_TRANSLATION_TABLE_SIZE) {
590  return false;
591  }
592  strncpy(pjon_keys[translation_count], pjon_key, MQTTT_KEY_SIZE);
593  pjon_keys[translation_count][MQTTT_KEY_SIZE-1] = 0;
594  strncpy(mqtt_keys[translation_count], mqtt_key, MQTTT_KEY_SIZE);
595  mqtt_keys[translation_count][MQTTT_KEY_SIZE-1] = 0;
596  for (char *p=mqtt_keys[translation_count]; *p!=0; p++) {
597  *p = tolower(*p);
598  }
599  translation_count++;
600  return true;
601  }
602 #endif
603 };
data
char data[MAX_PAYLOAD_SIZE+1]
Buffer for raw payload data.
Definition: MyMessage.h:654
PJON_Packet_Info
Definition: PJONDefines.h:207
last
uint8_t last
8 bit - Id of last node this message passed
Definition: MyMessage.h:334
min
#define min(a, b)
min
Definition: MySensors.h:100
MQTTTranslate
Definition: MQTTTranslate.h:125