http3: improvements across backends

- ngtcp2: using bufq for recv stream data
- internal stream_ctx instead of `struct HTTP` members
  for quiche, ngtcp2 and msh3
- no more QUIC related members in `struct HTTP`
- experimental use of recvmmsg(), disabled by default
  - testing on my old debian box shows no throughput improvements.
  - leaving it in, but disabled, for future revisit
- vquic: common UDP receive code for ngtcp2 and quiche
- vquic: common UDP send code for ngtcp2 and quiche
- added pytest skips for known msh3 failures
- fix unit2601 to survive torture testing
- quiche: using latest `master` from quiche and enabling large download
  tests, now that key change is supported
- fixing test_07_21 where retry handling of starting a stream
  was faulty
- msh3: use bufq for recv buffering headers and data
- msh3: replace fprintf debug logging with LOG_CF where possible
- msh3: force QUIC expire timers on recv/send to have more than
  1 request per second served

Closes #10772
This commit is contained in:
Stefan Eissing 2023-03-30 13:00:51 +02:00 committed by Daniel Stenberg
parent a094ec1a85
commit 544abeea83
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
14 changed files with 2008 additions and 1449 deletions

View File

@ -84,12 +84,12 @@ static size_t chunk_read(struct buf_chunk *chunk,
return n; return n;
} }
static ssize_t chunk_slurp(struct buf_chunk *chunk, static ssize_t chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
Curl_bufq_reader *reader, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err) void *reader_ctx, CURLcode *err)
{ {
unsigned char *p = &chunk->x.data[chunk->w_offset]; unsigned char *p = &chunk->x.data[chunk->w_offset];
size_t n = chunk->dlen - chunk->w_offset; size_t n = chunk->dlen - chunk->w_offset; /* free amount */
ssize_t nread; ssize_t nread;
DEBUGASSERT(chunk->dlen >= chunk->w_offset); DEBUGASSERT(chunk->dlen >= chunk->w_offset);
@ -97,6 +97,8 @@ static ssize_t chunk_slurp(struct buf_chunk *chunk,
*err = CURLE_AGAIN; *err = CURLE_AGAIN;
return -1; return -1;
} }
if(max_len && n > max_len)
n = max_len;
nread = reader(reader_ctx, p, n, err); nread = reader(reader_ctx, p, n, err);
if(nread > 0) { if(nread > 0) {
DEBUGASSERT((size_t)nread <= n); DEBUGASSERT((size_t)nread <= n);
@ -374,6 +376,7 @@ ssize_t Curl_bufq_write(struct bufq *q,
ssize_t nwritten = 0; ssize_t nwritten = 0;
size_t n; size_t n;
DEBUGASSERT(q->max_chunks > 0);
while(len) { while(len) {
tail = get_non_full_tail(q); tail = get_non_full_tail(q);
if(!tail) { if(!tail) {
@ -536,48 +539,75 @@ out:
return nwritten; return nwritten;
} }
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader, ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
void *reader_ctx, CURLcode *err) Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err)
{ {
struct buf_chunk *tail = NULL; struct buf_chunk *tail = NULL;
ssize_t nread = 0, chunk_nread; ssize_t nread;
*err = CURLE_AGAIN;
tail = get_non_full_tail(q);
if(!tail) {
if(q->chunk_count < q->max_chunks) {
*err = CURLE_OUT_OF_MEMORY;
return -1;
}
/* full, blocked */
*err = CURLE_AGAIN;
return -1;
}
nread = chunk_slurpn(tail, max_len, reader, reader_ctx, err);
if(nread < 0) {
return -1;
}
else if(nread == 0) {
/* eof */
*err = CURLE_OK;
}
return nread;
}
ssize_t Curl_bufq_slurpn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err)
{
ssize_t nread = 0, n;
*err = CURLE_AGAIN; *err = CURLE_AGAIN;
while(1) { while(1) {
tail = get_non_full_tail(q);
if(!tail) {
if(q->chunk_count < q->max_chunks) {
*err = CURLE_OUT_OF_MEMORY;
return -1;
}
else if(nread) {
/* full, return what we read */
return nread;
}
else {
/* full, blocked */
*err = CURLE_AGAIN;
return -1;
}
}
chunk_nread = chunk_slurp(tail, reader, reader_ctx, err); n = Curl_bufq_sipn(q, max_len, reader, reader_ctx, err);
if(chunk_nread < 0) { if(n < 0) {
if(!nread || *err != CURLE_AGAIN) { if(!nread || *err != CURLE_AGAIN) {
/* blocked on first read or real error, fail */ /* blocked on first read or real error, fail */
nread = -1; nread = -1;
} }
break; break;
} }
else if(chunk_nread == 0) { else if(n == 0) {
/* eof */ /* eof */
*err = CURLE_OK; *err = CURLE_OK;
break; break;
} }
nread += chunk_nread; nread += (size_t)n;
if(max_len) {
DEBUGASSERT((size_t)n <= max_len);
max_len -= (size_t)n;
if(!max_len)
break;
}
/* give up slurping when we get less bytes than we asked for */ /* give up slurping when we get less bytes than we asked for */
if(!chunk_is_full(tail)) if(q->tail && !chunk_is_full(q->tail))
break; break;
} }
return nread; return nread;
} }
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
{
return Curl_bufq_slurpn(q, 0, reader, reader_ctx, err);
}

View File

@ -245,6 +245,28 @@ typedef ssize_t Curl_bufq_reader(void *reader_ctx,
ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader, ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err); void *reader_ctx, CURLcode *err);
/**
* Read up to `max_len` bytes and append it to the end of the buffer queue.
* if `max_len` is 0, no limit is imposed and the call behaves exactly
* the same as `Curl_bufq_slurp()`.
* Returns the total amount of buf read (may be 0) or -1 on other
* reader errors.
* Note that even in case of a -1 chunks may have been read and
* the buffer queue will have different length than before.
*/
ssize_t Curl_bufq_slurpn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err);
/**
* Read *once* up to `max_len` bytes and append it to the buffer.
* if `max_len` is 0, no limit is imposed besides the chunk space.
* Returns the total amount of buf read (may be 0) or -1 on other
* reader errors.
*/
ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
Curl_bufq_reader *reader, void *reader_ctx,
CURLcode *err);
/** /**
* Write buf to the end of the buffer queue. * Write buf to the end of the buffer queue.

View File

@ -185,10 +185,6 @@ CURLcode Curl_http_auth_act(struct Curl_easy *data);
#endif /* CURL_DISABLE_HTTP */ #endif /* CURL_DISABLE_HTTP */
#ifdef USE_NGHTTP3
struct h3out; /* see ngtcp2 */
#endif
/**************************************************************************** /****************************************************************************
* HTTP unique setup * HTTP unique setup
***************************************************************************/ ***************************************************************************/
@ -216,6 +212,8 @@ struct HTTP {
HTTPSEND_BODY /* sending body */ HTTPSEND_BODY /* sending body */
} sending; } sending;
void *impl_ctx; /* context for actual HTTP implementation */
#ifdef USE_WEBSOCKETS #ifdef USE_WEBSOCKETS
struct websocket ws; struct websocket ws;
#endif #endif
@ -240,15 +238,11 @@ struct HTTP {
size_t push_headers_used; /* number of entries filled in */ size_t push_headers_used; /* number of entries filled in */
size_t push_headers_alloc; /* number of entries allocated */ size_t push_headers_alloc; /* number of entries allocated */
uint32_t error; /* HTTP/2 stream error code */ uint32_t error; /* HTTP/2 stream error code */
#endif
#if defined(USE_NGHTTP2) || defined(USE_NGHTTP3)
bool bodystarted; bool bodystarted;
int status_code; /* HTTP status code */ int status_code; /* HTTP status code */
char *mem; /* points to a buffer in memory to store received data */ char *mem; /* points to a buffer in memory to store received data */
size_t len; /* size of the buffer 'mem' points to */ size_t len; /* size of the buffer 'mem' points to */
size_t memlen; /* size of data copied to mem */ size_t memlen; /* size of data copied to mem */
#endif
#if defined(USE_NGHTTP2) || defined(ENABLE_QUIC)
/* fields used by both HTTP/2 and HTTP/3 */ /* fields used by both HTTP/2 and HTTP/3 */
const uint8_t *upload_mem; /* points to a buffer to read from */ const uint8_t *upload_mem; /* points to a buffer to read from */
size_t upload_len; /* size of the buffer 'upload_mem' points to */ size_t upload_len; /* size of the buffer 'upload_mem' points to */
@ -256,49 +250,6 @@ struct HTTP {
bool closed; /* TRUE on stream close */ bool closed; /* TRUE on stream close */
bool reset; /* TRUE on stream reset */ bool reset; /* TRUE on stream reset */
#endif #endif
#ifdef ENABLE_QUIC
#ifndef USE_MSH3
/*********** for HTTP/3 we store stream-local data here *************/
int64_t stream3_id; /* stream we are interested in */
uint64_t error3; /* HTTP/3 stream error code */
bool firstheader; /* FALSE until headers arrive */
bool firstbody; /* FALSE until body arrives */
bool h3req; /* FALSE until request is issued */
#endif /* !USE_MSH3 */
bool upload_done;
#endif /* ENABLE_QUIC */
#ifdef USE_NGHTTP3
size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
struct h3out *h3out; /* per-stream buffers for upload */
struct dynbuf overflow; /* excess data received during a single Curl_read */
#endif /* USE_NGHTTP3 */
#ifdef USE_MSH3
struct MSH3_REQUEST *req;
#ifdef _WIN32
CRITICAL_SECTION recv_lock;
#else /* !_WIN32 */
pthread_mutex_t recv_lock;
#endif /* _WIN32 */
/* Receive Buffer (Headers and Data) */
uint8_t* recv_buf;
size_t recv_buf_alloc;
size_t recv_buf_max;
/* Receive Headers */
size_t recv_header_len;
bool recv_header_complete;
/* Receive Data */
size_t recv_data_len;
bool recv_data_complete;
/* General Receive Error */
CURLcode recv_error;
#endif /* USE_MSH3 */
#ifdef USE_QUICHE
bool h3_got_header; /* TRUE when h3 stream has recvd some HEADER */
bool h3_recving_data; /* TRUE when h3 stream is reading DATA */
bool h3_body_pending; /* TRUE when h3 stream may have more body DATA */
struct h3_event_node *pending;
#endif /* USE_QUICHE */
}; };
CURLcode Curl_http_size(struct Curl_easy *data); CURLcode Curl_http_size(struct Curl_easy *data);

View File

@ -45,16 +45,10 @@
#include "curl_memory.h" #include "curl_memory.h"
#include "memdebug.h" #include "memdebug.h"
#define DEBUG_CF 1 #define H3_STREAM_WINDOW_SIZE (128 * 1024)
#define H3_STREAM_CHUNK_SIZE (16 * 1024)
#if DEBUG_CF && defined(DEBUGBUILD) #define H3_STREAM_RECV_CHUNKS \
#define CF_DEBUGF(x) x (H3_STREAM_WINDOW_SIZE / H3_STREAM_CHUNK_SIZE)
#else
#define CF_DEBUGF(x) do { } while(0)
#endif
#define MSH3_REQ_INIT_BUF_LEN 16384
#define MSH3_REQ_MAX_BUF_LEN 0x100000
#ifdef _WIN32 #ifdef _WIN32
#define msh3_lock CRITICAL_SECTION #define msh3_lock CRITICAL_SECTION
@ -116,6 +110,7 @@ struct cf_msh3_ctx {
curl_socket_t sock[2]; /* fake socket pair until we get support in msh3 */ curl_socket_t sock[2]; /* fake socket pair until we get support in msh3 */
char l_ip[MAX_IPADR_LEN]; /* local IP as string */ char l_ip[MAX_IPADR_LEN]; /* local IP as string */
int l_port; /* local port number */ int l_port; /* local port number */
struct cf_call_data call_data;
struct curltime connect_started; /* time the current attempt started */ struct curltime connect_started; /* time the current attempt started */
struct curltime handshake_at; /* time connect handshake finished */ struct curltime handshake_at; /* time connect handshake finished */
/* Flags written by msh3/msquic thread */ /* Flags written by msh3/msquic thread */
@ -127,6 +122,83 @@ struct cf_msh3_ctx {
BIT(active); BIT(active);
}; };
/* How to access `call_data` from a cf_msh3 filter */
#define CF_CTX_CALL_DATA(cf) \
((struct cf_msh3_ctx *)(cf)->ctx)->call_data
/**
* All about the H3 internals of a stream
*/
struct stream_ctx {
struct MSH3_REQUEST *req;
struct bufq recvbuf; /* h3 response */
#ifdef _WIN32
CRITICAL_SECTION recv_lock;
#else /* !_WIN32 */
pthread_mutex_t recv_lock;
#endif /* _WIN32 */
uint64_t error3; /* HTTP/3 stream error code */
int status_code; /* HTTP status code */
CURLcode recv_error;
bool closed;
bool reset;
bool upload_done;
bool firstheader; /* FALSE until headers arrive */
bool recv_header_complete;
};
#define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \
((struct HTTP *)(d)->req.p.http)->impl_ctx \
: NULL))
#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->impl_ctx
#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
H3_STREAM_CTX(d)->id : -2)
static CURLcode h3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
if(stream)
return CURLE_OK;
stream = calloc(1, sizeof(*stream));
if(!stream)
return CURLE_OUT_OF_MEMORY;
H3_STREAM_LCTX(data) = stream;
stream->req = ZERO_NULL;
msh3_lock_initialize(&stream->recv_lock);
Curl_bufq_init2(&stream->recvbuf, H3_STREAM_CHUNK_SIZE,
H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
DEBUGF(LOG_CF(data, cf, "data setup (easy %p)", (void *)data));
return CURLE_OK;
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)cf;
if(stream) {
DEBUGF(LOG_CF(data, cf, "easy handle is done"));
Curl_bufq_free(&stream->recvbuf);
free(stream);
H3_STREAM_LCTX(data) = NULL;
}
}
static void notify_drain(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
(void)cf;
if(!data->state.drain) {
data->state.drain = 1;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
static const MSH3_CONNECTION_IF msh3_conn_if = { static const MSH3_CONNECTION_IF msh3_conn_if = {
msh3_conn_connected, msh3_conn_connected,
msh3_conn_shutdown_complete, msh3_conn_shutdown_complete,
@ -136,10 +208,12 @@ static const MSH3_CONNECTION_IF msh3_conn_if = {
static void MSH3_CALL msh3_conn_connected(MSH3_CONNECTION *Connection, static void MSH3_CALL msh3_conn_connected(MSH3_CONNECTION *Connection,
void *IfContext) void *IfContext)
{ {
struct cf_msh3_ctx *ctx = IfContext; struct Curl_cfilter *cf = IfContext;
struct cf_msh3_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
(void)Connection; (void)Connection;
if(ctx->verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] evt: connected\n")); DEBUGF(LOG_CF(data, cf, "[MSH3] connected"));
ctx->handshake_succeeded = true; ctx->handshake_succeeded = true;
ctx->connected = true; ctx->connected = true;
ctx->handshake_complete = true; ctx->handshake_complete = true;
@ -148,10 +222,12 @@ static void MSH3_CALL msh3_conn_connected(MSH3_CONNECTION *Connection,
static void MSH3_CALL msh3_conn_shutdown_complete(MSH3_CONNECTION *Connection, static void MSH3_CALL msh3_conn_shutdown_complete(MSH3_CONNECTION *Connection,
void *IfContext) void *IfContext)
{ {
struct cf_msh3_ctx *ctx = IfContext; struct Curl_cfilter *cf = IfContext;
struct cf_msh3_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
(void)Connection; (void)Connection;
if(ctx->verbose) DEBUGF(LOG_CF(data, cf, "[MSH3] shutdown complete"));
CF_DEBUGF(fprintf(stderr, "* [MSH3] evt: shutdown complete\n"));
ctx->connected = false; ctx->connected = false;
ctx->handshake_complete = true; ctx->handshake_complete = true;
} }
@ -173,173 +249,159 @@ static const MSH3_REQUEST_IF msh3_request_if = {
msh3_data_sent msh3_data_sent
}; };
static CURLcode msh3_data_setup(struct Curl_cfilter *cf, /* Decode HTTP status code. Returns -1 if no valid status code was
struct Curl_easy *data) decoded. (duplicate from http2.c) */
static int decode_status_code(const char *value, size_t len)
{ {
struct HTTP *stream = data->req.p.http; int i;
(void)cf; int res;
DEBUGASSERT(stream); if(len != 3) {
if(!stream->recv_buf) { return -1;
DEBUGF(LOG_CF(data, cf, "req: setup"));
stream->recv_buf = malloc(MSH3_REQ_INIT_BUF_LEN);
if(!stream->recv_buf) {
return CURLE_OUT_OF_MEMORY;
}
stream->req = ZERO_NULL;
msh3_lock_initialize(&stream->recv_lock);
stream->recv_buf_alloc = MSH3_REQ_INIT_BUF_LEN;
stream->recv_buf_max = MSH3_REQ_MAX_BUF_LEN;
stream->recv_header_len = 0;
stream->recv_header_complete = false;
stream->recv_data_len = 0;
stream->recv_data_complete = false;
stream->recv_error = CURLE_OK;
} }
return CURLE_OK;
res = 0;
for(i = 0; i < 3; ++i) {
char c = value[i];
if(c < '0' || c > '9') {
return -1;
}
res *= 10;
res += c - '0';
}
return res;
} }
/* Requires stream->recv_lock to be held */ /*
static bool msh3request_ensure_room(struct HTTP *stream, size_t len) * write_resp_raw() copies response data in raw format to the `data`'s
* receive buffer. If not enough space is available, it appends to the
* `data`'s overflow buffer.
*/
static CURLcode write_resp_raw(struct Curl_easy *data,
const void *mem, size_t memlen)
{ {
uint8_t *new_recv_buf; struct stream_ctx *stream = H3_STREAM_CTX(data);
const size_t cur_recv_len = stream->recv_header_len + stream->recv_data_len; CURLcode result = CURLE_OK;
ssize_t nwritten;
if(cur_recv_len + len > stream->recv_buf_alloc) { nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result);
size_t new_recv_buf_alloc_len = stream->recv_buf_alloc; if(nwritten < 0) {
do { return result;
new_recv_buf_alloc_len <<= 1; /* TODO - handle overflow */
} while(cur_recv_len + len > new_recv_buf_alloc_len);
CF_DEBUGF(fprintf(stderr, "* enlarging buffer to %zu\n",
new_recv_buf_alloc_len));
new_recv_buf = malloc(new_recv_buf_alloc_len);
if(!new_recv_buf) {
CF_DEBUGF(fprintf(stderr, "* FAILED: enlarging buffer to %zu\n",
new_recv_buf_alloc_len));
return false;
}
if(cur_recv_len) {
memcpy(new_recv_buf, stream->recv_buf, cur_recv_len);
}
stream->recv_buf_alloc = new_recv_buf_alloc_len;
free(stream->recv_buf);
stream->recv_buf = new_recv_buf;
} }
return true;
if((size_t)nwritten < memlen) {
/* This MUST not happen. Our recbuf is dimensioned to hold the
* full max_stream_window and then some for this very reason. */
DEBUGASSERT(0);
return CURLE_RECV_ERROR;
}
return result;
} }
static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request, static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request,
void *IfContext, void *userp,
const MSH3_HEADER *Header) const MSH3_HEADER *hd)
{ {
struct Curl_easy *data = IfContext; struct Curl_easy *data = userp;
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
size_t total_len; CURLcode result;
(void)Request; (void)Request;
if(stream->recv_header_complete) { if(stream->recv_header_complete) {
CF_DEBUGF(fprintf(stderr, "* ignoring header after data\n"));
return; return;
} }
msh3_lock_acquire(&stream->recv_lock); msh3_lock_acquire(&stream->recv_lock);
if((Header->NameLength == 7) && if((hd->NameLength == 7) &&
!strncmp(H2H3_PSEUDO_STATUS, (char *)Header->Name, 7)) { !strncmp(H2H3_PSEUDO_STATUS, (char *)hd->Name, 7)) {
total_len = 10 + Header->ValueLength; char line[14]; /* status line is always 13 characters long */
if(!msh3request_ensure_room(stream, total_len)) { size_t ncopy;
CF_DEBUGF(fprintf(stderr, "* ERROR: unable to buffer: %.*s\n",
(int)Header->NameLength, Header->Name)); DEBUGASSERT(!stream->firstheader);
stream->recv_error = CURLE_OUT_OF_MEMORY; stream->status_code = decode_status_code(hd->Value, hd->ValueLength);
goto release_lock; DEBUGASSERT(stream->status_code != -1);
} ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n",
msnprintf((char *)stream->recv_buf + stream->recv_header_len, stream->status_code);
stream->recv_buf_alloc - stream->recv_header_len, result = write_resp_raw(data, line, ncopy);
"HTTP/3 %.*s \r\n", (int)Header->ValueLength, Header->Value); if(result)
stream->recv_error = result;
stream->firstheader = TRUE;
} }
else { else {
total_len = 4 + Header->NameLength + Header->ValueLength; /* store as an HTTP1-style header */
if(!msh3request_ensure_room(stream, total_len)) { DEBUGASSERT(stream->firstheader);
CF_DEBUGF(fprintf(stderr, "* ERROR: unable to buffer: %.*s\n", result = write_resp_raw(data, hd->Name, hd->NameLength);
(int)Header->NameLength, Header->Name)); if(!result)
stream->recv_error = CURLE_OUT_OF_MEMORY; result = write_resp_raw(data, ": ", 2);
goto release_lock; if(!result)
result = write_resp_raw(data, hd->Value, hd->ValueLength);
if(!result)
result = write_resp_raw(data, "\r\n", 2);
if(result) {
stream->recv_error = result;
} }
msnprintf((char *)stream->recv_buf + stream->recv_header_len,
stream->recv_buf_alloc - stream->recv_header_len,
"%.*s: %.*s\r\n",
(int)Header->NameLength, Header->Name,
(int)Header->ValueLength, Header->Value);
} }
stream->recv_header_len += total_len;
data->state.drain = 1; data->state.drain = 1;
release_lock:
msh3_lock_release(&stream->recv_lock); msh3_lock_release(&stream->recv_lock);
} }
static bool MSH3_CALL msh3_data_received(MSH3_REQUEST *Request, static bool MSH3_CALL msh3_data_received(MSH3_REQUEST *Request,
void *IfContext, uint32_t *Length, void *IfContext, uint32_t *buflen,
const uint8_t *Data) const uint8_t *buf)
{ {
struct Curl_easy *data = IfContext; struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
size_t cur_recv_len = stream->recv_header_len + stream->recv_data_len; CURLcode result;
bool rv = FALSE;
/* TODO: we would like to limit the amount of data we are buffer here.
* There seems to be no mechanism in msh3 to adjust flow control and
* it is undocumented what happens if we return FALSE here or less
* length (buflen is an inout parameter).
*/
(void)Request; (void)Request;
if(data && data->set.verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] req: evt: received %u. %zu buffered, "
"%zu allocated\n",
*Length, cur_recv_len, stream->recv_buf_alloc));
/* TODO - Update this code to limit data bufferring by `stream->recv_buf_max`
and return `false` when we reach that limit. Then, when curl drains some
of the buffer, making room, call MsH3RequestSetReceiveEnabled to enable
receive callbacks again. */
msh3_lock_acquire(&stream->recv_lock); msh3_lock_acquire(&stream->recv_lock);
if(!stream->recv_header_complete) { if(!stream->recv_header_complete) {
if(data && data->set.verbose) result = write_resp_raw(data, "\r\n", 2);
CF_DEBUGF(fprintf(stderr, "* [MSH3] req: Headers complete!\n")); if(result) {
if(!msh3request_ensure_room(stream, 2)) { stream->recv_error = result;
stream->recv_error = CURLE_OUT_OF_MEMORY; goto out;
goto release_lock;
} }
stream->recv_buf[stream->recv_header_len++] = '\r';
stream->recv_buf[stream->recv_header_len++] = '\n';
stream->recv_header_complete = true; stream->recv_header_complete = true;
cur_recv_len += 2;
} }
if(!msh3request_ensure_room(stream, *Length)) {
stream->recv_error = CURLE_OUT_OF_MEMORY;
goto release_lock;
}
memcpy(stream->recv_buf + cur_recv_len, Data, *Length);
stream->recv_data_len += (size_t)*Length;
data->state.drain = 1;
release_lock: result = write_resp_raw(data, buf, *buflen);
if(result) {
stream->recv_error = result;
}
rv = TRUE;
out:
msh3_lock_release(&stream->recv_lock); msh3_lock_release(&stream->recv_lock);
return true; return rv;
} }
static void MSH3_CALL msh3_complete(MSH3_REQUEST *Request, void *IfContext, static void MSH3_CALL msh3_complete(MSH3_REQUEST *Request, void *IfContext,
bool Aborted, uint64_t AbortError) bool aborted, uint64_t error)
{ {
struct Curl_easy *data = IfContext; struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)Request; (void)Request;
(void)AbortError;
if(data && data->set.verbose)
CF_DEBUGF(fprintf(stderr, "* [MSH3] req: evt: complete, aborted=%s\n",
Aborted ? "true" : "false"));
msh3_lock_acquire(&stream->recv_lock); msh3_lock_acquire(&stream->recv_lock);
if(Aborted) { stream->closed = TRUE;
stream->recv_error = CURLE_HTTP3; /* TODO - how do we pass AbortError? */
}
stream->recv_header_complete = true; stream->recv_header_complete = true;
stream->recv_data_complete = true; if(error)
stream->error3 = error;
if(aborted)
stream->reset = TRUE;
msh3_lock_release(&stream->recv_lock); msh3_lock_release(&stream->recv_lock);
} }
@ -347,7 +409,7 @@ static void MSH3_CALL msh3_shutdown_complete(MSH3_REQUEST *Request,
void *IfContext) void *IfContext)
{ {
struct Curl_easy *data = IfContext; struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)Request; (void)Request;
(void)stream; (void)stream;
} }
@ -356,82 +418,121 @@ static void MSH3_CALL msh3_data_sent(MSH3_REQUEST *Request,
void *IfContext, void *SendContext) void *IfContext, void *SendContext)
{ {
struct Curl_easy *data = IfContext; struct Curl_easy *data = IfContext;
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
(void)Request; (void)Request;
(void)stream; (void)stream;
(void)SendContext; (void)SendContext;
} }
static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
CURLcode *err)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
ssize_t nread = -1;
(void)cf;
if(stream->reset) {
failf(data, "HTTP/3 stream reset by server");
*err = CURLE_PARTIAL_FILE;
DEBUGF(LOG_CF(data, cf, "cf_recv, was reset -> %d", *err));
goto out;
}
else if(stream->error3) {
failf(data, "HTTP/3 stream was not closed cleanly: (error %zd)",
(ssize_t)stream->error3);
*err = CURLE_HTTP3;
DEBUGF(LOG_CF(data, cf, "cf_recv, closed uncleanly -> %d", *err));
goto out;
}
else {
DEBUGF(LOG_CF(data, cf, "cf_recv, closed ok -> %d", *err));
}
*err = CURLE_OK;
nread = 0;
out:
data->state.drain = 0;
return nread;
}
static void set_quic_expire(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
/* we have no indication from msh3 when it would be a good time
* to juggle the connection again. So, we compromise by calling
* us again every some milliseconds. */
(void)cf;
if(stream && stream->req && !stream->closed) {
Curl_expire(data, 10, EXPIRE_QUIC);
}
else {
Curl_expire(data, 50, EXPIRE_QUIC);
}
}
static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data, static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err) char *buf, size_t len, CURLcode *err)
{ {
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
size_t outsize = 0; ssize_t nread = -1;
struct cf_call_data save;
(void)cf; (void)cf;
CF_DATA_SAVE(save, cf, data);
DEBUGF(LOG_CF(data, cf, "req: recv with %zu byte buffer", len)); DEBUGF(LOG_CF(data, cf, "req: recv with %zu byte buffer", len));
msh3_lock_acquire(&stream->recv_lock);
if(stream->recv_error) { if(stream->recv_error) {
failf(data, "request aborted"); failf(data, "request aborted");
data->state.drain = 0; data->state.drain = 0;
*err = stream->recv_error; *err = stream->recv_error;
return -1; goto out;
} }
*err = CURLE_OK; *err = CURLE_OK;
msh3_lock_acquire(&stream->recv_lock);
if(stream->recv_header_len) { if(!Curl_bufq_is_empty(&stream->recvbuf)) {
outsize = len; nread = Curl_bufq_read(&stream->recvbuf,
if(stream->recv_header_len < outsize) { (unsigned char *)buf, len, err);
outsize = stream->recv_header_len; DEBUGF(LOG_CF(data, cf, "read recvbuf(len=%zu) -> %zd, %d",
len, nread, *err));
if(nread < 0)
goto out;
if(!Curl_bufq_is_empty(&stream->recvbuf) ||
stream->closed) {
notify_drain(cf, data);
} }
memcpy(buf, stream->recv_buf, outsize);
if(outsize < stream->recv_header_len + stream->recv_data_len) {
memmove(stream->recv_buf, stream->recv_buf + outsize,
stream->recv_header_len + stream->recv_data_len - outsize);
}
stream->recv_header_len -= outsize;
DEBUGF(LOG_CF(data, cf, "req: returned %zu bytes of header", outsize));
} }
else if(stream->recv_data_len) { else if(stream->closed) {
outsize = len; nread = recv_closed_stream(cf, data, err);
if(stream->recv_data_len < outsize) { goto out;
outsize = stream->recv_data_len;
}
memcpy(buf, stream->recv_buf, outsize);
if(outsize < stream->recv_data_len) {
memmove(stream->recv_buf, stream->recv_buf + outsize,
stream->recv_data_len - outsize);
}
stream->recv_data_len -= outsize;
DEBUGF(LOG_CF(data, cf, "req: returned %zu bytes of data", outsize));
if(stream->recv_data_len == 0 && stream->recv_data_complete)
data->state.drain = 1;
}
else if(stream->recv_data_complete) {
DEBUGF(LOG_CF(data, cf, "req: receive complete"));
data->state.drain = 0;
} }
else { else {
DEBUGF(LOG_CF(data, cf, "req: nothing here, call again")); DEBUGF(LOG_CF(data, cf, "req: nothing here, call again"));
*err = CURLE_AGAIN; *err = CURLE_AGAIN;
outsize = -1;
} }
out:
msh3_lock_release(&stream->recv_lock); msh3_lock_release(&stream->recv_lock);
set_quic_expire(cf, data);
return (ssize_t)outsize; CF_DATA_RESTORE(cf, save);
return nread;
} }
static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data, static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err) const void *buf, size_t len, CURLcode *err)
{ {
struct cf_msh3_ctx *ctx = cf->ctx; struct cf_msh3_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
struct h2h3req *hreq; struct h2h3req *hreq;
size_t hdrlen = 0; size_t hdrlen = 0;
size_t sentlen = 0; ssize_t nwritten = -1;
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
/* Sizes must match for cast below to work" */ /* Sizes must match for cast below to work" */
DEBUGASSERT(sizeof(MSH3_HEADER) == sizeof(struct h2h3pseudo)); DEBUGASSERT(sizeof(MSH3_HEADER) == sizeof(struct h2h3pseudo));
@ -442,16 +543,11 @@ static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data,
data. Parse out the headers and create the request, then if there is data. Parse out the headers and create the request, then if there is
any data left over go ahead and send it too. */ any data left over go ahead and send it too. */
*err = msh3_data_setup(cf, data);
if(*err) {
failf(data, "could not setup data");
return -1;
}
*err = Curl_pseudo_headers(data, buf, len, &hdrlen, &hreq); *err = Curl_pseudo_headers(data, buf, len, &hdrlen, &hreq);
if(*err) { if(*err) {
failf(data, "Curl_pseudo_headers failed"); failf(data, "Curl_pseudo_headers failed");
return -1; *err = CURLE_SEND_ERROR;
goto out;
} }
DEBUGF(LOG_CF(data, cf, "req: send %zu headers", hreq->entries)); DEBUGF(LOG_CF(data, cf, "req: send %zu headers", hreq->entries));
@ -463,31 +559,35 @@ static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data,
if(!stream->req) { if(!stream->req) {
failf(data, "request open failed"); failf(data, "request open failed");
*err = CURLE_SEND_ERROR; *err = CURLE_SEND_ERROR;
return -1; goto out;
} }
*err = CURLE_OK; *err = CURLE_OK;
return len; nwritten = len;
goto out;
}
else {
/* request is open */
DEBUGF(LOG_CF(data, cf, "req: send %zd body bytes", len));
if(len > 0xFFFFFFFF) {
len = 0xFFFFFFFF;
}
if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_NONE, buf,
(uint32_t)len, stream)) {
*err = CURLE_SEND_ERROR;
goto out;
}
/* TODO - msh3/msquic will hold onto this memory until the send complete
event. How do we make sure curl doesn't free it until then? */
*err = CURLE_OK;
nwritten = len;
} }
DEBUGF(LOG_CF(data, cf, "req: send %zd body bytes", len)); out:
if(len > 0xFFFFFFFF) { set_quic_expire(cf, data);
/* msh3 doesn't support size_t sends currently. */ CF_DATA_RESTORE(cf, save);
*err = CURLE_SEND_ERROR; return nwritten;
return -1;
}
/* TODO - Need an explicit signal to know when to FIN. */
if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_FIN, buf, (uint32_t)len,
stream)) {
*err = CURLE_SEND_ERROR;
return -1;
}
/* TODO - msh3/msquic will hold onto this memory until the send complete
event. How do we make sure curl doesn't free it until then? */
sentlen += len;
*err = CURLE_OK;
return sentlen;
} }
static int cf_msh3_get_select_socks(struct Curl_cfilter *cf, static int cf_msh3_get_select_socks(struct Curl_cfilter *cf,
@ -495,36 +595,49 @@ static int cf_msh3_get_select_socks(struct Curl_cfilter *cf,
curl_socket_t *socks) curl_socket_t *socks)
{ {
struct cf_msh3_ctx *ctx = cf->ctx; struct cf_msh3_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
int bitmap = GETSOCK_BLANK; int bitmap = GETSOCK_BLANK;
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
if(stream && ctx->sock[SP_LOCAL] != CURL_SOCKET_BAD) { if(stream && ctx->sock[SP_LOCAL] != CURL_SOCKET_BAD) {
socks[0] = ctx->sock[SP_LOCAL]; socks[0] = ctx->sock[SP_LOCAL];
if(stream->recv_error) { if(stream->recv_error) {
bitmap |= GETSOCK_READSOCK(0); bitmap |= GETSOCK_READSOCK(0);
data->state.drain = 1; notify_drain(cf, data);
} }
else if(stream->recv_header_len || stream->recv_data_len) { else if(stream->req) {
bitmap |= GETSOCK_READSOCK(0); bitmap |= GETSOCK_READSOCK(0);
data->state.drain = 1; notify_drain(cf, data);
} }
} }
DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d", DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d",
(uint32_t)data->state.drain, bitmap)); (uint32_t)data->state.drain, bitmap));
CF_DATA_RESTORE(cf, save);
return bitmap; return bitmap;
} }
static bool cf_msh3_data_pending(struct Curl_cfilter *cf, static bool cf_msh3_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data) const struct Curl_easy *data)
{ {
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_call_data save;
bool pending = FALSE;
CF_DATA_SAVE(save, cf, data);
(void)cf; (void)cf;
DEBUGF(LOG_CF((struct Curl_easy *)data, cf, "data pending = %hhu", if(stream->req) {
(bool)(stream->recv_header_len || stream->recv_data_len))); msh3_lock_acquire(&stream->recv_lock);
return stream->recv_header_len || stream->recv_data_len; DEBUGF(LOG_CF((struct Curl_easy *)data, cf, "data pending = %zu",
Curl_bufq_len(&stream->recvbuf)));
pending = !Curl_bufq_is_empty(&stream->recvbuf);
msh3_lock_release(&stream->recv_lock);
}
CF_DATA_RESTORE(cf, save);
return pending;
} }
static void cf_msh3_active(struct Curl_cfilter *cf, struct Curl_easy *data) static void cf_msh3_active(struct Curl_cfilter *cf, struct Curl_easy *data)
@ -548,31 +661,30 @@ static CURLcode cf_msh3_data_event(struct Curl_cfilter *cf,
struct Curl_easy *data, struct Curl_easy *data,
int event, int arg1, void *arg2) int event, int arg1, void *arg2)
{ {
struct HTTP *stream = data->req.p.http; struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_call_data save;
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
CF_DATA_SAVE(save, cf, data);
(void)arg1; (void)arg1;
(void)arg2; (void)arg2;
switch(event) { switch(event) {
case CF_CTRL_DATA_SETUP: case CF_CTRL_DATA_SETUP:
result = msh3_data_setup(cf, data); result = h3_data_setup(cf, data);
break; break;
case CF_CTRL_DATA_DONE: case CF_CTRL_DATA_DONE:
DEBUGF(LOG_CF(data, cf, "req: done")); h3_data_done(cf, data);
if(stream) {
if(stream->recv_buf) {
Curl_safefree(stream->recv_buf);
msh3_lock_uninitialize(&stream->recv_lock);
}
if(stream->req) {
MsH3RequestClose(stream->req);
stream->req = ZERO_NULL;
}
}
break; break;
case CF_CTRL_DATA_DONE_SEND: case CF_CTRL_DATA_DONE_SEND:
DEBUGF(LOG_CF(data, cf, "req: send done")); DEBUGF(LOG_CF(data, cf, "req: send done"));
stream->upload_done = TRUE; stream->upload_done = TRUE;
if(stream && stream->req) {
char buf[1];
if(!MsH3RequestSend(stream->req, MSH3_REQUEST_FLAG_FIN, buf, 0, data)) {
result = CURLE_SEND_ERROR;
}
}
break; break;
case CF_CTRL_CONN_INFO_UPDATE: case CF_CTRL_CONN_INFO_UPDATE:
DEBUGF(LOG_CF(data, cf, "req: update info")); DEBUGF(LOG_CF(data, cf, "req: update info"));
@ -581,6 +693,8 @@ static CURLcode cf_msh3_data_event(struct Curl_cfilter *cf,
default: default:
break; break;
} }
CF_DATA_RESTORE(cf, save);
return result; return result;
} }
@ -590,9 +704,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
struct cf_msh3_ctx *ctx = cf->ctx; struct cf_msh3_ctx *ctx = cf->ctx;
bool verify = !!cf->conn->ssl_config.verifypeer; bool verify = !!cf->conn->ssl_config.verifypeer;
MSH3_ADDR addr = {0}; MSH3_ADDR addr = {0};
CURLcode result;
memcpy(&addr, &ctx->addr.sa_addr, ctx->addr.addrlen); memcpy(&addr, &ctx->addr.sa_addr, ctx->addr.addrlen);
MSH3_SET_PORT(&addr, (uint16_t)cf->conn->remote_port); MSH3_SET_PORT(&addr, (uint16_t)cf->conn->remote_port);
ctx->verbose = (data && data->set.verbose);
if(verify && (cf->conn->ssl_config.CAfile || cf->conn->ssl_config.CApath)) { if(verify && (cf->conn->ssl_config.CAfile || cf->conn->ssl_config.CApath)) {
/* TODO: need a way to provide trust anchors to MSH3 */ /* TODO: need a way to provide trust anchors to MSH3 */
@ -618,7 +733,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
ctx->qconn = MsH3ConnectionOpen(ctx->api, ctx->qconn = MsH3ConnectionOpen(ctx->api,
&msh3_conn_if, &msh3_conn_if,
ctx, cf,
cf->conn->host.name, cf->conn->host.name,
&addr, &addr,
!verify); !verify);
@ -631,6 +746,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
return CURLE_FAILED_INIT; return CURLE_FAILED_INIT;
} }
result = h3_data_setup(cf, data);
if(result)
return result;
return CURLE_OK; return CURLE_OK;
} }
@ -639,6 +758,7 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
bool blocking, bool *done) bool blocking, bool *done)
{ {
struct cf_msh3_ctx *ctx = cf->ctx; struct cf_msh3_ctx *ctx = cf->ctx;
struct cf_call_data save;
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
(void)blocking; (void)blocking;
@ -647,6 +767,8 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
return CURLE_OK; return CURLE_OK;
} }
CF_DATA_SAVE(save, cf, data);
if(ctx->sock[SP_LOCAL] == CURL_SOCKET_BAD) { if(ctx->sock[SP_LOCAL] == CURL_SOCKET_BAD) {
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, &ctx->sock[0]) < 0) { if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, &ctx->sock[0]) < 0) {
ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD; ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD;
@ -666,6 +788,7 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
if(ctx->handshake_complete) { if(ctx->handshake_complete) {
ctx->handshake_at = Curl_now(); ctx->handshake_at = Curl_now();
if(ctx->handshake_succeeded) { if(ctx->handshake_succeeded) {
DEBUGF(LOG_CF(data, cf, "handshake succeeded"));
cf->conn->bits.multiplex = TRUE; /* at least potentially multiplexed */ cf->conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
cf->conn->httpversion = 30; cf->conn->httpversion = 30;
cf->conn->bundle->multiuse = BUNDLE_MULTIPLEX; cf->conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@ -682,26 +805,35 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
} }
out: out:
CF_DATA_RESTORE(cf, save);
return result; return result;
} }
static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data) static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data)
{ {
struct cf_msh3_ctx *ctx = cf->ctx; struct cf_msh3_ctx *ctx = cf->ctx;
struct cf_call_data save;
(void)data; (void)data;
CF_DATA_SAVE(save, cf, data);
if(ctx) { if(ctx) {
DEBUGF(LOG_CF(data, cf, "destroying")); DEBUGF(LOG_CF(data, cf, "destroying"));
if(ctx->qconn) if(ctx->qconn) {
MsH3ConnectionClose(ctx->qconn); MsH3ConnectionClose(ctx->qconn);
if(ctx->api) ctx->qconn = NULL;
}
if(ctx->api) {
MsH3ApiClose(ctx->api); MsH3ApiClose(ctx->api);
ctx->api = NULL;
}
if(ctx->active) { if(ctx->active) {
/* We share our socket at cf->conn->sock[cf->sockindex] when active. /* We share our socket at cf->conn->sock[cf->sockindex] when active.
* If it is no longer there, someone has stolen (and hopefully * If it is no longer there, someone has stolen (and hopefully
* closed it) and we just forget about it. * closed it) and we just forget about it.
*/ */
ctx->active = FALSE;
if(ctx->sock[SP_LOCAL] == cf->conn->sock[cf->sockindex]) { if(ctx->sock[SP_LOCAL] == cf->conn->sock[cf->sockindex]) {
DEBUGF(LOG_CF(data, cf, "cf_msh3_close(%d) active", DEBUGF(LOG_CF(data, cf, "cf_msh3_close(%d) active",
(int)ctx->sock[SP_LOCAL])); (int)ctx->sock[SP_LOCAL]));
@ -721,17 +853,22 @@ static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data)
if(ctx->sock[SP_REMOTE] != CURL_SOCKET_BAD) { if(ctx->sock[SP_REMOTE] != CURL_SOCKET_BAD) {
sclose(ctx->sock[SP_REMOTE]); sclose(ctx->sock[SP_REMOTE]);
} }
memset(ctx, 0, sizeof(*ctx));
ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD; ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD;
ctx->sock[SP_REMOTE] = CURL_SOCKET_BAD; ctx->sock[SP_REMOTE] = CURL_SOCKET_BAD;
} }
CF_DATA_RESTORE(cf, save);
} }
static void cf_msh3_destroy(struct Curl_cfilter *cf, struct Curl_easy *data) static void cf_msh3_destroy(struct Curl_cfilter *cf, struct Curl_easy *data)
{ {
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
cf_msh3_close(cf, data); cf_msh3_close(cf, data);
free(cf->ctx); free(cf->ctx);
cf->ctx = NULL; cf->ctx = NULL;
/* no CF_DATA_RESTORE(cf, save); its gone */
} }
static CURLcode cf_msh3_query(struct Curl_cfilter *cf, static CURLcode cf_msh3_query(struct Curl_cfilter *cf,

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -22,12 +22,25 @@
* *
***************************************************************************/ ***************************************************************************/
/* WIP, experimental: use recvmmsg() on linux
* we have no configure check, yet
* and also it is only available for _GNU_SOURCE, which
* we do not use otherwise.
#define HAVE_SENDMMSG
*/
#if defined(HAVE_SENDMMSG)
#define _GNU_SOURCE
#include <sys/socket.h>
#undef _GNU_SOURCE
#endif
#include "curl_setup.h" #include "curl_setup.h"
#ifdef HAVE_FCNTL_H #ifdef HAVE_FCNTL_H
#include <fcntl.h> #include <fcntl.h>
#endif #endif
#include "urldata.h" #include "urldata.h"
#include "bufq.h"
#include "dynbuf.h" #include "dynbuf.h"
#include "cfilters.h" #include "cfilters.h"
#include "curl_log.h" #include "curl_log.h"
@ -51,6 +64,10 @@
#define QLOGMODE O_WRONLY|O_CREAT #define QLOGMODE O_WRONLY|O_CREAT
#endif #endif
#define NW_CHUNK_SIZE (64 * 1024)
#define NW_SEND_CHUNKS 2
void Curl_quic_ver(char *p, size_t len) void Curl_quic_ver(char *p, size_t len)
{ {
#if defined(USE_NGTCP2) && defined(USE_NGHTTP3) #if defined(USE_NGTCP2) && defined(USE_NGHTTP3)
@ -62,17 +79,10 @@ void Curl_quic_ver(char *p, size_t len)
#endif #endif
} }
CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen) CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx)
{ {
qctx->num_blocked_pkt = 0; Curl_bufq_init2(&qctx->sendbuf, NW_CHUNK_SIZE, NW_SEND_CHUNKS,
qctx->num_blocked_pkt_sent = 0; BUFQ_OPT_SOFT_LIMIT);
memset(&qctx->blocked_pkt, 0, sizeof(qctx->blocked_pkt));
qctx->pktbuflen = pktbuflen;
qctx->pktbuf = malloc(qctx->pktbuflen);
if(!qctx->pktbuf)
return CURLE_OUT_OF_MEMORY;
#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG) #if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG)
qctx->no_gso = FALSE; qctx->no_gso = FALSE;
#else #else
@ -84,8 +94,7 @@ CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen)
void vquic_ctx_free(struct cf_quic_ctx *qctx) void vquic_ctx_free(struct cf_quic_ctx *qctx)
{ {
free(qctx->pktbuf); Curl_bufq_free(&qctx->sendbuf);
qctx->pktbuf = NULL;
} }
static CURLcode send_packet_no_gso(struct Curl_cfilter *cf, static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
@ -215,11 +224,11 @@ static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
return CURLE_OK; return CURLE_OK;
} }
CURLcode vquic_send_packet(struct Curl_cfilter *cf, CURLcode vquic_send_packets(struct Curl_cfilter *cf,
struct Curl_easy *data, struct Curl_easy *data,
struct cf_quic_ctx *qctx, struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen, const uint8_t *pkt, size_t pktlen, size_t gsolen,
size_t *psent) size_t *psent)
{ {
if(qctx->no_gso && pktlen > gsolen) { if(qctx->no_gso && pktlen > gsolen) {
return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent); return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent);
@ -228,53 +237,271 @@ CURLcode vquic_send_packet(struct Curl_cfilter *cf,
return do_sendmsg(cf, data, qctx, pkt, pktlen, gsolen, psent); return do_sendmsg(cf, data, qctx, pkt, pktlen, gsolen, psent);
} }
CURLcode vquic_flush(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx)
void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen)
{ {
struct vquic_blocked_pkt *blkpkt; const unsigned char *buf;
size_t blen, sent;
CURLcode result;
size_t gsolen;
(void)cf; while(Curl_bufq_peek(&qctx->sendbuf, &buf, &blen)) {
assert(qctx->num_blocked_pkt < gsolen = qctx->gsolen;
sizeof(qctx->blocked_pkt) / sizeof(qctx->blocked_pkt[0])); if(qctx->split_len) {
gsolen = qctx->split_gsolen;
if(blen > qctx->split_len)
blen = qctx->split_len;
}
blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt++]; DEBUGF(LOG_CF(data, cf, "vquic_send(len=%zu, gso=%zu)",
blen, gsolen));
blkpkt->pkt = pkt; result = vquic_send_packets(cf, data, qctx, buf, blen, gsolen, &sent);
blkpkt->pktlen = pktlen; DEBUGF(LOG_CF(data, cf, "vquic_send(len=%zu, gso=%zu) -> %d, sent=%zu",
blkpkt->gsolen = gsolen; blen, gsolen, result, sent));
if(result) {
if(result == CURLE_AGAIN) {
Curl_bufq_skip(&qctx->sendbuf, sent);
if(qctx->split_len)
qctx->split_len -= sent;
}
return result;
}
Curl_bufq_skip(&qctx->sendbuf, sent);
if(qctx->split_len)
qctx->split_len -= sent;
}
return CURLE_OK;
} }
CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf, CURLcode vquic_send(struct Curl_cfilter *cf, struct Curl_easy *data,
struct Curl_easy *data, struct cf_quic_ctx *qctx, size_t gsolen)
struct cf_quic_ctx *qctx)
{ {
size_t sent; qctx->gsolen = gsolen;
CURLcode curlcode; return vquic_flush(cf, data, qctx);
struct vquic_blocked_pkt *blkpkt; }
(void)cf; CURLcode vquic_send_tail_split(struct Curl_cfilter *cf, struct Curl_easy *data,
for(; qctx->num_blocked_pkt_sent < qctx->num_blocked_pkt; struct cf_quic_ctx *qctx, size_t gsolen,
++qctx->num_blocked_pkt_sent) { size_t tail_len, size_t tail_gsolen)
blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt_sent]; {
curlcode = vquic_send_packet(cf, data, qctx, blkpkt->pkt, DEBUGASSERT(Curl_bufq_len(&qctx->sendbuf) > tail_len);
blkpkt->pktlen, blkpkt->gsolen, &sent); qctx->split_len = Curl_bufq_len(&qctx->sendbuf) - tail_len;
qctx->split_gsolen = gsolen;
qctx->gsolen = tail_gsolen;
DEBUGF(LOG_CF(data, cf, "vquic_send_tail_split: [%zu gso=%zu][%zu gso=%zu]",
qctx->split_len, qctx->split_gsolen,
tail_len, qctx->gsolen));
return vquic_flush(cf, data, qctx);
}
if(curlcode) { #ifdef HAVE_SENDMMSG
if(curlcode == CURLE_AGAIN) { static CURLcode recvmmsg_packets(struct Curl_cfilter *cf,
blkpkt->pkt += sent; struct Curl_easy *data,
blkpkt->pktlen -= sent; struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
#define MMSG_NUM 64
struct iovec msg_iov[MMSG_NUM];
struct mmsghdr mmsg[MMSG_NUM];
uint8_t bufs[MMSG_NUM][2*1024];
struct sockaddr_storage remote_addr[MMSG_NUM];
size_t total_nread, pkts;
int mcount, i, n;
CURLcode result = CURLE_OK;
DEBUGASSERT(max_pkts > 0);
pkts = 0;
total_nread = 0;
while(pkts < max_pkts) {
n = (int)CURLMIN(MMSG_NUM, max_pkts);
memset(&mmsg, 0, sizeof(mmsg));
for(i = 0; i < n; ++i) {
msg_iov[i].iov_base = bufs[i];
msg_iov[i].iov_len = (int)sizeof(bufs[i]);
mmsg[i].msg_hdr.msg_iov = &msg_iov[i];
mmsg[i].msg_hdr.msg_iovlen = 1;
mmsg[i].msg_hdr.msg_name = &remote_addr[i];
mmsg[i].msg_hdr.msg_namelen = sizeof(remote_addr[i]);
}
while((mcount = recvmmsg(qctx->sockfd, mmsg, n, 0, NULL)) == -1 &&
SOCKERRNO == EINTR)
;
if(mcount == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
DEBUGF(LOG_CF(data, cf, "ingress, recvmmsg -> EAGAIN"));
goto out;
} }
return curlcode; if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
Curl_cf_socket_peek(cf->next, data, NULL, NULL,
&r_ip, &r_port, NULL, NULL);
failf(data, "QUIC: connection to %s port %u refused",
r_ip, r_port);
result = CURLE_COULDNT_CONNECT;
goto out;
}
failf(data, "QUIC: recvmsg() unexpectedly returned %d (errno=%d)",
mcount, SOCKERRNO);
result = CURLE_RECV_ERROR;
goto out;
}
DEBUGF(LOG_CF(data, cf, "recvmmsg() -> %d packets", mcount));
pkts += mcount;
for(i = 0; i < mcount; ++i) {
total_nread += mmsg[i].msg_len;
result = recv_cb(bufs[i], mmsg[i].msg_len,
mmsg[i].msg_hdr.msg_name, mmsg[i].msg_hdr.msg_namelen,
0, userp);
if(result)
goto out;
} }
} }
qctx->num_blocked_pkt = 0; out:
qctx->num_blocked_pkt_sent = 0; DEBUGF(LOG_CF(data, cf, "recvd %zu packets with %zd bytes -> %d",
pkts, total_nread, result));
return result;
}
return CURLE_OK; #elif defined(HAVE_SENDMSG)
static CURLcode recvmsg_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
struct iovec msg_iov;
struct msghdr msg;
uint8_t buf[64*1024];
struct sockaddr_storage remote_addr;
size_t total_nread, pkts;
ssize_t nread;
CURLcode result = CURLE_OK;
msg_iov.iov_base = buf;
msg_iov.iov_len = (int)sizeof(buf);
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &msg_iov;
msg.msg_iovlen = 1;
DEBUGASSERT(max_pkts > 0);
for(pkts = 0, total_nread = 0; pkts < max_pkts;) {
msg.msg_name = &remote_addr;
msg.msg_namelen = sizeof(remote_addr);
while((nread = recvmsg(qctx->sockfd, &msg, 0)) == -1 &&
SOCKERRNO == EINTR)
;
if(nread == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
DEBUGF(LOG_CF(data, cf, "ingress, recvmsg -> EAGAIN"));
goto out;
}
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
Curl_cf_socket_peek(cf->next, data, NULL, NULL,
&r_ip, &r_port, NULL, NULL);
failf(data, "QUIC: connection to %s port %u refused",
r_ip, r_port);
result = CURLE_COULDNT_CONNECT;
goto out;
}
failf(data, "QUIC: recvmsg() unexpectedly returned %zd (errno=%d)",
nread, SOCKERRNO);
result = CURLE_RECV_ERROR;
goto out;
}
++pkts;
total_nread += (size_t)nread;
result = recv_cb(buf, (size_t)nread, msg.msg_name, msg.msg_namelen,
0, userp);
if(result)
goto out;
}
out:
DEBUGF(LOG_CF(data, cf, "recvd %zu packets with %zd bytes -> %d",
pkts, total_nread, result));
return result;
}
#else /* HAVE_SENDMMSG || HAVE_SENDMSG */
CURLcode recvfrom_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
uint8_t buf[64*1024];
int bufsize = (int)sizeof(buf);
struct sockaddr_storage remote_addr;
socklen_t remote_addrlen = sizeof(remote_addr);
size_t total_nread, pkts;
ssize_t nread;
CURLcode result = CURLE_OK;
DEBUGASSERT(max_pkts > 0);
for(pkts = 0, total_nread = 0; pkts < max_pkts;) {
while((nread = recvfrom(qctx->sockfd, (char *)buf, bufsize, 0,
(struct sockaddr *)&remote_addr,
&remote_addrlen)) == -1 &&
SOCKERRNO == EINTR)
;
if(nread == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
DEBUGF(LOG_CF(data, cf, "ingress, recvfrom -> EAGAIN"));
goto out;
}
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
Curl_cf_socket_peek(cf->next, data, NULL, NULL,
&r_ip, &r_port, NULL, NULL);
failf(data, "QUIC: connection to %s port %u refused",
r_ip, r_port);
result = CURLE_COULDNT_CONNECT;
goto out;
}
failf(data, "QUIC: recvfrom() unexpectedly returned %zd (errno=%d)",
nread, SOCKERRNO);
result = CURLE_RECV_ERROR;
goto out;
}
++pkts;
total_nread += (size_t)nread;
result = recv_cb(buf, (size_t)nread, &remote_addr, remote_addrlen,
0, userp);
if(result)
goto out;
}
out:
DEBUGF(LOG_CF(data, cf, "recvd %zu packets with %zd bytes -> %d",
pkts, total_nread, result));
return result;
}
#endif /* !HAVE_SENDMMSG && !HAVE_SENDMSG */
CURLcode vquic_recv_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp)
{
#if defined(HAVE_SENDMMSG)
return recvmmsg_packets(cf, data, qctx, max_pkts, recv_cb, userp);
#elif defined(HAVE_SENDMSG)
return recvmsg_packets(cf, data, qctx, max_pkts, recv_cb, userp);
#else
return recvfrom_packets(cf, data, qctx, max_pkts, recv_cb, userp);
#endif
} }
/* /*

View File

@ -25,47 +25,63 @@
***************************************************************************/ ***************************************************************************/
#include "curl_setup.h" #include "curl_setup.h"
#include "bufq.h"
#ifdef ENABLE_QUIC #ifdef ENABLE_QUIC
struct vquic_blocked_pkt { #define MAX_PKT_BURST 10
const uint8_t *pkt; #define MAX_UDP_PAYLOAD_SIZE 1452
size_t pktlen;
size_t gsolen;
};
struct cf_quic_ctx { struct cf_quic_ctx {
curl_socket_t sockfd; curl_socket_t sockfd; /* connected UDP socket */
struct sockaddr_storage local_addr; struct sockaddr_storage local_addr; /* address socket is bound to */
socklen_t local_addrlen; socklen_t local_addrlen; /* length of local address */
struct vquic_blocked_pkt blocked_pkt[2];
uint8_t *pktbuf; struct bufq sendbuf; /* buffer for sending one or more packets */
/* the number of entries in blocked_pkt */ size_t gsolen; /* length of individual packets in send buf */
size_t num_blocked_pkt; size_t split_len; /* if != 0, buffer length after which GSO differs */
size_t num_blocked_pkt_sent; size_t split_gsolen; /* length of individual packets after split_len */
/* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */ bool no_gso; /* do not use gso on sending */
size_t pktbuflen;
/* the number of processed entries in blocked_pkt */
bool no_gso;
}; };
CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen); CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx);
void vquic_ctx_free(struct cf_quic_ctx *qctx); void vquic_ctx_free(struct cf_quic_ctx *qctx);
CURLcode vquic_send_packet(struct Curl_cfilter *cf, CURLcode vquic_send_packets(struct Curl_cfilter *cf,
struct Curl_easy *data, struct Curl_easy *data,
struct cf_quic_ctx *qctx, struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen, const uint8_t *pkt, size_t pktlen, size_t gsolen,
size_t *psent); size_t *psent);
void vquic_push_blocked_pkt(struct Curl_cfilter *cf, void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
struct cf_quic_ctx *qctx, struct cf_quic_ctx *qctx,
const uint8_t *pkt, size_t pktlen, size_t gsolen); const uint8_t *pkt, size_t pktlen, size_t gsolen);
CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf, CURLcode vquic_send_blocked_pkts(struct Curl_cfilter *cf,
struct Curl_easy *data, struct Curl_easy *data,
struct cf_quic_ctx *qctx); struct cf_quic_ctx *qctx);
CURLcode vquic_send(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx, size_t gsolen);
CURLcode vquic_send_tail_split(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx, size_t gsolen,
size_t tail_len, size_t tail_gsolen);
CURLcode vquic_flush(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_quic_ctx *qctx);
typedef CURLcode vquic_recv_pkt_cb(const unsigned char *pkt, size_t pktlen,
struct sockaddr_storage *remote_addr,
socklen_t remote_addrlen, int ecn,
void *userp);
CURLcode vquic_recv_packets(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct cf_quic_ctx *qctx,
size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp);
#endif /* !ENABLE_QUIC */ #endif /* !ENABLE_QUIC */

View File

@ -114,6 +114,8 @@ class TestDownload:
httpd, nghttpx, repeat, proto): httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 shaky here")
curl = CurlClient(env=env) curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-499]' urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-499]'
r = curl.http_download(urls=[urln], alpn_proto=proto) r = curl.http_download(urls=[urln], alpn_proto=proto)
@ -223,6 +225,8 @@ class TestDownload:
httpd, nghttpx, repeat, proto): httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 20 count = 20
urln = f'https://{env.authority_for(env.domain1, proto)}/data-10m?[0-{count-1}]' urln = f'https://{env.authority_for(env.domain1, proto)}/data-10m?[0-{count-1}]'
curl = CurlClient(env=env) curl = CurlClient(env=env)

View File

@ -81,6 +81,8 @@ class TestGoAway:
@pytest.mark.skipif(condition=not Env.have_h3(), reason="h3 not supported") @pytest.mark.skipif(condition=not Env.have_h3(), reason="h3 not supported")
def test_03_02_h3_goaway(self, env: Env, httpd, nghttpx, repeat): def test_03_02_h3_goaway(self, env: Env, httpd, nghttpx, repeat):
proto = 'h3' proto = 'h3'
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 3 count = 3
self.r = None self.r = None
def long_run(): def long_run():

View File

@ -52,6 +52,8 @@ class TestErrors:
proto): proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 1 count = 1
curl = CurlClient(env=env) curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, proto)}' \ urln = f'https://{env.authority_for(env.domain1, proto)}' \
@ -73,8 +75,8 @@ class TestErrors:
proto): proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('quiche'): if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("quiche not reliable, sometimes reports success") pytest.skip("msh3 stalls here")
count = 20 count = 20
curl = CurlClient(env=env) curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, proto)}' \ urln = f'https://{env.authority_for(env.domain1, proto)}' \

View File

@ -52,6 +52,8 @@ class TestUpload:
def test_07_01_upload_1_small(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_01_upload_1_small(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
data = '0123456789' data = '0123456789'
curl = CurlClient(env=env) curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]' url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
@ -66,6 +68,8 @@ class TestUpload:
def test_07_02_upload_1_large(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_02_upload_1_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
fdata = os.path.join(env.gen_dir, 'data-100k') fdata = os.path.join(env.gen_dir, 'data-100k')
curl = CurlClient(env=env) curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]' url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
@ -81,6 +85,8 @@ class TestUpload:
def test_07_10_upload_sequential(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_10_upload_sequential(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
count = 50 count = 50
data = '0123456789' data = '0123456789'
curl = CurlClient(env=env) curl = CurlClient(env=env)
@ -97,6 +103,8 @@ class TestUpload:
def test_07_11_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_11_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
# limit since we use a separate connection in h1 # limit since we use a separate connection in h1
count = 50 count = 50
data = '0123456789' data = '0123456789'
@ -115,6 +123,8 @@ class TestUpload:
def test_07_20_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_20_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
fdata = os.path.join(env.gen_dir, 'data-100k') fdata = os.path.join(env.gen_dir, 'data-100k')
count = 50 count = 50
curl = CurlClient(env=env) curl = CurlClient(env=env)
@ -133,6 +143,8 @@ class TestUpload:
def test_07_12_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_12_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
fdata = os.path.join(env.gen_dir, 'data-10m') fdata = os.path.join(env.gen_dir, 'data-10m')
count = 2 count = 2
curl = CurlClient(env=env) curl = CurlClient(env=env)
@ -151,6 +163,8 @@ class TestUpload:
def test_07_20_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_20_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 stalls here")
# limit since we use a separate connection in h1 # limit since we use a separate connection in h1
count = 50 count = 50
data = '0123456789' data = '0123456789'
@ -169,8 +183,8 @@ class TestUpload:
def test_07_21_upload_parallel_large(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_21_upload_parallel_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('quiche'): if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("quiche stalls on parallel, large uploads, unless --trace is used???") pytest.skip("msh3 stalls here")
fdata = os.path.join(env.gen_dir, 'data-100k') fdata = os.path.join(env.gen_dir, 'data-100k')
# limit since we use a separate connection in h1 # limit since we use a separate connection in h1
count = 50 count = 50
@ -187,6 +201,8 @@ class TestUpload:
def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
fdata = os.path.join(env.gen_dir, 'data-100k') fdata = os.path.join(env.gen_dir, 'data-100k')
count = 1 count = 1
curl = CurlClient(env=env) curl = CurlClient(env=env)
@ -206,6 +222,8 @@ class TestUpload:
def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto): def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3(): if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported") pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 fails here")
fdata = os.path.join(env.gen_dir, 'data-10m') fdata = os.path.join(env.gen_dir, 'data-10m')
count = 1 count = 1
curl = CurlClient(env=env) curl = CurlClient(env=env)

View File

@ -57,6 +57,7 @@ class TestCaddy:
@pytest.fixture(autouse=True, scope='class') @pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env, caddy): def _class_scope(self, env, caddy):
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data1.data', fsize=1024*1024) self._make_docs_file(docs_dir=caddy.docs_dir, fname='data1.data', fsize=1024*1024)
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data5.data', fsize=5*1024*1024)
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data10.data', fsize=10*1024*1024) self._make_docs_file(docs_dir=caddy.docs_dir, fname='data10.data', fsize=10*1024*1024)
self._make_docs_file(docs_dir=caddy.docs_dir, fname='data100.data', fsize=100*1024*1024) self._make_docs_file(docs_dir=caddy.docs_dir, fname='data100.data', fsize=100*1024*1024)
@ -65,6 +66,8 @@ class TestCaddy:
def test_08_01_download_1(self, env: Env, caddy: Caddy, repeat, proto): def test_08_01_download_1(self, env: Env, caddy: Caddy, repeat, proto):
if proto == 'h3' and not env.have_h3_curl(): if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl") pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
curl = CurlClient(env=env) curl = CurlClient(env=env)
url = f'https://{env.domain1}:{caddy.port}/data.json' url = f'https://{env.domain1}:{caddy.port}/data.json'
r = curl.http_download(urls=[url], alpn_proto=proto) r = curl.http_download(urls=[url], alpn_proto=proto)
@ -77,6 +80,8 @@ class TestCaddy:
repeat, proto): repeat, proto):
if proto == 'h3' and not env.have_h3_curl(): if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl") pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 50 count = 50
curl = CurlClient(env=env) curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]' urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
@ -92,7 +97,9 @@ class TestCaddy:
repeat, proto): repeat, proto):
if proto == 'h3' and not env.have_h3_curl(): if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl") pytest.skip("h3 not supported in curl")
count = 50 if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 20
curl = CurlClient(env=env) curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]' urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[ r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
@ -106,14 +113,31 @@ class TestCaddy:
else: else:
assert r.total_connects == 1 assert r.total_connects == 1
# download 10MB files sequentially # download 5MB files sequentially
@pytest.mark.parametrize("proto", ['h2', 'h3']) @pytest.mark.parametrize("proto", ['h2', 'h3'])
def test_08_04_download_10mb_sequential(self, env: Env, caddy: Caddy, def test_08_04a_download_10mb_sequential(self, env: Env, caddy: Caddy,
repeat, proto): repeat, proto):
if proto == 'h3' and not env.have_h3_curl(): if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl") pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('quiche'): if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("quiche stalls after a certain amount of data") pytest.skip("msh3 itself crashes")
count = 40
curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data5.data?[0-{count-1}]'
r = curl.http_download(urls=[urln], alpn_proto=proto)
assert r.exit_code == 0
r.check_stats(count=count, exp_status=200)
# sequential transfers will open 1 connection
assert r.total_connects == 1
# download 10MB files sequentially
@pytest.mark.parametrize("proto", ['h2', 'h3'])
def test_08_04b_download_10mb_sequential(self, env: Env, caddy: Caddy,
repeat, proto):
if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("msh3 itself crashes")
count = 20 count = 20
curl = CurlClient(env=env) curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]' urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'
@ -129,8 +153,8 @@ class TestCaddy:
repeat, proto): repeat, proto):
if proto == 'h3' and not env.have_h3_curl(): if proto == 'h3' and not env.have_h3_curl():
pytest.skip("h3 not supported in curl") pytest.skip("h3 not supported in curl")
if proto == 'h3' and env.curl_uses_lib('quiche'): if proto == 'h3' and env.curl_uses_lib('msh3'):
pytest.skip("quiche stalls after a certain amount of data") pytest.skip("msh3 itself crashes")
count = 50 count = 50
curl = CurlClient(env=env) curl = CurlClient(env=env)
urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]' urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'

View File

@ -243,4 +243,5 @@ UNITTEST_START
check_bufq(8, 8000, 10, 1234, 1234, BUFQ_OPT_NONE); check_bufq(8, 8000, 10, 1234, 1234, BUFQ_OPT_NONE);
check_bufq(8, 1024, 4, 129, 127, BUFQ_OPT_NO_SPARES); check_bufq(8, 1024, 4, 129, 127, BUFQ_OPT_NO_SPARES);
return 0;
UNITTEST_STOP UNITTEST_STOP