diff --git a/src/constants.cc b/src/constants.cc index 373b8ae4b..449b34f5d 100644 --- a/src/constants.cc +++ b/src/constants.cc @@ -4,6 +4,7 @@ #include #include #include +#include using namespace v8; using namespace node; @@ -429,5 +430,141 @@ node::DefineConstants (Handle target) NODE_DEFINE_CONSTANT(target, EXDEV); #endif +#ifdef SIGHUP + NODE_DEFINE_CONSTANT(target, SIGHUP); +#endif + +#ifdef SIGINT + NODE_DEFINE_CONSTANT(target, SIGINT); +#endif + +#ifdef SIGQUIT + NODE_DEFINE_CONSTANT(target, SIGQUIT); +#endif + +#ifdef SIGILL + NODE_DEFINE_CONSTANT(target, SIGILL); +#endif + +#ifdef SIGTRAP + NODE_DEFINE_CONSTANT(target, SIGTRAP); +#endif + +#ifdef SIGABRT + NODE_DEFINE_CONSTANT(target, SIGABRT); +#endif + +#ifdef SIGIOT + NODE_DEFINE_CONSTANT(target, SIGIOT); +#endif + +#ifdef SIGBUS + NODE_DEFINE_CONSTANT(target, SIGBUS); +#endif + +#ifdef SIGFPE + NODE_DEFINE_CONSTANT(target, SIGFPE); +#endif + +#ifdef SIGKILL + NODE_DEFINE_CONSTANT(target, SIGKILL); +#endif + +#ifdef SIGUSR1 + NODE_DEFINE_CONSTANT(target, SIGUSR1); +#endif + +#ifdef SIGSEGV + NODE_DEFINE_CONSTANT(target, SIGSEGV); +#endif + +#ifdef SIGUSR2 + NODE_DEFINE_CONSTANT(target, SIGUSR2); +#endif + +#ifdef SIGPIPE + NODE_DEFINE_CONSTANT(target, SIGPIPE); +#endif + +#ifdef SIGALRM + NODE_DEFINE_CONSTANT(target, SIGALRM); +#endif + + NODE_DEFINE_CONSTANT(target, SIGTERM); + NODE_DEFINE_CONSTANT(target, SIGCHLD); + +#ifdef SIGSTKFLT + NODE_DEFINE_CONSTANT(target, SIGSTKFLT); +#endif + + +#ifdef SIGCONT + NODE_DEFINE_CONSTANT(target, SIGCONT); +#endif + +#ifdef SIGSTOP + NODE_DEFINE_CONSTANT(target, SIGSTOP); +#endif + +#ifdef SIGTSTP + NODE_DEFINE_CONSTANT(target, SIGTSTP); +#endif + +#ifdef SIGTTIN + NODE_DEFINE_CONSTANT(target, SIGTTIN); +#endif + +#ifdef SIGTTOU + NODE_DEFINE_CONSTANT(target, SIGTTOU); +#endif + +#ifdef SIGURG + NODE_DEFINE_CONSTANT(target, SIGURG); +#endif + +#ifdef SIGXCPU + NODE_DEFINE_CONSTANT(target, SIGXCPU); +#endif + +#ifdef SIGXFSZ + NODE_DEFINE_CONSTANT(target, SIGXFSZ); +#endif + +#ifdef SIGVTALRM + NODE_DEFINE_CONSTANT(target, SIGVTALRM); +#endif + +#ifdef SIGPROF + NODE_DEFINE_CONSTANT(target, SIGPROF); +#endif + +#ifdef SIGWINCH + NODE_DEFINE_CONSTANT(target, SIGWINCH); +#endif + +#ifdef SIGIO + NODE_DEFINE_CONSTANT(target, SIGIO); +#endif + +#ifdef SIGPOLL + NODE_DEFINE_CONSTANT(target, SIGPOLL); +#endif + +#ifdef SIGLOST + NODE_DEFINE_CONSTANT(target, SIGLOST); +#endif + +#ifdef SIGPWR + NODE_DEFINE_CONSTANT(target, SIGPWR); +#endif + +#ifdef SIGSYS + NODE_DEFINE_CONSTANT(target, SIGSYS); +#endif + +#ifdef SIGUNUSED + NODE_DEFINE_CONSTANT(target, SIGUNUSED); +#endif + } diff --git a/src/http.h b/src/http.h index ab207a3c5..eb7ecd774 100644 --- a/src/http.h +++ b/src/http.h @@ -13,6 +13,8 @@ public: static v8::Persistent client_constructor_template; static v8::Persistent server_constructor_template; + + virtual size_t size (void) { return sizeof(HTTPConnection); }; protected: static v8::Handle NewClient (const v8::Arguments& args); @@ -41,6 +43,8 @@ public: static void Initialize (v8::Handle target); static v8::Persistent constructor_template; + virtual size_t size (void) { return sizeof(HTTPServer); }; + protected: static v8::Handle New (const v8::Arguments& args); diff --git a/src/node.cc b/src/node.cc index 07384838b..6036e158b 100644 --- a/src/node.cc +++ b/src/node.cc @@ -4,6 +4,7 @@ #include "file.h" #include "http.h" #include "timer.h" +#include "process.h" #include "constants.h" #include "natives.h" @@ -302,6 +303,7 @@ Load (int argc, char *argv[]) NODE_SET_METHOD(node_obj, "reallyExit", node_exit); Timer::Initialize(node_obj); + Process::Initialize(node_obj); DefineConstants(node_obj); diff --git a/src/process.cc b/src/process.cc new file mode 100644 index 000000000..3cdeadc39 --- /dev/null +++ b/src/process.cc @@ -0,0 +1,486 @@ +#include "node.h" +#include "process.h" + +#include +#include +#include +#include +#include +#include + +using namespace v8; +using namespace node; + +#define ON_ERROR_SYMBOL String::NewSymbol("onError") +#define ON_OUTPUT_SYMBOL String::NewSymbol("onOutput") +#define ON_EXIT_SYMBOL String::NewSymbol("onExit") +#define PID_SYMBOL String::NewSymbol("pid") + +Persistent Process::constructor_template; + +void +Process::Initialize (Handle target) +{ + HandleScope scope; + + Local t = FunctionTemplate::New(Process::New); + constructor_template = Persistent::New(t); + constructor_template->InstanceTemplate()->SetInternalFieldCount(1); + + NODE_SET_PROTOTYPE_METHOD(constructor_template, "write", Process::Write); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Process::Close); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "kill", Process::Kill); + + constructor_template->PrototypeTemplate()->SetAccessor(PID_SYMBOL, + PIDGetter); + + target->Set(String::NewSymbol("Process"), constructor_template->GetFunction()); +} + +Handle +Process::New (const Arguments& args) +{ + if (args.Length() == 0) return Undefined(); + + HandleScope scope; + + String::Utf8Value command(args[0]->ToString()); + + Process *p = new Process(args.Holder()); + ObjectWrap::InformV8ofAllocation(p); + + int r = p->Spawn(*command); + if (r != 0) { + return ThrowException(String::New("Error spawning")); + } + + return args.This(); +} + +Handle +Process::PIDGetter (Local _, const AccessorInfo& info) +{ + Process *process = NODE_UNWRAP(Process, info.This()); + assert(process); + + HandleScope scope; + + if (process->pid_ == 0) return Null(); + + Local pid = Integer::New(process->pid_); + return scope.Close(pid); +} + +static void +free_buf (oi_buf *b) +{ + V8::AdjustAmountOfExternalAllocatedMemory(-b->len); + free(b); +} + +static oi_buf * +new_buf (size_t size) +{ + size_t total = sizeof(oi_buf) + size; + void *p = malloc(total); + if (p == NULL) return NULL; + + oi_buf *b = static_cast(p); + b->base = static_cast(p) + sizeof(oi_buf); + + b->len = size; + b->release = free_buf; + V8::AdjustAmountOfExternalAllocatedMemory(total); + + return b; +} + +Handle +Process::Write (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + + // XXX + // A lot of improvement can be made here. First of all we're allocating + // oi_bufs for every send which is clearly inefficent - it should use a + // memory pool or ring buffer. Of course, expressing binary data as an + // array of integers is extremely inefficent. This can improved when v8 + // bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been + // addressed. + + oi_buf *buf; + size_t len; + + if (args[0]->IsString()) { + enum encoding enc = ParseEncoding(args[1]); + Local s = args[0]->ToString(); + len = s->Utf8Length(); + buf = new_buf(len); + switch (enc) { + case RAW: + case ASCII: + s->WriteAscii(buf->base, 0, len); + break; + + case UTF8: + s->WriteUtf8(buf->base, len); + break; + + default: + assert(0 && "unhandled string encoding"); + } + + } else if (args[0]->IsArray()) { + Handle array = Handle::Cast(args[0]); + len = array->Length(); + buf = new_buf(len); + for (size_t i = 0; i < len; i++) { + Local int_value = array->Get(Integer::New(i)); + buf->base[i] = int_value->IntegerValue(); + } + + } else return ThrowException(String::New("Bad argument")); + + if (process->Write(buf) != 0) { + return ThrowException(String::New("Pipe already closed")); + } + + return Undefined(); +} + +Handle +Process::Kill (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + + int sig = SIGTERM; + if (args[0]->IsInt32()) sig = args[0]->Int32Value(); + + if (process->Kill(sig) != 0) { + return ThrowException(String::New("Process already dead")); + } + + return Undefined(); +} + +Handle +Process::Close (const Arguments& args) +{ + HandleScope scope; + Process *process = NODE_UNWRAP(Process, args.Holder()); + assert(process); + + if (process->Close() != 0) { + return ThrowException(String::New("Pipe already closed.")); + } + + return Undefined(); +} + +Process::Process (Handle handle) + : ObjectWrap(handle) +{ + ev_init(&stdout_watcher_, Process::OnOutput); + stdout_watcher_.data = this; + + ev_init(&stderr_watcher_, Process::OnOutput); + stderr_watcher_.data = this; + + ev_init(&stdin_watcher_, Process::OnWritable); + stdin_watcher_.data = this; + + ev_init(&child_watcher_, Process::OnExit); + child_watcher_.data = this; + + stdout_pipe_[0] = -1; + stdout_pipe_[1] = -1; + stderr_pipe_[0] = -1; + stderr_pipe_[1] = -1; + stdin_pipe_[0] = -1; + stdin_pipe_[1] = -1; + + got_close_ = false; + + pid_ = 0; + + oi_queue_init(&out_stream_); +} + +Process::~Process () +{ + Shutdown(); +} + +void +Process::Shutdown () +{ + // Clear the out_stream + while (!oi_queue_empty(&out_stream_)) { + oi_queue *q = oi_queue_last(&out_stream_); + oi_buf *buf = (oi_buf*) oi_queue_data(q, oi_buf, queue); + oi_queue_remove(q); + if (buf->release) buf->release(buf); + } + + if (stdout_pipe_[0] >= 0) close(stdout_pipe_[0]); + if (stdout_pipe_[1] >= 0) close(stdout_pipe_[1]); + + if (stderr_pipe_[0] >= 0) close(stderr_pipe_[0]); + if (stderr_pipe_[1] >= 0) close(stderr_pipe_[1]); + + if (stdin_pipe_[0] >= 0) close(stdin_pipe_[0]); + if (stdin_pipe_[1] >= 0) close(stdin_pipe_[1]); + + stdout_pipe_[0] = -1; + stdout_pipe_[1] = -1; + stderr_pipe_[0] = -1; + stderr_pipe_[1] = -1; + stdin_pipe_[0] = -1; + stdin_pipe_[1] = -1; + + ev_io_stop(EV_DEFAULT_UC_ &stdout_watcher_); + ev_io_stop(EV_DEFAULT_UC_ &stderr_watcher_); + ev_io_stop(EV_DEFAULT_UC_ &stdin_watcher_); + + ev_child_stop(EV_DEFAULT_UC_ &child_watcher_); + /* XXX Kill the PID? */ + pid_ = 0; + + Detach(); +} + +static inline int +SetNonBlocking (int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (r != 0) { + perror("SetNonBlocking()"); + } + return r; +} + +int +Process::Spawn (const char *command) +{ + assert(pid_ == 0); + assert(stdout_pipe_[0] == -1); + assert(stdout_pipe_[1] == -1); + assert(stderr_pipe_[0] == -1); + assert(stderr_pipe_[1] == -1); + assert(stdin_pipe_[0] == -1); + assert(stdin_pipe_[1] == -1); + + /* An implementation of popen(), basically */ + if (pipe(stdout_pipe_) < 0) { + perror("pipe()"); + return -1; + } + + if (pipe(stderr_pipe_) < 0) { + perror("pipe()"); + return -2; + } + + if (pipe(stdin_pipe_) < 0) { + perror("pipe()"); + return -3; + } + + switch (pid_ = vfork()) { + case -1: // Error. + Shutdown(); + return -4; + + case 0: // Child. + close(stdout_pipe_[0]); // close read end + dup2(stdout_pipe_[1], STDOUT_FILENO); + + close(stderr_pipe_[0]); // close read end + dup2(stderr_pipe_[1], STDERR_FILENO); + + close(stdin_pipe_[1]); // close write end + dup2(stdin_pipe_[0], STDIN_FILENO); + + execl("/bin/sh", "sh", "-c", command, (char *)NULL); + //execl(_PATH_BSHELL, "sh", "-c", program, (char *)NULL); + _exit(127); + } + + // Parent. + + ev_child_set(&child_watcher_, pid_, 0); + ev_child_start(EV_DEFAULT_UC_ &child_watcher_); + + SetNonBlocking(stdout_pipe_[0]); + ev_io_set(&stdout_watcher_, stdout_pipe_[0], EV_READ); + ev_io_start(EV_DEFAULT_UC_ &stdout_watcher_); + close(stdout_pipe_[1]); // close write end + stdout_pipe_[1] = -1; + + SetNonBlocking(stderr_pipe_[0]); + ev_io_set(&stderr_watcher_, stderr_pipe_[0], EV_READ); + ev_io_start(EV_DEFAULT_UC_ &stderr_watcher_); + close(stderr_pipe_[1]); // close write end + stderr_pipe_[1] = -1; + + SetNonBlocking(stdin_pipe_[1]); + ev_io_set(&stdin_watcher_, stdin_pipe_[1], EV_WRITE); + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + close(stdin_pipe_[0]); // close read end + stdin_pipe_[0] = -1; + + Attach(); + + return 0; +} + +void +Process::OnOutput (EV_P_ ev_io *watcher, int revents) +{ + int r; + char buf[16*1024]; + size_t buf_size = 16*1024; + + Process *process = static_cast(watcher->data); + + bool is_stdout = (&process->stdout_watcher_ == watcher); + int fd = is_stdout ? process->stdout_pipe_[0] : process->stderr_pipe_[0]; + + assert(revents == EV_READ); + assert(fd >= 0); + + HandleScope scope; + Handle callback_v = + process->handle_->Get(is_stdout ? ON_OUTPUT_SYMBOL : ON_ERROR_SYMBOL); + Handle callback; + if (callback_v->IsFunction()) { + callback = Handle::Cast(callback_v); + } + Handle argv[1]; + + for (;;) { + r = read(fd, buf, buf_size); + + if (r < 0) { + if (errno != EAGAIN) perror("IPC pipe read error"); + break; + } + + if (!callback.IsEmpty()) { + if (r == 0) { + argv[0] = Null(); + } else { + // TODO multiple encodings + argv[0] = String::New((const char*)buf, r); + } + + TryCatch try_catch; + callback->Call(process->handle_, 1, argv); + if (try_catch.HasCaught()) { + FatalException(try_catch); + return; + } + } + + if (r == 0) { + ev_io_stop(EV_DEFAULT_UC_ watcher); + break; + } + } +} + +void +Process::OnWritable (EV_P_ ev_io *watcher, int revents) +{ + Process *process = static_cast(watcher->data); + int sent; + + assert(revents == EV_WRITE); + assert(process->stdin_pipe_[1] >= 0); + + while (!oi_queue_empty(&process->out_stream_)) { + oi_queue *q = oi_queue_last(&process->out_stream_); + oi_buf *to_write = (oi_buf*) oi_queue_data(q, oi_buf, queue); + + sent = write( process->stdin_pipe_[1] + , to_write->base + to_write->written + , to_write->len - to_write->written + ); + if (sent < 0) { + if (errno == EAGAIN) break; + perror("IPC pipe write error"); + break; + } + + to_write->written += sent; + + if (to_write->written == to_write->len) { + oi_queue_remove(q); + if (to_write->release) to_write->release(to_write); + } + } + + if (oi_queue_empty(&process->out_stream_)) { + ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_); + if (process->got_close_) { + close(process->stdin_pipe_[1]); + process->stdin_pipe_[1] = -1; + } + } +} + +void +Process::OnExit (EV_P_ ev_child *watcher, int revents) +{ + ev_child_stop(EV_A_ watcher); + Process *process = static_cast(watcher->data); + + assert(revents == EV_CHILD); + assert(process->pid_ == watcher->rpid); + assert(&process->child_watcher_ == watcher); + + // Call onExit ( watcher->rstatus ) + HandleScope scope; + Handle callback_v = process->handle_->Get(ON_EXIT_SYMBOL); + + if (callback_v->IsFunction()) { + Handle callback = Handle::Cast(callback_v); + TryCatch try_catch; + Handle argv[1] = { Integer::New(watcher->rstatus) }; + callback->Call(process->handle_, 1, argv); + if (try_catch.HasCaught()) FatalException(try_catch); + } + process->Shutdown(); + process->Detach(); +} + +int +Process::Write (oi_buf *buf) +{ + if (stdin_pipe_[1] < 0 || got_close_) return -1; + oi_queue_insert_head(&out_stream_, &buf->queue); + buf->written = 0; + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + return 0; +} + +int +Process::Close () +{ + if (stdin_pipe_[1] < 0 || got_close_) return -1; + got_close_ = true; + ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); + return 0; +} + +int +Process::Kill (int sig) +{ + if (pid_ == 0) return -1; + return kill(pid_, sig); +} diff --git a/src/process.h b/src/process.h new file mode 100644 index 000000000..38138b87c --- /dev/null +++ b/src/process.h @@ -0,0 +1,56 @@ +#ifndef node_process_h +#define node_process_h + +#include "node.h" +#include +#include +#include + +namespace node { + +class Process : ObjectWrap { + public: + static void Initialize (v8::Handle target); + virtual size_t size (void) { return sizeof(Process); } + + protected: + static v8::Persistent constructor_template; + static v8::Handle New (const v8::Arguments& args); + static v8::Handle Write (const v8::Arguments& args); + static v8::Handle Close (const v8::Arguments& args); + static v8::Handle Kill (const v8::Arguments& args); + static v8::Handle PIDGetter (v8::Local _, const v8::AccessorInfo& info); + + Process(v8::Handle handle); + ~Process(); + + void Shutdown (); + int Spawn (const char *command); + int Write (oi_buf *buf); + int Close (); + int Kill (int sig); + + private: + static void OnOutput (EV_P_ ev_io *watcher, int revents); + static void OnError (EV_P_ ev_io *watcher, int revents); + static void OnWritable (EV_P_ ev_io *watcher, int revents); + static void OnExit (EV_P_ ev_child *watcher, int revents); + + ev_io stdout_watcher_; + ev_io stderr_watcher_; + ev_io stdin_watcher_; + ev_child child_watcher_; + + int stdout_pipe_[2]; + int stderr_pipe_[2]; + int stdin_pipe_[2]; + + pid_t pid_; + + bool got_close_; + + oi_queue out_stream_; +}; + +} // namespace node +#endif // node_process_h diff --git a/test/test-process-kill.js b/test/test-process-kill.js new file mode 100644 index 000000000..72e7e1090 --- /dev/null +++ b/test/test-process-kill.js @@ -0,0 +1,15 @@ +include("mjsunit.js"); + +var cat = new node.Process("cat"); + +var exit_status = -1; + +cat.onOutput = function (chunk) { assertEquals(null, chunk); }; +cat.onError = function (chunk) { assertEquals(null, chunk); }; +cat.onExit = function (status) { exit_status = status; }; + +cat.kill(); + +function onExit () { + assertTrue(exit_status > 0); +} diff --git a/test/test-process-simple.js b/test/test-process-simple.js new file mode 100644 index 000000000..f3a761abd --- /dev/null +++ b/test/test-process-simple.js @@ -0,0 +1,28 @@ +include("mjsunit.js"); + +var cat = new node.Process("cat"); + +var response = ""; +var exit_status = -1; + +cat.onOutput = function (chunk) { + if (chunk) { + response += chunk; + if (response === "hello world") cat.close(); + } +}; +cat.onError = function (chunk) { + assertEquals(null, chunk); +}; +cat.onExit = function (status) { exit_status = status; }; + +function onLoad () { + cat.write("hello"); + cat.write(" "); + cat.write("world"); +} + +function onExit () { + assertEquals(0, exit_status); + assertEquals("hello world", response); +} diff --git a/website/api.html b/website/api.html index 59bb15a4a..3b3bcdbaf 100644 --- a/website/api.html +++ b/website/api.html @@ -13,6 +13,7 @@
  1. Timers
  2. +
  3. Processes
  4. File I/O
      @@ -130,6 +131,66 @@
      Stops a interval from triggering.
      +

      Processes and IPC

      + +

      + Node provides a tridirectional popen(3) facility. + It is possible to stream data through the child's stdin, + stdout, and stderr in a fully non-blocking + way. +

      + +
      +
      new node.Process(command)
      +
      Launches a new process with the given command. For example: +
      var ls = new Process("ls -lh /usr");
      +
      + +
      process.pid
      +
      The PID of the child process.
      + +
      process.onOutput = function (chunk) { };
      +
      A callback to receive output from the process's stdout. + At the moment the received data is always a string and utf8 encoded. + (More encodings will be supported in the future.) + +

      If the process closes it's stdout, this callback will + be issued with null as an argument. Be prepared for this + possibility. +

      + +
      process.onError = function (chunk) { };
      +
      A callback to receive output from the process's stderr. + At the moment the received data is always a string and utf8 encoded. + (More encodings will be supported in the future.) + +

      If the process closes it's stderr, this callback will + be issued with null as an argument. Be prepared for this + possibility. +

      + +
      process.onExit = function (exit_code) { };
      +
      A callback which is called when the sub-process exits. The argument + is the exit status of the child. +
      + +
      process.write(data, encoding="ascii");
      +
      Write data to the child process's stdin. The second + argument is optional and specifies the encoding: possible values are + "utf8", "ascii", and "raw". +
      + +
      process.close();
      +
      Closes the processes stdin stream.
      + +
      process.kill(signal=node.SIGTERM);
      +
      Kills the child process with the given signal. If no argument is + given, the process will be sent node.SIGTERM. The standard + POSIX signals are defined under the node namespace (e.g. + node.SIGINT, node.SIGUSR1). +
      +
      +

      node.fs

      diff --git a/wscript b/wscript index 8d49a8811..ce85c400b 100644 --- a/wscript +++ b/wscript @@ -158,6 +158,7 @@ def build(bld): src/net.cc src/file.cc src/timer.cc + src/process.cc src/constants.cc """ node.includes = """