diff --git a/src/ub_loop.c b/src/ub_loop.c index 49c57115..e0755cd5 100644 --- a/src/ub_loop.c +++ b/src/ub_loop.c @@ -106,6 +106,12 @@ typedef struct my_event { void (*cb)(int, short, void *arg); /** callback user arg */ void *arg; +#ifdef USE_WINSOCK + int is_tcp; + int read_wouldblock; + int write_wouldblock; +#endif + int *active; } my_event; #define AS_UB_LOOP(x) \ @@ -141,9 +147,10 @@ static int my_event_base_loopexit(struct ub_event_base* base, struct timeval* tv #define CLEAR_MY_EVENT(ev) \ do { (ev)->loop->extension->vmt->clear((ev)->loop->extension, \ - &(ev)->gev); (ev)->added = 0; } while(0) + &(ev)->gev); (ev)->added = 0; if ((ev)->active) { \ + *(ev)->active = 0; (ev)->active = NULL; }} while(0) -static inline getdns_return_t schedule_my_event(my_event *ev) +static getdns_return_t schedule_my_event(my_event *ev) { getdns_return_t r; @@ -159,25 +166,66 @@ static inline getdns_return_t schedule_my_event(my_event *ev) static void read_cb(void *userarg) { struct my_event *ev = (struct my_event *)userarg; - (*ev->cb)(ev->fd, UB_EV_READ, ev->arg); - if ((ev->bits & UB_EV_PERSIST) == 0) - CLEAR_MY_EVENT(ev); + + int active = 1; + ev->active = &active; + +#ifdef USE_WINSOCK + if (ev->is_tcp) { + ev->read_wouldblock = 0; + do { + (*ev->cb)(ev->fd, UB_EV_READ, ev->arg); + } while (active && !ev->read_wouldblock && (ev->bits & UB_EV_PERSIST)); + } else + (*ev->cb)(ev->fd, UB_EV_READ, ev->arg); +#else + (*ev->cb)(ev->fd, UB_EV_READ, ev->arg); +#endif + if (active) { + ev->active = NULL; + if ((ev->bits & UB_EV_PERSIST) == 0) + CLEAR_MY_EVENT(ev); + } } static void write_cb(void *userarg) { struct my_event *ev = (struct my_event *)userarg; - (*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg); - if ((ev->bits & UB_EV_PERSIST) == 0) - CLEAR_MY_EVENT(ev); + + int active = 1; + ev->active = &active; + +#ifdef USE_WINSOCK + if (ev->is_tcp) { + ev->write_wouldblock = 0; + do { + (*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg); + } while (active && !ev->write_wouldblock && (ev->bits & UB_EV_PERSIST)); + } else + (*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg); +#else + (*ev->cb)(ev->fd, UB_EV_WRITE, ev->arg); +#endif + if (active) { + ev->active = NULL; + if ((ev->bits & UB_EV_PERSIST) == 0) + CLEAR_MY_EVENT(ev); + } } static void timeout_cb(void *userarg) { struct my_event *ev = (struct my_event *)userarg; + + int active = 1; + ev->active = &active; + (*ev->cb)(ev->fd, UB_EV_TIMEOUT, ev->arg); - if ((ev->bits & UB_EV_PERSIST) == 0) - CLEAR_MY_EVENT(ev); + if (active) { + ev->active = NULL; + if ((ev->bits & UB_EV_PERSIST) == 0) + CLEAR_MY_EVENT(ev); + } } static getdns_return_t set_gev_callbacks(my_event* ev, short bits) @@ -235,12 +283,28 @@ static int my_event_del(struct ub_event* ev) return 0; } -static int my_event_add(struct ub_event* ev, struct timeval* tv) +static int my_event_add(struct ub_event* ub_ev, struct timeval* tv) { - if (AS_MY_EVENT(ev)->added) - my_event_del(ev); - if (tv && (AS_MY_EVENT(ev)->bits & UB_EV_TIMEOUT) != 0) - AS_MY_EVENT(ev)->timeout = (tv->tv_sec * 1000) + (tv->tv_usec / 1000); + my_event *ev = AS_MY_EVENT(ub_ev); +#ifdef USE_WINSOCK + int t, l = sizeof(t); +#endif + + if (ev->added) + my_event_del(&ev->super); + if (tv && (ev->bits & UB_EV_TIMEOUT) != 0) + ev->timeout = (tv->tv_sec * 1000) + (tv->tv_usec / 1000); +#ifdef USE_WINSOCK + if ((ev->bits & (UB_EV_READ|UB_EV_WRITE)) && ev->fd != -1 && + getsockopt(ev->fd, SOL_SOCKET, SO_TYPE, (void *)&t, &l) == 0 && + t == SOCK_STREAM) { + + ev->is_tcp = 1; + ev->read_wouldblock = 0; + ev->write_wouldblock = 0; + } else + ev->is_tcp = 0; +#endif if (schedule_my_event(AS_MY_EVENT(ev))) return -1; return 0; @@ -288,7 +352,14 @@ static void my_winsock_unregister_wsaevent(struct ub_event* ev) static void my_winsock_tcp_wouldblock(struct ub_event* ev, int bits) { +#ifndef USE_WINSOCK (void)ev; (void)bits; +#else + if (bits & UB_EV_READ) + AS_MY_EVENT(ev)->read_wouldblock = 1; + if (bits & UB_EV_WRITE) + AS_MY_EVENT(ev)->write_wouldblock = 1; +#endif } static struct ub_event* my_event_new(struct ub_event_base* base, int fd, @@ -323,6 +394,12 @@ static struct ub_event* my_event_new(struct ub_event_base* base, int fd, ev->timeout = (uint64_t)-1; ev->cb = cb; ev->arg = arg; +#ifdef USE_WINSOCK + ev->is_tcp = 0; + ev->read_wouldblock = 0; + ev->write_wouldblock = 0; +#endif + ev->active = NULL; ev->gev.userarg = ev; ev->gev.read_cb = bits & UB_EV_READ ? read_cb : NULL; ev->gev.write_cb = bits & UB_EV_WRITE ? write_cb : NULL; @@ -333,7 +410,9 @@ static struct ub_event* my_event_new(struct ub_event_base* base, int fd, static struct ub_event* my_signal_new(struct ub_event_base* base, int fd, void (*cb)(int, short, void*), void* arg) { - return my_event_new(base, fd, UB_EV_SIGNAL | UB_EV_PERSIST, cb, arg); + /* Not applicable, because in unbound used in the daemon only */ + (void)base; (void)fd; (void)cb; (void)arg; + return NULL; } static struct ub_event* my_winsock_register_wsaevent(struct ub_event_base *b,