TCP handling on windows

This commit is contained in:
Willem Toorop 2016-03-09 15:37:47 +01:00
parent a83c54387d
commit 36e620d769
1 changed files with 95 additions and 16 deletions

View File

@ -106,6 +106,12 @@ typedef struct my_event {
void (*cb)(int, short, void *arg); void (*cb)(int, short, void *arg);
/** callback user arg */ /** callback user arg */
void *arg; void *arg;
#ifdef USE_WINSOCK
int is_tcp;
int read_wouldblock;
int write_wouldblock;
#endif
int *active;
} my_event; } my_event;
#define AS_UB_LOOP(x) \ #define AS_UB_LOOP(x) \
@ -141,9 +147,10 @@ static int my_event_base_loopexit(struct ub_event_base* base, struct timeval* tv
#define CLEAR_MY_EVENT(ev) \ #define CLEAR_MY_EVENT(ev) \
do { (ev)->loop->extension->vmt->clear((ev)->loop->extension, \ do { (ev)->loop->extension->vmt->clear((ev)->loop->extension, \
&(ev)->gev); (ev)->added = 0; } while(0) &(ev)->gev); (ev)->added = 0; if ((ev)->active) { \
*(ev)->active = 0; (ev)->active = NULL; }} while(0)
static inline getdns_return_t schedule_my_event(my_event *ev) static getdns_return_t schedule_my_event(my_event *ev)
{ {
getdns_return_t r; getdns_return_t r;
@ -159,25 +166,66 @@ static inline getdns_return_t schedule_my_event(my_event *ev)
static void read_cb(void *userarg) static void read_cb(void *userarg)
{ {
struct my_event *ev = (struct my_event *)userarg; struct my_event *ev = (struct my_event *)userarg;
(*ev->cb)(ev->fd, UB_EV_READ, ev->arg);
if ((ev->bits & UB_EV_PERSIST) == 0) int active = 1;
CLEAR_MY_EVENT(ev); ev->active = &active;
#ifdef USE_WINSOCK
if (ev->is_tcp) {
ev->read_wouldblock = 0;
do {
(*ev->cb)(ev->fd, UB_EV_READ, ev->arg);
} while (active && !ev->read_wouldblock && (ev->bits & UB_EV_PERSIST));
} else
(*ev->cb)(ev->fd, UB_EV_READ, ev->arg);
#else
(*ev->cb)(ev->fd, UB_EV_READ, ev->arg);
#endif
if (active) {
ev->active = NULL;
if ((ev->bits & UB_EV_PERSIST) == 0)
CLEAR_MY_EVENT(ev);
}
} }
static void write_cb(void *userarg) static void write_cb(void *userarg)
{ {
struct my_event *ev = (struct my_event *)userarg; struct my_event *ev = (struct my_event *)userarg;
(*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg);
if ((ev->bits & UB_EV_PERSIST) == 0) int active = 1;
CLEAR_MY_EVENT(ev); ev->active = &active;
#ifdef USE_WINSOCK
if (ev->is_tcp) {
ev->write_wouldblock = 0;
do {
(*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg);
} while (active && !ev->write_wouldblock && (ev->bits & UB_EV_PERSIST));
} else
(*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg);
#else
(*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg);
#endif
if (active) {
ev->active = NULL;
if ((ev->bits & UB_EV_PERSIST) == 0)
CLEAR_MY_EVENT(ev);
}
} }
static void timeout_cb(void *userarg) static void timeout_cb(void *userarg)
{ {
struct my_event *ev = (struct my_event *)userarg; struct my_event *ev = (struct my_event *)userarg;
int active = 1;
ev->active = &active;
(*ev->cb)(ev->fd, UB_EV_TIMEOUT, ev->arg); (*ev->cb)(ev->fd, UB_EV_TIMEOUT, ev->arg);
if ((ev->bits & UB_EV_PERSIST) == 0) if (active) {
CLEAR_MY_EVENT(ev); ev->active = NULL;
if ((ev->bits & UB_EV_PERSIST) == 0)
CLEAR_MY_EVENT(ev);
}
} }
static getdns_return_t set_gev_callbacks(my_event* ev, short bits) static getdns_return_t set_gev_callbacks(my_event* ev, short bits)
@ -235,12 +283,28 @@ static int my_event_del(struct ub_event* ev)
return 0; return 0;
} }
static int my_event_add(struct ub_event* ev, struct timeval* tv) static int my_event_add(struct ub_event* ub_ev, struct timeval* tv)
{ {
if (AS_MY_EVENT(ev)->added) my_event *ev = AS_MY_EVENT(ub_ev);
my_event_del(ev); #ifdef USE_WINSOCK
if (tv && (AS_MY_EVENT(ev)->bits & UB_EV_TIMEOUT) != 0) int t, l = sizeof(t);
AS_MY_EVENT(ev)->timeout = (tv->tv_sec * 1000) + (tv->tv_usec / 1000); #endif
if (ev->added)
my_event_del(&ev->super);
if (tv && (ev->bits & UB_EV_TIMEOUT) != 0)
ev->timeout = (tv->tv_sec * 1000) + (tv->tv_usec / 1000);
#ifdef USE_WINSOCK
if ((ev->bits & (UB_EV_READ|UB_EV_WRITE)) && ev->fd != -1 &&
getsockopt(ev->fd, SOL_SOCKET, SO_TYPE, (void *)&t, &l) == 0 &&
t == SOCK_STREAM) {
ev->is_tcp = 1;
ev->read_wouldblock = 0;
ev->write_wouldblock = 0;
} else
ev->is_tcp = 0;
#endif
if (schedule_my_event(AS_MY_EVENT(ev))) if (schedule_my_event(AS_MY_EVENT(ev)))
return -1; return -1;
return 0; return 0;
@ -288,7 +352,14 @@ static void my_winsock_unregister_wsaevent(struct ub_event* ev)
static void my_winsock_tcp_wouldblock(struct ub_event* ev, int bits) static void my_winsock_tcp_wouldblock(struct ub_event* ev, int bits)
{ {
#ifndef USE_WINSOCK
(void)ev; (void)bits; (void)ev; (void)bits;
#else
if (bits & UB_EV_READ)
AS_MY_EVENT(ev)->read_wouldblock = 1;
if (bits & UB_EV_WRITE)
AS_MY_EVENT(ev)->write_wouldblock = 1;
#endif
} }
static struct ub_event* my_event_new(struct ub_event_base* base, int fd, static struct ub_event* my_event_new(struct ub_event_base* base, int fd,
@ -323,6 +394,12 @@ static struct ub_event* my_event_new(struct ub_event_base* base, int fd,
ev->timeout = (uint64_t)-1; ev->timeout = (uint64_t)-1;
ev->cb = cb; ev->cb = cb;
ev->arg = arg; ev->arg = arg;
#ifdef USE_WINSOCK
ev->is_tcp = 0;
ev->read_wouldblock = 0;
ev->write_wouldblock = 0;
#endif
ev->active = NULL;
ev->gev.userarg = ev; ev->gev.userarg = ev;
ev->gev.read_cb = bits & UB_EV_READ ? read_cb : NULL; ev->gev.read_cb = bits & UB_EV_READ ? read_cb : NULL;
ev->gev.write_cb = bits & UB_EV_WRITE ? write_cb : NULL; ev->gev.write_cb = bits & UB_EV_WRITE ? write_cb : NULL;
@ -333,7 +410,9 @@ static struct ub_event* my_event_new(struct ub_event_base* base, int fd,
static struct ub_event* my_signal_new(struct ub_event_base* base, int fd, static struct ub_event* my_signal_new(struct ub_event_base* base, int fd,
void (*cb)(int, short, void*), void* arg) void (*cb)(int, short, void*), void* arg)
{ {
return my_event_new(base, fd, UB_EV_SIGNAL | UB_EV_PERSIST, cb, arg); /* Not applicable, because in unbound used in the daemon only */
(void)base; (void)fd; (void)cb; (void)arg;
return NULL;
} }
static struct ub_event* my_winsock_register_wsaevent(struct ub_event_base *b, static struct ub_event* my_winsock_register_wsaevent(struct ub_event_base *b,