simple nanomsg variant of zeromq binary pubsub

This commit is contained in:
Troy D. Hanson
2014-05-21 11:16:03 -04:00
parent 0c44f13503
commit 0b9d3bf009
5 changed files with 299 additions and 0 deletions

View File

@@ -13,6 +13,11 @@ AC_CHECK_LIB(jansson,json_string,
AM_CONDITIONAL(HAVE_JANSSON,true),
AM_CONDITIONAL(HAVE_JANSSON,false))
# is nanomsg installed
AC_CHECK_LIB(nanomsg,nn_socket,
AM_CONDITIONAL(HAVE_NANOMSG,true),
AM_CONDITIONAL(HAVE_NANOMSG,false))
# is zeromq (0MQ) installed
AC_CHECK_LIB(zmq,zmq_init,
AM_CONDITIONAL(HAVE_ZEROMQ,true),

View File

@@ -14,6 +14,8 @@ kvsp_mod_LDADD = $(LIBSPOOL)
kvsp_rewind_LDADD = $(LIBSPOOL)
kvsp_bpub_LDADD = $(LIBSPOOL)
kvsp_bsub_LDADD = $(LIBSPOOL)
kvsp_npub_LDADD = $(LIBSPOOL)
kvsp_nsub_LDADD = $(LIBSPOOL)
kvsp_pub_LDADD = $(LIBSPOOL)
kvsp_sub_LDADD = $(LIBSPOOL)
kvsp_concen_LDADD = $(LIBSPOOL)
@@ -21,8 +23,16 @@ kvsp_tpub_LDADD = $(LIBSPOOL)
kvsp_bpub_SOURCES = kvsp-bpub.c kvsp-bconfig.c
kvsp_bsub_SOURCES = kvsp-bsub.c kvsp-bconfig.c
kvsp_npub_SOURCES = kvsp-npub.c kvsp-bconfig.c
kvsp_nsub_SOURCES = kvsp-nsub.c kvsp-bconfig.c
kvsp_tpub_SOURCES = kvsp-tpub.c kvsp-bconfig.c
if HAVE_NANOMSG
bin_PROGRAMS += kvsp-npub kvsp-nsub
kvsp_npub_LDADD += -lnanomsg
kvsp_nsub_LDADD += -lnanomsg
endif
if HAVE_ZEROMQ
bin_PROGRAMS += kvsp-bpub kvsp-bsub
kvsp_bpub_LDADD += -lzmq
@@ -48,5 +58,7 @@ kvsp_sub_DEPENDENCIES = ../src/libkvspool.a
kvsp_pub_DEPENDENCIES = ../src/libkvspool.a
kvsp_bpub_DEPENDENCIES = ../src/libkvspool.a
kvsp_bsub_DEPENDENCIES = ../src/libkvspool.a
kvsp_npub_DEPENDENCIES = ../src/libkvspool.a
kvsp_nsub_DEPENDENCIES = ../src/libkvspool.a
kvsp_concen_DEPENDENCIES = ../src/libkvspool.a
kvsp_tpub_DEPENDENCIES = ../src/libkvspool.a

138
utils/kvsp-npub.c Normal file
View File

@@ -0,0 +1,138 @@
#include <stdio.h>
#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/prctl.h>
#include <assert.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
#include <arpa/inet.h>
#include "utarray.h"
#include "utstring.h"
#include "kvspool.h"
#include "kvsp-bconfig.h"
char *config_file;
UT_string *tmp;
char *remote; // remote side to which we publish (we do the active open)
int sock,eid;
int verbose;
char *spool;
void *set;
void *sp;
void usage(char *prog) {
fprintf(stderr, "usage: %s [-v] -b <cast-config> -d spool <dst-path>\n", prog);
fprintf(stderr, " <dst-path> is the nsub peer e.g. tcp://127.0.0.1:1234\n");
exit(-1);
}
int set_to_binary(void *set) {
uint32_t l, u, a,b,c,d, abcd;
uint16_t s;
uint8_t g;
double h;
utstring_clear(tmp);
int rc=-1,i=0,*t;
kv_t *kv, kvdef;
char **k=NULL,**def;
while( (k=(char**)utarray_next(output_keys,k))) {
kv = kv_get(set,*k);
t = (int*)utarray_eltptr(output_types,i); assert(t);
def = (char**)utarray_eltptr(output_defaults,i); assert(def);
if (kv==NULL) { /* no such key */
kv=&kvdef;
if (*def) {kv->val=*def; kv->vlen=strlen(*def);} /* default */
else if (*t == str) {kv->val=NULL; kv->vlen=0;} /* zero len string */
else {
fprintf(stderr,"required key %s not present in spool frame\n", *k);
goto done;
}
}
switch(*t) {
case d64: h=atof(kv->val); utstring_bincpy(tmp,&h,sizeof(h)); break;
case i8: g=atoi(kv->val); utstring_bincpy(tmp,&g,sizeof(g)); break;
case i16: s=atoi(kv->val); utstring_bincpy(tmp,&s,sizeof(s)); break;
case i32: u=atoi(kv->val); utstring_bincpy(tmp,&u,sizeof(u)); break;
case str:
l=kv->vlen; utstring_bincpy(tmp,&l,sizeof(l)); /* length prefix */
utstring_bincpy(tmp,kv->val,kv->vlen); /* string itself */
break;
case ipv4:
if ((sscanf(kv->val,"%u.%u.%u.%u",&a,&b,&c,&d) != 4) ||
(a > 255 || b > 255 || c > 255 || d > 255)) {
fprintf(stderr,"invalid IP for key %s: %s\n",*k,kv->val);
goto done;
}
abcd = (a << 24) | (b << 16) | (c << 8) | d;
abcd = htonl(abcd);
utstring_bincpy(tmp,&abcd,sizeof(abcd));
break;
default: assert(0); break;
}
i++;
}
rc = 0;
done:
return rc;
}
int main(int argc, char *argv[]) {
int opt,rc=-1;
size_t len;
void *buf;
set = kv_set_new();
utarray_new(output_keys, &ut_str_icd);
utarray_new(output_defaults, &ut_str_icd);
utarray_new(output_types,&ut_int_icd);
utstring_new(tmp);
while ( (opt = getopt(argc, argv, "v+d:b:")) != -1) {
switch (opt) {
case 'v': verbose++; break;
case 'd': spool=strdup(optarg); break;
case 'b': config_file=strdup(optarg); break;
default: usage(argv[0]); break;
}
}
if (optind < argc) remote = argv[optind++];
if (!remote) usage(argv[0]);
if (spool == NULL) usage(argv[0]);
if (parse_config(config_file) < 0) goto done;
if ( !(sp = kv_spoolreader_new(spool))) goto done;
rc = -2;
if ( (sock = nn_socket(AF_SP, NN_PUSH)) < 0) goto done;
if ( (eid = nn_connect(sock, remote)) < 0) goto done;
while (kv_spool_read(sp,set,1) > 0) { /* read til interrupted by signal */
set_to_binary(set);
buf = utstring_body(tmp);
len = utstring_len(tmp);
rc = nn_send(sock, buf, len, 0);
if (rc == -1) goto done;
}
rc = 0;
done:
if (rc==-2) fprintf(stderr,"nano: %s %s\n", remote, nn_strerror(errno));
if (sock) nn_shutdown(sock,eid);
if (sp) kv_spoolreader_free(sp);
kv_set_free(set);
utarray_free(output_keys);
utarray_free(output_defaults);
utarray_free(output_types);
utstring_free(tmp);
return 0;
}

141
utils/kvsp-nsub.c Normal file
View File

@@ -0,0 +1,141 @@
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <time.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "kvspool_internal.h"
#include "utstring.h"
#include "uthash.h"
#include "kvsp-bconfig.h"
char *config_file;
UT_string *tmp;
int sock,eid;
int verbose;
char *local;
char *dir;
void *set;
void *sp;
void usage(char *exe) {
fprintf(stderr,"usage: %s [-v] -b <cast-config> -d <dir> <listen-addr>\n", exe);
fprintf(stderr," <listen-addr> is our local address e.g. tcp://0.0.0.0:1234\n");
exit(-1);
}
int get(void **msg_data,size_t *msg_len,void *dst,size_t len) {
if (*msg_len < len) {
fprintf(stderr,"received message shorter than expected\n");
return -1;
}
memcpy(dst,*msg_data,len);
*(char**)msg_data += len;
*msg_len -= len;
return 0;
}
int binary_to_frame(void *sp, void *set, void *msg_data, size_t msg_len) {
int rc=-1,i=0,*t;
const char *key;
struct in_addr ia;
uint32_t l, u, a,b,c,d, abcd;
uint16_t s;
uint8_t g;
double h;
kv_set_clear(set);
char **k = NULL;
while ( (k=(char**)utarray_next(output_keys,k))) {
t = (int*)utarray_eltptr(output_types,i); assert(t);
// type is *t and key is *k
utstring_clear(tmp);
switch(*t) {
case d64: if (get(&msg_data,&msg_len,&h,sizeof(h))<0) goto done; utstring_printf(tmp,"%f",h); break;
case i8: if (get(&msg_data,&msg_len,&g,sizeof(g))<0) goto done; utstring_printf(tmp,"%d",(int)g); break;
case i16: if (get(&msg_data,&msg_len,&s,sizeof(s))<0) goto done; utstring_printf(tmp,"%d",(int)s); break;
case i32: if (get(&msg_data,&msg_len,&u,sizeof(u))<0) goto done; utstring_printf(tmp,"%d",u); break;
case str:
if (get(&msg_data,&msg_len,&l,sizeof(l)) < 0) goto done;
utstring_reserve(tmp,l);
if (get(&msg_data,&msg_len,utstring_body(tmp),l) < 0) goto done;
tmp->i += l;
break;
case ipv4:
if (get(&msg_data,&msg_len,&abcd,sizeof(abcd)) < 0) goto done;
ia.s_addr = abcd;
utstring_printf(tmp,"%s", inet_ntoa(ia));
break;
default: assert(0); break;
}
i++;
key = *k;
kv_add(set, key, strlen(key), utstring_body(tmp), utstring_len(tmp));
}
kv_spool_write(sp, set);
rc = 0;
done:
if (rc) fprintf(stderr,"binary frame mismatches expected message length\n");
return rc;
}
int main(int argc, char *argv[]) {
int opt,rc=-1,len;
char *buf;
set = kv_set_new();
utstring_new(tmp);
utarray_new(output_keys, &ut_str_icd);
utarray_new(output_defaults, &ut_str_icd);
utarray_new(output_types,&ut_int_icd);
while ( (opt = getopt(argc, argv, "d:b:v+")) != -1) {
switch (opt) {
case 'v': verbose++; break;
case 'd': dir=strdup(optarg); break;
case 'b': config_file=strdup(optarg); break;
default: usage(argv[0]); break;
}
}
if (!dir) usage(argv[0]);
if (parse_config(config_file) < 0) goto done;
if ( !(sp = kv_spoolwriter_new(dir))) goto done;
if (optind < argc) local = argv[optind++];
if (!local) usage(argv[0]);
rc = -2;
if ( (sock = nn_socket(AF_SP, NN_PULL)) < 0) goto done;
if ( (eid = nn_bind(sock, local)) < 0) goto done;
while(1) {
if ( (len = nn_recv(sock, &buf, NN_MSG, 0)) == -1) goto done;
if (binary_to_frame(sp,set,buf,len)) {rc=-3; goto done;}
nn_freemsg(buf);
}
rc = 0; /* not reached TODO under clean shutdown on signal */
done:
if (rc==-2) fprintf(stderr,"%s: %s\n", local, nn_strerror(errno));
if(sock) nn_shutdown(sock,eid);
kv_spoolwriter_free(sp);
if (set) kv_set_free(set);
utarray_free(output_keys);
utarray_free(output_defaults);
utarray_free(output_types);
utstring_free(tmp);
return rc;
}

3
utils/spr-cast.cfg Normal file
View File

@@ -0,0 +1,3 @@
str from
str when
i32 iter