diff --git a/utils/websocket.c b/utils/websocket.c index 54ce9879..1bcc21b9 100644 --- a/utils/websocket.c +++ b/utils/websocket.c @@ -39,13 +39,16 @@ const char policy_response[] = "ssl) { - //printf("SSL recv\n"); + //handler_msg("SSL recv\n"); return SSL_read(ctx->ssl, buf, len); } else { return recv(ctx->sockfd, buf, len, 0); @@ -98,7 +101,7 @@ ssize_t ws_recv(ws_ctx_t *ctx, void *buf, size_t len) { ssize_t ws_send(ws_ctx_t *ctx, const void *buf, size_t len) { if (ctx->ssl) { - //printf("SSL send\n"); + //handler_msg("SSL send\n"); return SSL_write(ctx->ssl, buf, len); } else { return send(ctx->sockfd, buf, len, 0); @@ -202,7 +205,7 @@ int decode(char *src, size_t srclength, u_char *target, size_t targsize) { int i, len, framecount = 0, retlen = 0; unsigned char chr; if ((src[0] != '\x00') || (src[srclength-1] != '\xff')) { - fprintf(stderr, "WebSocket framing error\n"); + handler_emsg("WebSocket framing error\n"); return -1; } start = src+1; // Skip '\x00' start @@ -337,13 +340,13 @@ ws_ctx_t *do_handshake(int sock) { len = recv(sock, handshake, 1024, MSG_PEEK); handshake[len] = 0; if (len == 0) { - printf("Ignoring empty handshake\n"); + handler_msg("ignoring empty handshake\n"); close(sock); return NULL; } else if (bcmp(handshake, "", 22) == 0) { len = recv(sock, handshake, 1024, 0); handshake[len] = 0; - printf("Sending flash policy response\n"); + handler_msg("sending flash policy response\n"); send(sock, policy_response, sizeof(policy_response), 0); close(sock); return NULL; @@ -353,22 +356,22 @@ ws_ctx_t *do_handshake(int sock) { ws_ctx = ws_socket_ssl(sock, settings.cert); if (! ws_ctx) { return NULL; } scheme = "wss"; - printf(" using SSL socket\n"); + handler_msg("using SSL socket\n"); } else if (settings.ssl_only) { - printf("Non-SSL connection disallowed\n"); + handler_msg("non-SSL connection disallowed\n"); close(sock); return NULL; } else { ws_ctx = ws_socket(sock); if (! ws_ctx) { return NULL; } scheme = "ws"; - printf(" using plain (not SSL) socket\n"); + handler_msg("using plain (not SSL) socket\n"); } len = ws_recv(ws_ctx, handshake, 4096); handshake[len] = 0; if (!parse_handshake(handshake, &headers)) { - fprintf(stderr, "Invalid WS request\n"); + handler_emsg("Invalid WS request\n"); close(sock); return NULL; } @@ -376,16 +379,16 @@ ws_ctx_t *do_handshake(int sock) { if (headers.key3[0] != '\0') { gen_md5(&headers, trailer); pre = "Sec-"; - printf(" using protocol version 76\n"); + handler_msg("using protocol version 76\n"); } else { trailer[0] = '\0'; pre = ""; - printf(" using protocol version 75\n"); + handler_msg("using protocol version 75\n"); } sprintf(response, server_handshake, pre, headers.origin, pre, scheme, headers.host, headers.path, pre, trailer); - //printf("response: %s\n", response); + //handler_msg("response: %s\n", response); ws_send(ws_ctx, response, strlen(response)); return ws_ctx; @@ -393,7 +396,8 @@ ws_ctx_t *do_handshake(int sock) { void signal_handler(sig) { switch (sig) { - case SIGHUP: break; // ignore + case SIGHUP: break; // ignore for now + case SIGPIPE: pipe_error = 1; break; // handle inline case SIGTERM: exit(0); break; } } @@ -434,7 +438,7 @@ void daemonize(int keepfd) { void start_server() { - int lsock, csock, clilen, sopt = 1, i; + int lsock, csock, pid, clilen, sopt = 1, i; struct sockaddr_in serv_addr, cli_addr; ws_ctx_t *ws_ctx; @@ -470,18 +474,26 @@ void start_server() { } listen(lsock,100); + signal(SIGPIPE, signal_handler); // catch pipe + if (settings.daemon) { daemonize(lsock); } + if (settings.multiprocess) { + printf("Waiting for connections on %s:%d\n", + settings.listen_host, settings.listen_port); + // Reep zombies + signal(SIGCHLD, SIG_IGN); + } + while (1) { clilen = sizeof(cli_addr); - if (settings.listen_host && settings.listen_host[0] != '\0') { - printf("waiting for connection on %s:%d\n", + pipe_error = 0; + pid = 0; + if (! settings.multiprocess) { + printf("Waiting for connection on %s:%d\n", settings.listen_host, settings.listen_port); - } else { - printf("waiting for connection on port %d\n", - settings.listen_port); } csock = accept(lsock, (struct sockaddr *) &cli_addr, @@ -490,7 +502,8 @@ void start_server() { error("ERROR on accept"); continue; } - printf("Got client connection from %s\n", inet_ntoa(cli_addr.sin_addr)); + handler_msg("got client connection from %s\n", + inet_ntoa(cli_addr.sin_addr)); ws_ctx = do_handshake(csock); if (ws_ctx == NULL) { close(csock); @@ -501,8 +514,24 @@ void start_server() { * 20 for WS '\x00' / '\xff' and good measure */ dbufsize = (bufsize * 3)/4 - 20; - settings.handler(ws_ctx); - close(csock); + if (settings.multiprocess) { + handler_msg("forking handler process\n"); + pid = fork(); + } + + if (pid == 0) { // handler process + settings.handler(ws_ctx); + if (pipe_error) { + handler_emsg("Closing due to SIGPIPE\n"); + } + close(csock); + if (settings.multiprocess) { + handler_msg("handler exit\n"); + break; // Child process exits + } + } else { // parent process + settings.handler_id += 1; + } } } diff --git a/utils/websocket.h b/utils/websocket.h index 6271cc94..3246d738 100644 --- a/utils/websocket.h +++ b/utils/websocket.h @@ -10,8 +10,10 @@ typedef struct { char listen_host[256]; int listen_port; void (*handler)(ws_ctx_t*); + int handler_id; int ssl_only; int daemon; + int multiprocess; char *record; char *cert; } settings_t; @@ -34,3 +36,16 @@ ssize_t ws_send(ws_ctx_t *ctx, const void *buf, size_t len); //int b64_ntop(u_char const *src, size_t srclength, char *target, size_t targsize); //int b64_pton(char const *src, u_char *target, size_t targsize); +#define gen_handler_msg(stream, ...) \ + if (! settings.daemon) { \ + if (settings.multiprocess) { \ + fprintf(stream, " %d: ", settings.handler_id); \ + } else { \ + fprintf(stream, " "); \ + } \ + fprintf(stream, __VA_ARGS__); \ + } + +#define handler_msg(...) gen_handler_msg(stdout, __VA_ARGS__); +#define handler_emsg(...) gen_handler_msg(stderr, __VA_ARGS__); + diff --git a/utils/websocket.py b/utils/websocket.py index 75789469..abc463f1 100755 --- a/utils/websocket.py +++ b/utils/websocket.py @@ -25,9 +25,11 @@ settings = { 'listen_host' : '', 'listen_port' : None, 'handler' : None, + 'handler_id' : 1, 'cert' : None, 'ssl_only' : False, 'daemon' : True, + 'multiprocess': False, 'record' : None, } server_handshake = """HTTP/1.1 101 Web Socket Protocol Handshake\r @@ -41,9 +43,20 @@ Connection: Upgrade\r policy_response = """\n""" +class EClose(Exception): + pass + def traffic(token="."): - sys.stdout.write(token) - sys.stdout.flush() + if not settings['daemon'] and not settings['multiprocess']: + sys.stdout.write(token) + sys.stdout.flush() + +def handler_msg(msg): + if not settings['daemon']: + if settings['multiprocess']: + print " %d: %s" % (settings['handler_id'], msg) + else: + print " %s" % msg def encode(buf): buf = b64encode(buf) @@ -89,14 +102,14 @@ def do_handshake(sock): # Peek, but don't read the data handshake = sock.recv(1024, socket.MSG_PEEK) - #print "Handshake [%s]" % repr(handshake) + #handler_msg("Handshake [%s]" % repr(handshake)) if handshake == "": - print "Ignoring empty handshake" + handler_msg("ignoring empty handshake") sock.close() return False elif handshake.startswith(""): handshake = sock.recv(1024) - print "Sending flash policy response" + handler_msg("Sending flash policy response") sock.send(policy_response) sock.close() return False @@ -107,32 +120,32 @@ def do_handshake(sock): certfile=settings['cert'], ssl_version=ssl.PROTOCOL_TLSv1) scheme = "wss" - print " using SSL/TLS" + handler_msg("using SSL/TLS") elif settings['ssl_only']: - print "Non-SSL connection disallowed" + handler_msg("non-SSL connection disallowed") sock.close() return False else: retsock = sock scheme = "ws" - print " using plain (not SSL) socket" + handler_msg("using plain (not SSL) socket") handshake = retsock.recv(4096) - #print "handshake: " + repr(handshake) + #handler_msg("handshake: " + repr(handshake)) h = parse_handshake(handshake) if h.get('key3'): trailer = gen_md5(h) pre = "Sec-" - print " using protocol version 76" + handler_msg("using protocol version 76") else: trailer = "" pre = "" - print " using protocol version 75" + handler_msg("using protocol version 75") response = server_handshake % (pre, h['Origin'], pre, scheme, h['Host'], h['path'], pre, trailer) - #print "sending response:", repr(response) + #handler_msg("sending response:", repr(response)) retsock.send(response) return retsock @@ -177,21 +190,44 @@ def start_server(): lsock.bind((settings['listen_host'], settings['listen_port'])) lsock.listen(100) - if settings['daemon']: daemonize(keepfd=lsock.fileno()) + if settings['daemon']: + daemonize(keepfd=lsock.fileno()) + + if settings['multiprocess']: + print 'Waiting for connections on %s:%s' % ( + settings['listen_host'], settings['listen_port']) + # Reep zombies + signal.signal(signal.SIGCHLD, signal.SIG_IGN) while True: try: csock = startsock = None - print 'waiting for connection on port %s' % settings['listen_port'] + pid = 0 + if not settings['multiprocess']: + print 'Waiting for connection on %s:%s' % ( + settings['listen_host'], settings['listen_port']) startsock, address = lsock.accept() - print 'Got client connection from %s' % address[0] + handler_msg('got client connection from %s' % address[0]) csock = do_handshake(startsock) if not csock: continue - settings['handler'](csock) + if settings['multiprocess']: + handler_msg("forking handler process") + pid = os.fork() - except Exception: - print "Ignoring exception:" - print traceback.format_exc() + if pid == 0: # handler process + settings['handler'](csock) + else: # parent process + settings['handler_id'] += 1 + + except EClose, exc: + handler_msg("handler exit: %s" % exc.args) + except Exception, exc: + handler_msg("handler exception: %s" % str(exc)) + #handler_msg(traceback.format_exc()) + + if pid == 0: if csock: csock.close() if startsock and startsock != csock: startsock.close() + + if settings['multiprocess']: break # Child process exits diff --git a/utils/wsproxy.c b/utils/wsproxy.c index 77f8de56..27755dee 100644 --- a/utils/wsproxy.c +++ b/utils/wsproxy.c @@ -45,6 +45,7 @@ char USAGE[] = "Usage: [options] " \ char target_host[256]; int target_port; +extern pipe_error; extern settings_t settings; extern char *tbuf, *cbuf, *tbuf_tmp, *cbuf_tmp; extern unsigned int bufsize, dbufsize; @@ -86,29 +87,32 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { } ret = select(maxfd, &rlist, &wlist, &elist, &tv); + if (pipe_error) { break; } if (FD_ISSET(target, &elist)) { - fprintf(stderr, "target exception\n"); + handler_emsg("target exception\n"); break; } if (FD_ISSET(client, &elist)) { - fprintf(stderr, "client exception\n"); + handler_emsg("client exception\n"); break; } if (ret == -1) { - error("select()"); + handler_emsg("select(): %s\n", strerror(errno)); break; } else if (ret == 0) { - //fprintf(stderr, "select timeout\n"); + //handler_emsg("select timeout\n"); continue; } if (FD_ISSET(target, &wlist)) { len = tend-tstart; bytes = send(target, tbuf + tstart, len, 0); + if (pipe_error) { break; } if (bytes < 0) { - error("target connection error"); + handler_emsg("target connection error: %s\n", + strerror(errno)); break; } tstart += bytes; @@ -123,8 +127,9 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { if (FD_ISSET(client, &wlist)) { len = cend-cstart; bytes = ws_send(ws_ctx, cbuf + cstart, len); + if (pipe_error) { break; } if (len < 3) { - fprintf(stderr, "len: %d, bytes: %d: %d\n", len, bytes, *(cbuf + cstart)); + handler_emsg("len: %d, bytes: %d: %d\n", len, bytes, *(cbuf + cstart)); } cstart += bytes; if (cstart >= cend) { @@ -137,8 +142,9 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { if (FD_ISSET(target, &rlist)) { bytes = recv(target, cbuf_tmp, dbufsize , 0); + if (pipe_error) { break; } if (bytes <= 0) { - fprintf(stderr, "target closed connection"); + handler_emsg("target closed connection\n"); break; } cstart = 0; @@ -151,7 +157,7 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { printf("\n"); */ if (cend < 0) { - fprintf(stderr, "encoding error\n"); + handler_emsg("encoding error\n"); break; } traffic("{"); @@ -159,13 +165,14 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { if (FD_ISSET(client, &rlist)) { bytes = ws_recv(ws_ctx, tbuf_tmp, bufsize-1); + if (pipe_error) { break; } if (bytes <= 0) { - fprintf(stderr, "client closed connection\n"); + handler_emsg("client closed connection\n"); break; } else if ((bytes == 2) && (tbuf_tmp[0] == '\xff') && (tbuf_tmp[1] == '\x00')) { - fprintf(stderr, "client sent orderly close frame"); + handler_emsg("client sent orderly close frame\n"); break; } /* @@ -184,7 +191,7 @@ void do_proxy(ws_ctx_t *ws_ctx, int target) { printf("\n"); */ if (len < 0) { - fprintf(stderr, "decoding error\n"); + handler_emsg("decoding error\n"); break; } traffic("}"); @@ -198,11 +205,12 @@ void proxy_handler(ws_ctx_t *ws_ctx) { int tsock = 0; struct sockaddr_in taddr; - printf("Connecting to: %s:%d\n", target_host, target_port); + handler_msg("connecting to: %s:%d\n", target_host, target_port); tsock = socket(AF_INET, SOCK_STREAM, 0); if (tsock < 0) { - error("Could not create target socket"); + handler_emsg("Could not create target socket: %s\n", + strerror(errno)); return; } bzero((char *) &taddr, sizeof(taddr)); @@ -211,16 +219,20 @@ void proxy_handler(ws_ctx_t *ws_ctx) { /* Resolve target address */ if (resolve_host(&taddr.sin_addr, target_host) < -1) { - error("Could not resolve target address"); + handler_emsg("Could not resolve target address: %s\n", + strerror(errno)); } if (connect(tsock, (struct sockaddr *) &taddr, sizeof(taddr)) < 0) { - error("Could not connect to target"); + handler_emsg("Could not connect to target: %s\n", + strerror(errno)); close(tsock); return; } - printf("%s", traffic_legend); + if ((! settings.daemon) && (! settings.multiprocess)) { + printf("%s", traffic_legend); + } do_proxy(ws_ctx, tsock); @@ -230,11 +242,12 @@ void proxy_handler(ws_ctx_t *ws_ctx) { int main(int argc, char *argv[]) { int fd, c, option_index = 0; - static int ssl_only = 0, foreground = 0; + static int ssl_only = 0, foreground = 0, multi = 0; char *found; static struct option long_options[] = { {"ssl-only", no_argument, &ssl_only, 1 }, {"foreground", no_argument, &foreground, 'f'}, + {"multiprocess", no_argument, &multi, 'm'}, /* ---- */ {"cert", required_argument, 0, 'c'}, {0, 0, 0, 0} @@ -243,7 +256,7 @@ int main(int argc, char *argv[]) settings.cert = realpath("self.pem", NULL); while (1) { - c = getopt_long (argc, argv, "fr:c:", + c = getopt_long (argc, argv, "fmc:", long_options, &option_index); /* Detect the end */ @@ -257,6 +270,9 @@ int main(int argc, char *argv[]) case 'f': foreground = 1; break; + case 'm': + multi = 1; + break; case 'r': if ((fd = open(optarg, O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < -1) { @@ -274,8 +290,9 @@ int main(int argc, char *argv[]) usage(""); } } - settings.ssl_only = ssl_only; - settings.daemon = foreground ? 0: 1; + settings.ssl_only = ssl_only; + settings.daemon = foreground ? 0: 1; + settings.multiprocess = multi; if ((argc-optind) != 2) { usage("Invalid number of arguments\n"); @@ -314,6 +331,7 @@ int main(int argc, char *argv[]) //printf(" ssl_only: %d\n", settings.ssl_only); //printf(" daemon: %d\n", settings.daemon); + //printf(" multiproces: %d\n", settings.multiprocess); //printf(" cert: %s\n", settings.cert); settings.handler = proxy_handler; diff --git a/utils/wsproxy.py b/utils/wsproxy.py index c2a7e990..dfa3ddeb 100755 --- a/utils/wsproxy.py +++ b/utils/wsproxy.py @@ -72,7 +72,7 @@ def do_proxy(client, target): if target in ins: buf = target.recv(buffer_size) - if len(buf) == 0: raise Exception("Target closed") + if len(buf) == 0: raise EClose("Target closed") cqueue.append(encode(buf)) traffic("{") @@ -80,10 +80,10 @@ def do_proxy(client, target): if client in ins: buf = client.recv(buffer_size) - if len(buf) == 0: raise Exception("Client closed") + if len(buf) == 0: raise EClose("Client closed") if buf == '\xff\x00': - raise Exception("Client sent orderly close frame") + raise EClose("Client sent orderly close frame") elif buf[-1] == '\xff': if buf.count('\xff') > 1: traffic(str(buf.count('\xff'))) @@ -104,15 +104,16 @@ def proxy_handler(client): global target_host, target_port, options, rec if settings['record']: - print "Opening record file: %s" % settings['record'] + handler_msg("opening record file: %s" % settings['record']) rec = open(settings['record'], 'w+') rec.write("var VNC_frame_data = [\n") - print "Connecting to: %s:%s" % (target_host, target_port) + handler_msg("connecting to: %s:%s" % (target_host, target_port)) tsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tsock.connect((target_host, target_port)) - print traffic_legend + if not settings['daemon'] and not settings['multiprocess']: + print traffic_legend try: do_proxy(client, tsock) @@ -132,6 +133,9 @@ if __name__ == '__main__': parser.add_option("--foreground", "-f", dest="daemon", default=True, action="store_false", help="stay in foreground, do not daemonize") + parser.add_option("--multiprocess", "-m", + dest="multiprocess", action="store_true", + help="fork handler processes") parser.add_option("--ssl-only", action="store_true", help="disallow non-encrypted connections") parser.add_option("--cert", default="self.pem", @@ -162,6 +166,7 @@ if __name__ == '__main__': settings['cert'] = os.path.abspath(options.cert) settings['ssl_only'] = options.ssl_only settings['daemon'] = options.daemon + settings['multiprocess'] = options.multiprocess if options.record: settings['record'] = os.path.abspath(options.record) start_server()