From fc6e583b4b4004f3015899e2407c45fd189d8286 Mon Sep 17 00:00:00 2001 From: Willem Toorop Date: Sat, 18 Oct 2014 14:32:55 +0200 Subject: [PATCH] Stub TCP pipelining TODO: Resolve issue with timeouts in async pipelining mode. --- src/context.c | 5 +- src/context.h | 2 +- src/getdns/getdns_extra.h | 5 + src/request-internal.c | 3 +- src/stub.c | 254 ++++++++++++++++++++++++++++++++++++-- src/types-internal.h | 2 +- 6 files changed, 253 insertions(+), 18 deletions(-) diff --git a/src/context.c b/src/context.c index 1f91aff3..31af1c22 100755 --- a/src/context.c +++ b/src/context.c @@ -361,12 +361,13 @@ upstream_init(getdns_upstream *upstream, /* For sharing a socket to this upstream with TCP */ upstream->fd = -1; - (void) memset(&upstream->event, 0, sizeof(upstream->event)); upstream->loop = NULL; + (void) getdns_eventloop_event_init( + &upstream->event, upstream, NULL, NULL, NULL); (void) memset(&upstream->tcp, 0, sizeof(upstream->tcp)); upstream->write_queue = NULL; - upstream->write_queue_tail = NULL; + upstream->write_queue_last = NULL; /* Tracking of network requests on this socket */ getdns_rbtree_init(&upstream->netreq_by_query_id, diff --git a/src/context.h b/src/context.h index 130b6c57..5c9c9cd9 100755 --- a/src/context.h +++ b/src/context.h @@ -87,7 +87,7 @@ typedef struct getdns_upstream { /* Pipelining of TCP network requests */ getdns_network_req *write_queue; - getdns_network_req *write_queue_tail; + getdns_network_req *write_queue_last; getdns_rbtree_t netreq_by_query_id; } getdns_upstream; diff --git a/src/getdns/getdns_extra.h b/src/getdns/getdns_extra.h index 33ec4dea..46d7bdb2 100644 --- a/src/getdns/getdns_extra.h +++ b/src/getdns/getdns_extra.h @@ -134,6 +134,11 @@ getdns_context_detach_eventloop(getdns_context *context); void getdns_context_run(getdns_context *context); +#define GETDNS_CLEAR_EVENT(loop, event) \ + do { if ((event)->ev) (loop)->vmt->clear((loop), (event)); } while(0) +#define GETDNS_SCHEDULE_EVENT(loop, fd, timeout, event) \ + do { GETDNS_CLEAR_EVENT((loop), (event)); \ + (loop)->vmt->schedule((loop),(fd),(timeout),(event)); } while(0) #ifdef __cplusplus } #endif diff --git a/src/request-internal.c b/src/request-internal.c index de08926a..b6b17067 100644 --- a/src/request-internal.c +++ b/src/request-internal.c @@ -76,8 +76,7 @@ network_req_new(getdns_dns_req * owner, net_req->fd = -1; memset(&net_req->event, 0, sizeof(net_req->event)); memset(&net_req->tcp, 0, sizeof(net_req->tcp)); - net_req->query_id = -1; - + net_req->write_queue_tail = NULL; return net_req; } diff --git a/src/stub.c b/src/stub.c index 660d6697..3e9c5da0 100644 --- a/src/stub.c +++ b/src/stub.c @@ -280,15 +280,68 @@ static void stub_cleanup(getdns_network_req *netreq) { getdns_dns_req *dnsreq = netreq->owner; + getdns_network_req *r, *prev_r; + getdns_upstream *upstream; + intptr_t query_id_intptr; + int schedule; - if (netreq->event.ev) - dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event); + GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event); GETDNS_NULL_FREE(dnsreq->context->mf, netreq->tcp.write_buf); GETDNS_NULL_FREE(dnsreq->context->mf, netreq->tcp.read_buf); - /* TODO: Delete from write_queue */ - /* TODO: Delete from netreq_by_query_id */ + /* Nothing globally scheduled? Then nothing queued */ + if (!(upstream = netreq->upstream)->event.ev) + return; + + /* Delete from upstream->netreq_by_query_id (if present) */ + query_id_intptr = (intptr_t)netreq->query_id; + (void) getdns_rbtree_delete( + &upstream->netreq_by_query_id, (void *)query_id_intptr); + + /* Delete from upstream->write_queue (if present) */ + for (prev_r = NULL, r = upstream->write_queue; r; + prev_r = r, r = r->write_queue_tail) + + if (r == netreq) { + if (prev_r) + prev_r->write_queue_tail = r->write_queue_tail; + else + upstream->write_queue = r->write_queue_tail; + + if (r == upstream->write_queue_last) + upstream->write_queue_last = + prev_r ? prev_r : NULL; + break; + } + schedule = 0; + if (!upstream->write_queue && upstream->event.write_cb) { + upstream->event.write_cb = NULL; + schedule = 1; + } + if (!upstream->netreq_by_query_id.count && upstream->event.read_cb) { + upstream->event.read_cb = NULL; + schedule = 1; + } + if (schedule) { + if (upstream->event.read_cb || upstream->event.write_cb) + GETDNS_SCHEDULE_EVENT(upstream->loop, + upstream->fd, TIMEOUT_FOREVER, &upstream->event); + else + GETDNS_CLEAR_EVENT(upstream->loop,&upstream->event); + } +} + +static void +upstream_cleanup(getdns_upstream *upstream) +{ + while (upstream->write_queue) + stub_cleanup(upstream->write_queue); + while (upstream->netreq_by_query_id.count) + stub_cleanup((getdns_network_req *) + getdns_rbtree_first(&upstream->netreq_by_query_id)); + close(upstream->fd); + upstream->fd = -1; } void @@ -331,7 +384,7 @@ stub_udp_read_cb(void *userarg) size_t read; - dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event); + GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event); read = recvfrom(netreq->fd, pkt, pkt_len, 0, NULL, NULL); if (read == -1 && (errno = EAGAIN || errno == EWOULDBLOCK)) @@ -367,7 +420,7 @@ stub_udp_write_cb(void *userarg) size_t pkt_len; size_t pkt_size_needed; - dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event); + GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event); pkt_size_needed = getdns_get_query_pkt_size(dnsreq->context, dnsreq->name, netreq->request_type, dnsreq->extensions); @@ -392,7 +445,7 @@ stub_udp_write_cb(void *userarg) close(netreq->fd); goto done; } - dnsreq->loop->vmt->schedule( + GETDNS_SCHEDULE_EVENT( dnsreq->loop, netreq->fd, dnsreq->context->timeout, getdns_eventloop_event_init(&netreq->event, netreq, stub_udp_read_cb, NULL, stub_timeout_cb)); @@ -464,6 +517,10 @@ stub_tcp_read(int fd, getdns_tcp_state *tcp, struct mem_funcs *mf) return STUB_TCP_AGAIN; else return STUB_TCP_ERROR; + } else if (read == 0) { + /* Remote end closed the socket */ + /* TODO: Try to reconnect */ + return STUB_TCP_ERROR; } tcp->to_read -= read; tcp->read_pos += read; @@ -517,7 +574,7 @@ stub_tcp_read_cb(void *userarg) return; default: - dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event); + GETDNS_CLEAR_EVENT(dnsreq->loop, &netreq->event); if (q != netreq->query_id) return; netreq->state = NET_REQ_FINISHED; @@ -535,6 +592,66 @@ stub_tcp_read_cb(void *userarg) } } +static void +upstream_read_cb(void *userarg) +{ + getdns_upstream *upstream = (getdns_upstream *)userarg; + getdns_network_req *netreq; + int q; + uint16_t query_id; + intptr_t query_id_intptr; + + switch ((q = stub_tcp_read(upstream->fd, &upstream->tcp, + &upstream->upstreams->mf))) { + case STUB_TCP_AGAIN: + return; + + case STUB_TCP_ERROR: + upstream_cleanup(upstream); + return; + + default: + /* Lookup netreq */ + query_id = (uint16_t) q; + query_id_intptr = (intptr_t) query_id; + netreq = (getdns_network_req *)getdns_rbtree_delete( + &upstream->netreq_by_query_id, (void *)query_id_intptr); + if (! netreq) /* maybe canceled */ + break; + + netreq->state = NET_REQ_FINISHED; + ldns_wire2pkt(&(netreq->result), upstream->tcp.read_buf, + upstream->tcp.read_pos - upstream->tcp.read_buf); + upstream->upstreams->current = 0; + + /* TODO: DNSSEC */ + netreq->secure = 0; + netreq->bogus = 0; + + stub_cleanup(netreq); + priv_getdns_check_dns_req_complete(netreq->owner); + } + /* reset read buffer */ + upstream->tcp.read_pos = upstream->tcp.read_buf; + upstream->tcp.to_read = 2; + + /* Nothing more to read? Then, deschedule the reads.*/ + if (! upstream->netreq_by_query_id.count) { + upstream->event.read_cb = NULL; + if (!upstream->event.write_cb) + GETDNS_CLEAR_EVENT(upstream->loop, &upstream->event); + else + GETDNS_SCHEDULE_EVENT(upstream->loop, + upstream->fd, TIMEOUT_FOREVER, &upstream->event); + } +} + +static void +netreq_upstream_read_cb(void *userarg) +{ + upstream_read_cb(((getdns_network_req *)userarg)->upstream); +} + /* stub_tcp_write(fd, tcp, netreq) * will return STUB_TCP_AGAIN when we need to come back again, * STUB_TCP_ERROR on error and a query_id on successfull sent. @@ -680,9 +797,8 @@ stub_tcp_write_cb(void *userarg) return; default: - dnsreq->loop->vmt->clear(dnsreq->loop, &netreq->event); netreq->query_id = (uint16_t) q; - dnsreq->loop->vmt->schedule( + GETDNS_SCHEDULE_EVENT( dnsreq->loop, netreq->fd, dnsreq->context->timeout, getdns_eventloop_event_init(&netreq->event, netreq, stub_tcp_read_cb, NULL, stub_timeout_cb)); @@ -690,6 +806,78 @@ stub_tcp_write_cb(void *userarg) } } +static void +upstream_write_cb(void *userarg) +{ + getdns_upstream *upstream = (getdns_upstream *)userarg; + getdns_network_req *netreq = upstream->write_queue; + getdns_dns_req *dnsreq = netreq->owner; + int q; + + switch ((q = stub_tcp_write(upstream->fd, &upstream->tcp, netreq))) { + case STUB_TCP_AGAIN: + return; + + case STUB_TCP_ERROR: + stub_erred(netreq); + return; + + default: + netreq->query_id = (uint16_t) q; + + /* Unqueue the netreq from the write_queue */ + if (!(upstream->write_queue = netreq->write_queue_tail)) { + upstream->write_queue_last = NULL; + upstream->event.write_cb = NULL; + + /* Reschedule (if already reading) to clear writable */ + if (upstream->event.read_cb) + GETDNS_SCHEDULE_EVENT(upstream->loop, + upstream->fd, TIMEOUT_FOREVER, + &upstream->event); + } + /* Schedule reading (if not already scheduled) */ + if (!upstream->event.read_cb) { + upstream->event.read_cb = upstream_read_cb; + GETDNS_SCHEDULE_EVENT(upstream->loop, + upstream->fd, TIMEOUT_FOREVER, &upstream->event); + } + /* With synchonous lookups, schedule the read locally too */ + if (netreq->event.write_cb) { + GETDNS_SCHEDULE_EVENT( + dnsreq->loop, upstream->fd, dnsreq->context->timeout, + getdns_eventloop_event_init(&netreq->event, netreq, + netreq_upstream_read_cb, NULL, stub_timeout_cb)); + } + return; + } +} + +static void +netreq_upstream_write_cb(void *userarg) +{ + upstream_write_cb(((getdns_network_req *)userarg)->upstream); +} + +static void +upstream_schedule_netreq(getdns_upstream *upstream, getdns_network_req *netreq) +{ + /* We have a connected socket and a global event loop */ + assert(upstream->fd >= 0); + assert(upstream->loop); + + /* Append netreq to write_queue */ + if (!upstream->write_queue) { + upstream->write_queue = upstream->write_queue_last = netreq; + upstream->event.write_cb = upstream_write_cb; + GETDNS_SCHEDULE_EVENT(upstream->loop, + upstream->fd, TIMEOUT_FOREVER, &upstream->event); + } else { + upstream->write_queue_last->write_queue_tail = netreq; + upstream->write_queue_last = netreq; + } +} + getdns_return_t priv_getdns_submit_stub_request(getdns_network_req *netreq) { @@ -710,7 +898,7 @@ priv_getdns_submit_stub_request(getdns_network_req *netreq) getdns_sock_nonblock(netreq->fd); netreq->upstream = upstream; - dnsreq->loop->vmt->schedule( + GETDNS_SCHEDULE_EVENT( dnsreq->loop, netreq->fd, dnsreq->context->timeout, getdns_eventloop_event_init(&netreq->event, netreq, NULL, stub_udp_write_cb, stub_timeout_cb)); @@ -732,13 +920,55 @@ priv_getdns_submit_stub_request(getdns_network_req *netreq) } netreq->upstream = upstream; - dnsreq->loop->vmt->schedule( + GETDNS_SCHEDULE_EVENT( dnsreq->loop, netreq->fd, dnsreq->context->timeout, getdns_eventloop_event_init(&netreq->event, netreq, NULL, stub_tcp_write_cb, stub_timeout_cb)); return GETDNS_RETURN_GOOD; + + case GETDNS_TRANSPORT_TCP_ONLY_KEEP_CONNECTIONS_OPEN: + /* In coming comments, "global" means "context wide" */ + + /* Are we the first? (Is global socket initialized?) */ + if (upstream->fd == -1) { + /* We are the first. Make global socket and connect. */ + if ((upstream->fd = socket(upstream->addr.ss_family, + SOCK_STREAM, IPPROTO_TCP)) == -1) + return GETDNS_RETURN_GENERIC_ERROR; + + getdns_sock_nonblock(upstream->fd); + if (connect(upstream->fd, + (struct sockaddr *)&upstream->addr, + upstream->addr_len) == -1 && errno != EINPROGRESS){ + + close(upstream->fd); + upstream->fd = -1; + return GETDNS_RETURN_GENERIC_ERROR; + } + /* Attach to the global event loop + * so it can do it's own scheduling + */ + upstream->loop = dnsreq->context->extension; + } + netreq->upstream = upstream; + + /* We have a context wide socket. + * Now schedule the write request. + */ + upstream_schedule_netreq(upstream, netreq); + + /* Schedule at least the timeout locally. + * And also the write if we perform a synchronous lookup + */ + GETDNS_SCHEDULE_EVENT( + dnsreq->loop, upstream->fd, dnsreq->context->timeout, + getdns_eventloop_event_init(&netreq->event, netreq, NULL, + ( dnsreq->loop != upstream->loop /* Synchronous lookup? */ + ? netreq_upstream_write_cb : NULL), stub_timeout_cb)); + + return GETDNS_RETURN_GOOD; default: return GETDNS_RETURN_GENERIC_ERROR; } diff --git a/src/types-internal.h b/src/types-internal.h index 67a91c3c..fd132750 100644 --- a/src/types-internal.h +++ b/src/types-internal.h @@ -201,7 +201,7 @@ typedef struct getdns_network_req uint16_t query_id; /* Network requests scheduled to write after me */ - struct getdns_network_req *write_queue; + struct getdns_network_req *write_queue_tail; } getdns_network_req;