From 25d14bd0015b995913bfb5b9590ff5f0c9be09c1 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sat, 20 Jun 2009 15:17:54 +0200 Subject: [PATCH 01/11] Bug: Add HTTPConnection->size() and HTTPServer->size() Need this for proper garbage collection. --- src/http.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/http.h b/src/http.h index ab207a3c5..eb7ecd774 100644 --- a/src/http.h +++ b/src/http.h @@ -13,6 +13,8 @@ public: static v8::Persistent client_constructor_template; static v8::Persistent server_constructor_template; + + virtual size_t size (void) { return sizeof(HTTPConnection); }; protected: static v8::Handle NewClient (const v8::Arguments& args); @@ -41,6 +43,8 @@ public: static void Initialize (v8::Handle target); static v8::Persistent constructor_template; + virtual size_t size (void) { return sizeof(HTTPServer); }; + protected: static v8::Handle New (const v8::Arguments& args); From 83cb156b6f77c11e2a2068e3b75b69c4eea2dfd2 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sat, 20 Jun 2009 16:44:06 +0200 Subject: [PATCH 02/11] skelton of node.Process --- src/node.cc | 2 + src/process.cc | 225 +++++++++++++++++++++++++++++++++++++++++++++++++ src/process.h | 45 ++++++++++ wscript | 1 + 4 files changed, 273 insertions(+) create mode 100644 src/process.cc create mode 100644 src/process.h diff --git a/src/node.cc b/src/node.cc index 07384838b..6036e158b 100644 --- a/src/node.cc +++ b/src/node.cc @@ -4,6 +4,7 @@ #include "file.h" #include "http.h" #include "timer.h" +#include "process.h" #include "constants.h" #include "natives.h" @@ -302,6 +303,7 @@ Load (int argc, char *argv[]) NODE_SET_METHOD(node_obj, "reallyExit", node_exit); Timer::Initialize(node_obj); + Process::Initialize(node_obj); DefineConstants(node_obj); diff --git a/src/process.cc b/src/process.cc new file mode 100644 index 000000000..fddc4ca20 --- /dev/null +++ b/src/process.cc @@ -0,0 +1,225 @@ +#include "node.h" +#include "process.h" + +#include +#include +#include +#include + +using namespace v8; +using namespace node; + +Persistent Process::constructor_template; + +void +Process::Initialize (Handle target) +{ + HandleScope scope; + + Local t = FunctionTemplate::New(Process::New); + constructor_template = Persistent::New(t); + constructor_template->InstanceTemplate()->SetInternalFieldCount(1); + +#if 0 + NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", Timer::Start); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", Timer::Stop); +#endif + + target->Set(String::NewSymbol("Process"), constructor_template->GetFunction()); +} + +Handle +Process::New (const Arguments& args) +{ + if (args.Length() == 0) return Undefined(); + + HandleScope scope; + + String::Utf8Value command(args[0]->ToString()); + + Process *p = new Process(args.Holder()); + ObjectWrap::InformV8ofAllocation(p); + + int r = p->Spawn(*command); + if (r != 0) { + return ThrowException(String::New("Error spawning")); + } + + return args.This(); +} + +Process::Process (Handle handle) + : ObjectWrap(handle) +{ + ev_init(&stdout_watcher_, Process::OnOutput); + stdout_watcher_.data = this; + + ev_init(&stderr_watcher_, Process::OnError); + stderr_watcher_.data = this; + + ev_init(&stdin_watcher_, Process::OnWritable); + stdin_watcher_.data = this; + + ev_init(&child_watcher_, Process::OnCHLD); + child_watcher_.data = this; + + stdout_pipe_[0] = -1; + stdout_pipe_[1] = -1; + stderr_pipe_[0] = -1; + stderr_pipe_[1] = -1; + stdin_pipe_[0] = -1; + stdin_pipe_[1] = -1; + + pid_ = 0; +} + +Process::~Process () +{ + Shutdown(); +} + +void +Process::Shutdown () +{ + if (stdout_pipe_[0] >= 0) close(stdout_pipe_[0]); + if (stdout_pipe_[1] >= 0) close(stdout_pipe_[1]); + + if (stderr_pipe_[0] >= 0) close(stderr_pipe_[0]); + if (stderr_pipe_[1] >= 0) close(stderr_pipe_[1]); + + if (stdin_pipe_[0] >= 0) close(stdin_pipe_[0]); + if (stdin_pipe_[1] >= 0) close(stdin_pipe_[1]); + + stdout_pipe_[0] = -1; + stdout_pipe_[1] = -1; + stderr_pipe_[0] = -1; + stderr_pipe_[1] = -1; + stdin_pipe_[0] = -1; + stdin_pipe_[1] = -1; + + ev_io_stop(EV_DEFAULT_UC_ &stdout_watcher_); + ev_io_stop(EV_DEFAULT_UC_ &stderr_watcher_); + ev_io_stop(EV_DEFAULT_UC_ &stdin_watcher_); + + ev_child_stop(EV_DEFAULT_UC_ &child_watcher_); + /* XXX Kill the PID? */ + pid_ = 0; + + Detach(); +} + +static int +SetNonBlocking (int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (r != 0) { + perror("SetNonBlocking()"); + } + return r; +} + +int +Process::Spawn (const char *command) +{ + assert(pid_ == 0); + assert(stdout_pipe_[0] == -1); + assert(stdout_pipe_[1] == -1); + assert(stderr_pipe_[0] == -1); + assert(stderr_pipe_[1] == -1); + assert(stdin_pipe_[0] == -1); + assert(stdin_pipe_[1] == -1); + + /* An implementation of popen(), basically */ + if (pipe(stdout_pipe_) < 0) { + perror("pipe()"); + return -1; + } + + if (pipe(stderr_pipe_) < 0) { + perror("pipe()"); + return -2; + } + + if (pipe(stdin_pipe_) < 0) { + perror("pipe()"); + return -3; + } + + switch (pid_ = vfork()) { + case -1: // Error. + Shutdown(); + return -4; + + case 0: // Child. + close(stdout_pipe_[0]); // close read end + dup2(stdout_pipe_[1], STDOUT_FILENO); + + close(stderr_pipe_[0]); // close read end + dup2(stderr_pipe_[1], STDERR_FILENO); + + close(stdin_pipe_[1]); // close write end + dup2(stdin_pipe_[0], STDIN_FILENO); + + execl("/bin/sh", "-c", command, (char *)NULL); + _exit(127); + } + + // Parent. + + ev_child_set(&child_watcher_, pid_, 0); + ev_child_start(EV_DEFAULT_UC_ &child_watcher_); + + SetNonBlocking(stdout_pipe_[0]); + SetNonBlocking(stderr_pipe_[0]); + SetNonBlocking(stdin_pipe_[1]); + + ev_io_set(&stdout_watcher_, stdout_pipe_[0], EV_READ); + ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); + ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); + + ev_io_start(EV_DEFAULT_UC_ &stdout_watcher_); + ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + + close(stdout_pipe_[1]); // close write end + close(stderr_pipe_[1]); // close write end + close(stdin_pipe_[0]); // close read end + + stdout_pipe_[1] = -1; + stderr_pipe_[1] = -1; + stdin_pipe_[0] = -1; + + Attach(); + + return 0; +} + +void +Process::OnOutput (EV_P_ ev_io *watcher, int revents) +{ + Process *process = static_cast(watcher->data); + assert(revents == EV_READ); +} + +void +Process::OnError (EV_P_ ev_io *watcher, int revents) +{ + Process *process = static_cast(watcher->data); + assert(revents == EV_READ); +} + +void +Process::OnWritable (EV_P_ ev_io *watcher, int revents) +{ + Process *process = static_cast(watcher->data); + assert(revents == EV_WRITE); +} + +void +Process::OnCHLD (EV_P_ ev_child *watcher, int revents) +{ + ev_child_stop(EV_A_ watcher); + Process *process = static_cast(watcher->data); + assert(revents == EV_CHILD); +} diff --git a/src/process.h b/src/process.h new file mode 100644 index 000000000..5e2a25eb7 --- /dev/null +++ b/src/process.h @@ -0,0 +1,45 @@ +#ifndef node_process_h +#define node_process_h + +#include "node.h" +#include +#include + +namespace node { + +class Process : ObjectWrap { + public: + static void Initialize (v8::Handle target); + + virtual size_t size (void) { return sizeof(Process); } + + protected: + static v8::Persistent constructor_template; + static v8::Handle New (const v8::Arguments& args); + + Process(v8::Handle handle); + ~Process(); + + void Shutdown (); + int Spawn (const char *command); + + private: + static void OnOutput (EV_P_ ev_io *watcher, int revents); + static void OnError (EV_P_ ev_io *watcher, int revents); + static void OnWritable (EV_P_ ev_io *watcher, int revents); + static void OnCHLD (EV_P_ ev_child *watcher, int revents); + + ev_io stdout_watcher_; + ev_io stderr_watcher_; + ev_io stdin_watcher_; + ev_child child_watcher_; + + int stdout_pipe_[2]; + int stderr_pipe_[2]; + int stdin_pipe_[2]; + + pid_t pid_; +}; + +} // namespace node +#endif // node_process_h diff --git a/wscript b/wscript index 8d49a8811..ce85c400b 100644 --- a/wscript +++ b/wscript @@ -158,6 +158,7 @@ def build(bld): src/net.cc src/file.cc src/timer.cc + src/process.cc src/constants.cc """ node.includes = """ From 03c5772ce40f5415d25273aa3344bc68a91816cc Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 13:10:00 +0200 Subject: [PATCH 03/11] Get stdin/stdout working. Add process->Close(). --- src/process.cc | 263 ++++++++++++++++++++++++++++++++++++++++++++----- src/process.h | 10 +- 2 files changed, 247 insertions(+), 26 deletions(-) diff --git a/src/process.cc b/src/process.cc index fddc4ca20..7e7495477 100644 --- a/src/process.cc +++ b/src/process.cc @@ -2,6 +2,8 @@ #include "process.h" #include +#include +#include #include #include #include @@ -9,6 +11,10 @@ using namespace v8; using namespace node; +#define ON_ERROR_SYMBOL String::NewSymbol("onError") +#define ON_OUTPUT_SYMBOL String::NewSymbol("onOutput") +#define ON_EXIT_SYMBOL String::NewSymbol("onExit") + Persistent Process::constructor_template; void @@ -20,10 +26,8 @@ Process::Initialize (Handle target) constructor_template = Persistent::New(t); constructor_template->InstanceTemplate()->SetInternalFieldCount(1); -#if 0 - NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", Timer::Start); - NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", Timer::Stop); -#endif + NODE_SET_PROTOTYPE_METHOD(constructor_template, "write", Process::Write); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close); target->Set(String::NewSymbol("Process"), constructor_template->GetFunction()); } @@ -48,13 +52,113 @@ Process::New (const Arguments& args) return args.This(); } +static void +free_buf (oi_buf *b) +{ + V8::AdjustAmountOfExternalAllocatedMemory(-b->len); + free(b); +} + +static oi_buf * +new_buf (size_t size) +{ + size_t total = sizeof(oi_buf) + size; + void *p = malloc(total); + if (p == NULL) return NULL; + + oi_buf *b = static_cast(p); + b->base = static_cast(p) + sizeof(oi_buf); + + b->len = size; + b->release = free_buf; + V8::AdjustAmountOfExternalAllocatedMemory(total); + + return b; +} + +Handle +Process::Write (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + +#if 0 + if ( connection->ReadyState() != OPEN + && connection->ReadyState() != WRITE_ONLY + ) + return ThrowException(String::New("Socket is not open for writing")); +#endif + + // XXX + // A lot of improvement can be made here. First of all we're allocating + // oi_bufs for every send which is clearly inefficent - it should use a + // memory pool or ring buffer. Of course, expressing binary data as an + // array of integers is extremely inefficent. This can improved when v8 + // bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been + // addressed. + + oi_buf *buf; + size_t len; + + if (args[0]->IsString()) { + enum encoding enc = ParseEncoding(args[1]); + Local s = args[0]->ToString(); + len = s->Utf8Length(); + buf = new_buf(len); + switch (enc) { + case RAW: + case ASCII: + s->WriteAscii(buf->base, 0, len); + break; + + case UTF8: + s->WriteUtf8(buf->base, len); + break; + + default: + assert(0 && "unhandled string encoding"); + } + + } else if (args[0]->IsArray()) { + Handle array = Handle::Cast(args[0]); + len = array->Length(); + buf = new_buf(len); + for (size_t i = 0; i < len; i++) { + Local int_value = array->Get(Integer::New(i)); + buf->base[i] = int_value->IntegerValue(); + } + + } else return ThrowException(String::New("Bad argument")); + + if (process->Write(buf) != 0) { + return ThrowException(String::New("Pipe already closed")); + } + + return Undefined(); +} + +Handle +Process::Close (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + + if (process->Close() != 0) { + return ThrowException(String::New("Pipe already closed.")); + } + + return Undefined(); +} + Process::Process (Handle handle) : ObjectWrap(handle) { ev_init(&stdout_watcher_, Process::OnOutput); stdout_watcher_.data = this; - ev_init(&stderr_watcher_, Process::OnError); + ev_init(&stderr_watcher_, Process::OnOutput); stderr_watcher_.data = this; ev_init(&stdin_watcher_, Process::OnWritable); @@ -70,7 +174,11 @@ Process::Process (Handle handle) stdin_pipe_[0] = -1; stdin_pipe_[1] = -1; + got_close_ = false; + pid_ = 0; + + oi_queue_init(&out_stream_); } Process::~Process () @@ -108,7 +216,7 @@ Process::Shutdown () Detach(); } -static int +static inline int SetNonBlocking (int fd) { int flags = fcntl(fd, F_GETFL, 0); @@ -152,6 +260,8 @@ Process::Spawn (const char *command) return -4; case 0: // Child. + //printf("child process!\n"); + close(stdout_pipe_[0]); // close read end dup2(stdout_pipe_[1], STDOUT_FILENO); @@ -161,7 +271,10 @@ Process::Spawn (const char *command) close(stdin_pipe_[1]); // close write end dup2(stdin_pipe_[0], STDIN_FILENO); - execl("/bin/sh", "-c", command, (char *)NULL); + //printf("child process!\n"); + + execl("/bin/sh", "sh", "-c", command, (char *)NULL); + //execl(_PATH_BSHELL, "sh", "-c", program, (char *)NULL); _exit(127); } @@ -171,23 +284,21 @@ Process::Spawn (const char *command) ev_child_start(EV_DEFAULT_UC_ &child_watcher_); SetNonBlocking(stdout_pipe_[0]); - SetNonBlocking(stderr_pipe_[0]); - SetNonBlocking(stdin_pipe_[1]); - ev_io_set(&stdout_watcher_, stdout_pipe_[0], EV_READ); - ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); - ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); - ev_io_start(EV_DEFAULT_UC_ &stdout_watcher_); - ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); - ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); - close(stdout_pipe_[1]); // close write end - close(stderr_pipe_[1]); // close write end - close(stdin_pipe_[0]); // close read end - stdout_pipe_[1] = -1; + + SetNonBlocking(stderr_pipe_[0]); + ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); + ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); + close(stderr_pipe_[1]); // close write end stderr_pipe_[1] = -1; + + SetNonBlocking(stdin_pipe_[1]); + ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + close(stdin_pipe_[0]); // close read end stdin_pipe_[0] = -1; Attach(); @@ -198,22 +309,96 @@ Process::Spawn (const char *command) void Process::OnOutput (EV_P_ ev_io *watcher, int revents) { - Process *process = static_cast(watcher->data); - assert(revents == EV_READ); -} + int r; + char buf[16*1024]; + size_t buf_size = 16*1024; -void -Process::OnError (EV_P_ ev_io *watcher, int revents) -{ Process *process = static_cast(watcher->data); + + bool is_stdout = (&process->stdout_watcher_ == watcher); + int fd = is_stdout ? process->stdout_pipe_[0] : process->stderr_pipe_[0]; + assert(revents == EV_READ); + assert(fd >= 0); + + HandleScope scope; + Handle callback_v = + process->handle_->Get(is_stdout ? ON_OUTPUT_SYMBOL : ON_ERROR_SYMBOL); + Handle callback; + if (callback_v->IsFunction()) { + callback = Handle::Cast(callback_v); + } + Handle argv[1]; + + for (;;) { + r = read(fd, buf, buf_size); + + if (r < 0) { + if (errno != EAGAIN) perror("IPC pipe read error"); + break; + } + + if (!callback.IsEmpty()) { + if (r == 0) { + argv[0] = Null(); + } else { + // TODO multiple encodings + argv[0] = String::New((const char*)buf, r); + } + + TryCatch try_catch; + callback->Call(process->handle_, 1, argv); + if (try_catch.HasCaught()) { + FatalException(try_catch); + return; + } + } + + if (r == 0) { + ev_io_stop(EV_DEFAULT_UC_ watcher); + break; + } + } } void Process::OnWritable (EV_P_ ev_io *watcher, int revents) { Process *process = static_cast(watcher->data); + int sent; + assert(revents == EV_WRITE); + assert(process->stdin_pipe_[1] >= 0); + + while (!oi_queue_empty(&process->out_stream_)) { + oi_queue *q = oi_queue_last(&process->out_stream_); + oi_buf *to_write = (oi_buf*) oi_queue_data(q, oi_buf, queue); + + sent = write( process->stdin_pipe_[1] + , to_write->base + to_write->written + , to_write->len - to_write->written + ); + if (sent < 0) { + if (errno == EAGAIN) break; + perror("IPC pipe write error"); + break; + } + + to_write->written += sent; + + if (to_write->written == to_write->len) { + oi_queue_remove(q); + if (to_write->release) to_write->release(to_write); + } + } + + if (oi_queue_empty(&process->out_stream_)) { + ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_); + if (process->got_close_) { + close(process->stdin_pipe_[1]); + process->stdin_pipe_[1] = -1; + } + } } void @@ -221,5 +406,33 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents) { ev_child_stop(EV_A_ watcher); Process *process = static_cast(watcher->data); + assert(revents == EV_CHILD); + assert(process->pid_ == watcher->rpid); + assert(&process->child_watcher_ == watcher); + + // Call onExit ( watcher->rstatus ) + printf("OnCHLD with status %d\n", watcher->rstatus); } + +int +Process::Write (oi_buf *buf) +{ + if (stdin_pipe_[1] < 0 || got_close_) + return -1; + oi_queue_insert_head(&out_stream_, &buf->queue); + buf->written = 0; + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + return 0; +} + +int +Process::Close () +{ + if (stdin_pipe_[1] < 0 || got_close_) + return -1; + got_close_ = true; + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + return 0; +} + diff --git a/src/process.h b/src/process.h index 5e2a25eb7..8cbb444b5 100644 --- a/src/process.h +++ b/src/process.h @@ -4,24 +4,28 @@ #include "node.h" #include #include +#include namespace node { class Process : ObjectWrap { public: static void Initialize (v8::Handle target); - virtual size_t size (void) { return sizeof(Process); } protected: static v8::Persistent constructor_template; static v8::Handle New (const v8::Arguments& args); + static v8::Handle Write (const v8::Arguments& args); + static v8::Handle Close (const v8::Arguments& args); Process(v8::Handle handle); ~Process(); void Shutdown (); int Spawn (const char *command); + int Write (oi_buf *buf); + int Close (); private: static void OnOutput (EV_P_ ev_io *watcher, int revents); @@ -39,6 +43,10 @@ class Process : ObjectWrap { int stdin_pipe_[2]; pid_t pid_; + + bool got_close_; + + oi_queue out_stream_; }; } // namespace node From a78ea510a2f7a8f86ad00f7bf7c6202cd5aecc9f Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 13:18:00 +0200 Subject: [PATCH 04/11] Add onExit callback --- src/process.cc | 17 ++++++++++++++--- src/process.h | 2 +- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/process.cc b/src/process.cc index 7e7495477..bba44ddf2 100644 --- a/src/process.cc +++ b/src/process.cc @@ -164,7 +164,7 @@ Process::Process (Handle handle) ev_init(&stdin_watcher_, Process::OnWritable); stdin_watcher_.data = this; - ev_init(&child_watcher_, Process::OnCHLD); + ev_init(&child_watcher_, Process::OnExit); child_watcher_.data = this; stdout_pipe_[0] = -1; @@ -402,7 +402,7 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents) } void -Process::OnCHLD (EV_P_ ev_child *watcher, int revents) +Process::OnExit (EV_P_ ev_child *watcher, int revents) { ev_child_stop(EV_A_ watcher); Process *process = static_cast(watcher->data); @@ -412,7 +412,18 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents) assert(&process->child_watcher_ == watcher); // Call onExit ( watcher->rstatus ) - printf("OnCHLD with status %d\n", watcher->rstatus); + HandleScope scope; + Handle callback_v = process->handle_->Get(ON_EXIT_SYMBOL); + + if (callback_v->IsFunction()) { + Handle callback = Handle::Cast(callback_v); + TryCatch try_catch; + Handle argv[1] = { Integer::New(watcher->rstatus) }; + callback->Call(process->handle_, 1, argv); + if (try_catch.HasCaught()) FatalException(try_catch); + } + process->Shutdown(); + process->Detach(); } int diff --git a/src/process.h b/src/process.h index 8cbb444b5..faee88e1b 100644 --- a/src/process.h +++ b/src/process.h @@ -31,7 +31,7 @@ class Process : ObjectWrap { static void OnOutput (EV_P_ ev_io *watcher, int revents); static void OnError (EV_P_ ev_io *watcher, int revents); static void OnWritable (EV_P_ ev_io *watcher, int revents); - static void OnCHLD (EV_P_ ev_child *watcher, int revents); + static void OnExit (EV_P_ ev_child *watcher, int revents); ev_io stdout_watcher_; ev_io stderr_watcher_; From 2fd4958698b6efa8b2f924ed28d3520806de1a38 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 13:29:15 +0200 Subject: [PATCH 05/11] Add pid accessor --- src/process.cc | 30 ++++++++++++++++++++++++++---- src/process.h | 1 + 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/process.cc b/src/process.cc index bba44ddf2..1c83c533c 100644 --- a/src/process.cc +++ b/src/process.cc @@ -14,6 +14,7 @@ using namespace node; #define ON_ERROR_SYMBOL String::NewSymbol("onError") #define ON_OUTPUT_SYMBOL String::NewSymbol("onOutput") #define ON_EXIT_SYMBOL String::NewSymbol("onExit") +#define PID_SYMBOL String::NewSymbol("pid") Persistent Process::constructor_template; @@ -29,6 +30,9 @@ Process::Initialize (Handle target) NODE_SET_PROTOTYPE_METHOD(constructor_template, "write", Process::Write); NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close); + constructor_template->PrototypeTemplate()->SetAccessor(PID_SYMBOL, + PIDGetter); + target->Set(String::NewSymbol("Process"), constructor_template->GetFunction()); } @@ -52,6 +56,20 @@ Process::New (const Arguments& args) return args.This(); } +Handle +Process::PIDGetter (Local _, const AccessorInfo& info) +{ + Process *process = NODE_UNWRAP(Process, info.This()); + assert(process); + + HandleScope scope; + + if (process->pid_ == 0) return Null(); + + Local pid = Integer::New(process->pid_); + return scope.Close(pid); +} + static void free_buf (oi_buf *b) { @@ -189,6 +207,14 @@ Process::~Process () void Process::Shutdown () { + // Clear the out_stream + while (!oi_queue_empty(&out_stream_)) { + oi_queue *q = oi_queue_last(&out_stream_); + oi_buf *buf = (oi_buf*) oi_queue_data(q, oi_buf, queue); + oi_queue_remove(q); + if (buf->release) buf->release(buf); + } + if (stdout_pipe_[0] >= 0) close(stdout_pipe_[0]); if (stdout_pipe_[1] >= 0) close(stdout_pipe_[1]); @@ -260,8 +286,6 @@ Process::Spawn (const char *command) return -4; case 0: // Child. - //printf("child process!\n"); - close(stdout_pipe_[0]); // close read end dup2(stdout_pipe_[1], STDOUT_FILENO); @@ -271,8 +295,6 @@ Process::Spawn (const char *command) close(stdin_pipe_[1]); // close write end dup2(stdin_pipe_[0], STDIN_FILENO); - //printf("child process!\n"); - execl("/bin/sh", "sh", "-c", command, (char *)NULL); //execl(_PATH_BSHELL, "sh", "-c", program, (char *)NULL); _exit(127); diff --git a/src/process.h b/src/process.h index faee88e1b..b48ba9713 100644 --- a/src/process.h +++ b/src/process.h @@ -18,6 +18,7 @@ class Process : ObjectWrap { static v8::Handle New (const v8::Arguments& args); static v8::Handle Write (const v8::Arguments& args); static v8::Handle Close (const v8::Arguments& args); + static v8::Handle PIDGetter (v8::Local _, const v8::AccessorInfo& info); Process(v8::Handle handle); ~Process(); From e39923a3d7a87ccd984bfafe3aacefe419a15cdb Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 13:41:03 +0200 Subject: [PATCH 06/11] Add process.kill(sig = SIGTERM) --- src/process.cc | 37 ++++++++++++++++++++++++++----------- src/process.h | 2 ++ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/process.cc b/src/process.cc index 1c83c533c..3cdeadc39 100644 --- a/src/process.cc +++ b/src/process.cc @@ -29,6 +29,7 @@ Process::Initialize (Handle target) NODE_SET_PROTOTYPE_METHOD(constructor_template, "write", Process::Write); NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "kill", Process::Kill); constructor_template->PrototypeTemplate()->SetAccessor(PID_SYMBOL, PIDGetter); @@ -101,13 +102,6 @@ Process::Write (const Arguments& args) Process *process = NODE_UNWRAP(Process, args.Holder()); assert(process); -#if 0 - if ( connection->ReadyState() != OPEN - && connection->ReadyState() != WRITE_ONLY - ) - return ThrowException(String::New("Socket is not open for writing")); -#endif - // XXX // A lot of improvement can be made here. First of all we're allocating // oi_bufs for every send which is clearly inefficent - it should use a @@ -156,6 +150,23 @@ Process::Write (const Arguments& args) return Undefined(); } +Handle +Process::Kill (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + + int sig = SIGTERM; + if (args[0]->IsInt32()) sig = args[0]->Int32Value(); + + if (process->Kill(sig) != 0) { + return ThrowException(String::New("Process already dead")); + } + + return Undefined(); +} + Handle Process::Close (const Arguments& args) { @@ -451,8 +462,7 @@ Process::OnExit (EV_P_ ev_child *watcher, int revents) int Process::Write (oi_buf *buf) { - if (stdin_pipe_[1] < 0 || got_close_) - return -1; + if (stdin_pipe_[1] < 0 || got_close_) return -1; oi_queue_insert_head(&out_stream_, &buf->queue); buf->written = 0; ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); @@ -462,10 +472,15 @@ Process::Write (oi_buf *buf) int Process::Close () { - if (stdin_pipe_[1] < 0 || got_close_) - return -1; + if (stdin_pipe_[1] < 0 || got_close_) return -1; got_close_ = true; ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); return 0; } +int +Process::Kill (int sig) +{ + if (pid_ == 0) return -1; + return kill(pid_, sig); +} diff --git a/src/process.h b/src/process.h index b48ba9713..38138b87c 100644 --- a/src/process.h +++ b/src/process.h @@ -18,6 +18,7 @@ class Process : ObjectWrap { static v8::Handle New (const v8::Arguments& args); static v8::Handle Write (const v8::Arguments& args); static v8::Handle Close (const v8::Arguments& args); + static v8::Handle Kill (const v8::Arguments& args); static v8::Handle PIDGetter (v8::Local _, const v8::AccessorInfo& info); Process(v8::Handle handle); @@ -27,6 +28,7 @@ class Process : ObjectWrap { int Spawn (const char *command); int Write (oi_buf *buf); int Close (); + int Kill (int sig); private: static void OnOutput (EV_P_ ev_io *watcher, int revents); From 0f76d3e6d8b4dd4a072ec6655cdd298418567a14 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 13:50:13 +0200 Subject: [PATCH 07/11] define signal constants --- src/constants.cc | 137 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/src/constants.cc b/src/constants.cc index 373b8ae4b..449b34f5d 100644 --- a/src/constants.cc +++ b/src/constants.cc @@ -4,6 +4,7 @@ #include #include #include +#include using namespace v8; using namespace node; @@ -429,5 +430,141 @@ node::DefineConstants (Handle target) NODE_DEFINE_CONSTANT(target, EXDEV); #endif +#ifdef SIGHUP + NODE_DEFINE_CONSTANT(target, SIGHUP); +#endif + +#ifdef SIGINT + NODE_DEFINE_CONSTANT(target, SIGINT); +#endif + +#ifdef SIGQUIT + NODE_DEFINE_CONSTANT(target, SIGQUIT); +#endif + +#ifdef SIGILL + NODE_DEFINE_CONSTANT(target, SIGILL); +#endif + +#ifdef SIGTRAP + NODE_DEFINE_CONSTANT(target, SIGTRAP); +#endif + +#ifdef SIGABRT + NODE_DEFINE_CONSTANT(target, SIGABRT); +#endif + +#ifdef SIGIOT + NODE_DEFINE_CONSTANT(target, SIGIOT); +#endif + +#ifdef SIGBUS + NODE_DEFINE_CONSTANT(target, SIGBUS); +#endif + +#ifdef SIGFPE + NODE_DEFINE_CONSTANT(target, SIGFPE); +#endif + +#ifdef SIGKILL + NODE_DEFINE_CONSTANT(target, SIGKILL); +#endif + +#ifdef SIGUSR1 + NODE_DEFINE_CONSTANT(target, SIGUSR1); +#endif + +#ifdef SIGSEGV + NODE_DEFINE_CONSTANT(target, SIGSEGV); +#endif + +#ifdef SIGUSR2 + NODE_DEFINE_CONSTANT(target, SIGUSR2); +#endif + +#ifdef SIGPIPE + NODE_DEFINE_CONSTANT(target, SIGPIPE); +#endif + +#ifdef SIGALRM + NODE_DEFINE_CONSTANT(target, SIGALRM); +#endif + + NODE_DEFINE_CONSTANT(target, SIGTERM); + NODE_DEFINE_CONSTANT(target, SIGCHLD); + +#ifdef SIGSTKFLT + NODE_DEFINE_CONSTANT(target, SIGSTKFLT); +#endif + + +#ifdef SIGCONT + NODE_DEFINE_CONSTANT(target, SIGCONT); +#endif + +#ifdef SIGSTOP + NODE_DEFINE_CONSTANT(target, SIGSTOP); +#endif + +#ifdef SIGTSTP + NODE_DEFINE_CONSTANT(target, SIGTSTP); +#endif + +#ifdef SIGTTIN + NODE_DEFINE_CONSTANT(target, SIGTTIN); +#endif + +#ifdef SIGTTOU + NODE_DEFINE_CONSTANT(target, SIGTTOU); +#endif + +#ifdef SIGURG + NODE_DEFINE_CONSTANT(target, SIGURG); +#endif + +#ifdef SIGXCPU + NODE_DEFINE_CONSTANT(target, SIGXCPU); +#endif + +#ifdef SIGXFSZ + NODE_DEFINE_CONSTANT(target, SIGXFSZ); +#endif + +#ifdef SIGVTALRM + NODE_DEFINE_CONSTANT(target, SIGVTALRM); +#endif + +#ifdef SIGPROF + NODE_DEFINE_CONSTANT(target, SIGPROF); +#endif + +#ifdef SIGWINCH + NODE_DEFINE_CONSTANT(target, SIGWINCH); +#endif + +#ifdef SIGIO + NODE_DEFINE_CONSTANT(target, SIGIO); +#endif + +#ifdef SIGPOLL + NODE_DEFINE_CONSTANT(target, SIGPOLL); +#endif + +#ifdef SIGLOST + NODE_DEFINE_CONSTANT(target, SIGLOST); +#endif + +#ifdef SIGPWR + NODE_DEFINE_CONSTANT(target, SIGPWR); +#endif + +#ifdef SIGSYS + NODE_DEFINE_CONSTANT(target, SIGSYS); +#endif + +#ifdef SIGUNUSED + NODE_DEFINE_CONSTANT(target, SIGUNUSED); +#endif + } From e71b089cd772754a54250d4c7fbb178a35d75006 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 13:57:23 +0200 Subject: [PATCH 08/11] Add test-process-simple.js --- test/test-process-simple.js | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 test/test-process-simple.js diff --git a/test/test-process-simple.js b/test/test-process-simple.js new file mode 100644 index 000000000..b2babfca3 --- /dev/null +++ b/test/test-process-simple.js @@ -0,0 +1,26 @@ +include("mjsunit.js"); + +var cat = new node.Process("cat"); + +var response = ""; +var exit_status = -1; + +cat.onOutput = function (chunk) { + if (chunk) response += chunk; + if (response === "hello world") cat.close(); +}; +cat.onError = function (chunk) { + assertEquals(null, chunk); +}; +cat.onExit = function (status) { exit_status = status; }; + +function onLoad () { + cat.write("hello"); + cat.write(" "); + cat.write("world"); +} + +function onExit () { + assertEquals(0, exit_status); + assertEquals("hello world", response); +} From 145072e73629b8b9b131b8a1117bb9d44a0fb155 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 14:06:03 +0200 Subject: [PATCH 09/11] Add test-process-kill.js --- test/test-process-kill.js | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 test/test-process-kill.js diff --git a/test/test-process-kill.js b/test/test-process-kill.js new file mode 100644 index 000000000..72e7e1090 --- /dev/null +++ b/test/test-process-kill.js @@ -0,0 +1,15 @@ +include("mjsunit.js"); + +var cat = new node.Process("cat"); + +var exit_status = -1; + +cat.onOutput = function (chunk) { assertEquals(null, chunk); }; +cat.onError = function (chunk) { assertEquals(null, chunk); }; +cat.onExit = function (status) { exit_status = status; }; + +cat.kill(); + +function onExit () { + assertTrue(exit_status > 0); +} From c5b5815ae7ff3767ec2d25bd3d02dae81b11e0e8 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 14:07:52 +0200 Subject: [PATCH 10/11] fix error in test-process-simple --- test/test-process-simple.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/test-process-simple.js b/test/test-process-simple.js index b2babfca3..f3a761abd 100644 --- a/test/test-process-simple.js +++ b/test/test-process-simple.js @@ -6,8 +6,10 @@ var response = ""; var exit_status = -1; cat.onOutput = function (chunk) { - if (chunk) response += chunk; - if (response === "hello world") cat.close(); + if (chunk) { + response += chunk; + if (response === "hello world") cat.close(); + } }; cat.onError = function (chunk) { assertEquals(null, chunk); From da03a02a98f67add4438002a3fb08510655f3507 Mon Sep 17 00:00:00 2001 From: Ryan Date: Sun, 21 Jun 2009 14:34:13 +0200 Subject: [PATCH 11/11] Add documentation for node.Process --- website/api.html | 61 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/website/api.html b/website/api.html index 59bb15a4a..3b3bcdbaf 100644 --- a/website/api.html +++ b/website/api.html @@ -13,6 +13,7 @@
  1. Timers
  2. +
  3. Processes
  4. File I/O
      @@ -130,6 +131,66 @@
      Stops a interval from triggering.
      +

      Processes and IPC

      + +

      + Node provides a tridirectional popen(3) facility. + It is possible to stream data through the child's stdin, + stdout, and stderr in a fully non-blocking + way. +

      + +
      +
      new node.Process(command)
      +
      Launches a new process with the given command. For example: +
      var ls = new Process("ls -lh /usr");
      +
      + +
      process.pid
      +
      The PID of the child process.
      + +
      process.onOutput = function (chunk) { };
      +
      A callback to receive output from the process's stdout. + At the moment the received data is always a string and utf8 encoded. + (More encodings will be supported in the future.) + +

      If the process closes it's stdout, this callback will + be issued with null as an argument. Be prepared for this + possibility. +

      + +
      process.onError = function (chunk) { };
      +
      A callback to receive output from the process's stderr. + At the moment the received data is always a string and utf8 encoded. + (More encodings will be supported in the future.) + +

      If the process closes it's stderr, this callback will + be issued with null as an argument. Be prepared for this + possibility. +

      + +
      process.onExit = function (exit_code) { };
      +
      A callback which is called when the sub-process exits. The argument + is the exit status of the child. +
      + +
      process.write(data, encoding="ascii");
      +
      Write data to the child process's stdin. The second + argument is optional and specifies the encoding: possible values are + "utf8", "ascii", and "raw". +
      + +
      process.close();
      +
      Closes the processes stdin stream.
      + +
      process.kill(signal=node.SIGTERM);
      +
      Kills the child process with the given signal. If no argument is + given, the process will be sent node.SIGTERM. The standard + POSIX signals are defined under the node namespace (e.g. + node.SIGINT, node.SIGUSR1). +
      +
      +

      node.fs