From 863cce135823d0eacc8a359923109bc4afa59a3d Mon Sep 17 00:00:00 2001 From: "Troy D. Hanson" Date: Thu, 31 Mar 2016 23:07:45 -0400 Subject: [PATCH] kkpub --- utils/kvsp-kkpub.c | 170 ++++++++++++++++++++++++++++++++++----------- utils/kvsp-kpub.c | 2 + 2 files changed, 130 insertions(+), 42 deletions(-) diff --git a/utils/kvsp-kkpub.c b/utils/kvsp-kkpub.c index e3494ef..5cc2a74 100644 --- a/utils/kvsp-kkpub.c +++ b/utils/kvsp-kkpub.c @@ -40,6 +40,8 @@ struct { /* remote receiver */ char *broker; char *topic; + UT_array *rdkafka_options; + UT_array *rdkafka_topic_options; /* threads */ int nthread; pthread_t spr_thread; /* spool reader thread (1) */ @@ -63,7 +65,7 @@ struct { .egress_socket_pull = -1, .nnctl_binding = "tcp://127.0.0.1:9995", .nnctl_socket = -1, - .nthread = 8, + .nthread = 1, }; #define STATS_INTERVAL 10 @@ -78,7 +80,19 @@ void usage() { " -t (kafka topic)\n" " -b (kafka broker)\n" " -N (nnctl binding)\n" - " -n (threads/pool) [def:8]\n" + " -n (threads/pool) [def:1]\n" + "\n" + "The rdkafka config variables should be left at default values!\n" + "\n" + " -c option=value (rdkafka options, repeatable)\n" + " statistics.interval.ms=60000 (0=disables)\n" + " compression.codec=gzip (none, gzip, snappy)\n" + " delivery.report.only.error=true\n" + " ...\n" + " -C option=value (rdkafka topic options, repeatable)\n" + " request.required.acks=0 (0=no acks, 1=ack)\n" + " message.timeout.ms=0 (millisec waited for successful delivery, 0=infinite)\n" + " ...\n" "\n", CF.prog); exit(-1); @@ -99,8 +113,8 @@ double rate_per_sec(ts_t *t) { void periodic_work() { fprintf(stderr,"i/o summary\n"); fprintf(stderr," spool read rate: %f msgs/sec\n", rate_per_sec(CF.spr_msgs_ts)); - fprintf(stderr," xmitr intake rate: %f msgs/sec\n", rate_per_sec(CF.kaf_msgs_ts)); - fprintf(stderr," xmitr output rate: %f bytes/sec\n", rate_per_sec(CF.kaf_bytes_ts)); + fprintf(stderr," xmitr#1 intake rate: %f msgs/sec\n", rate_per_sec(CF.kaf_msgs_ts)); + fprintf(stderr," xmitr#1 output rate: %f bytes/sec\n", rate_per_sec(CF.kaf_bytes_ts)); } int new_epoll(int events, int fd) { @@ -147,10 +161,8 @@ int handle_signal() { #define MAX_BUF 65536 -/* encode a tpl (kv frame) as json. this is, absolutely the wrong - * way to go about encoding something into JSON. for now, this is - * easier than figuring out the upstream (DNS sensor) encoding. */ -void *enc_worker(void *data) { +/* encode a tpl (kv frame) as json. */ +void *enc_worker(void *thread_id) { char buf[MAX_BUF], *key, *val; json_t *o = json_object(); int rc=-1, len, nc; @@ -167,15 +179,8 @@ void *enc_worker(void *data) { tn = tpl_map("A(ss)",&key,&val); assert(tn); if (tpl_load(tn,TPL_MEM,buf,len) < 0) goto done; while(tpl_unpack(tn,1) > 0) { - // check if value is already JSON - json_t *jtest; - jtest = json_loads(val,0,NULL); - if(jtest) { - json_object_set_new(o,key,jtest); - } else { - json_t *jval = json_string(val); - json_object_set_new(o, key, jval); - } + json_t *jval = json_string(val); + json_object_set_new(o, key, jval); free(key); key=NULL; free(val); val=NULL; } @@ -185,13 +190,10 @@ void *enc_worker(void *data) { if (CF.verbose>1) json_dumpf(o, stderr, JSON_INDENT(1)); char *dump = json_dumps(o, JSON_INDENT(0)); size_t dump_len = strlen(dump); - char line[dump_len+1]; - memcpy(line, dump, dump_len); - line[dump_len] = '\n'; - free(dump); - /* give the buffer to nano, from here it goes to tcp thread */ - nc = nn_send(CF.egress_socket_push, line, dump_len+1, 0); + /* give the buffer to nano, from here it goes to kaf thread */ + nc = nn_send(CF.egress_socket_push, dump, dump_len, 0); + free(dump); if (nc < 0) { fprintf(stderr,"nn_send: %s\n", nn_strerror(errno)); goto done; @@ -206,10 +208,38 @@ void *enc_worker(void *data) { return NULL; } +static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) { + fprintf(stderr,"%% ERROR CALLBACK: %s: %s: %s\n", + rd_kafka_name(rk), rd_kafka_err2str(err), reason); + CF.shutdown=1; +} + +static void throttle_cb (rd_kafka_t *rk, const char *broker_name, + int32_t broker_id, int throttle_time_ms, + void *opaque) { + fprintf(stderr,"%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms, + broker_name, broker_id); +} + +static int stats_cb (rd_kafka_t *rk, char *json, size_t json_len, + void *opaque) { + + /* Extract values for our own stats */ + //json_parse_stats(json); + + //if (stats_fp) + //fprintf(stats_fp, "%s\n", json); + fprintf(stderr, "stats:\n%.*s\n", (int)json_len, json); + return 0; +} + + + + /* transmitter worker */ -void *kaf_worker(void *data) { +void *kaf_worker(void *thread_id) { char buf[MAX_BUF], *b; - int rc=-1, nr, len, l, count=0; + int rc=-1, nr, len, l, count=0,kr; /* kafka connection setup */ char errstr[512]; @@ -221,8 +251,57 @@ void *kaf_worker(void *data) { char *key = NULL; int keylen = key ? strlen(key) : 0; + /* set up global options */ conf = rd_kafka_conf_new(); + rd_kafka_conf_set_error_cb(conf, err_cb); + rd_kafka_conf_set_throttle_cb(conf, throttle_cb); + rd_kafka_conf_set_stats_cb(conf, stats_cb); + kr = rd_kafka_conf_set(conf, "statistics.interval.ms", "60000", errstr, sizeof(errstr)); + if (kr != RD_KAFKA_CONF_OK) { + fprintf(stderr,"error: rd_kafka_conf_set: statistics.interval.ms 60000 => %s\n", errstr); + goto done; + } + char **opt=NULL; + while( (opt=(char **)utarray_next(CF.rdkafka_options,opt))) { + char *eq = strchr(*opt,'='); + if (eq == NULL) { + fprintf(stderr,"error: specify rdkafka params as key=value\n"); + goto done; + } + char *k = strdup(*opt), *v; + k[eq-*opt] = '\0'; + v = &k[eq-*opt + 1]; + if (CF.verbose) fprintf(stderr,"setting %s %s\n", k, v); + kr = rd_kafka_conf_set(conf, k, v, errstr, sizeof(errstr)); + if (kr != RD_KAFKA_CONF_OK) { + fprintf(stderr,"error: rd_kafka_conf_set: %s %s => %s\n", k, v, errstr); + goto done; + } + free(k); + } + + + /* set up topic options */ topic_conf = rd_kafka_topic_conf_new(); + opt=NULL; + while( (opt=(char **)utarray_next(CF.rdkafka_topic_options,opt))) { + char *eq = strchr(*opt,'='); + if (eq == NULL) { + fprintf(stderr,"error: specify rdkafka topic params as key=value\n"); + goto done; + } + char *k = strdup(*opt), *v; + k[eq-*opt] = '\0'; + v = &k[eq-*opt + 1]; + if (CF.verbose) fprintf(stderr,"setting %s %s\n", k, v); + kr = rd_kafka_topic_conf_set(topic_conf, k, v, errstr, sizeof(errstr)); + if (kr != RD_KAFKA_CONF_OK) { + fprintf(stderr,"error: rd_kafka_conf_set: %s %s => %s\n", k, v, errstr); + goto done; + } + free(k); + } + k = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (k == NULL) { @@ -244,23 +323,23 @@ void *kaf_worker(void *data) { goto done; } - rc = rd_kafka_produce(t, partition, RD_KAFKA_MSG_F_FREE, buf, len, + rc = rd_kafka_produce(t, partition, RD_KAFKA_MSG_F_COPY, buf, len, key, keylen, NULL); - if ((rc == -1) && (errno == ENOBUFS)) { - /* check for backpressure. what to do? wait for space.. */ - fprintf(stderr,"backpressure\n"); - goto done; // FIXME - } - if (rc == -1) { - fprintf(stderr,"rd_kafka_produce: failed\n"); + fprintf(stderr,"rd_kafka_produce: %s %s\n", + rd_kafka_err2str( rd_kafka_errno2err(errno)), + ((errno == ENOBUFS) ? "(backpressure)" : "")); goto done; } - if (++count % 1000) rd_kafka_poll(k, 10); // FIXME handle delivery reports + // cause rdkafka to invoke optional callbacks (msg delivery reports or error) + if ((++count % 1000) == 0) rd_kafka_poll(k, 0); - ts_add(CF.kaf_bytes_ts, CF.now, &len); - ts_add(CF.kaf_msgs_ts, CF.now, NULL); + if (thread_id == 0) { + /* only emit these stats from the first worker thread (not thread safe) */ + ts_add(CF.kaf_bytes_ts, CF.now, &len); + ts_add(CF.kaf_msgs_ts, CF.now, NULL); + } } rc = 0; @@ -365,9 +444,11 @@ int main(int argc, char *argv[]) { struct epoll_event ev; CF.prog = argv[0]; CF.now = time(NULL); + utarray_new(CF.rdkafka_options,&ut_str_icd); + utarray_new(CF.rdkafka_topic_options,&ut_str_icd); void *res; - while ( (opt = getopt(argc, argv, "v+N:n:d:b:t:")) != -1) { + while ( (opt = getopt(argc, argv, "hv+N:n:d:b:t:c:C:")) != -1) { switch(opt) { case 'v': CF.verbose++; break; case 'n': CF.nthread=atoi(optarg); break; @@ -375,7 +456,9 @@ int main(int argc, char *argv[]) { case 'd': CF.dir=strdup(optarg); break; case 't': CF.topic=strdup(optarg); break; case 'b': CF.broker=strdup(optarg); break; - default: usage(); + case 'c': utarray_push_back(CF.rdkafka_options,&optarg); break; + case 'C': utarray_push_back(CF.rdkafka_topic_options,&optarg); break; + case 'h': default: usage(); } } if (CF.dir == NULL) usage(); @@ -423,15 +506,16 @@ int main(int argc, char *argv[]) { rc = pthread_create(&CF.spr_thread, NULL, spr_worker, NULL); if (rc) goto done; CF.enc_thread = malloc(sizeof(pthread_t)*CF.nthread); if (CF.enc_thread == NULL) goto done; - for(i=0; i < CF.nthread; i++) { - rc = pthread_create(&CF.enc_thread[i],NULL,enc_worker,NULL); + void *id; + for(i=0, id=NULL; i < CF.nthread; i++, id++) { + rc = pthread_create(&CF.enc_thread[i],NULL,enc_worker,id); if (rc) goto done; } CF.kaf_thread = malloc(sizeof(pthread_t)*CF.nthread); if (CF.kaf_thread == NULL) goto done; - for(i=0; i < CF.nthread; i++) { - rc = pthread_create(&CF.kaf_thread[i],NULL,kaf_worker,NULL); + for(i=0, id=NULL; i < CF.nthread; i++, id++) { + rc = pthread_create(&CF.kaf_thread[i],NULL,kaf_worker,id); if (rc) goto done; } @@ -488,5 +572,7 @@ done: if (CF.enc_thread) free(CF.enc_thread); if (CF.kaf_thread) free(CF.kaf_thread); if (CF.nnctl) nnctl_free(CF.nnctl); + utarray_free(CF.rdkafka_options); + utarray_free(CF.rdkafka_topic_options); return rc; } diff --git a/utils/kvsp-kpub.c b/utils/kvsp-kpub.c index d180b55..df474b8 100644 --- a/utils/kvsp-kpub.c +++ b/utils/kvsp-kpub.c @@ -32,6 +32,8 @@ int main(int argc, char *argv[]) { json_t *o = NULL; int ticks=0; + fprintf(stderr,"this program is not recommended; use kvsp-kkpub instead\n"); + while ( (opt = getopt(argc, argv, "v+d:st:")) != -1) { switch (opt) { case 'v': verbose++; break;