diff --git a/libraries/PubSubClient/CHANGES.txt b/libraries/PubSubClient/CHANGES.txt index 8688011d9d1..5202ddab496 100755 --- a/libraries/PubSubClient/CHANGES.txt +++ b/libraries/PubSubClient/CHANGES.txt @@ -1,7 +1,24 @@ +2.3 + * Add publish(topic,payload,retained) function + +2.2 + * Change code layout to match Arduino Library reqs + +2.1 + * Add MAX_TRANSFER_SIZE def to chunk messages if needed + * Reject topic/payloads that exceed MQTT_MAX_PACKET_SIZE + +2.0 + * Add (and default to) MQTT 3.1.1 support + * Fix PROGMEM handling for Intel Galileo/ESP8266 + * Add overloaded constructors for convenience + * Add chainable setters for server/callback/client/stream + * Add state function to return connack return code + 1.9 * Do not split MQTT packets over multiple calls to _client->write() * API change: All constructors now require an instance of Client - to be passed in. + to be passed in. * Fixed example to match 1.8 api changes - dpslwk * Added username/password support - WilHall * Added publish_P - publishes messages from PROGMEM - jobytaffey @@ -10,7 +27,7 @@ * KeepAlive interval is configurable in PubSubClient.h * Maximum packet size is configurable in PubSubClient.h * API change: Return boolean rather than int from various functions - * API change: Length parameter in message callback changed + * API change: Length parameter in message callback changed from int to unsigned int * Various internal tidy-ups around types 1.7 @@ -44,4 +61,3 @@ * The maximum message size, including header, is 128 bytes * The keepalive interval is set to 30 seconds * No support for Will messages - diff --git a/libraries/PubSubClient/LICENSE.txt b/libraries/PubSubClient/LICENSE.txt index f512cba28b9..217df35ccab 100755 --- a/libraries/PubSubClient/LICENSE.txt +++ b/libraries/PubSubClient/LICENSE.txt @@ -1,4 +1,4 @@ -Copyright (c) 2008-2012 Nicholas O'Leary +Copyright (c) 2008-2015 Nicholas O'Leary Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/libraries/PubSubClient/PubSubClient.cpp b/libraries/PubSubClient/PubSubClient.cpp index fbbbf065d1c..5288cfed941 100755 --- a/libraries/PubSubClient/PubSubClient.cpp +++ b/libraries/PubSubClient/PubSubClient.cpp @@ -1,338 +1,571 @@ /* - PubSubClient.cpp - A simple client for MQTT. - Nicholas O'Leary + PubSubClient.cpp - A simple client for MQTT. + Nick O'Leary http://knolleary.net */ #include "PubSubClient.h" -#include +#include "Arduino.h" + +PubSubClient::PubSubClient() { + this->_state = MQTT_DISCONNECTED; + this->_client = NULL; + this->stream = NULL; + setCallback(NULL); +} PubSubClient::PubSubClient(Client& client) { - this->_client = &client; + this->_state = MQTT_DISCONNECTED; + setClient(client); + this->stream = NULL; } -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { - this->_client = &client; - this->callback = callback; - this->ip = ip; - this->port = port; +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setCallback(callback); + setClient(client); + setStream(stream); } -PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { - this->_client = &client; - this->callback = callback; - this->domain = domain; - this->port = port; +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setCallback(callback); + setClient(client); + setStream(stream); } -boolean PubSubClient::connect(char *id) { - return connect(id,NULL,NULL,0,0,0,0); +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + setStream(stream); } -boolean PubSubClient::connect(char *id, char *user, char *pass) { - return connect(id,user,pass,0,0,0,0); +boolean PubSubClient::connect(const char *id) { + return connect(id,NULL,NULL,0,0,0,0); } -boolean PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) -{ - return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { + return connect(id,user,pass,0,0,0,0); } -boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { - if (!connected()) { - int result = 0; - - if (domain != NULL) { - result = _client->connect(this->domain, this->port); - } else { - result = _client->connect(this->ip, this->port); - } - if (result) { - nextMsgId = 1; - uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - unsigned int j; - for (j = 0;j<9;j++) { - buffer[length++] = d[j]; - } +boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + if (!connected()) { + int result = 0; + + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + if (result) { + nextMsgId = 1; + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + unsigned int j; + +#if MQTT_VERSION == MQTT_VERSION_3_1 + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 9 +#elif MQTT_VERSION == MQTT_VERSION_3_1_1 + uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 7 +#endif + for (j = 0;j>1); + } + } - uint8_t v; - if (willTopic) { - v = 0x06|(willQos<<3)|(willRetain<<5); - } else { - v = 0x02; - } + buffer[length++] = v; - if(user != NULL) { - v = v|0x80; + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } - if(pass != NULL) { - v = v|(0x80>>1); + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } } - } - - buffer[length++] = v; - - buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - length = writeString(id,buffer,length); - if (willTopic) { - length = writeString(willTopic,buffer,length); - length = writeString(willMessage,buffer,length); - } - - if(user != NULL) { - length = writeString(user,buffer,length); - if(pass != NULL) { - length = writeString(pass,buffer,length); + + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + while (!_client->available()) { + unsigned long t = millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } } - } - - write(MQTTCONNECT,buffer,length-5); - - lastInActivity = lastOutActivity = millis(); - - while (!_client->available()) { - unsigned long t = millis(); - if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { - _client->stop(); - return false; + uint8_t llen; + uint16_t len = readPacket(&llen); + + if (len == 4) { + if (buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + _state = MQTT_CONNECTED; + return true; + } else { + _state = buffer[3]; + } } - } - uint16_t len = readPacket(); - - if (len == 4 && buffer[3] == 0) { - lastInActivity = millis(); - pingOutstanding = false; - return true; - } - } - _client->stop(); - } - return false; + _client->stop(); + } else { + _state = MQTT_CONNECT_FAILED; + } + return false; + } + return true; } uint8_t PubSubClient::readByte() { - while(!_client->available()) { - Serial.println("."); - } - return _client->read(); -} - -uint16_t PubSubClient::readPacket() { - uint16_t len = 0; - buffer[len++] = readByte(); - uint8_t multiplier = 1; - uint16_t length = 0; - uint8_t digit = 0; - do { - digit = readByte(); - buffer[len++] = digit; - length += (digit & 127) * multiplier; - multiplier *= 128; - } while ((digit & 128) != 0); - - for (uint16_t i = 0;i MQTT_KEEPALIVE * 1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE * 1000UL)) { - if (pingOutstanding) { - _client->stop(); - return false; - } else { - buffer[0] = MQTTPINGREQ; - buffer[1] = 0; - _client->write(buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; - } - } - if (_client->available()) { - uint16_t len = readPacket(); - if (len > 0) { - lastInActivity = t; - uint8_t type = buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (buffer[2]<<8)+buffer[3]; - char topic[tl+1]; - for (uint16_t i=0;iwrite(buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; + while(!_client->available()) {} + return _client->read(); +} + +uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { + uint16_t len = 0; + buffer[len++] = readByte(); + bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint16_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint8_t start = 0; + + do { + digit = readByte(); + buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier *= 128; + } while ((digit & 128) != 0); + *lengthLength = len-1; + + if (isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing + buffer[len++] = readByte(); + buffer[len++] = readByte(); + skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; + start = 2; + if (buffer[0]&MQTTQOS1) { + // skip message id + skip += 2; + } + } + + for (uint16_t i = start;istream) { + if (isPublish && len-*lengthLength-2>skip) { + this->stream->write(digit); + } + } + if (len < MQTT_MAX_PACKET_SIZE) { + buffer[len] = digit; + } + len++; + } + + if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { + len = 0; // This will cause the packet to be ignored. + } + + return len; +} + +boolean PubSubClient::loop() { + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; + char topic[tl+1]; + for (uint16_t i=0;i0 + if ((buffer[0]&0x06) == MQTTQOS1) { + msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; + payload = buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + + buffer[0] = MQTTPUBACK; + buffer[1] = 2; + buffer[2] = (msgId >> 8); + buffer[3] = (msgId & 0xFF); + _client->write(buffer,4); + lastOutActivity = t; + + } else { + payload = buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } + } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client->write(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } } - } - } - return true; - } - return false; -} - -boolean PubSubClient::publish(char* topic, char* payload) { - return publish(topic,(uint8_t*)payload,strlen(payload),false); -} - -boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength) { - return publish(topic, payload, plength, false); -} - -boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, boolean retained) { - if (connected()) { - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - length = writeString(topic,buffer,length); - uint16_t i; - for (i=0;i 0) { - digit |= 0x80; - } - buffer[pos++] = digit; - llen++; - } while(len>0); - - pos = writeString(topic,buffer,pos); - - rc += _client->write(buffer,pos); - - for (i=0;iwrite(*payload + i); - } - - lastOutActivity = millis(); - return rc == len + 1 + plength; + } + return true; + } + return false; +} + +boolean PubSubClient::publish(const char* topic, const char* payload) { + return publish(topic,(const uint8_t*)payload,strlen(payload),false); +} + +boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { + return publish(topic,(const uint8_t*)payload,strlen(payload),retained); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { + return publish(topic, payload, plength, false); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { + if (connected()) { + if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) { + // Too long + return false; + } + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + length = writeString(topic,buffer,length); + uint16_t i; + for (i=0;i 0) { + digit |= 0x80; + } + buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,buffer,pos); + + rc += _client->write(buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + + return rc == tlen + 4 + plength; } boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { - uint8_t lenBuf[4]; - uint8_t llen = 0; - uint8_t digit; - uint8_t pos = 0; - uint8_t rc; - uint8_t len = length; - do { - digit = len % 128; - len = len / 128; - if (len > 0) { - digit |= 0x80; - } - lenBuf[pos++] = digit; - llen++; - } while(len>0); - - buf[4-llen] = header; - for (int i=0;iwrite(buf+(4-llen),length+1+llen); - - lastOutActivity = millis(); - return (rc == 1+llen+length); -} - - -boolean PubSubClient::subscribe(char* topic) { - if (connected()) { - // Leave room in the buffer for header and variable length field - uint16_t length = 7; - nextMsgId++; - if (nextMsgId == 0) { - nextMsgId = 1; - } - buffer[0] = (nextMsgId >> 8); - buffer[1] = (nextMsgId & 0xFF); - length = writeString(topic, buffer,length); - buffer[length++] = 0; // Only do QoS 0 subs - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); - } - return false; + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint8_t rc; + uint16_t len = length; + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;i 0) && result) { + bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining; + rc = _client->write(writeBuf,bytesToWrite); + result = (rc == bytesToWrite); + bytesRemaining -= rc; + writeBuf += rc; + } + return result; +#else + rc = _client->write(buf+(4-llen),length+1+llen); + lastOutActivity = millis(); + return (rc == 1+llen+length); +#endif +} + +boolean PubSubClient::subscribe(const char* topic) { + return subscribe(topic, 0); +} + +boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { + if (qos < 0 || qos > 1) { + return false; + } + if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { + // Too long + return false; + } + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, buffer,length); + buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +boolean PubSubClient::unsubscribe(const char* topic) { + if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { + // Too long + return false; + } + if (connected()) { + uint16_t length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; } void PubSubClient::disconnect() { - buffer[0] = MQTTDISCONNECT; - buffer[1] = 0; - _client->write(buffer,2); - _client->stop(); - lastInActivity = lastOutActivity = millis(); + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client->write(buffer,2); + _state = MQTT_DISCONNECTED; + _client->stop(); + lastInActivity = lastOutActivity = millis(); } -uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) { - char* idp = string; - uint16_t i = 0; - pos += 2; - while (*idp) { - buf[pos++] = *idp++; - i++; - } - buf[pos-i-2] = (i >> 8); - buf[pos-i-1] = (i & 0xFF); - return pos; +uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { + const char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; } boolean PubSubClient::connected() { - int rc = (int)_client->connected(); - if (!rc);// _client->stop(); - return rc; + boolean rc; + if (_client == NULL ) { + rc = false; + } else { + rc = (int)_client->connected(); + if (!rc) { + if (this->_state == MQTT_CONNECTED) { + this->_state = MQTT_CONNECTION_LOST; + _client->flush(); + _client->stop(); + } + } + } + return rc; +} + +PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { + IPAddress addr(ip[0],ip[1],ip[2],ip[3]); + return setServer(addr,port); +} + +PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { + this->ip = ip; + this->port = port; + this->domain = NULL; + return *this; +} + +PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { + this->domain = domain; + this->port = port; + return *this; +} + +PubSubClient& PubSubClient::setCallback(void(*callback)(char*,uint8_t*,unsigned int)){ + this->callback = callback; + return *this; } +PubSubClient& PubSubClient::setClient(Client& client){ + this->_client = &client; + return *this; +} + +PubSubClient& PubSubClient::setStream(Stream& stream){ + this->stream = &stream; + return *this; +} + +int PubSubClient::state() { + return this->_state; +} diff --git a/libraries/PubSubClient/PubSubClient.h b/libraries/PubSubClient/PubSubClient.h index 159e79c06f1..ece2d4befa7 100755 --- a/libraries/PubSubClient/PubSubClient.h +++ b/libraries/PubSubClient/PubSubClient.h @@ -1,6 +1,6 @@ /* PubSubClient.h - A simple client for MQTT. - Nicholas O'Leary + Nick O'Leary http://knolleary.net */ @@ -8,7 +8,16 @@ #define PubSubClient_h #include +#include "IPAddress.h" #include "Client.h" +#include "Stream.h" + +#define MQTT_VERSION_3_1 3 +#define MQTT_VERSION_3_1_1 4 + +// MQTT_VERSION : Pick the version +//#define MQTT_VERSION MQTT_VERSION_3_1 +#define MQTT_VERSION MQTT_VERSION_3_1_1 // MQTT_MAX_PACKET_SIZE : Maximum packet size #define MQTT_MAX_PACKET_SIZE 128 @@ -16,7 +25,23 @@ // MQTT_KEEPALIVE : keepAlive interval in Seconds #define MQTT_KEEPALIVE 15 -#define MQTTPROTOCOLVERSION 3 +// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client +// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to +// pass the entire MQTT packet in each write call. +//#define MQTT_MAX_TRANSFER_SIZE 80 + +// Possible values for client.state() +#define MQTT_CONNECTION_TIMEOUT -4 +#define MQTT_CONNECTION_LOST -3 +#define MQTT_CONNECT_FAILED -2 +#define MQTT_DISCONNECTED -1 +#define MQTT_CONNECTED 0 +#define MQTT_CONNECT_BAD_PROTOCOL 1 +#define MQTT_CONNECT_BAD_CLIENT_ID 2 +#define MQTT_CONNECT_UNAVAILABLE 3 +#define MQTT_CONNECT_BAD_CREDENTIALS 4 +#define MQTT_CONNECT_UNAUTHORIZED 5 + #define MQTTCONNECT 1 << 4 // Client request to connect to Server #define MQTTCONNACK 2 << 4 // Connect Acknowledgment #define MQTTPUBLISH 3 << 4 // Publish message @@ -37,6 +62,8 @@ #define MQTTQOS1 (1 << 1) #define MQTTQOS2 (2 << 1) +#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*,uint8_t*,unsigned int) + class PubSubClient { private: Client* _client; @@ -45,30 +72,55 @@ class PubSubClient { unsigned long lastOutActivity; unsigned long lastInActivity; bool pingOutstanding; - void (*callback)(char*,uint8_t*,unsigned int); - uint16_t readPacket(); + MQTT_CALLBACK_SIGNATURE; + uint16_t readPacket(uint8_t*); uint8_t readByte(); boolean write(uint8_t header, uint8_t* buf, uint16_t length); - uint16_t writeString(char* string, uint8_t* buf, uint16_t pos); - uint8_t *ip; - char* domain; + uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); + IPAddress ip; + const char* domain; uint16_t port; + Stream* stream; + int _state; public: + PubSubClient(); PubSubClient(Client& client); - PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); - PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); - boolean connect(char *); - boolean connect(char *, char *, char *); - boolean connect(char *, char *, uint8_t, uint8_t, char *); - boolean connect(char *, char *, char *, char *, uint8_t, uint8_t, char*); + PubSubClient(IPAddress, uint16_t, Client& client); + PubSubClient(IPAddress, uint16_t, Client& client, Stream&); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, Client& client); + PubSubClient(uint8_t *, uint16_t, Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(const char*, uint16_t, Client& client); + PubSubClient(const char*, uint16_t, Client& client, Stream&); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + + PubSubClient& setServer(IPAddress ip, uint16_t port); + PubSubClient& setServer(uint8_t * ip, uint16_t port); + PubSubClient& setServer(const char * domain, uint16_t port); + PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); + PubSubClient& setClient(Client& client); + PubSubClient& setStream(Stream& stream); + + boolean connect(const char* id); + boolean connect(const char* id, const char* user, const char* pass); + boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); void disconnect(); - boolean publish(char *, char *); - boolean publish(char *, uint8_t *, unsigned int); - boolean publish(char *, uint8_t *, unsigned int, boolean); - boolean publish_P(char *, uint8_t *, unsigned int, boolean); - boolean subscribe(char *); - boolean poll(); + boolean publish(const char* topic, const char* payload); + boolean publish(const char* topic, const char* payload, boolean retained); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean subscribe(const char* topic); + boolean subscribe(const char* topic, uint8_t qos); + boolean unsubscribe(const char* topic); + boolean loop(); boolean connected(); + int state(); }; diff --git a/libraries/PubSubClient/examples/MQTTAutWiFi/MQTTAutWiFi.ino b/libraries/PubSubClient/examples/MQTTAutWiFi/MQTTAutWiFi.ino deleted file mode 100644 index fab28d4ef83..00000000000 --- a/libraries/PubSubClient/examples/MQTTAutWiFi/MQTTAutWiFi.ino +++ /dev/null @@ -1,108 +0,0 @@ -/* - Basic MQTT example - - - connects to an MQTT server - - publishes "hello world" to the topic "outTopic" - - subscribes to the topic "inTopic" -*/ - -#include -#include -#include - -// your network name also called SSID -char ssid[] = "energia1"; -// your network password -char password[] = "launch pad"; -// MQTTServer to use -char server[] = "iot.eclipse.org"; - -void callback(char* topic, byte* payload, unsigned int length) { - Serial.print("Received message for topic "); - Serial.print(topic); - Serial.print("with length "); - Serial.println(length); - Serial.println("Message:"); - Serial.write(payload, length); - Serial.println(); -} - -WiFiClient wifiClient; -PubSubClient client(server, 1883, callback, wifiClient); - -void setup() -{ - Serial.begin(115200); - - // Start Ethernet with the build in MAC Address - // attempt to connect to Wifi network: - Serial.print("Attempting to connect to Network named: "); - // print the network name (SSID); - Serial.println(ssid); - // Connect to WPA/WPA2 network. Change this line if using open or WEP network: - WiFi.begin(ssid, password); - while ( WiFi.status() != WL_CONNECTED) { - // print dots while we wait to connect - Serial.print("."); - delay(300); - } - - Serial.println("\nYou're connected to the network"); - Serial.println("Waiting for an ip address"); - - while (WiFi.localIP() == INADDR_NONE) { - // print dots while we wait for an ip addresss - Serial.print("."); - delay(300); - } - - Serial.println("\nIP Address obtained"); - // We are connected and have an IP address. - // Print the WiFi status. - printWifiStatus(); -} - -void loop() -{ - // Reconnect if the connection was lost - if (!client.connected()) { - Serial.println("Disconnected. Reconnecting...."); - - if(!client.connect("energiaClient", "testuser", "testpass")) { - Serial.println("Connection failed"); - } else { - Serial.println("Connection success"); - if(client.subscribe("inTopic")) { - Serial.println("Subscription successfull"); - } - } - } - - if(client.publish("outTopic","hello world")) { - Serial.println("Publish success"); - } else { - Serial.println("Publish failed"); - } - - // Check if any message were received - // on the topic we subsrcived to - client.poll(); - delay(1000); -} - -void printWifiStatus() { - // print the SSID of the network you're attached to: - Serial.print("SSID: "); - Serial.println(WiFi.SSID()); - - // print your WiFi IP address: - IPAddress ip = WiFi.localIP(); - Serial.print("IP Address: "); - Serial.println(ip); - - // print the received signal strength: - long rssi = WiFi.RSSI(); - Serial.print("signal strength (RSSI):"); - Serial.print(rssi); - Serial.println(" dBm"); -} diff --git a/libraries/PubSubClient/examples/MQTTAuthEthernet/MQTTAuthEthernet.ino b/libraries/PubSubClient/examples/MQTTAuthEthernet/MQTTAuthEthernet.ino deleted file mode 100644 index 65c8f6ced93..00000000000 --- a/libraries/PubSubClient/examples/MQTTAuthEthernet/MQTTAuthEthernet.ino +++ /dev/null @@ -1,67 +0,0 @@ -/* - Basic MQTT example - - - connects to an MQTT server - - publishes "hello world" to the topic "outTopic" - - subscribes to the topic "inTopic" -*/ - -#include -#include -#include - -// MQTTServer to use -char server[] = "iot.eclipse.org"; - -void callback(char* topic, byte* payload, unsigned int length) { - Serial.print("Received message for topic "); - Serial.print(topic); - Serial.print("with length "); - Serial.println(length); - Serial.println("Message:"); - Serial.write(payload, length); - Serial.println(); -} - -EthernetClient ethClient; -PubSubClient client(server, 1883, callback, ethClient); - -void setup() -{ - Serial.begin(115200); - - // Start Ethernet with the build in MAC Address - Ethernet.begin(0); - - Serial.print("My IP address: "); - Serial.println(Ethernet.localIP()); -} - -void loop() -{ - // Reconnect if the connection was lost - if (!client.connected()) { - Serial.println("Disconnected. Reconnecting...."); - - if(!client.connect("energiaClient", "testuser", "testpass")) { - Serial.println("Connection failed"); - } else { - Serial.println("Connection success"); - if(client.subscribe("inTopic")) { - Serial.println("Subscription successfull"); - } - } - } - - if(client.publish("outTopic","hello world")) { - Serial.println("Publish success"); - } else { - Serial.println("Publish failed"); - } - - // Check if any message were received - // on the topic we subsrcived to - client.poll(); - delay(1000); -} - diff --git a/libraries/PubSubClient/examples/MQTTBasicEthernet/MQTTBasicEthernet.ino b/libraries/PubSubClient/examples/MQTTBasicEthernet/MQTTBasicEthernet.ino deleted file mode 100644 index 8a39a827931..00000000000 --- a/libraries/PubSubClient/examples/MQTTBasicEthernet/MQTTBasicEthernet.ino +++ /dev/null @@ -1,67 +0,0 @@ -/* - Basic MQTT example - - - connects to an MQTT server - - publishes "hello world" to the topic "outTopic" - - subscribes to the topic "inTopic" -*/ - -#include -#include -#include - -// MQTTServer to use -char server[] = "iot.eclipse.org"; - -void callback(char* topic, byte* payload, unsigned int length) { - Serial.print("Received message for topic "); - Serial.print(topic); - Serial.print("with length "); - Serial.println(length); - Serial.println("Message:"); - Serial.write(payload, length); - Serial.println(); -} - -EthernetClient ethClient; -PubSubClient client(server, 1883, callback, ethClient); - -void setup() -{ - Serial.begin(115200); - - // Start Ethernet with the build in MAC Address - Ethernet.begin(0); - - Serial.print("My IP address: "); - Serial.println(Ethernet.localIP()); -} - -void loop() -{ - // Reconnect if the connection was lost - if (!client.connected()) { - Serial.println("Disconnected. Reconnecting...."); - - if(!client.connect("energiaClient")) { - Serial.println("Connection failed"); - } else { - Serial.println("Connection success"); - if(client.subscribe("inTopic")) { - Serial.println("Subscription successfull"); - } - } - } - - if(client.publish("outTopic","hello world")) { - Serial.println("Publish success"); - } else { - Serial.println("Publish failed"); - } - - // Check if any message were received - // on the topic we subsrcived to - client.poll(); - delay(1000); -} - diff --git a/libraries/PubSubClient/examples/MQTTBasicWiFi/MQTTBasicWiFi.ino b/libraries/PubSubClient/examples/MQTTBasicWiFi/MQTTBasicWiFi.ino deleted file mode 100644 index e797ee6daf7..00000000000 --- a/libraries/PubSubClient/examples/MQTTBasicWiFi/MQTTBasicWiFi.ino +++ /dev/null @@ -1,108 +0,0 @@ -/* - Basic MQTT example - - - connects to an MQTT server - - publishes "hello world" to the topic "outTopic" - - subscribes to the topic "inTopic" -*/ - -#include -#include -#include - -// your network name also called SSID -char ssid[] = "energia1"; -// your network password -char password[] = "launch pad"; -// MQTTServer to use -char server[] = "iot.eclipse.org"; - -void callback(char* topic, byte* payload, unsigned int length) { - Serial.print("Received message for topic "); - Serial.print(topic); - Serial.print("with length "); - Serial.println(length); - Serial.println("Message:"); - Serial.write(payload, length); - Serial.println(); -} - -WiFiClient wifiClient; -PubSubClient client(server, 1883, callback, wifiClient); - -void setup() -{ - Serial.begin(115200); - - // Start Ethernet with the build in MAC Address - // attempt to connect to Wifi network: - Serial.print("Attempting to connect to Network named: "); - // print the network name (SSID); - Serial.println(ssid); - // Connect to WPA/WPA2 network. Change this line if using open or WEP network: - WiFi.begin(ssid, password); - while ( WiFi.status() != WL_CONNECTED) { - // print dots while we wait to connect - Serial.print("."); - delay(300); - } - - Serial.println("\nYou're connected to the network"); - Serial.println("Waiting for an ip address"); - - while (WiFi.localIP() == INADDR_NONE) { - // print dots while we wait for an ip addresss - Serial.print("."); - delay(300); - } - - Serial.println("\nIP Address obtained"); - // We are connected and have an IP address. - // Print the WiFi status. - printWifiStatus(); -} - -void loop() -{ - // Reconnect if the connection was lost - if (!client.connected()) { - Serial.println("Disconnected. Reconnecting...."); - - if(!client.connect("energiaClient")) { - Serial.println("Connection failed"); - } else { - Serial.println("Connection success"); - if(client.subscribe("inTopic")) { - Serial.println("Subscription successfull"); - } - } - } - - if(client.publish("outTopic","hello world")) { - Serial.println("Publish success"); - } else { - Serial.println("Publish failed"); - } - - // Check if any message were received - // on the topic we subsrcived to - client.poll(); - delay(1000); -} - -void printWifiStatus() { - // print the SSID of the network you're attached to: - Serial.print("SSID: "); - Serial.println(WiFi.SSID()); - - // print your WiFi IP address: - IPAddress ip = WiFi.localIP(); - Serial.print("IP Address: "); - Serial.println(ip); - - // print the received signal strength: - long rssi = WiFi.RSSI(); - Serial.print("signal strength (RSSI):"); - Serial.print(rssi); - Serial.println(" dBm"); -} diff --git a/libraries/PubSubClient/examples/MQTTPublishInCallbackEthernet/MQTTPublishInCallbackEthernet.ino b/libraries/PubSubClient/examples/MQTTPublishInCallbackEthernet/MQTTPublishInCallbackEthernet.ino deleted file mode 100644 index 19d1152efad..00000000000 --- a/libraries/PubSubClient/examples/MQTTPublishInCallbackEthernet/MQTTPublishInCallbackEthernet.ino +++ /dev/null @@ -1,62 +0,0 @@ -/* - Basic MQTT example - - - connects to an MQTT server - - publishes "hello world" to the topic "outTopic" - - subscribes to the topic "inTopic" -*/ - -#include -#include -#include - -char server[] = "iot.eclipse.org"; - -byte buffer[128]; - -EthernetClient ethClient; -PubSubClient client(server, 1883, callback, ethClient); - -void callback(char* topic, byte* payload, unsigned int length) { - Serial.println("Got a message, publishing it"); - // If length is larger than 128, - // then make sure we do not write more then our buffer can handle - if(length > 128) length = 128; - - memcpy(buffer, payload, length); - client.publish("outTopic", buffer, length); -} - -void setup() -{ - Serial.begin(115200); - - // Start Ethernet with the build in MAC Address - Ethernet.begin(0); - - IPAddress ip = Ethernet.localIP(); - Serial.print("IP Address: "); - Serial.println(ip); - -} - -void loop() -{ - // Reconnect if the connection was lost - if (!client.connected()) { - Serial.println("Disconnected. Reconnecting...."); - - if(!client.connect("energiaClient")) { - Serial.println("Connection failed"); - } else { - Serial.println("Connection success"); - if(client.subscribe("inTopic")) { - Serial.println("Subscription successfull"); - } - } - } - - // Check if any message were received - // on the topic we subsrcived to - client.poll(); -} diff --git a/libraries/PubSubClient/examples/MQTTPublishInCallbackWiFi/MQTTPublishInCallbackWiFi.ino b/libraries/PubSubClient/examples/MQTTPublishInCallbackWiFi/MQTTPublishInCallbackWiFi.ino deleted file mode 100644 index e063911ea46..00000000000 --- a/libraries/PubSubClient/examples/MQTTPublishInCallbackWiFi/MQTTPublishInCallbackWiFi.ino +++ /dev/null @@ -1,102 +0,0 @@ -/* - Basic MQTT example - - - connects to an MQTT server - - publishes "hello world" to the topic "outTopic" - - subscribes to the topic "inTopic" -*/ - -#include -#include -#include - -// your network name also called SSID -char ssid[] = "energia1"; -// your network password -char password[] = "launchpad"; -// MQTTServer to use -char server[] = "iot.eclipse.org"; - -byte buffer[128]; - -WiFiClient wifiClient; -PubSubClient client(server, 1883, callback, wifiClient); - -void callback(char* topic, byte* payload, unsigned int length) { - Serial.println("Got a message, publishing it"); - // If length is larger than 128, - // then make sure we do not write more then our buffer can handle - if(length > 128) length = 128; - - memcpy(buffer, payload, length); - client.publish("outTopic", buffer, length); -} - -void setup() -{ - Serial.begin(115200); - - // attempt to connect to Wifi network: - Serial.print("Attempting to connect to Network named: "); - // print the network name (SSID); - Serial.println(ssid); - // Connect to WPA/WPA2 network. Change this line if using open or WEP network: - WiFi.begin(ssid, password); - while ( WiFi.status() != WL_CONNECTED) { - // print dots while we wait to connect - Serial.print("."); - delay(300); - } - - Serial.println("\nYou're connected to the network"); - Serial.println("Waiting for an ip address"); - - while (WiFi.localIP() == INADDR_NONE) { - // print dots while we wait for an ip addresss - Serial.print("."); - delay(300); - } - - Serial.println("\nIP Address obtained"); - // We are connected and have an IP address. - // Print the WiFi status. - printWifiStatus(); -} - -void loop() -{ - // Reconnect if the connection was lost - if (!client.connected()) { - Serial.println("Disconnected. Reconnecting...."); - - if(!client.connect("energiaClient")) { - Serial.println("Connection failed"); - } else { - Serial.println("Connection success"); - if(client.subscribe("inTopic")) { - Serial.println("Subscription successfull"); - } - } - } - - // Check if any message were received - // on the topic we subsrcived to - client.poll(); -} - -void printWifiStatus() { - // print the SSID of the network you're attached to: - Serial.print("SSID: "); - Serial.println(WiFi.SSID()); - - // print your WiFi IP address: - IPAddress ip = WiFi.localIP(); - Serial.print("IP Address: "); - Serial.println(ip); - - // print the received signal strength: - long rssi = WiFi.RSSI(); - Serial.print("signal strength (RSSI):"); - Serial.print(rssi); - Serial.println(" dBm"); -} diff --git a/libraries/PubSubClient/examples/mqtt_auth/mqtt_auth.ino b/libraries/PubSubClient/examples/mqtt_auth/mqtt_auth.ino new file mode 100755 index 00000000000..e9f7b180f36 --- /dev/null +++ b/libraries/PubSubClient/examples/mqtt_auth/mqtt_auth.ino @@ -0,0 +1,43 @@ +/* + Basic MQTT example with Authentication + + - connects to an MQTT server, providing username + and password + - publishes "hello world" to the topic "outTopic" + - subscribes to the topic "inTopic" +*/ + +#include +#include +#include + +// Update these with values suitable for your network. +byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; +IPAddress ip(172, 16, 0, 100); +IPAddress server(172, 16, 0, 2); + +void callback(char* topic, byte* payload, unsigned int length) { + // handle message arrived +} + +EthernetClient ethClient; +PubSubClient client(server, 1883, callback, ethClient); + +void setup() +{ + Ethernet.begin(mac, ip); + // Note - the default maximum packet size is 128 bytes. If the + // combined length of clientId, username and password exceed this, + // you will need to increase the value of MQTT_MAX_PACKET_SIZE in + // PubSubClient.h + + if (client.connect("arduinoClient", "testuser", "testpass")) { + client.publish("outTopic","hello world"); + client.subscribe("inTopic"); + } +} + +void loop() +{ + client.loop(); +} diff --git a/libraries/PubSubClient/examples/mqtt_basic/mqtt_basic.ino b/libraries/PubSubClient/examples/mqtt_basic/mqtt_basic.ino new file mode 100755 index 00000000000..f545adef82b --- /dev/null +++ b/libraries/PubSubClient/examples/mqtt_basic/mqtt_basic.ino @@ -0,0 +1,77 @@ +/* + Basic MQTT example + + This sketch demonstrates the basic capabilities of the library. + It connects to an MQTT server then: + - publishes "hello world" to the topic "outTopic" + - subscribes to the topic "inTopic", printing out any messages + it receives. NB - it assumes the received payloads are strings not binary + + It will reconnect to the server if the connection is lost using a blocking + reconnect function. See the 'mqtt_reconnect_nonblocking' example for how to + achieve the same result without blocking the main loop. + +*/ + +#include +#include +#include + +// Update these with values suitable for your network. +byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; +IPAddress ip(172, 16, 0, 100); +IPAddress server(172, 16, 0, 2); + +void callback(char* topic, byte* payload, unsigned int length) { + Serial.print("Message arrived ["); + Serial.print(topic); + Serial.print("] "); + for (int i=0;i Preferences -> Additional Boards Manager URLs": + http://arduino.esp8266.com/stable/package_esp8266com_index.json + - Open the "Tools -> Board -> Board Manager" and click install for the ESP8266" + - Select your ESP8266 in "Tools -> Board" + +*/ + +#include +#include + +// Update these with values suitable for your network. + +const char* ssid = "........"; +const char* password = "........"; +const char* mqtt_server = "broker.mqtt-dashboard.com"; + +WiFiClient espClient; +PubSubClient client(espClient); +long lastMsg = 0; +char msg[50]; +int value = 0; + +void setup() { + pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output + Serial.begin(115200); + setup_wifi(); + client.setServer(mqtt_server, 1883); + client.setCallback(callback); +} + +void setup_wifi() { + + delay(10); + // We start by connecting to a WiFi network + Serial.println(); + Serial.print("Connecting to "); + Serial.println(ssid); + + WiFi.begin(ssid, password); + + while (WiFi.status() != WL_CONNECTED) { + delay(500); + Serial.print("."); + } + + Serial.println(""); + Serial.println("WiFi connected"); + Serial.println("IP address: "); + Serial.println(WiFi.localIP()); +} + +void callback(char* topic, byte* payload, unsigned int length) { + Serial.print("Message arrived ["); + Serial.print(topic); + Serial.print("] "); + for (int i = 0; i < length; i++) { + Serial.print((char)payload[i]); + } + Serial.println(); + + // Switch on the LED if an 1 was received as first character + if ((char)payload[0] == '1') { + digitalWrite(BUILTIN_LED, LOW); // Turn the LED on (Note that LOW is the voltage level + // but actually the LED is on; this is because + // it is acive low on the ESP-01) + } else { + digitalWrite(BUILTIN_LED, HIGH); // Turn the LED off by making the voltage HIGH + } + +} + +void reconnect() { + // Loop until we're reconnected + while (!client.connected()) { + Serial.print("Attempting MQTT connection..."); + // Attempt to connect + if (client.connect("ESP8266Client")) { + Serial.println("connected"); + // Once connected, publish an announcement... + client.publish("outTopic", "hello world"); + // ... and resubscribe + client.subscribe("inTopic"); + } else { + Serial.print("failed, rc="); + Serial.print(client.state()); + Serial.println(" try again in 5 seconds"); + // Wait 5 seconds before retrying + delay(5000); + } + } +} +void loop() { + + if (!client.connected()) { + reconnect(); + } + client.loop(); + + long now = millis(); + if (now - lastMsg > 2000) { + lastMsg = now; + ++value; + snprintf (msg, 75, "hello world #%ld", value); + Serial.print("Publish message: "); + Serial.println(msg); + client.publish("outTopic", msg); + } +} diff --git a/libraries/PubSubClient/examples/mqtt_publish_in_callback/mqtt_publish_in_callback.ino b/libraries/PubSubClient/examples/mqtt_publish_in_callback/mqtt_publish_in_callback.ino new file mode 100644 index 00000000000..42afb2a3a49 --- /dev/null +++ b/libraries/PubSubClient/examples/mqtt_publish_in_callback/mqtt_publish_in_callback.ino @@ -0,0 +1,60 @@ +/* + Publishing in the callback + + - connects to an MQTT server + - subscribes to the topic "inTopic" + - when a message is received, republishes it to "outTopic" + + This example shows how to publish messages within the + callback function. The callback function header needs to + be declared before the PubSubClient constructor and the + actual callback defined afterwards. + This ensures the client reference in the callback function + is valid. + +*/ + +#include +#include +#include + +// Update these with values suitable for your network. +byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; +IPAddress ip(172, 16, 0, 100); +IPAddress server(172, 16, 0, 2); + +// Callback function header +void callback(char* topic, byte* payload, unsigned int length); + +EthernetClient ethClient; +PubSubClient client(server, 1883, callback, ethClient); + +// Callback function +void callback(char* topic, byte* payload, unsigned int length) { + // In order to republish this payload, a copy must be made + // as the orignal payload buffer will be overwritten whilst + // constructing the PUBLISH packet. + + // Allocate the correct amount of memory for the payload copy + byte* p = (byte*)malloc(length); + // Copy the payload to the new buffer + memcpy(p,payload,length); + client.publish("outTopic", p, length); + // Free the memory + free(p); +} + +void setup() +{ + + Ethernet.begin(mac, ip); + if (client.connect("arduinoClient")) { + client.publish("outTopic","hello world"); + client.subscribe("inTopic"); + } +} + +void loop() +{ + client.loop(); +} diff --git a/libraries/PubSubClient/examples/mqtt_reconnect_nonblocking/mqtt_reconnect_nonblocking.ino b/libraries/PubSubClient/examples/mqtt_reconnect_nonblocking/mqtt_reconnect_nonblocking.ino new file mode 100644 index 00000000000..080b7391c6a --- /dev/null +++ b/libraries/PubSubClient/examples/mqtt_reconnect_nonblocking/mqtt_reconnect_nonblocking.ino @@ -0,0 +1,67 @@ +/* + Reconnecting MQTT example - non-blocking + + This sketch demonstrates how to keep the client connected + using a non-blocking reconnect function. If the client loses + its connection, it attempts to reconnect every 5 seconds + without blocking the main loop. + +*/ + +#include +#include +#include + +// Update these with values suitable for your hardware/network. +byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; +IPAddress ip(172, 16, 0, 100); +IPAddress server(172, 16, 0, 2); + +void callback(char* topic, byte* payload, unsigned int length) { + // handle message arrived +} + +EthernetClient ethClient; +PubSubClient client(ethClient); + +long lastReconnectAttempt = 0; + +boolean reconnect() { + if (client.connect("arduinoClient")) { + // Once connected, publish an announcement... + client.publish("outTopic","hello world"); + // ... and resubscribe + client.subscribe("inTopic"); + } + return client.connected(); +} + +void setup() +{ + client.setServer(server, 1883); + client.setCallback(callback); + + Ethernet.begin(mac, ip); + delay(1500); + lastReconnectAttempt = 0; +} + + +void loop() +{ + if (!client.connected()) { + long now = millis(); + if (now - lastReconnectAttempt > 5000) { + lastReconnectAttempt = now; + // Attempt to reconnect + if (reconnect()) { + lastReconnectAttempt = 0; + } + } + } else { + // Client connected + + client.loop(); + } + +} diff --git a/libraries/PubSubClient/examples/mqtt_stream/mqtt_stream.ino b/libraries/PubSubClient/examples/mqtt_stream/mqtt_stream.ino new file mode 100644 index 00000000000..67c22872cf7 --- /dev/null +++ b/libraries/PubSubClient/examples/mqtt_stream/mqtt_stream.ino @@ -0,0 +1,57 @@ +/* + Example of using a Stream object to store the message payload + + Uses SRAM library: https://github.com/ennui2342/arduino-sram + but could use any Stream based class such as SD + + - connects to an MQTT server + - publishes "hello world" to the topic "outTopic" + - subscribes to the topic "inTopic" +*/ + +#include +#include +#include +#include + +// Update these with values suitable for your network. +byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; +IPAddress ip(172, 16, 0, 100); +IPAddress server(172, 16, 0, 2); + +SRAM sram(4, SRAM_1024); + +void callback(char* topic, byte* payload, unsigned int length) { + sram.seek(1); + + // do something with the message + for(uint8_t i=0; i