Stub TCP pipelining

TODO: Resolve issue with timeouts in async pipelining mode.
This commit is contained in:
Willem Toorop 2014-10-18 14:32:55 +02:00
parent 9b64473718
commit fc6e583b4b
6 changed files with 253 additions and 18 deletions

View File

@ -361,12 +361,13 @@ upstream_init(getdns_upstream *upstream,
/* For sharing a socket to this upstream with TCP */
upstream->fd = -1;
(void) memset(&upstream->event, 0, sizeof(upstream->event));
upstream->loop = NULL;
(void) getdns_eventloop_event_init(
&upstream->event, upstream, NULL, NULL, NULL);
(void) memset(&upstream->tcp, 0, sizeof(upstream->tcp));
upstream->write_queue = NULL;
upstream->write_queue_tail = NULL;
upstream->write_queue_last = NULL;
/* Tracking of network requests on this socket */
getdns_rbtree_init(&upstream->netreq_by_query_id,

View File

@ -87,7 +87,7 @@ typedef struct getdns_upstream {
/* Pipelining of TCP network requests */
getdns_network_req *write_queue;
getdns_network_req *write_queue_tail;
getdns_network_req *write_queue_last;
getdns_rbtree_t netreq_by_query_id;
} getdns_upstream;

View File

@ -134,6 +134,11 @@ getdns_context_detach_eventloop(getdns_context *context);
void
getdns_context_run(getdns_context *context);
#define GETDNS_CLEAR_EVENT(loop, event) \
do { if ((event)->ev) (loop)->vmt->clear((loop), (event)); } while(0)
#define GETDNS_SCHEDULE_EVENT(loop, fd, timeout, event) \
do { GETDNS_CLEAR_EVENT((loop), (event)); \
(loop)->vmt->schedule((loop),(fd),(timeout),(event)); } while(0)
#ifdef __cplusplus
}
#endif

View File

@ -76,8 +76,7 @@ network_req_new(getdns_dns_req * owner,
net_req->fd = -1;
memset(&net_req->event, 0, sizeof(net_req->event));
memset(&net_req->tcp, 0, sizeof(net_req->tcp));
net_req->query_id = -1;
net_req->write_queue_tail = NULL;
return net_req;
}

View File

@ -280,15 +280,68 @@ static void
stub_cleanup(getdns_network_req *netreq)
{
getdns_dns_req *dnsreq = netreq->owner;
getdns_network_req *r, *prev_r;
getdns_upstream *upstream;
intptr_t query_id_intptr;
int schedule;
if (netreq->event.ev)
dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event);
GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event);
GETDNS_NULL_FREE(dnsreq->context->mf, netreq->tcp.write_buf);
GETDNS_NULL_FREE(dnsreq->context->mf, netreq->tcp.read_buf);
/* TODO: Delete from write_queue */
/* TODO: Delete from netreq_by_query_id */
/* Nothing globally scheduled? Then nothing queued */
if (!(upstream = netreq->upstream)->event.ev)
return;
/* Delete from upstream->netreq_by_query_id (if present) */
query_id_intptr = (intptr_t)netreq->query_id;
(void) getdns_rbtree_delete(
&upstream->netreq_by_query_id, (void *)query_id_intptr);
/* Delete from upstream->write_queue (if present) */
for (prev_r = NULL, r = upstream->write_queue; r;
prev_r = r, r = r->write_queue_tail)
if (r == netreq) {
if (prev_r)
prev_r->write_queue_tail = r->write_queue_tail;
else
upstream->write_queue = r->write_queue_tail;
if (r == upstream->write_queue_last)
upstream->write_queue_last =
prev_r ? prev_r : NULL;
break;
}
schedule = 0;
if (!upstream->write_queue && upstream->event.write_cb) {
upstream->event.write_cb = NULL;
schedule = 1;
}
if (!upstream->netreq_by_query_id.count && upstream->event.read_cb) {
upstream->event.read_cb = NULL;
schedule = 1;
}
if (schedule) {
if (upstream->event.read_cb || upstream->event.write_cb)
GETDNS_SCHEDULE_EVENT(upstream->loop,
upstream->fd, TIMEOUT_FOREVER, &upstream->event);
else
GETDNS_CLEAR_EVENT(upstream->loop,&upstream->event);
}
}
static void
upstream_cleanup(getdns_upstream *upstream)
{
while (upstream->write_queue)
stub_cleanup(upstream->write_queue);
while (upstream->netreq_by_query_id.count)
stub_cleanup((getdns_network_req *)
getdns_rbtree_first(&upstream->netreq_by_query_id));
close(upstream->fd);
upstream->fd = -1;
}
void
@ -331,7 +384,7 @@ stub_udp_read_cb(void *userarg)
size_t read;
dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event);
GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event);
read = recvfrom(netreq->fd, pkt, pkt_len, 0, NULL, NULL);
if (read == -1 && (errno = EAGAIN || errno == EWOULDBLOCK))
@ -367,7 +420,7 @@ stub_udp_write_cb(void *userarg)
size_t pkt_len;
size_t pkt_size_needed;
dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event);
GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event);
pkt_size_needed = getdns_get_query_pkt_size(dnsreq->context,
dnsreq->name, netreq->request_type, dnsreq->extensions);
@ -392,7 +445,7 @@ stub_udp_write_cb(void *userarg)
close(netreq->fd);
goto done;
}
dnsreq->loop->vmt->schedule(
GETDNS_SCHEDULE_EVENT(
dnsreq->loop, netreq->fd, dnsreq->context->timeout,
getdns_eventloop_event_init(&netreq->event, netreq,
stub_udp_read_cb, NULL, stub_timeout_cb));
@ -464,6 +517,10 @@ stub_tcp_read(int fd, getdns_tcp_state *tcp, struct mem_funcs *mf)
return STUB_TCP_AGAIN;
else
return STUB_TCP_ERROR;
} else if (read == 0) {
/* Remote end closed the socket */
/* TODO: Try to reconnect */
return STUB_TCP_ERROR;
}
tcp->to_read -= read;
tcp->read_pos += read;
@ -517,7 +574,7 @@ stub_tcp_read_cb(void *userarg)
return;
default:
dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event);
GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event);
if (q != netreq->query_id)
return;
netreq->state = NET_REQ_FINISHED;
@ -535,6 +592,66 @@ stub_tcp_read_cb(void *userarg)
}
}
static void
upstream_read_cb(void *userarg)
{
getdns_upstream *upstream = (getdns_upstream *)userarg;
getdns_network_req *netreq;
int q;
uint16_t query_id;
intptr_t query_id_intptr;
switch ((q = stub_tcp_read(upstream->fd, &upstream->tcp,
&upstream->upstreams->mf))) {
case STUB_TCP_AGAIN:
return;
case STUB_TCP_ERROR:
upstream_cleanup(upstream);
return;
default:
/* Lookup netreq */
query_id = (uint16_t) q;
query_id_intptr = (intptr_t) query_id;
netreq = (getdns_network_req *)getdns_rbtree_delete(
&upstream->netreq_by_query_id, (void *)query_id_intptr);
if (! netreq) /* maybe canceled */
break;
netreq->state = NET_REQ_FINISHED;
ldns_wire2pkt(&(netreq->result), upstream->tcp.read_buf,
upstream->tcp.read_pos - upstream->tcp.read_buf);
upstream->upstreams->current = 0;
/* TODO: DNSSEC */
netreq->secure = 0;
netreq->bogus = 0;
stub_cleanup(netreq);
priv_getdns_check_dns_req_complete(netreq->owner);
}
/* reset read buffer */
upstream->tcp.read_pos = upstream->tcp.read_buf;
upstream->tcp.to_read = 2;
/* Nothing more to read? Then, deschedule the reads.*/
if (! upstream->netreq_by_query_id.count) {
upstream->event.read_cb = NULL;
if (!upstream->event.write_cb)
GETDNS_CLEAR_EVENT(upstream->loop, &upstream->event);
else
GETDNS_SCHEDULE_EVENT(upstream->loop,
upstream->fd, TIMEOUT_FOREVER, &upstream->event);
}
}
static void
netreq_upstream_read_cb(void *userarg)
{
upstream_read_cb(((getdns_network_req *)userarg)->upstream);
}
/* stub_tcp_write(fd, tcp, netreq)
* will return STUB_TCP_AGAIN when we need to come back again,
* STUB_TCP_ERROR on error and a query_id on successfull sent.
@ -680,9 +797,8 @@ stub_tcp_write_cb(void *userarg)
return;
default:
dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event);
netreq->query_id = (uint16_t) q;
dnsreq->loop->vmt->schedule(
GETDNS_SCHEDULE_EVENT(
dnsreq->loop, netreq->fd, dnsreq->context->timeout,
getdns_eventloop_event_init(&netreq->event, netreq,
stub_tcp_read_cb, NULL, stub_timeout_cb));
@ -690,6 +806,78 @@ stub_tcp_write_cb(void *userarg)
}
}
static void
upstream_write_cb(void *userarg)
{
getdns_upstream *upstream = (getdns_upstream *)userarg;
getdns_network_req *netreq = upstream->write_queue;
getdns_dns_req *dnsreq = netreq->owner;
int q;
switch ((q = stub_tcp_write(upstream->fd, &upstream->tcp, netreq))) {
case STUB_TCP_AGAIN:
return;
case STUB_TCP_ERROR:
stub_erred(netreq);
return;
default:
netreq->query_id = (uint16_t) q;
/* Unqueue the netreq from the write_queue */
if (!(upstream->write_queue = netreq->write_queue_tail)) {
upstream->write_queue_last = NULL;
upstream->event.write_cb = NULL;
/* Reschedule (if already reading) to clear writable */
if (upstream->event.read_cb)
GETDNS_SCHEDULE_EVENT(upstream->loop,
upstream->fd, TIMEOUT_FOREVER,
&upstream->event);
}
/* Schedule reading (if not already scheduled) */
if (!upstream->event.read_cb) {
upstream->event.read_cb = upstream_read_cb;
GETDNS_SCHEDULE_EVENT(upstream->loop,
upstream->fd, TIMEOUT_FOREVER, &upstream->event);
}
/* With synchonous lookups, schedule the read locally too */
if (netreq->event.write_cb) {
GETDNS_SCHEDULE_EVENT(
dnsreq->loop, upstream->fd, dnsreq->context->timeout,
getdns_eventloop_event_init(&netreq->event, netreq,
netreq_upstream_read_cb, NULL, stub_timeout_cb));
}
return;
}
}
static void
netreq_upstream_write_cb(void *userarg)
{
upstream_write_cb(((getdns_network_req *)userarg)->upstream);
}
static void
upstream_schedule_netreq(getdns_upstream *upstream, getdns_network_req *netreq)
{
/* We have a connected socket and a global event loop */
assert(upstream->fd >= 0);
assert(upstream->loop);
/* Append netreq to write_queue */
if (!upstream->write_queue) {
upstream->write_queue = upstream->write_queue_last = netreq;
upstream->event.write_cb = upstream_write_cb;
GETDNS_SCHEDULE_EVENT(upstream->loop,
upstream->fd, TIMEOUT_FOREVER, &upstream->event);
} else {
upstream->write_queue_last->write_queue_tail = netreq;
upstream->write_queue_last = netreq;
}
}
getdns_return_t
priv_getdns_submit_stub_request(getdns_network_req *netreq)
{
@ -710,7 +898,7 @@ priv_getdns_submit_stub_request(getdns_network_req *netreq)
getdns_sock_nonblock(netreq->fd);
netreq->upstream = upstream;
dnsreq->loop->vmt->schedule(
GETDNS_SCHEDULE_EVENT(
dnsreq->loop, netreq->fd, dnsreq->context->timeout,
getdns_eventloop_event_init(&netreq->event, netreq,
NULL, stub_udp_write_cb, stub_timeout_cb));
@ -732,13 +920,55 @@ priv_getdns_submit_stub_request(getdns_network_req *netreq)
}
netreq->upstream = upstream;
dnsreq->loop->vmt->schedule(
GETDNS_SCHEDULE_EVENT(
dnsreq->loop, netreq->fd, dnsreq->context->timeout,
getdns_eventloop_event_init(&netreq->event, netreq,
NULL, stub_tcp_write_cb, stub_timeout_cb));
return GETDNS_RETURN_GOOD;
case GETDNS_TRANSPORT_TCP_ONLY_KEEP_CONNECTIONS_OPEN:
/* In coming comments, "global" means "context wide" */
/* Are we the first? (Is global socket initialized?) */
if (upstream->fd == -1) {
/* We are the first. Make global socket and connect. */
if ((upstream->fd = socket(upstream->addr.ss_family,
SOCK_STREAM, IPPROTO_TCP)) == -1)
return GETDNS_RETURN_GENERIC_ERROR;
getdns_sock_nonblock(upstream->fd);
if (connect(upstream->fd,
(struct sockaddr *)&upstream->addr,
upstream->addr_len) == -1 && errno != EINPROGRESS){
close(upstream->fd);
upstream->fd = -1;
return GETDNS_RETURN_GENERIC_ERROR;
}
/* Attach to the global event loop
* so it can do it's own scheduling
*/
upstream->loop = dnsreq->context->extension;
}
netreq->upstream = upstream;
/* We have a context wide socket.
* Now schedule the write request.
*/
upstream_schedule_netreq(upstream, netreq);
/* Schedule at least the timeout locally.
* And also the write if we perform a synchronous lookup
*/
GETDNS_SCHEDULE_EVENT(
dnsreq->loop, upstream->fd, dnsreq->context->timeout,
getdns_eventloop_event_init(&netreq->event, netreq, NULL,
( dnsreq->loop != upstream->loop /* Synchronous lookup? */
? netreq_upstream_write_cb : NULL), stub_timeout_cb));
return GETDNS_RETURN_GOOD;
default:
return GETDNS_RETURN_GENERIC_ERROR;
}

View File

@ -201,7 +201,7 @@ typedef struct getdns_network_req
uint16_t query_id;
/* Network requests scheduled to write after me */
struct getdns_network_req *write_queue;
struct getdns_network_req *write_queue_tail;
} getdns_network_req;