diff --git a/deps/evcom/evcom.c b/deps/evcom/evcom.c index 48220a607..40e80de6e 100644 --- a/deps/evcom/evcom.c +++ b/deps/evcom/evcom.c @@ -392,7 +392,7 @@ stream__handshake (evcom_stream *stream) return OKAY; } - evcom_stream_reset_timeout(stream); + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) { if (0 == gnutls_record_get_direction((stream)->session)) { @@ -411,11 +411,14 @@ stream__handshake (evcom_stream *stream) stream->flags |= EVCOM_CONNECTED; if (stream->on_connect) stream->on_connect(stream); - ev_io_start(D_LOOP_(stream) &stream->read_watcher); - ev_io_start(D_LOOP_(stream) &stream->write_watcher); + /* evcom_stream_force_close might have been called. */ + if (stream->recvfd >= 0 && stream->sendfd >= 0) { + ev_io_start(D_LOOP_(stream) &stream->read_watcher); + ev_io_start(D_LOOP_(stream) &stream->write_watcher); - stream->send_action = stream_send__data; - stream->recv_action = stream_recv__data; + stream->send_action = stream_send__data; + stream->recv_action = stream_recv__data; + } return OKAY; } @@ -541,7 +544,7 @@ stream_recv__data (evcom_stream *stream) return OKAY; } - evcom_stream_reset_timeout(stream); + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); assert(recved >= 0); @@ -614,7 +617,7 @@ stream_send__data (evcom_stream *stream) return OKAY; } - evcom_stream_reset_timeout(stream); + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); assert(sent >= 0); @@ -638,14 +641,24 @@ stream_send__shutdown (evcom_stream *stream) int r = shutdown(stream->sendfd, SHUT_WR); if (r < 0) { - stream->errorno = errno; - evcom_perror("shutdown()", errno); + switch (errno) { + case EINTR: + assert(stream->send_action == stream_send__shutdown); + return OKAY; + + case ENOTCONN: + break; + + default: + stream->errorno = errno; + evcom_perror("shutdown()", errno); + break; + } stream->send_action = stream_send__close; return OKAY; } stream->flags &= ~EVCOM_WRITABLE; - stream->send_action = stream_send__wait_for_eof; return OKAY; } @@ -985,13 +998,15 @@ on_timeout (EV_P_ ev_timer *watcher, int revents) assert(watcher == &stream->timeout_watcher); if (PAUSED(stream)) { - evcom_stream_reset_timeout(stream); + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); return; } if (stream->on_timeout) stream->on_timeout(stream); evcom_stream_force_close(stream); + + if (stream->on_close) stream->on_close(stream); } static void @@ -1045,7 +1060,7 @@ stream_event (EV_P_ ev_io *w, int revents) * gnutls_db_set_ptr (stream->session, _); */ void -evcom_stream_init (evcom_stream *stream, float timeout) +evcom_stream_init (evcom_stream *stream) { stream->flags = 0; stream->errorno = 0; @@ -1069,7 +1084,7 @@ evcom_stream_init (evcom_stream *stream, float timeout) stream->gnutls_errorno = 0; stream->session = NULL; #endif - ev_timer_init(&stream->timeout_watcher, on_timeout, 0., timeout); + ev_timer_init(&stream->timeout_watcher, on_timeout, 0., 60.); stream->timeout_watcher.data = stream; stream->on_connect = NULL; @@ -1098,8 +1113,8 @@ void evcom_stream_force_close (evcom_stream *stream) if (!DUPLEX(stream) && stream->sendfd >= 0) { close(stream->sendfd); - stream__set_send_closed(stream); } + stream__set_send_closed(stream); evcom_stream_detach(stream); } @@ -1175,9 +1190,12 @@ close: } void -evcom_stream_reset_timeout (evcom_stream *stream) +evcom_stream_reset_timeout (evcom_stream *stream, float timeout) { - ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); + stream->timeout_watcher.repeat = timeout; + if (ATTACHED(stream)) { + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); + } } void @@ -1211,6 +1229,7 @@ void evcom_stream_read_pause (evcom_stream *stream) { stream->flags |= EVCOM_PAUSED; + ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher); if (stream->recv_action == stream_recv__data) { ev_io_stop(D_LOOP_(stream) &stream->read_watcher); stream->recv_action = stream_recv__wait_for_resume; @@ -1221,12 +1240,13 @@ void evcom_stream_read_resume (evcom_stream *stream) { stream->flags &= ~EVCOM_PAUSED; - evcom_stream_reset_timeout(stream); + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); if (stream->recv_action == stream_recv__wait_for_resume) { stream->recv_action = stream_recv__data; } - if (ATTACHED(stream) && READABLE(stream)) { - ev_io_start(D_LOOP_(stream) &stream->read_watcher); + if (ATTACHED(stream)) { + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); + if (READABLE(stream)) ev_io_start(D_LOOP_(stream) &stream->read_watcher); } } diff --git a/deps/evcom/evcom.h b/deps/evcom/evcom.h index 88056a9ef..977941692 100644 --- a/deps/evcom/evcom.h +++ b/deps/evcom/evcom.h @@ -181,7 +181,7 @@ void evcom_server_attach (EV_P_ evcom_server *); void evcom_server_detach (evcom_server *); void evcom_server_close (evcom_server *); -void evcom_stream_init (evcom_stream *, float timeout); +void evcom_stream_init (evcom_stream *); int evcom_stream_pair (evcom_stream *a, evcom_stream *b); int evcom_stream_connect (evcom_stream *, struct sockaddr *address); @@ -191,8 +191,7 @@ void evcom_stream_attach (EV_P_ evcom_stream *); void evcom_stream_detach (evcom_stream *); void evcom_stream_read_resume (evcom_stream *); void evcom_stream_read_pause (evcom_stream *); -/* Resets the timeout to stay alive for another stream->timeout seconds */ -void evcom_stream_reset_timeout (evcom_stream *); +void evcom_stream_reset_timeout (evcom_stream *, float timeout); void evcom_stream_write (evcom_stream *, const char *str, size_t len); /* Once the write buffer is drained, evcom_stream_close will shutdown the * writing end of the stream and will close the read end once the server diff --git a/deps/evcom/test/echo.c b/deps/evcom/test/echo.c index 4e05f1483..15c17b3ce 100644 --- a/deps/evcom/test/echo.c +++ b/deps/evcom/test/echo.c @@ -58,10 +58,11 @@ on_server_connection (evcom_server *server, struct sockaddr *addr) assert(addr); evcom_stream *stream = malloc(sizeof(evcom_stream)); - evcom_stream_init(stream, TIMEOUT); + evcom_stream_init(stream); stream->on_read = on_peer_read; stream->on_close = on_peer_close; stream->on_timeout = on_peer_timeout; + evcom_stream_reset_timeout(stream, TIMEOUT); nconnections++; diff --git a/deps/evcom/test/test.c b/deps/evcom/test/test.c index 0ce337086..cbfd7bb3c 100644 --- a/deps/evcom/test/test.c +++ b/deps/evcom/test/test.c @@ -155,10 +155,11 @@ pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr) assert(addr); evcom_stream *stream = malloc(sizeof(evcom_stream)); - evcom_stream_init(stream, PINGPONG_TIMEOUT); + evcom_stream_init(stream); stream->on_read = pingpong_on_peer_read; stream->on_close = common_on_peer_close; stream->on_timeout = common_on_peer_timeout; + evcom_stream_reset_timeout(stream, PINGPONG_TIMEOUT); assert(EVCOM_INITIALIZED == evcom_stream_state(stream)); @@ -226,11 +227,12 @@ pingpong (struct sockaddr *address) assert(r == 0); evcom_server_attach(EV_DEFAULT_ &server); - evcom_stream_init(&client, PINGPONG_TIMEOUT); + evcom_stream_init(&client); client.on_read = pingpong_on_client_read; client.on_connect = pingpong_on_client_connect; client.on_close = pingpong_on_client_close; client.on_timeout = common_on_client_timeout; + evcom_stream_reset_timeout(&client, PINGPONG_TIMEOUT); assert(EVCOM_INITIALIZED == evcom_stream_state(&client)); @@ -274,10 +276,11 @@ connint_on_connection(evcom_server *_server, struct sockaddr *addr) assert(addr); evcom_stream *stream = malloc(sizeof(evcom_stream)); - evcom_stream_init(stream, CONNINT_TIMEOUT); + evcom_stream_init(stream); stream->on_read = send_bye_and_close; stream->on_close = common_on_peer_close; stream->on_timeout = common_on_peer_timeout; + evcom_stream_reset_timeout(stream, CONNINT_TIMEOUT); #if EVCOM_HAVE_GNUTLS if (use_tls) anon_tls_server(stream); @@ -349,11 +352,12 @@ connint (struct sockaddr *address) int i; for (i = 0; i < NCONN; i++) { evcom_stream *client = &clients[i]; - evcom_stream_init(client, CONNINT_TIMEOUT); + evcom_stream_init(client); client->on_read = connint_on_client_read; client->on_connect = connint_on_client_connect; client->on_close = connint_on_client_close; client->on_timeout = common_on_client_timeout; + evcom_stream_reset_timeout(client, CONNINT_TIMEOUT); #if EVCOM_HAVE_GNUTLS if (use_tls) anon_tls_client(client); #endif @@ -554,18 +558,20 @@ pair_pingpong (int use_pipe) b_got_connect = 0; pair_pingpong_cnt = 0; - evcom_stream_init(&a, PAIR_PINGPONG_TIMEOUT); + evcom_stream_init(&a); a.on_close = a_close; a.on_connect = a_connect; a.on_read = a_read; + evcom_stream_reset_timeout(&a, PAIR_PINGPONG_TIMEOUT); #if EVCOM_HAVE_GNUTLS if (use_tls) anon_tls_client(&a); #endif - evcom_stream_init(&b, PAIR_PINGPONG_TIMEOUT); + evcom_stream_init(&b); b.on_close = b_close; b.on_connect = b_connect; b.on_read = b_read; + evcom_stream_reset_timeout(&b, PAIR_PINGPONG_TIMEOUT); #if EVCOM_HAVE_GNUTLS if (use_tls) anon_tls_server(&b); #endif @@ -639,10 +645,11 @@ make_echo_connection (evcom_server *server, struct sockaddr *addr) assert(addr); evcom_stream *stream = malloc(sizeof(evcom_stream)); - evcom_stream_init(stream, ZERO_TIMEOUT); + evcom_stream_init(stream); stream->on_read = echo; stream->on_close = free_stream; stream->on_timeout = error_out; + evcom_stream_reset_timeout(stream, ZERO_TIMEOUT); #if EVCOM_HAVE_GNUTLS if (use_tls) anon_tls_server(stream); @@ -717,11 +724,12 @@ zero_stream (struct sockaddr *address, size_t to_write) evcom_server_attach(EV_DEFAULT_ &server); evcom_stream client; - evcom_stream_init(&client, ZERO_TIMEOUT); + evcom_stream_init(&client); client.on_read = zero_recv; client.on_connect = zero_start; client.on_close = zero_close; client.on_timeout = error_out; + evcom_stream_reset_timeout(&client, ZERO_TIMEOUT); #if EVCOM_HAVE_GNUTLS if (use_tls) anon_tls_client(&client); #endif diff --git a/src/net.cc b/src/net.cc index 17f546d5b..c92d9d57d 100644 --- a/src/net.cc +++ b/src/net.cc @@ -66,6 +66,7 @@ Connection::Initialize (v8::Handle target) NODE_SET_PROTOTYPE_METHOD(constructor_template, "setEncoding", SetEncoding); NODE_SET_PROTOTYPE_METHOD(constructor_template, "readPause", ReadPause); NODE_SET_PROTOTYPE_METHOD(constructor_template, "readResume", ReadResume); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "setTimeout", SetTimeout); constructor_template->PrototypeTemplate()->SetAccessor( READY_STATE_SYMBOL, @@ -104,8 +105,7 @@ void Connection::Init (void) { resolving_ = false; - double timeout = 60.0; // default - evcom_stream_init(&stream_, timeout); + evcom_stream_init(&stream_); stream_.on_connect = Connection::on_connect; stream_.on_read = Connection::on_read; stream_.on_close = Connection::on_close; @@ -333,6 +333,21 @@ Connection::ReadResume (const Arguments& args) return Undefined(); } +Handle +Connection::SetTimeout (const Arguments& args) +{ + HandleScope scope; + + Connection *connection = ObjectWrap::Unwrap(args.This()); + assert(connection); + + float timeout = (float)(args[0]->IntegerValue()) / 1000; + + connection->SetTimeout(timeout); + + return Undefined(); +} + Handle Connection::Close (const Arguments& args) { diff --git a/src/net.h b/src/net.h index c3c588a92..1c587f973 100644 --- a/src/net.h +++ b/src/net.h @@ -28,6 +28,7 @@ protected: static v8::Handle SetEncoding (const v8::Arguments& args); static v8::Handle ReadPause (const v8::Arguments& args); static v8::Handle ReadResume (const v8::Arguments& args); + static v8::Handle SetTimeout (const v8::Arguments& args); static v8::Handle ReadyStateGetter (v8::Local _, const v8::AccessorInfo& info); @@ -51,6 +52,10 @@ protected: void ForceClose (void) { evcom_stream_force_close(&stream_); } void ReadPause (void) { evcom_stream_read_pause(&stream_); } void ReadResume (void) { evcom_stream_read_resume(&stream_); } + void SetTimeout (float timeout) + { + evcom_stream_reset_timeout(&stream_, timeout); + } virtual void OnConnect (void); virtual void OnReceive (const void *buf, size_t len); diff --git a/test/mjsunit/test-tcp-timeout.js b/test/mjsunit/test-tcp-timeout.js new file mode 100644 index 000000000..59df33a8c --- /dev/null +++ b/test/mjsunit/test-tcp-timeout.js @@ -0,0 +1,79 @@ +include("mjsunit.js"); +port = 9992; +exchanges = 0; +starttime = null; +timeouttime = null; +timeout = 1000; + +var echo_server = node.tcp.createServer(function (socket) { + socket.setTimeout(timeout); + + socket.addListener("timeout", function (d) { + puts("server timeout"); + timeouttime = new Date; + p(timeouttime); + }); + + socket.addListener("receive", function (d) { + p(d); + socket.send(d); + }); + + socket.addListener("eof", function () { + socket.close(); + }); +}); + +echo_server.listen(port); +puts("server listening at " + port); + +var client = node.tcp.createConnection(port); +client.setEncoding("UTF8"); +client.setTimeout(0); // disable the timeout for client +client.addListener("connect", function () { + puts("client connected."); + client.send("hello\r\n"); +}); + +client.addListener("receive", function (chunk) { + assertEquals("hello\r\n", chunk); + if (exchanges++ < 5) { + setTimeout(function () { + puts("client send 'hello'"); + client.send("hello\r\n"); + }, 500); + + if (exchanges == 5) { + puts("wait for timeout - should come in " + timeout + " ms"); + starttime = new Date; + p(starttime); + } + } +}); + +client.addListener("timeout", function () { + puts("client timeout - this shouldn't happen"); + assertFalse(true); +}); + +client.addListener("eof", function () { + puts("client eof"); + client.close(); +}); + +client.addListener("close", function (had_error) { + puts("client disconnect"); + echo_server.close(); + assertFalse(had_error); +}); + +process.addListener("exit", function () { + assertTrue(starttime != null); + assertTrue(timeouttime != null); + + diff = timeouttime - starttime; + puts("diff = " + diff); + assertTrue(timeout < diff); + // Allow for 800 milliseconds more + assertTrue(diff < timeout + 800); +}); diff --git a/website/api.txt b/website/api.txt index 6ae50efdb..b86468c2a 100644 --- a/website/api.txt +++ b/website/api.txt @@ -921,6 +921,9 @@ socket for +node.tcp.Server+. will be +"writeOnly"+. One should probably just call +connection.close()+ when this event is emitted. +|+"timeout"+ | | Emitted if the connection times out from + inactivity. The +"close"+ event will be + emitted immediately following this event. |+"close"+ | +had_error+ | Emitted once the connection is fully closed. The argument +had_error+ is a boolean which says if the connection @@ -984,6 +987,12 @@ Useful to throttle back an upload. +connection.readResume()+:: Resumes reading if reading was paused by +readPause()+. ++connection.setTimeout(timeout)+:: +Sets the connection to timeout after +timeout+ milliseconds of inacitivty on +the connection. By default all +node.tcp.Connection+ objects have a timeout +of 60 seconds (60000 ms). ++ +If +timeout+ is 0, then the idle timeout is disabled. === DNS