This commit is contained in:
Troy D. Hanson
2016-03-31 19:04:17 -04:00
parent aa5fc13562
commit 2fe9d71f3d
6 changed files with 622 additions and 1 deletions

View File

@@ -33,6 +33,11 @@ AC_CHECK_LIB(rdkafka,rd_kafka_new,
AM_CONDITIONAL(HAVE_RDKAFKA,true),
AM_CONDITIONAL(HAVE_RDKAFKA,false))
# is libnnctl installed
AC_CHECK_LIB(nnctl,nnctl_exec,
AM_CONDITIONAL(HAVE_NNCTL,true),
AM_CONDITIONAL(HAVE_NNCTL,false), -lnanomsg)
# is SSL installed. Not sure why Python build requires
AC_CHECK_LIB(ssl,SSL_library_init,
AM_CONDITIONAL(HAVE_SSL,true),

View File

@@ -25,6 +25,7 @@ kvsp_concen_LDADD = $(LIBSPOOL)
kvsp_tpub_LDADD = $(LIBSPOOL)
kvsp_upub_LDADD = $(LIBSPOOL)
kvsp_kpub_LDADD = $(LIBSPOOL)
kvsp_kkpub_LDADD = $(LIBSPOOL)
kvsp_bcat_SOURCES = kvsp-bcat.c kvsp-bconfig.c
kvsp_bpub_SOURCES = kvsp-bpub.c kvsp-bconfig.c
@@ -48,6 +49,12 @@ if HAVE_RDKAFKA
if HAVE_JANSSON
bin_PROGRAMS += kvsp-kpub
kvsp_kpub_LDADD += -lrdkafka -ljansson
if HAVE_NANOMSG
bin_PROGRAMS += kvsp-kkpub
kvsp_kkpub_SOURCES = kvsp-kkpub.c ts.c ts.h
kvsp_kkpub_CFLAGS = ${AM_CFLAGS} -pthread
kvsp_kkpub_LDADD += -lrdkafka -ljansson -lnanomsg -lnnctl
endif
endif
endif

492
utils/kvsp-kkpub.c Normal file
View File

@@ -0,0 +1,492 @@
#include <errno.h>
#include <assert.h>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <string.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
#include <nanomsg/reqrep.h>
#include <pthread.h>
#include <limits.h>
#include <time.h>
#include <jansson.h>
#include <librdkafka/rdkafka.h>
#include "utarray.h"
#include "utstring.h"
#include "libnnctl.h"
#include "kvspool.h"
#include "tpl.h"
#include "ts.h"
struct {
int verbose;
int shutdown;
char *dir;
char *prog;
int signal_fd;
int epoll_fd;
int ticks;
time_t now;
/* stats */
ts_t *spr_msgs_ts;
ts_t *kaf_msgs_ts;
ts_t *kaf_bytes_ts;
/* remote receiver */
char *broker;
char *topic;
/* threads */
int nthread;
pthread_t spr_thread; /* spool reader thread (1) */
pthread_t *enc_thread; /* json encoding threads (n) */
pthread_t *kaf_thread; /* kafkaa sending threads(n) */
/* nano stuff below */
int ingress_socket_push;
int ingress_socket_pull;
int egress_socket_push;
int egress_socket_pull;
char *nnctl_binding;
int nnctl_socket;
int nnctl_fd;
nnctl *nnctl;
} CF = {
.signal_fd = -1,
.epoll_fd = -1,
.ingress_socket_push = -1,
.ingress_socket_pull = -1,
.egress_socket_push = -1,
.egress_socket_pull = -1,
.nnctl_binding = "tcp://127.0.0.1:9995",
.nnctl_socket = -1,
.nthread = 8,
};
#define STATS_INTERVAL 10
const ts_mm ts_int_mm = {.sz=sizeof(int)};
void usage() {
fprintf(stderr,"usage: %s <options>\n"
"\n"
" options:\n"
" -v (verbose)\n"
" -d <spool> (spool)\n"
" -t <topic> (kafka topic)\n"
" -b <broker> (kafka broker)\n"
" -N <binding> (nnctl binding)\n"
" -n <nthread> (threads/pool) [def:8]\n"
"\n",
CF.prog);
exit(-1);
}
/* signals that we'll accept via signalfd in epoll */
int sigs[] = {SIGHUP,SIGTERM,SIGINT,SIGQUIT,SIGALRM};
double rate_per_sec(ts_t *t) {
long total=0;
int i;
for(i=0; i<t->num_buckets; i++) total += *(int*)(bkt(t,i)->data);
int report_seconds = STATS_INTERVAL * t->num_buckets;
double events_per_second = report_seconds ? (total * 1.0 / report_seconds) : 0;
return events_per_second;
}
void periodic_work() {
fprintf(stderr,"i/o summary\n");
fprintf(stderr," spool read rate: %f msgs/sec\n", rate_per_sec(CF.spr_msgs_ts));
fprintf(stderr," xmitr intake rate: %f msgs/sec\n", rate_per_sec(CF.kaf_msgs_ts));
fprintf(stderr," xmitr output rate: %f bytes/sec\n", rate_per_sec(CF.kaf_bytes_ts));
}
int new_epoll(int events, int fd) {
int rc;
struct epoll_event ev;
memset(&ev,0,sizeof(ev)); // placate valgrind
ev.events = events;
ev.data.fd= fd;
if (CF.verbose) fprintf(stderr,"adding fd %d to epoll\n", fd);
rc = epoll_ctl(CF.epoll_fd, EPOLL_CTL_ADD, fd, &ev);
if (rc == -1) {
fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
}
return rc;
}
int handle_signal() {
int rc=-1;
struct signalfd_siginfo info;
if (read(CF.signal_fd, &info, sizeof(info)) != sizeof(info)) {
fprintf(stderr,"failed to read signal fd buffer\n");
goto done;
}
switch(info.ssi_signo) {
case SIGALRM:
if (CF.shutdown) goto done;
CF.now = time(NULL);
if ((++CF.ticks % STATS_INTERVAL) == 0) periodic_work();
alarm(1);
break;
default:
fprintf(stderr,"got signal %d\n", info.ssi_signo);
goto done;
break;
}
rc = 0;
done:
return rc;
}
#define MAX_BUF 65536
/* encode a tpl (kv frame) as json. this is, absolutely the wrong
* way to go about encoding something into JSON. for now, this is
* easier than figuring out the upstream (DNS sensor) encoding. */
void *enc_worker(void *data) {
char buf[MAX_BUF], *key, *val;
json_t *o = json_object();
int rc=-1, len, nc;
tpl_node *tn;
while (CF.shutdown == 0) {
len = nn_recv(CF.ingress_socket_pull, buf, MAX_BUF, 0);
if (len < 0) {
fprintf(stderr,"nn_recv: %s\n", nn_strerror(errno));
goto done;
}
/* decode, then re-encode as json */
json_object_clear(o);
tn = tpl_map("A(ss)",&key,&val); assert(tn);
if (tpl_load(tn,TPL_MEM,buf,len) < 0) goto done;
while(tpl_unpack(tn,1) > 0) {
// check if value is already JSON
json_t *jtest;
jtest = json_loads(val,0,NULL);
if(jtest) {
json_object_set_new(o,key,jtest);
} else {
json_t *jval = json_string(val);
json_object_set_new(o, key, jval);
}
free(key); key=NULL;
free(val); val=NULL;
}
tpl_free(tn);
/* dump the json object, then newline-terminate it. */
if (CF.verbose>1) json_dumpf(o, stderr, JSON_INDENT(1));
char *dump = json_dumps(o, JSON_INDENT(0));
size_t dump_len = strlen(dump);
char line[dump_len+1];
memcpy(line, dump, dump_len);
line[dump_len] = '\n';
free(dump);
/* give the buffer to nano, from here it goes to tcp thread */
nc = nn_send(CF.egress_socket_push, line, dump_len+1, 0);
if (nc < 0) {
fprintf(stderr,"nn_send: %s\n", nn_strerror(errno));
goto done;
}
}
rc = 0;
done:
CF.shutdown = 1;
json_decref(o);
return NULL;
}
/* transmitter worker */
void *kaf_worker(void *data) {
char buf[MAX_BUF], *b;
int rc=-1, nr, len, l, count=0;
/* kafka connection setup */
char errstr[512];
rd_kafka_t *k;
rd_kafka_topic_t *t;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
int partition = RD_KAFKA_PARTITION_UA;
char *key = NULL;
int keylen = key ? strlen(key) : 0;
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
k = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (k == NULL) {
fprintf(stderr, "rd_kafka_new: %s\n", errstr);
goto done;
}
if (rd_kafka_brokers_add(k, CF.broker) < 1) {
fprintf(stderr, "invalid broker\n");
goto done;
}
t = rd_kafka_topic_new(k, CF.topic, topic_conf);
while (CF.shutdown == 0) {
len = nn_recv(CF.egress_socket_pull, buf, MAX_BUF, 0);
if (len < 0) {
fprintf(stderr,"nn_recv: %s\n", nn_strerror(errno));
goto done;
}
rc = rd_kafka_produce(t, partition, RD_KAFKA_MSG_F_FREE, buf, len,
key, keylen, NULL);
if ((rc == -1) && (errno == ENOBUFS)) {
/* check for backpressure. what to do? wait for space.. */
fprintf(stderr,"backpressure\n");
goto done; // FIXME
}
if (rc == -1) {
fprintf(stderr,"rd_kafka_produce: failed\n");
goto done;
}
if (++count % 1000) rd_kafka_poll(k, 10); // FIXME handle delivery reports
ts_add(CF.kaf_bytes_ts, CF.now, &len);
ts_add(CF.kaf_msgs_ts, CF.now, NULL);
}
rc = 0;
done:
CF.shutdown = 1;
return NULL;
}
/* this returns memory that caller must free */
int set_to_buf(void *set, char **buf, size_t *len) {
int rc=-1;
char *key, *val;
tpl_node *tn = tpl_map("A(ss)",&key,&val); assert(tn);
kv_t *kv = NULL;
while (kv = kv_next(set,kv)) {
key = kv->key;
val = kv->val;
tpl_pack(tn,1);
}
if (tpl_dump(tn,TPL_MEM,buf,len) < 0) goto done;
tpl_free(tn);
rc = 0;
done:
return rc;
}
void *spr_worker(void *data) {
int rc=-1, nc;
void *sp=NULL, *set;
size_t len;
char *buf;
set = kv_set_new();
sp = kv_spoolreader_new(CF.dir);
if (!sp) goto done;
while (kv_spool_read(sp,set,1) > 0) {
if (set_to_buf(set,&buf,&len) < 0) goto done;
assert(len < MAX_BUF);
nc = nn_send(CF.ingress_socket_push, buf, len, 0);
free(buf);
if (nc < 0) {
fprintf(stderr,"nn_send: %s\n", nn_strerror(errno));
goto done;
}
ts_add(CF.spr_msgs_ts, CF.now, NULL);
if (CF.shutdown) break;
}
done:
CF.shutdown = 1;
kv_set_free(set);
if (sp) kv_spoolreader_free(sp);
return NULL;
}
/* we use nano sockets to set up a pipeline like:
*
* spool-reader
* PUSH
* | <--- "ingress" socket
* PULL
* json-encoder
* PUSH
* | <--- "egress" socket
* PULL
* kaf-sender
*/
int setup_nano(void) {
int rc = -1;
/* set up the ingress and egress sockets */
if ( (CF.ingress_socket_push = nn_socket(AF_SP, NN_PUSH)) < 0) goto done;
if ( (CF.ingress_socket_pull = nn_socket(AF_SP, NN_PULL)) < 0) goto done;
if ( (CF.egress_socket_push = nn_socket(AF_SP, NN_PUSH)) < 0) goto done;
if ( (CF.egress_socket_pull = nn_socket(AF_SP, NN_PULL)) < 0) goto done;
if (nn_bind(CF.ingress_socket_push, "ipc://ingress.ipc") < 0) goto done;
if (nn_connect(CF.ingress_socket_pull, "ipc://ingress.ipc") < 0) goto done;
if (nn_bind(CF.egress_socket_push, "ipc://egress.ipc") < 0) goto done;
if (nn_connect(CF.egress_socket_pull, "ipc://egress.ipc") < 0) goto done;
/* set up our nnctl rep socket */
if ( (CF.nnctl_socket = nn_socket(AF_SP, NN_REP)) < 0) goto done;
if (nn_bind(CF.nnctl_socket, CF.nnctl_binding) < 0) goto done;
if ( (CF.nnctl = nnctl_init(NULL, NULL)) == NULL) goto done;
rc = 0;
done:
if (rc < 0) fprintf(stderr,"nano: %s\n", nn_strerror(errno));
return rc;
}
int main(int argc, char *argv[]) {
int opt, i, n, rc=-1;
struct epoll_event ev;
CF.prog = argv[0];
CF.now = time(NULL);
void *res;
while ( (opt = getopt(argc, argv, "v+N:n:d:b:t:")) != -1) {
switch(opt) {
case 'v': CF.verbose++; break;
case 'n': CF.nthread=atoi(optarg); break;
case 'N': CF.nnctl_binding=strdup(optarg); break;
case 'd': CF.dir=strdup(optarg); break;
case 't': CF.topic=strdup(optarg); break;
case 'b': CF.broker=strdup(optarg); break;
default: usage();
}
}
if (CF.dir == NULL) usage();
if (CF.broker == NULL) usage();
if (CF.topic == NULL) CF.topic = CF.dir;
/* stats (time series) for input/output tracking */
CF.spr_msgs_ts = ts_new(6, STATS_INTERVAL ,&ts_int_mm);
CF.kaf_msgs_ts = ts_new(6, STATS_INTERVAL, &ts_int_mm);
CF.kaf_bytes_ts = ts_new(6, STATS_INTERVAL, &ts_int_mm);
/* block all signals. we take signals synchronously via signalfd */
sigset_t all;
sigfillset(&all);
sigprocmask(SIG_SETMASK,&all,NULL);
/* a few signals we'll accept via our signalfd */
sigset_t sw;
sigemptyset(&sw);
for(n=0; n < sizeof(sigs)/sizeof(*sigs); n++) sigaddset(&sw, sigs[n]);
/* create the signalfd for receiving signals */
CF.signal_fd = signalfd(-1, &sw, 0);
if (CF.signal_fd == -1) {
fprintf(stderr,"signalfd: %s\n", strerror(errno));
goto done;
}
/* set up the epoll instance */
CF.epoll_fd = epoll_create(1);
if (CF.epoll_fd == -1) {
fprintf(stderr,"epoll: %s\n", strerror(errno));
goto done;
}
if (setup_nano() < 0) goto done;
/* add descriptors of interest */
size_t sz = sizeof(CF.nnctl_fd);
nn_getsockopt(CF.nnctl_socket, NN_SOL_SOCKET, NN_RCVFD, &CF.nnctl_fd, &sz);
if (new_epoll(EPOLLIN, CF.nnctl_fd)) goto done; // nnctl socket
if (new_epoll(EPOLLIN, CF.signal_fd)) goto done; // signal socket
/* fire up threads */
rc = pthread_create(&CF.spr_thread, NULL, spr_worker, NULL); if (rc) goto done;
CF.enc_thread = malloc(sizeof(pthread_t)*CF.nthread);
if (CF.enc_thread == NULL) goto done;
for(i=0; i < CF.nthread; i++) {
rc = pthread_create(&CF.enc_thread[i],NULL,enc_worker,NULL);
if (rc) goto done;
}
CF.kaf_thread = malloc(sizeof(pthread_t)*CF.nthread);
if (CF.kaf_thread == NULL) goto done;
for(i=0; i < CF.nthread; i++) {
rc = pthread_create(&CF.kaf_thread[i],NULL,kaf_worker,NULL);
if (rc) goto done;
}
alarm(1);
while (epoll_wait(CF.epoll_fd, &ev, 1, -1) > 0) {
if (ev.data.fd == CF.signal_fd) {
if (handle_signal() < 0) goto done;
}
if (ev.data.fd == CF.nnctl_fd) {
if (nnctl_exec(CF.nnctl, CF.nnctl_socket) < 0) goto done;
}
}
rc = 0;
done:
CF.shutdown=1;
nn_term();
fprintf(stderr,"shutting down threads:\n");
fprintf(stderr,"spoolreader...\n");
if (CF.spr_thread) {
pthread_cancel(CF.spr_thread);
pthread_join(CF.spr_thread,NULL);
}
fprintf(stderr,"encoders...\n");
if (CF.enc_thread) {
for(i=0; i < CF.nthread; i++) {
pthread_cancel(CF.enc_thread[i]);
pthread_join(CF.enc_thread[i],NULL);
}
}
fprintf(stderr,"transmitters...\n");
if (CF.kaf_thread) {
for(i=0; i < CF.nthread; i++) {
pthread_cancel(CF.kaf_thread[i]);
pthread_join(CF.kaf_thread[i],NULL);
}
}
fprintf(stderr,"terminating...\n");
if (CF.ingress_socket_push >= 0) nn_close(CF.ingress_socket_push);
if (CF.ingress_socket_pull >= 0) nn_close(CF.ingress_socket_pull);
if (CF.egress_socket_push >= 0) nn_close(CF.egress_socket_push);
if (CF.egress_socket_pull >= 0) nn_close(CF.egress_socket_pull);
if (CF.nnctl_socket >= 0) nn_close(CF.nnctl_socket);
if (CF.epoll_fd != -1) close(CF.epoll_fd);
if (CF.signal_fd != -1) close(CF.signal_fd);
ts_free(CF.spr_msgs_ts);
ts_free(CF.kaf_msgs_ts);
ts_free(CF.kaf_bytes_ts);
if (CF.enc_thread) free(CF.enc_thread);
if (CF.kaf_thread) free(CF.kaf_thread);
if (CF.nnctl) nnctl_free(CF.nnctl);
return rc;
}

View File

@@ -21,7 +21,7 @@ char *topic;
char errstr[512];
void usage(char *prog) {
fprintf(stderr, "usage: %s [-v] -d spool <broker>\n", prog);
fprintf(stderr, "usage: %s [-v] [-t topic] -d spool <broker>\n", prog);
exit(-1);
}

86
utils/ts.c Normal file
View File

@@ -0,0 +1,86 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include "ts.h"
static void ts_def_clear(char *data, size_t sz) { memset(data,0,sz); }
static void ts_def_incr(int *cur, int *incr) { *cur += (incr ? (*incr) : 1); }
static void ts_def_show(int *cur) { printf("%d\n",*cur); }
ts_t *ts_new(unsigned num_buckets, unsigned secs_per_bucket, const ts_mm *mm) {
int i;
ts_t *t = calloc(1,sizeof(ts_t)); if (!t) return NULL;
t->secs_per_bucket = secs_per_bucket;
t->num_buckets = num_buckets;
t->mm = *mm; /* struct copy */
// pad sz so sequential buckets' time_t are naturally aligned
int pad = ((t->mm.sz % sizeof(ts_bucket)) > 0) ?
(sizeof(ts_bucket) - (t->mm.sz % sizeof(ts_bucket))) :
0;
t->mm.sz += pad;
if (t->mm.ctor == NULL) t->mm.ctor = (ts_ctor_f*)ts_def_clear;
if (t->mm.data == NULL) {
if (mm->sz == sizeof(int)) t->mm.data = (ts_data_f*)ts_def_incr;
else assert(0);
}
if (t->mm.show == NULL) {
if (mm->sz == sizeof(int)) t->mm.show = (ts_show_f*)ts_def_show;
}
t->buckets = calloc(num_buckets,sizeof(ts_bucket)+t->mm.sz);
if (t->buckets == NULL) { free(t); return NULL; }
for(i=0; i<t->num_buckets; i++) {
//fprintf(stderr,"t->buckets %p bkt(t,%d) %p\n", t->buckets, i, bkt(t,i));
bkt(t,i)->start = i * t->secs_per_bucket;
t->mm.ctor(bkt(t,i)->data,t->mm.sz);
}
return t;
}
void ts_add(ts_t *t, time_t when, void *data) {
int i;
if (bkt(t,0)->start > when) return; // too old
/* figure out bucket it should go in */
unsigned idx = (when - bkt(t,0)->start) / t->secs_per_bucket;
if (idx >= t->num_buckets) { // shift
unsigned shift = (idx - t->num_buckets) + 1;
if (shift > t->num_buckets) shift = t->num_buckets;
if (shift <= t->num_buckets) {
if (t->mm.dtor) {
for(i=0; i<shift; i++) t->mm.dtor(bkt(t,i)->data);
}
}
if (shift < t->num_buckets) {
memmove(bkt(t,0),bkt(t,shift),(t->num_buckets-shift)*(sizeof(ts_bucket)+t->mm.sz));
}
if (shift) {
for(i=t->num_buckets-shift; i<t->num_buckets; i++) {
t->mm.ctor(bkt(t,i)->data,t->mm.sz);
bkt(t,i)->start = (i > 0) ? (bkt(t,i-1)->start + t->secs_per_bucket) : when;
}
}
idx = (when - bkt(t,0)->start) / t->secs_per_bucket;
assert(idx < t->num_buckets);
}
void *cur = bkt(t,idx)->data;
t->mm.data(cur,data);
}
void ts_free(ts_t *t) {
int i;
if (t->mm.dtor) {
for(i=0; i<t->num_buckets; i++) t->mm.dtor(bkt(t,i)->data);
}
free(t->buckets);
free(t);
}
void ts_show(ts_t *t) {
int i;
for(i=0; i<t->num_buckets; i++) {
printf("#%d(%lu): ", i, (long)(bkt(t,i)->start));
if (t->mm.show) t->mm.show(bkt(t,i)->data);
else printf("\n");
}
printf("\n");
}

31
utils/ts.h Normal file
View File

@@ -0,0 +1,31 @@
typedef void (ts_data_f)(char *cur, char *add);
typedef void (ts_ctor_f)(void *elt, size_t sz);
typedef void (ts_dtor_f)(void *elt);
typedef void (ts_show_f)(void *elt);
typedef struct {
size_t sz;
ts_data_f *data;
ts_ctor_f *ctor;
ts_dtor_f *dtor;
ts_show_f *show;
} ts_mm;
typedef struct {
time_t start;
char data[]; /* C99 flexible array member */
} ts_bucket;
#define bkt(t,i) ((ts_bucket*)((char*)((t)->buckets) + ((i)*(sizeof(ts_bucket)+(t)->mm.sz))))
typedef struct {
ts_mm mm;
unsigned secs_per_bucket;
unsigned num_buckets;
ts_bucket *buckets;
} ts_t;
ts_t *ts_new(unsigned num_buckets, unsigned secs_per_bucket, const ts_mm *mm);
void ts_add(ts_t *t, time_t when, void *data);
void ts_free(ts_t *t);
void ts_show(ts_t *t);