mqtt: use conn/easy meta hash

Remove mqtt structs from the unions at connectdata and
easy handle requests. Use meta hash at easy/connnection.

Make mqtt structs private to mqtt.c

Closes #17221
This commit is contained in:
Stefan Eissing 2025-04-29 10:49:46 +02:00 committed by Daniel Stenberg
parent e383ba53eb
commit 47b2300192
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
4 changed files with 132 additions and 74 deletions

View File

@ -61,6 +61,44 @@
#define MQTT_SUBACK_LEN 3
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
/* meta key for storing protocol meta at easy handle */
#define CURL_META_MQTT_EASY "meta:proto:mqtt:easy"
/* meta key for storing protocol meta at connection */
#define CURL_META_MQTT_CONN "meta:proto:mqtt:conn"
enum mqttstate {
MQTT_FIRST, /* 0 */
MQTT_REMAINING_LENGTH, /* 1 */
MQTT_CONNACK, /* 2 */
MQTT_SUBACK, /* 3 */
MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */
MQTT_PUBWAIT, /* 5 - wait for publish */
MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */
MQTT_NOSTATE /* 7 - never used an actual state */
};
struct mqtt_conn {
enum mqttstate state;
enum mqttstate nextstate; /* switch to this after remaining length is
done */
unsigned int packetid;
};
/* protocol-specific transfer-related data */
struct MQTT {
struct dynbuf sendbuf;
/* when receiving */
struct dynbuf recvbuf;
size_t npacket; /* byte counter */
size_t remaining_length;
unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
struct curltime lastTime; /* last time we sent or received data */
unsigned char firstbyte;
BIT(pingsent); /* 1 while we wait for ping response */
};
/*
* Forward declarations.
*/
@ -103,30 +141,56 @@ const struct Curl_handler Curl_handler_mqtt = {
PROTOPT_NONE /* flags */
};
static void mqtt_easy_dtor(void *key, size_t klen, void *entry)
{
struct MQTT *mq = entry;
(void)key;
(void)klen;
Curl_dyn_free(&mq->sendbuf);
Curl_dyn_free(&mq->recvbuf);
free(mq);
}
static void mqtt_conn_dtor(void *key, size_t klen, void *entry)
{
(void)key;
(void)klen;
free(entry);
}
static CURLcode mqtt_setup_conn(struct Curl_easy *data,
struct connectdata *conn)
{
/* allocate the HTTP-specific struct for the Curl_easy, only to survive
during this request */
/* setup MQTT specific meta data at easy handle and connection */
struct mqtt_conn *mqtt;
struct MQTT *mq;
(void)conn;
DEBUGASSERT(data->req.p.mqtt == NULL);
mqtt = calloc(1, sizeof(*mqtt));
if(!mqtt ||
Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor))
return CURLE_OUT_OF_MEMORY;
mq = calloc(1, sizeof(struct MQTT));
if(!mq)
return CURLE_OUT_OF_MEMORY;
Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
Curl_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
data->req.p.mqtt = mq;
if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor))
return CURLE_OUT_OF_MEMORY;
return CURLE_OK;
}
static CURLcode mqtt_send(struct Curl_easy *data,
const char *buf, size_t len)
{
struct MQTT *mq = data->req.p.mqtt;
size_t n;
CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n);
CURLcode result;
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
if(!mq)
return CURLE_FAILED_INIT;
result = Curl_xfer_send(data, buf, len, FALSE, &n);
if(result)
return result;
mq->lastTime = Curl_now();
@ -349,20 +413,19 @@ end:
static CURLcode mqtt_disconnect(struct Curl_easy *data)
{
CURLcode result = CURLE_OK;
struct MQTT *mq = data->req.p.mqtt;
result = mqtt_send(data, "\xe0\x00", 2);
Curl_dyn_free(&mq->sendbuf);
Curl_dyn_free(&mq->recvbuf);
return result;
return mqtt_send(data, "\xe0\x00", 2);
}
static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
{
struct MQTT *mq = data->req.p.mqtt;
size_t rlen = Curl_dyn_len(&mq->recvbuf);
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
size_t rlen;
CURLcode result;
if(!mq)
return CURLE_FAILED_INIT;
rlen = Curl_dyn_len(&mq->recvbuf);
if(rlen < nbytes) {
unsigned char readbuf[1024];
ssize_t nread;
@ -381,20 +444,27 @@ static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
{
struct MQTT *mq = data->req.p.mqtt;
size_t rlen = Curl_dyn_len(&mq->recvbuf);
if(rlen <= nbytes)
Curl_dyn_reset(&mq->recvbuf);
else
Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
DEBUGASSERT(mq);
if(mq) {
size_t rlen = Curl_dyn_len(&mq->recvbuf);
if(rlen <= nbytes)
Curl_dyn_reset(&mq->recvbuf);
else
Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
}
}
static CURLcode mqtt_verify_connack(struct Curl_easy *data)
{
struct MQTT *mq = data->req.p.mqtt;
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result;
char *ptr;
DEBUGASSERT(mq);
if(!mq)
return CURLE_FAILED_INIT;
result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
if(result)
goto fail;
@ -444,12 +514,16 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data)
char encodedsize[4];
size_t n;
struct connectdata *conn = data->conn;
struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
if(!mqtt)
return CURLE_FAILED_INIT;
result = mqtt_get_topic(data, &topic, &topiclen);
if(result)
goto fail;
conn->proto.mqtt.packetid++;
mqtt->packetid++;
packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
+ 2 bytes topic length + QoS byte */
@ -464,8 +538,8 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data)
packet[0] = MQTT_MSG_SUBSCRIBE;
memcpy(&packet[1], encodedsize, n);
packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
packet[1 + n] = (mqtt->packetid >> 8) & 0xff;
packet[2 + n] = mqtt->packetid & 0xff;
packet[3 + n] = (topiclen >> 8) & 0xff;
packet[4 + n ] = topiclen & 0xff;
memcpy(&packet[5 + n], topic, topiclen);
@ -484,12 +558,15 @@ fail:
*/
static CURLcode mqtt_verify_suback(struct Curl_easy *data)
{
struct MQTT *mq = data->req.p.mqtt;
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
struct connectdata *conn = data->conn;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
CURLcode result;
char *ptr;
if(!mqtt || !mq)
return CURLE_FAILED_INIT;
result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
if(result)
goto fail;
@ -606,7 +683,10 @@ static void mqstate(struct Curl_easy *data,
enum mqttstate nextstate) /* used if state == FIRST */
{
struct connectdata *conn = data->conn;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
DEBUGASSERT(mqtt);
if(!mqtt)
return;
#ifdef DEBUGBUILD
infof(data, "%s (from %s) (next is %s)",
statenames[state],
@ -625,10 +705,14 @@ static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
struct connectdata *conn = data->conn;
ssize_t nread;
size_t remlen;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct MQTT *mq = data->req.p.mqtt;
struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
unsigned char packet;
DEBUGASSERT(mqtt);
if(!mqtt || !mq)
return CURLE_FAILED_INIT;
switch(mqtt->state) {
MQTT_SUBACK_COMING:
case MQTT_SUBACK_COMING:
@ -717,10 +801,12 @@ end:
static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
{
struct MQTT *mq = data->req.p.mqtt;
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
*done = FALSE; /* unconditionally */
if(!mq)
return CURLE_FAILED_INIT;
mq->lastTime = Curl_now();
mq->pingsent = FALSE;
@ -736,21 +822,26 @@ static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
static CURLcode mqtt_done(struct Curl_easy *data,
CURLcode status, bool premature)
{
struct MQTT *mq = data->req.p.mqtt;
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
(void)status;
(void)premature;
Curl_dyn_free(&mq->sendbuf);
Curl_dyn_free(&mq->recvbuf);
if(mq) {
Curl_dyn_free(&mq->sendbuf);
Curl_dyn_free(&mq->recvbuf);
}
return CURLE_OK;
}
/* we ping regularly to avoid being disconnected by the server */
static CURLcode mqtt_ping(struct Curl_easy *data)
{
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
struct connectdata *conn = data->conn;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct MQTT *mq = data->req.p.mqtt;
struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
if(!mqtt || !mq)
return CURLE_FAILED_INIT;
if(mqtt->state == MQTT_FIRST &&
!mq->pingsent &&
@ -775,11 +866,14 @@ static CURLcode mqtt_ping(struct Curl_easy *data)
static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
{
struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
struct mqtt_conn *mqtt = &data->conn->proto.mqtt;
struct MQTT *mq = data->req.p.mqtt;
ssize_t nread;
unsigned char recvbyte;
struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN);
if(!mqtt || !mq)
return CURLE_FAILED_INIT;
*done = FALSE;

View File

@ -28,36 +28,4 @@
extern const struct Curl_handler Curl_handler_mqtt;
#endif
enum mqttstate {
MQTT_FIRST, /* 0 */
MQTT_REMAINING_LENGTH, /* 1 */
MQTT_CONNACK, /* 2 */
MQTT_SUBACK, /* 3 */
MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */
MQTT_PUBWAIT, /* 5 - wait for publish */
MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */
MQTT_NOSTATE /* 7 - never used an actual state */
};
struct mqtt_conn {
enum mqttstate state;
enum mqttstate nextstate; /* switch to this after remaining length is
done */
unsigned int packetid;
};
/* protocol-specific transfer-related data */
struct MQTT {
struct dynbuf sendbuf;
/* when receiving */
struct dynbuf recvbuf;
size_t npacket; /* byte counter */
size_t remaining_length;
unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
struct curltime lastTime; /* last time we sent or received data */
unsigned char firstbyte;
BIT(pingsent); /* 1 while we wait for ping response */
};
#endif /* HEADER_CURL_MQTT_H */

View File

@ -106,7 +106,6 @@ struct SingleRequest {
struct FTP *ftp;
struct IMAP *imap;
struct ldapreqinfo *ldap;
struct MQTT *mqtt;
struct POP3 *pop3;
struct RTSP *rtsp;
struct smb_request *smb;

View File

@ -894,9 +894,6 @@ struct connectdata {
#endif
#ifdef USE_OPENLDAP
struct ldapconninfo *ldapc;
#endif
#ifndef CURL_DISABLE_MQTT
struct mqtt_conn mqtt;
#endif
unsigned int unused:1; /* avoids empty union */
} proto;