multi client tpub

This commit is contained in:
Troy D. Hanson
2014-10-22 17:42:45 -04:00
parent 39c02b17da
commit d753ce95a7
4 changed files with 796 additions and 258 deletions

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2008-2011, Troy D. Hanson http://uthash.sourceforge.net
Copyright (c) 2008-2014, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -22,12 +22,11 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* a dynamic array implementation using macros
* see http://uthash.sourceforge.net/utarray
*/
#ifndef UTARRAY_H
#define UTARRAY_H
#define UTARRAY_VERSION 1.9.4
#define UTARRAY_VERSION 1.9.9
#ifdef __GNUC__
#define _UNUSED_ __attribute__ ((__unused__))
@@ -53,21 +52,21 @@ typedef struct {
typedef struct {
unsigned i,n;/* i: index of next available slot, n: num slots */
const UT_icd *icd; /* initializer, copy and destructor functions */
UT_icd icd; /* initializer, copy and destructor functions */
char *d; /* n slots of size icd->sz*/
} UT_array;
#define utarray_init(a,_icd) do { \
memset(a,0,sizeof(UT_array)); \
(a)->icd=_icd; \
(a)->icd=*_icd; \
} while(0)
#define utarray_done(a) do { \
if ((a)->n) { \
if ((a)->icd->dtor) { \
if ((a)->icd.dtor) { \
size_t _ut_i; \
for(_ut_i=0; _ut_i < (a)->i; _ut_i++) { \
(a)->icd->dtor(utarray_eltptr(a,_ut_i)); \
(a)->icd.dtor(utarray_eltptr(a,_ut_i)); \
} \
} \
free((a)->d); \
@@ -88,62 +87,62 @@ typedef struct {
#define utarray_reserve(a,by) do { \
if (((a)->i+by) > ((a)->n)) { \
while(((a)->i+by) > ((a)->n)) { (a)->n = ((a)->n ? (2*(a)->n) : 8); } \
if ( ((a)->d=(char*)realloc((a)->d, (a)->n*(a)->icd->sz)) == NULL) oom(); \
if ( ((a)->d=(char*)realloc((a)->d, (a)->n*(a)->icd.sz)) == NULL) oom(); \
} \
} while(0)
#define utarray_push_back(a,p) do { \
utarray_reserve(a,1); \
if ((a)->icd->copy) { (a)->icd->copy( _utarray_eltptr(a,(a)->i++), p); } \
else { memcpy(_utarray_eltptr(a,(a)->i++), p, (a)->icd->sz); }; \
if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,(a)->i++), p); } \
else { memcpy(_utarray_eltptr(a,(a)->i++), p, (a)->icd.sz); }; \
} while(0)
#define utarray_pop_back(a) do { \
if ((a)->icd->dtor) { (a)->icd->dtor( _utarray_eltptr(a,--((a)->i))); } \
if ((a)->icd.dtor) { (a)->icd.dtor( _utarray_eltptr(a,--((a)->i))); } \
else { (a)->i--; } \
} while(0)
#define utarray_extend_back(a) do { \
utarray_reserve(a,1); \
if ((a)->icd->init) { (a)->icd->init(_utarray_eltptr(a,(a)->i)); } \
else { memset(_utarray_eltptr(a,(a)->i),0,(a)->icd->sz); } \
if ((a)->icd.init) { (a)->icd.init(_utarray_eltptr(a,(a)->i)); } \
else { memset(_utarray_eltptr(a,(a)->i),0,(a)->icd.sz); } \
(a)->i++; \
} while(0)
#define utarray_len(a) ((a)->i)
#define utarray_eltptr(a,j) (((j) < (a)->i) ? _utarray_eltptr(a,j) : NULL)
#define _utarray_eltptr(a,j) ((char*)((a)->d + ((a)->icd->sz*(j) )))
#define _utarray_eltptr(a,j) ((char*)((a)->d + ((a)->icd.sz*(j) )))
#define utarray_insert(a,p,j) do { \
if (j > (a)->i) utarray_resize(a,j); \
utarray_reserve(a,1); \
if (j > (a)->i) break; \
if ((j) < (a)->i) { \
memmove( _utarray_eltptr(a,(j)+1), _utarray_eltptr(a,j), \
((a)->i - (j))*((a)->icd->sz)); \
((a)->i - (j))*((a)->icd.sz)); \
} \
if ((a)->icd->copy) { (a)->icd->copy( _utarray_eltptr(a,j), p); } \
else { memcpy(_utarray_eltptr(a,j), p, (a)->icd->sz); }; \
if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,j), p); } \
else { memcpy(_utarray_eltptr(a,j), p, (a)->icd.sz); }; \
(a)->i++; \
} while(0)
#define utarray_inserta(a,w,j) do { \
if (utarray_len(w) == 0) break; \
if (j > (a)->i) break; \
if (j > (a)->i) utarray_resize(a,j); \
utarray_reserve(a,utarray_len(w)); \
if ((j) < (a)->i) { \
memmove(_utarray_eltptr(a,(j)+utarray_len(w)), \
_utarray_eltptr(a,j), \
((a)->i - (j))*((a)->icd->sz)); \
((a)->i - (j))*((a)->icd.sz)); \
} \
if ((a)->icd->copy) { \
if ((a)->icd.copy) { \
size_t _ut_i; \
for(_ut_i=0;_ut_i<(w)->i;_ut_i++) { \
(a)->icd->copy(_utarray_eltptr(a,j+_ut_i), _utarray_eltptr(w,_ut_i)); \
(a)->icd.copy(_utarray_eltptr(a,j+_ut_i), _utarray_eltptr(w,_ut_i)); \
} \
} else { \
memcpy(_utarray_eltptr(a,j), _utarray_eltptr(w,0), \
utarray_len(w)*((a)->icd->sz)); \
utarray_len(w)*((a)->icd.sz)); \
} \
(a)->i += utarray_len(w); \
} while(0)
@@ -151,19 +150,19 @@ typedef struct {
#define utarray_resize(dst,num) do { \
size_t _ut_i; \
if (dst->i > (size_t)(num)) { \
if ((dst)->icd->dtor) { \
if ((dst)->icd.dtor) { \
for(_ut_i=num; _ut_i < dst->i; _ut_i++) { \
(dst)->icd->dtor(utarray_eltptr(dst,_ut_i)); \
(dst)->icd.dtor(utarray_eltptr(dst,_ut_i)); \
} \
} \
} else if (dst->i < (size_t)(num)) { \
utarray_reserve(dst,num-dst->i); \
if ((dst)->icd->init) { \
if ((dst)->icd.init) { \
for(_ut_i=dst->i; _ut_i < num; _ut_i++) { \
(dst)->icd->init(utarray_eltptr(dst,_ut_i)); \
(dst)->icd.init(utarray_eltptr(dst,_ut_i)); \
} \
} else { \
memset(_utarray_eltptr(dst,dst->i),0,(dst)->icd->sz*(num-dst->i)); \
memset(_utarray_eltptr(dst,dst->i),0,(dst)->icd.sz*(num-dst->i)); \
} \
} \
dst->i = num; \
@@ -174,15 +173,15 @@ typedef struct {
} while(0)
#define utarray_erase(a,pos,len) do { \
if ((a)->icd->dtor) { \
if ((a)->icd.dtor) { \
size_t _ut_i; \
for(_ut_i=0; _ut_i < len; _ut_i++) { \
(a)->icd->dtor(utarray_eltptr((a),pos+_ut_i)); \
(a)->icd.dtor(utarray_eltptr((a),pos+_ut_i)); \
} \
} \
if ((a)->i > (pos+len)) { \
memmove( _utarray_eltptr((a),pos), _utarray_eltptr((a),pos+len), \
(((a)->i)-(pos+len))*((a)->icd->sz)); \
(((a)->i)-(pos+len))*((a)->icd.sz)); \
} \
(a)->i -= (len); \
} while(0)
@@ -190,14 +189,14 @@ typedef struct {
#define utarray_renew(a,u) do { \
if (a) utarray_clear(a); \
else utarray_new((a),(u)); \
} while(0);
} while(0)
#define utarray_clear(a) do { \
if ((a)->i > 0) { \
if ((a)->icd->dtor) { \
if ((a)->icd.dtor) { \
size_t _ut_i; \
for(_ut_i=0; _ut_i < (a)->i; _ut_i++) { \
(a)->icd->dtor(utarray_eltptr(a,_ut_i)); \
(a)->icd.dtor(utarray_eltptr(a,_ut_i)); \
} \
} \
(a)->i = 0; \
@@ -205,15 +204,16 @@ typedef struct {
} while(0)
#define utarray_sort(a,cmp) do { \
qsort((a)->d, (a)->i, (a)->icd->sz, cmp); \
qsort((a)->d, (a)->i, (a)->icd.sz, cmp); \
} while(0)
#define utarray_find(a,v,cmp) bsearch((v),(a)->d,(a)->i,(a)->icd->sz,cmp)
#define utarray_find(a,v,cmp) bsearch((v),(a)->d,(a)->i,(a)->icd.sz,cmp)
#define utarray_front(a) (((a)->i) ? (_utarray_eltptr(a,0)) : NULL)
#define utarray_next(a,e) (((e)==NULL) ? utarray_front(a) : ((((a)->i) > (utarray_eltidx(a,e)+1)) ? _utarray_eltptr(a,utarray_eltidx(a,e)+1) : NULL))
#define utarray_prev(a,e) (((e)==NULL) ? utarray_back(a) : ((utarray_eltidx(a,e) > 0) ? _utarray_eltptr(a,utarray_eltidx(a,e)-1) : NULL))
#define utarray_back(a) (((a)->i) ? (_utarray_eltptr(a,(a)->i-1)) : NULL)
#define utarray_eltidx(a,e) (((char*)(e) >= (char*)((a)->d)) ? (((char*)(e) - (char*)((a)->d))/(a)->icd->sz) : -1)
#define utarray_eltidx(a,e) (((char*)(e) >= (char*)((a)->d)) ? (((char*)(e) - (char*)((a)->d))/(size_t)(a)->icd.sz) : -1)
/* last we pre-define a few icd for common utarrays of ints and strings */
static void utarray_str_cpy(void *dst, const void *src) {

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2003-2011, Troy D. Hanson http://uthash.sourceforge.net
Copyright (c) 2003-2014, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -32,13 +32,16 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
As decltype is only available in newer compilers (VS2010 or gcc 4.3+
when compiling c++ source) this code uses whatever method is needed
or, for VS2008 where neither is available, uses casting workarounds. */
#ifdef _MSC_VER /* MS compiler */
#if defined(_MSC_VER) /* MS compiler */
#if _MSC_VER >= 1600 && defined(__cplusplus) /* VS2010 or newer in C++ mode */
#define DECLTYPE(x) (decltype(x))
#else /* VS2008 or older (or VS2010 in C mode) */
#define NO_DECLTYPE
#define DECLTYPE(x)
#endif
#elif defined(__BORLANDC__) || defined(__LCC__) || defined(__WATCOMC__)
#define NO_DECLTYPE
#define DECLTYPE(x)
#else /* GNU, Sun and other compilers */
#define DECLTYPE(x) (__typeof(x))
#endif
@@ -56,22 +59,38 @@ do {
} while(0)
#endif
/* a number of the hash function use uint32_t which isn't defined on win32 */
#ifdef _MSC_VER
/* a number of the hash function use uint32_t which isn't defined on Pre VS2010 */
#if defined (_WIN32)
#if defined(_MSC_VER) && _MSC_VER >= 1600
#include <stdint.h>
#elif defined(__WATCOMC__)
#include <stdint.h>
#else
typedef unsigned int uint32_t;
typedef unsigned char uint8_t;
#endif
#else
#include <inttypes.h> /* uint32_t */
#include <stdint.h>
#endif
#define UTHASH_VERSION 1.9.4
#define UTHASH_VERSION 1.9.9
#ifndef uthash_fatal
#define uthash_fatal(msg) exit(-1) /* fatal error (out of memory,etc) */
#endif
#ifndef uthash_malloc
#define uthash_malloc(sz) malloc(sz) /* malloc fcn */
#endif
#ifndef uthash_free
#define uthash_free(ptr,sz) free(ptr) /* free fcn */
#endif
#ifndef uthash_noexpand_fyi
#define uthash_noexpand_fyi(tbl) /* can be defined to log noexpand */
#endif
#ifndef uthash_expand_fyi
#define uthash_expand_fyi(tbl) /* can be defined to log expands */
#endif
/* initial number of buckets */
#define HASH_INITIAL_NUM_BUCKETS 32 /* initial number of buckets */
@@ -104,12 +123,12 @@ do {
if (!((tbl)->bloom_bv)) { uthash_fatal( "out of memory"); } \
memset((tbl)->bloom_bv, 0, HASH_BLOOM_BYTELEN); \
(tbl)->bloom_sig = HASH_BLOOM_SIGNATURE; \
} while (0);
} while (0)
#define HASH_BLOOM_FREE(tbl) \
do { \
uthash_free((tbl)->bloom_bv, HASH_BLOOM_BYTELEN); \
} while (0);
} while (0)
#define HASH_BLOOM_BITSET(bv,idx) (bv[(idx)/8] |= (1U << ((idx)%8)))
#define HASH_BLOOM_BITTEST(bv,idx) (bv[(idx)/8] & (1U << ((idx)%8)))
@@ -125,6 +144,7 @@ do {
#define HASH_BLOOM_FREE(tbl)
#define HASH_BLOOM_ADD(tbl,hashv)
#define HASH_BLOOM_TEST(tbl,hashv) (1)
#define HASH_BLOOM_BYTELEN 0
#endif
#define HASH_MAKE_TABLE(hh,head) \
@@ -147,14 +167,24 @@ do {
} while(0)
#define HASH_ADD(hh,head,fieldname,keylen_in,add) \
HASH_ADD_KEYPTR(hh,head,&add->fieldname,keylen_in,add)
HASH_ADD_KEYPTR(hh,head,&((add)->fieldname),keylen_in,add)
#define HASH_REPLACE(hh,head,fieldname,keylen_in,add,replaced) \
do { \
replaced=NULL; \
HASH_FIND(hh,head,&((add)->fieldname),keylen_in,replaced); \
if (replaced!=NULL) { \
HASH_DELETE(hh,head,replaced); \
}; \
HASH_ADD(hh,head,fieldname,keylen_in,add); \
} while(0)
#define HASH_ADD_KEYPTR(hh,head,keyptr,keylen_in,add) \
do { \
unsigned _ha_bkt; \
(add)->hh.next = NULL; \
(add)->hh.key = (char*)keyptr; \
(add)->hh.keylen = keylen_in; \
(add)->hh.key = (char*)(keyptr); \
(add)->hh.keylen = (unsigned)(keylen_in); \
if (!(head)) { \
head = (add); \
(head)->hh.prev = NULL; \
@@ -205,17 +235,17 @@ do {
_hd_hh_del = &((delptr)->hh); \
if ((delptr) == ELMT_FROM_HH((head)->hh.tbl,(head)->hh.tbl->tail)) { \
(head)->hh.tbl->tail = \
(UT_hash_handle*)((char*)((delptr)->hh.prev) + \
(UT_hash_handle*)((ptrdiff_t)((delptr)->hh.prev) + \
(head)->hh.tbl->hho); \
} \
if ((delptr)->hh.prev) { \
((UT_hash_handle*)((char*)((delptr)->hh.prev) + \
((UT_hash_handle*)((ptrdiff_t)((delptr)->hh.prev) + \
(head)->hh.tbl->hho))->next = (delptr)->hh.next; \
} else { \
DECLTYPE_ASSIGN(head,(delptr)->hh.next); \
} \
if (_hd_hh_del->next) { \
((UT_hash_handle*)((char*)_hd_hh_del->next + \
((UT_hash_handle*)((ptrdiff_t)_hd_hh_del->next + \
(head)->hh.tbl->hho))->prev = \
_hd_hh_del->prev; \
} \
@@ -231,15 +261,21 @@ do {
#define HASH_FIND_STR(head,findstr,out) \
HASH_FIND(hh,head,findstr,strlen(findstr),out)
#define HASH_ADD_STR(head,strfield,add) \
HASH_ADD(hh,head,strfield,strlen(add->strfield),add)
HASH_ADD(hh,head,strfield[0],strlen(add->strfield),add)
#define HASH_REPLACE_STR(head,strfield,add,replaced) \
HASH_REPLACE(hh,head,strfield[0],strlen(add->strfield),add,replaced)
#define HASH_FIND_INT(head,findint,out) \
HASH_FIND(hh,head,findint,sizeof(int),out)
#define HASH_ADD_INT(head,intfield,add) \
HASH_ADD(hh,head,intfield,sizeof(int),add)
#define HASH_REPLACE_INT(head,intfield,add,replaced) \
HASH_REPLACE(hh,head,intfield,sizeof(int),add,replaced)
#define HASH_FIND_PTR(head,findptr,out) \
HASH_FIND(hh,head,findptr,sizeof(void *),out)
#define HASH_ADD_PTR(head,ptrfield,add) \
HASH_ADD(hh,head,ptrfield,sizeof(void *),add)
#define HASH_REPLACE_PTR(head,ptrfield,add,replaced) \
HASH_REPLACE(hh,head,ptrfield,sizeof(void *),add,replaced)
#define HASH_DEL(head,delptr) \
HASH_DELETE(hh,head,delptr)
@@ -324,13 +360,13 @@ do {
#define HASH_FCN HASH_JEN
#endif
/* The Bernstein hash function, used in Perl prior to v5.6 */
/* The Bernstein hash function, used in Perl prior to v5.6. Note (x<<5+x)=x*33. */
#define HASH_BER(key,keylen,num_bkts,hashv,bkt) \
do { \
unsigned _hb_keylen=keylen; \
char *_hb_key=(char*)(key); \
(hashv) = 0; \
while (_hb_keylen--) { (hashv) = ((hashv) * 33) + *_hb_key++; } \
while (_hb_keylen--) { (hashv) = (((hashv) << 5) + (hashv)) + *_hb_key++; } \
bkt = (hashv) & (num_bkts-1); \
} while (0)
@@ -346,16 +382,17 @@ do {
hashv ^= (hashv << 5) + (hashv >> 2) + _hs_key[_sx_i]; \
bkt = hashv & (num_bkts-1); \
} while (0)
/* FNV-1a variation */
#define HASH_FNV(key,keylen,num_bkts,hashv,bkt) \
do { \
unsigned _fn_i; \
char *_hf_key=(char*)(key); \
hashv = 2166136261UL; \
for(_fn_i=0; _fn_i < keylen; _fn_i++) \
hashv = (hashv * 16777619) ^ _hf_key[_fn_i]; \
hashv = hashv ^ _hf_key[_fn_i]; \
hashv = hashv * 16777619; \
bkt = hashv & (num_bkts-1); \
} while(0);
} while(0)
#define HASH_OAT(key,keylen,num_bkts,hashv,bkt) \
do { \
@@ -389,10 +426,10 @@ do {
#define HASH_JEN(key,keylen,num_bkts,hashv,bkt) \
do { \
unsigned _hj_i,_hj_j,_hj_k; \
char *_hj_key=(char*)(key); \
unsigned char *_hj_key=(unsigned char*)(key); \
hashv = 0xfeedbeef; \
_hj_i = _hj_j = 0x9e3779b9; \
_hj_k = keylen; \
_hj_k = (unsigned)(keylen); \
while (_hj_k >= 12) { \
_hj_i += (_hj_key[0] + ( (unsigned)_hj_key[1] << 8 ) \
+ ( (unsigned)_hj_key[2] << 16 ) \
@@ -440,7 +477,7 @@ do {
#endif
#define HASH_SFH(key,keylen,num_bkts,hashv,bkt) \
do { \
char *_sfh_key=(char*)(key); \
unsigned char *_sfh_key=(unsigned char*)(key); \
uint32_t _sfh_tmp, _sfh_len = keylen; \
\
int _sfh_rem = _sfh_len & 3; \
@@ -450,7 +487,7 @@ do {
/* Main loop */ \
for (;_sfh_len > 0; _sfh_len--) { \
hashv += get16bits (_sfh_key); \
_sfh_tmp = (get16bits (_sfh_key+2) << 11) ^ hashv; \
_sfh_tmp = (uint32_t)(get16bits (_sfh_key+2)) << 11 ^ hashv; \
hashv = (hashv << 16) ^ _sfh_tmp; \
_sfh_key += 2*sizeof (uint16_t); \
hashv += hashv >> 11; \
@@ -460,7 +497,7 @@ do {
switch (_sfh_rem) { \
case 3: hashv += get16bits (_sfh_key); \
hashv ^= hashv << 16; \
hashv ^= _sfh_key[sizeof (uint16_t)] << 18; \
hashv ^= (uint32_t)(_sfh_key[sizeof (uint16_t)] << 18); \
hashv += hashv >> 11; \
break; \
case 2: hashv += get16bits (_sfh_key); \
@@ -480,7 +517,7 @@ do {
hashv ^= hashv << 25; \
hashv += hashv >> 6; \
bkt = hashv & (num_bkts-1); \
} while(0);
} while(0)
#ifdef HASH_USING_NO_STRICT_ALIASING
/* The MurmurHash exploits some CPU's (x86,x86_64) tolerance for unaligned reads.
@@ -492,7 +529,7 @@ do {
* gcc -m64 -dM -E - < /dev/null (on gcc)
* cc -## a.c (where a.c is a simple test file) (Sun Studio)
*/
#if (defined(__i386__) || defined(__x86_64__))
#if (defined(__i386__) || defined(__x86_64__) || defined(_M_IX86))
#define MUR_GETBLOCK(p,i) p[i]
#else /* non intel */
#define MUR_PLUS0_ALIGNED(p) (((unsigned long)p & 0x3) == 0)
@@ -531,10 +568,12 @@ do { \
uint32_t _mur_h1 = 0xf88D5353; \
uint32_t _mur_c1 = 0xcc9e2d51; \
uint32_t _mur_c2 = 0x1b873593; \
uint32_t _mur_k1 = 0; \
const uint8_t *_mur_tail; \
const uint32_t *_mur_blocks = (const uint32_t*)(_mur_data+_mur_nblocks*4); \
int _mur_i; \
for(_mur_i = -_mur_nblocks; _mur_i; _mur_i++) { \
uint32_t _mur_k1 = MUR_GETBLOCK(_mur_blocks,_mur_i); \
_mur_k1 = MUR_GETBLOCK(_mur_blocks,_mur_i); \
_mur_k1 *= _mur_c1; \
_mur_k1 = MUR_ROTL32(_mur_k1,15); \
_mur_k1 *= _mur_c2; \
@@ -543,8 +582,8 @@ do { \
_mur_h1 = MUR_ROTL32(_mur_h1,13); \
_mur_h1 = _mur_h1*5+0xe6546b64; \
} \
const uint8_t *_mur_tail = (const uint8_t*)(_mur_data + _mur_nblocks*4); \
uint32_t _mur_k1=0; \
_mur_tail = (const uint8_t*)(_mur_data + _mur_nblocks*4); \
_mur_k1=0; \
switch((keylen) & 3) { \
case 3: _mur_k1 ^= _mur_tail[2] << 16; \
case 2: _mur_k1 ^= _mur_tail[1] << 8; \
@@ -570,10 +609,10 @@ do {
if (head.hh_head) DECLTYPE_ASSIGN(out,ELMT_FROM_HH(tbl,head.hh_head)); \
else out=NULL; \
while (out) { \
if (out->hh.keylen == keylen_in) { \
if ((HASH_KEYCMP(out->hh.key,keyptr,keylen_in)) == 0) break; \
if ((out)->hh.keylen == keylen_in) { \
if ((HASH_KEYCMP((out)->hh.key,keyptr,keylen_in)) == 0) break; \
} \
if (out->hh.hh_next) DECLTYPE_ASSIGN(out,ELMT_FROM_HH(tbl,out->hh.hh_next)); \
if ((out)->hh.hh_next) DECLTYPE_ASSIGN(out,ELMT_FROM_HH(tbl,(out)->hh.hh_next)); \
else out = NULL; \
} \
} while(0)
@@ -722,18 +761,22 @@ do {
_hs_qsize--; \
} else if ( (_hs_qsize == 0) || !(_hs_q) ) { \
_hs_e = _hs_p; \
if (_hs_p){ \
_hs_p = (UT_hash_handle*)((_hs_p->next) ? \
((void*)((char*)(_hs_p->next) + \
(head)->hh.tbl->hho)) : NULL); \
} \
_hs_psize--; \
} else if (( \
cmpfcn(DECLTYPE(head)(ELMT_FROM_HH((head)->hh.tbl,_hs_p)), \
DECLTYPE(head)(ELMT_FROM_HH((head)->hh.tbl,_hs_q))) \
) <= 0) { \
_hs_e = _hs_p; \
if (_hs_p){ \
_hs_p = (UT_hash_handle*)((_hs_p->next) ? \
((void*)((char*)(_hs_p->next) + \
(head)->hh.tbl->hho)) : NULL); \
} \
_hs_psize--; \
} else { \
_hs_e = _hs_q; \
@@ -748,13 +791,17 @@ do {
} else { \
_hs_list = _hs_e; \
} \
if (_hs_e) { \
_hs_e->prev = ((_hs_tail) ? \
ELMT_FROM_HH((head)->hh.tbl,_hs_tail) : NULL); \
} \
_hs_tail = _hs_e; \
} \
_hs_p = _hs_q; \
} \
if (_hs_tail){ \
_hs_tail->next = NULL; \
} \
if ( _hs_nmerges <= 1 ) { \
_hs_looping=0; \
(head)->hh.tbl->tail = _hs_tail; \
@@ -814,11 +861,18 @@ do {
if (head) { \
uthash_free((head)->hh.tbl->buckets, \
(head)->hh.tbl->num_buckets*sizeof(struct UT_hash_bucket)); \
HASH_BLOOM_FREE((head)->hh.tbl); \
uthash_free((head)->hh.tbl, sizeof(UT_hash_table)); \
(head)=NULL; \
} \
} while(0)
#define HASH_OVERHEAD(hh,head) \
(size_t)((((head)->hh.tbl->num_items * sizeof(UT_hash_handle)) + \
((head)->hh.tbl->num_buckets * sizeof(UT_hash_bucket)) + \
(sizeof(UT_hash_table)) + \
(HASH_BLOOM_BYTELEN)))
#ifdef NO_DECLTYPE
#define HASH_ITER(hh,head,el,tmp) \
for((el)=(head), (*(char**)(&(tmp)))=(char*)((head)?(head)->hh.next:NULL); \

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2008-2011, Troy D. Hanson http://uthash.sourceforge.net
Copyright (c) 2008-2014, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -22,12 +22,11 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* a dynamic string implementation using macros
* see http://uthash.sourceforge.net/utstring
*/
#ifndef UTSTRING_H
#define UTSTRING_H
#define UTSTRING_VERSION 1.9.4
#define UTSTRING_VERSION 1.9.9
#ifdef __GNUC__
#define _UNUSED_ __attribute__ ((__unused__))
@@ -37,6 +36,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
#define oom() exit(-1)
@@ -106,10 +106,10 @@ do { \
#define utstring_concat(dst,src) \
do { \
utstring_reserve(dst,(src->i)+1); \
if (src->i) memcpy(&(dst)->d[(dst)->i], src->d, src->i); \
dst->i += src->i; \
dst->d[dst->i]='\0'; \
utstring_reserve((dst),((src)->i)+1); \
if ((src)->i) memcpy(&(dst)->d[(dst)->i], (src)->d, (src)->i); \
(dst)->i += (src)->i; \
(dst)->d[(dst)->i]='\0'; \
} while(0)
#define utstring_len(s) ((unsigned)((s)->i))
@@ -128,7 +128,7 @@ _UNUSED_ static void utstring_printf_va(UT_string *s, const char *fmt, va_list a
n = vsnprintf (&s->d[s->i], s->n-s->i, fmt, cp);
va_end(cp);
if ((n > -1) && (n < (int)(s->n-s->i))) {
if ((n > -1) && ((size_t) n < (s->n-s->i))) {
s->i += n;
return;
}
@@ -138,6 +138,11 @@ _UNUSED_ static void utstring_printf_va(UT_string *s, const char *fmt, va_list a
else utstring_reserve(s,(s->n)*2); /* 2x */
}
}
#ifdef __GNUC__
/* support printf format checking (2=the format string, 3=start of varargs) */
static void utstring_printf(UT_string *s, const char *fmt, ...)
__attribute__ (( format( printf, 2, 3) ));
#endif
_UNUSED_ static void utstring_printf(UT_string *s, const char *fmt, ...) {
va_list ap;
va_start(ap,fmt);
@@ -145,4 +150,244 @@ _UNUSED_ static void utstring_printf(UT_string *s, const char *fmt, ...) {
va_end(ap);
}
/*******************************************************************************
* begin substring search functions *
******************************************************************************/
/* Build KMP table from left to right. */
_UNUSED_ static void _utstring_BuildTable(
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
i = 0;
j = i - 1;
P_KMP_Table[i] = j;
while (i < (long) P_NeedleLen)
{
while ( (j > -1) && (P_Needle[i] != P_Needle[j]) )
{
j = P_KMP_Table[j];
}
i++;
j++;
if (i < (long) P_NeedleLen)
{
if (P_Needle[i] == P_Needle[j])
{
P_KMP_Table[i] = P_KMP_Table[j];
}
else
{
P_KMP_Table[i] = j;
}
}
else
{
P_KMP_Table[i] = j;
}
}
return;
}
/* Build KMP table from right to left. */
_UNUSED_ static void _utstring_BuildTableR(
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
i = P_NeedleLen - 1;
j = i + 1;
P_KMP_Table[i + 1] = j;
while (i >= 0)
{
while ( (j < (long) P_NeedleLen) && (P_Needle[i] != P_Needle[j]) )
{
j = P_KMP_Table[j + 1];
}
i--;
j--;
if (i >= 0)
{
if (P_Needle[i] == P_Needle[j])
{
P_KMP_Table[i + 1] = P_KMP_Table[j + 1];
}
else
{
P_KMP_Table[i + 1] = j;
}
}
else
{
P_KMP_Table[i + 1] = j;
}
}
return;
}
/* Search data from left to right. ( Multiple search mode. ) */
_UNUSED_ static long _utstring_find(
const char *P_Haystack,
size_t P_HaystackLen,
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
long V_FindPosition = -1;
/* Search from left to right. */
i = j = 0;
while ( (j < (int)P_HaystackLen) && (((P_HaystackLen - j) + i) >= P_NeedleLen) )
{
while ( (i > -1) && (P_Needle[i] != P_Haystack[j]) )
{
i = P_KMP_Table[i];
}
i++;
j++;
if (i >= (int)P_NeedleLen)
{
/* Found. */
V_FindPosition = j - i;
break;
}
}
return V_FindPosition;
}
/* Search data from right to left. ( Multiple search mode. ) */
_UNUSED_ static long _utstring_findR(
const char *P_Haystack,
size_t P_HaystackLen,
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
long V_FindPosition = -1;
/* Search from right to left. */
j = (P_HaystackLen - 1);
i = (P_NeedleLen - 1);
while ( (j >= 0) && (j >= i) )
{
while ( (i < (int)P_NeedleLen) && (P_Needle[i] != P_Haystack[j]) )
{
i = P_KMP_Table[i + 1];
}
i--;
j--;
if (i < 0)
{
/* Found. */
V_FindPosition = j + 1;
break;
}
}
return V_FindPosition;
}
/* Search data from left to right. ( One time search mode. ) */
_UNUSED_ static long utstring_find(
UT_string *s,
long P_StartPosition, /* Start from 0. -1 means last position. */
const char *P_Needle,
size_t P_NeedleLen)
{
long V_StartPosition;
long V_HaystackLen;
long *V_KMP_Table;
long V_FindPosition = -1;
if (P_StartPosition < 0)
{
V_StartPosition = s->i + P_StartPosition;
}
else
{
V_StartPosition = P_StartPosition;
}
V_HaystackLen = s->i - V_StartPosition;
if ( (V_HaystackLen >= (long) P_NeedleLen) && (P_NeedleLen > 0) )
{
V_KMP_Table = (long *)malloc(sizeof(long) * (P_NeedleLen + 1));
if (V_KMP_Table != NULL)
{
_utstring_BuildTable(P_Needle, P_NeedleLen, V_KMP_Table);
V_FindPosition = _utstring_find(s->d + V_StartPosition,
V_HaystackLen,
P_Needle,
P_NeedleLen,
V_KMP_Table);
if (V_FindPosition >= 0)
{
V_FindPosition += V_StartPosition;
}
free(V_KMP_Table);
}
}
return V_FindPosition;
}
/* Search data from right to left. ( One time search mode. ) */
_UNUSED_ static long utstring_findR(
UT_string *s,
long P_StartPosition, /* Start from 0. -1 means last position. */
const char *P_Needle,
size_t P_NeedleLen)
{
long V_StartPosition;
long V_HaystackLen;
long *V_KMP_Table;
long V_FindPosition = -1;
if (P_StartPosition < 0)
{
V_StartPosition = s->i + P_StartPosition;
}
else
{
V_StartPosition = P_StartPosition;
}
V_HaystackLen = V_StartPosition + 1;
if ( (V_HaystackLen >= (long) P_NeedleLen) && (P_NeedleLen > 0) )
{
V_KMP_Table = (long *)malloc(sizeof(long) * (P_NeedleLen + 1));
if (V_KMP_Table != NULL)
{
_utstring_BuildTableR(P_Needle, P_NeedleLen, V_KMP_Table);
V_FindPosition = _utstring_findR(s->d,
V_HaystackLen,
P_Needle,
P_NeedleLen,
V_KMP_Table);
free(V_KMP_Table);
}
}
return V_FindPosition;
}
/*******************************************************************************
* end substring search functions *
******************************************************************************/
#endif /* UTSTRING_H */

View File

@@ -1,97 +1,93 @@
#include <stdio.h>
#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/prctl.h>
#include <assert.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
/* this program listens on a TCP port. when a client connects, it receives the
* binary packed spool frames from the input spool. RR to multiple clients */
#define _GNU_SOURCE
#include <errno.h>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "utarray.h"
#include "utstring.h"
#include "kvspool.h"
#include "kvsp-bconfig.h"
struct {
int verbose;
int port;
int fd; // listening socket
int fa; // socket to client
char *spool;
UT_string *buf;
char *prog;
enum {fan, round_robin} mode;
/* spool stuff */
char *dir;
void *sp;
void *set;
char *config_file; // cast config
/* */
int listener_port;
int listener_fd;
int signal_fd;
int epoll_fd;
int mb_per_client;
UT_array *clients;
UT_array *outbufs;
int rr_idx;
UT_string *s; // scratch
} cfg = {
.mb_per_client=1,
.mode=fan,
};
char discard[1024];
void usage(char *prog) {
fprintf(stderr, "usage: %s [-v] -b <config> -d spool -p <port>\n", prog);
void usage() {
fprintf(stderr,"usage: %s [-v] -p <port> (tcp port to listen on - packet stream)\n"
" -m <mb> (megabytes to buffer to each client)\n"
" -b <conf> (binary cast config file)\n"
" -d <spool> (spool to read)\n"
" -r (round robin mode, [def: fan mode])\n"
"\n", cfg.prog);
exit(-1);
}
int setup_listener() {
int rc=-1;
/* signals that we'll accept via signalfd in epoll */
int sigs[] = {SIGHUP,SIGTERM,SIGINT,SIGQUIT,SIGALRM};
/**********************************************************
* create an IPv4/TCP socket, not yet bound to any address
*********************************************************/
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
/* clean up the client output buffers and slots in fd/buf arrays */
void discard_client_buffers(int pos) {
UT_string **s = (UT_string**)utarray_eltptr(cfg.outbufs,pos);
utstring_free(*s); // deep free string
utarray_erase(cfg.outbufs,pos,1); // erase string pointer
utarray_erase(cfg.clients,pos,1); // erase client descriptor
}
int one=1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
/**********************************************************
* internet socket address structure: our address and port
*********************************************************/
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = htons(port);
/**********************************************************
* bind socket to address and port we'd like to receive on
*********************************************************/
if (bind(fd, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
fprintf(stderr,"bind: %s\n", strerror(errno));
goto done;
void mark_writable() {
/* mark writability-interest for any clients with pending output */
int *fd=NULL;
UT_string **s=NULL;
while ( (fd=(int*)utarray_next(cfg.clients,fd))) {
s=(UT_string**)utarray_next(cfg.outbufs,s); assert(s);
if (utstring_len(*s)) mod_epoll(EPOLLIN|EPOLLOUT, *fd);
}
}
/**********************************************************
* put socket into listening state
*********************************************************/
if (listen(fd,1) == -1) {
fprintf(stderr,"listen: %s\n", strerror(errno));
goto done;
}
rc = 0;
/* clients whose out-buffers exceed thresh. get closed */
void cull_excess() {
int *fd=NULL,pos;
UT_string **s=NULL;
while ( (fd=(int*)utarray_prev(cfg.clients,fd))) {
s=(UT_string**)utarray_prev(cfg.outbufs,s); assert(s);
if (utstring_len(*s) < cfg.mb_per_client*1024*1024) continue;
done:
return rc;
/* this client output buffer is too big. close the client. */
fprintf(stderr,"closing client@buffer max %d/%d mb\n",
utstring_len(*s)/(1024*1024), cfg.mb_per_client);
close(*fd); /* also deletes any epoll */
pos = utarray_eltidx(cfg.clients,fd);
discard_client_buffers(pos);
}
int accept_connection() {
int rc=-1;
struct sockaddr_in cin;
socklen_t cin_sz = sizeof(cin);
fa = accept(fd, (struct sockaddr*)&cin, &cin_sz);
if (fa == -1) {
fprintf(stderr,"accept: %s\n", strerror(errno));
goto done;
}
if (verbose && (sizeof(cin)==cin_sz))
fprintf(stderr, "connection from %s:%d\n",
inet_ntoa(cin.sin_addr), (int)ntohs(cin.sin_port));
rc = 0;
done:
return rc;
}
int set_to_binary(void *set, UT_string *bin) {
@@ -150,71 +146,314 @@ int set_to_binary(void *set, UT_string *bin) {
return rc;
}
int main(int argc, char *argv[]) {
void *sp=NULL;
void *set=NULL;
int c, opt,rc=-1;
size_t sz; void *b;
char *config_file;
set = kv_set_new();
utarray_new(output_keys, &ut_str_icd);
utarray_new(output_defaults, &ut_str_icd);
utarray_new(output_types,&ut_int_icd);
utstring_new(buf);
void append_to_client_buf(UT_string *f) {
UT_string **s;
size_t l;
char *b;
signal(SIGPIPE,SIG_IGN);
b = utstring_body(f);
l = utstring_len(f);
while ( (opt = getopt(argc, argv, "v+d:b:p:")) != -1) {
switch (opt) {
case 'v': verbose++; break;
case 'd': spool=strdup(optarg); break;
case 'b': config_file=strdup(optarg); break;
case 'p': port=atoi(optarg); break;
default: usage(argv[0]); break;
if (cfg.mode == fan) { // send to ALL clients
s=NULL;
while ( (s=(UT_string**)utarray_next(cfg.outbufs,s))) {
utstring_bincpy(*s,b,l);
}
} else { // send to ONE client (cfg.mode == round_robin)
int nb = utarray_len(cfg.outbufs);
int i = (cfg.rr_idx++) % nb;
UT_string **s = (UT_string**)utarray_eltptr(cfg.outbufs,i);
utstring_bincpy(*s,b,l);
}
}
if (spool == NULL) usage(argv[0]);
if (port == 0) usage(argv[0]);
if (parse_config(config_file) < 0) goto done;
sp = kv_spoolreader_new(spool);
if (!sp) goto done;
int check_spools() {
if (utstring_len(cfg.outbufs) == 0) return 0; // no clients connected
size_t n=0;
int rc=-1;
if (setup_listener()) goto done;
if (accept_connection()) goto done;
// TODO multiple clients
// TODO respond to remote close even while waiting on spool read
while (kv_spool_read(sp,set,1) > 0) { /* read til interrupted by signal */
if (set_to_binary(set,buf) < 0) goto done;
b = utstring_body(buf);
sz = utstring_len(buf);
do {
c = write(fa, b, sz);
if (c <= 0) {
fprintf(stderr,"write error: %s\n",c?strerror(errno):"remote close");
goto done;
/* gather as much as we can from the spools, until it blocks or
* we gather a threshhold of total bytes. the threshold is somewhat
* arbitrary, here we overload cfg.mb_per_client as the threshold. */
while (kv_spool_read(cfg.sp,cfg.set,0) > 0) {
if (set_to_binary(cfg.set, cfg.s)) goto done;
append_to_client_buf(cfg.s);
n += utstring_len(cfg.s);
if (n > cfg.mb_per_client*(1024*1024)) break;
}
b += c;
sz -= c;
} while(sz);
// discard any pending response. used remotely for detecting half-open TCP
recv(fa, discard, sizeof(discard), MSG_DONTWAIT);
}
close(fa);
rc = 0;
done:
if (sp) kv_spoolreader_free(sp);
kv_set_free(set);
return rc;
}
int periodic_work() {
int rc = -1;
cull_excess();
if (check_spools()) goto done;
mark_writable();
rc = 0;
done:
return rc;
}
int setup_client_listener() {
int rc = -1;
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
fprintf(stderr,"socket: %s\n", strerror(errno));
goto done;
}
int one=1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = htons(cfg.listener_port);
if (bind(fd, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
fprintf(stderr,"bind: %s\n", strerror(errno));
goto done;
}
if (listen(fd,1) == -1) {
fprintf(stderr,"listen: %s\n", strerror(errno));
goto done;
}
cfg.listener_fd = fd;
rc=0;
done:
if ((rc < 0) && (fd != -1)) close(fd);
return rc;
}
/* flush as much pending output to the client as it can handle. */
void feed_client(int ready_fd, int events) {
int *fd=NULL, rc, pos, rv;
char *buf, tmp[100];
size_t len;
UT_string **s=NULL;
/* find the fd in our list */
while ( (fd=(int*)utarray_next(cfg.clients,fd))) {
s=(UT_string**)utarray_next(cfg.outbufs,s); assert(s);
pos = utarray_eltidx(cfg.clients, fd);
if (ready_fd == *fd) break;
}
assert(fd);
if (cfg.verbose > 1) {
fprintf(stderr, "pollout:%c pollin: %c\n", (events & EPOLLOUT)?'1':'0',
(events & EPOLLIN) ?'1':'0');
}
/* before we write to the client, drain any input or closure */
rv = recv(*fd, tmp, sizeof(tmp), MSG_DONTWAIT);
if (rv == 0) {
fprintf(stderr,"client closed (eof)\n");
close(*fd); /* deletes epoll instances on *fd too */
discard_client_buffers(pos);
return;
}
if ((events & EPOLLOUT) == 0) return;
/* send the pending buffer to the client */
buf = utstring_body(*s);
len = utstring_len(*s); assert(len);
rc = send(*fd, buf, len, MSG_DONTWAIT);
if (cfg.verbose) fprintf(stderr,"sent %d/%d bytes\n", rc, (int)len);
/* test for client closure or error. */
if (rc < 0) {
if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) return;
fprintf(stderr,"client closed (%s)\n", strerror(errno));
close(*fd); /* deletes all epoll instances on *fd too */
discard_client_buffers(pos);
return;
}
/* shift the output buffer; we wrote rc bytes TODO noshift */
if (rc < len) {
memmove((*s)->d, (*s)->d + rc, len-rc);
(*s)->i -= rc;
} else {
utstring_clear(*s); // buffer emptied
mod_epoll(EPOLLIN,*fd); // remove EPOLLOUT
}
}
int new_epoll(int events, int fd) {
int rc;
struct epoll_event ev;
memset(&ev,0,sizeof(ev)); // placate valgrind
ev.events = events;
ev.data.fd= fd;
if (cfg.verbose) fprintf(stderr,"adding fd %d to epoll\n", fd);
rc = epoll_ctl(cfg.epoll_fd, EPOLL_CTL_ADD, fd, &ev);
if (rc == -1) {
fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
}
return rc;
}
int mod_epoll(int events, int fd) {
int rc;
struct epoll_event ev;
memset(&ev,0,sizeof(ev)); // placate valgrind
ev.events = events;
ev.data.fd= fd;
if (cfg.verbose) fprintf(stderr,"modding fd %d epoll\n", fd);
rc = epoll_ctl(cfg.epoll_fd, EPOLL_CTL_MOD, fd, &ev);
if (rc == -1) {
fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
}
return rc;
}
int handle_signal() {
int rc=-1;
struct signalfd_siginfo info;
if (read(cfg.signal_fd, &info, sizeof(info)) != sizeof(info)) {
fprintf(stderr,"failed to read signal fd buffer\n");
goto done;
}
switch(info.ssi_signo) {
case SIGALRM:
if (periodic_work()) goto done;
alarm(1);
break;
default:
fprintf(stderr,"got signal %d\n", info.ssi_signo);
goto done;
break;
}
rc = 0;
done:
return rc;
}
int accept_client() {
int rc = -1;
struct sockaddr_in cin;
socklen_t cin_sz = sizeof(cin);
int fd = accept(cfg.listener_fd,(struct sockaddr*)&cin, &cin_sz);
if (fd == -1) {
fprintf(stderr,"accept: %s\n", strerror(errno));
goto done;
}
if (sizeof(cin)==cin_sz) fprintf(stderr,"connection from %s:%d\n",
inet_ntoa(cin.sin_addr), (int)ntohs(cin.sin_port));
utarray_push_back(cfg.clients,&fd);
/* set up client output buffer. reserve space for full buffer */
UT_string *s; utstring_new(s); utstring_reserve(s,cfg.mb_per_client*1024*1024);
utarray_push_back(cfg.outbufs,&s);
new_epoll(EPOLLIN, fd);
rc=0;
done:
return rc;
}
int main(int argc, char *argv[]) {
int opt, rc, n, *fd;
cfg.prog = argv[0];
utarray_new(cfg.clients,&ut_int_icd);
utarray_new(cfg.outbufs,&ut_ptr_icd);
cfg.set = kv_set_new();
struct epoll_event ev;
UT_string **s;
utstring_new(cfg.s);
utarray_new(output_keys, &ut_str_icd);
utarray_new(output_defaults, &ut_str_icd);
utarray_new(output_types,&ut_int_icd);
while ( (opt=getopt(argc,argv,"vb:p:m:d:rh")) != -1) {
switch(opt) {
case 'v': cfg.verbose++; break;
case 'p': cfg.listener_port=atoi(optarg); break;
case 'm': cfg.mb_per_client=atoi(optarg); break;
case 'd': cfg.dir=strdup(optarg); break;
case 'r': cfg.mode=round_robin; break;
case 'b': cfg.config_file=strdup(optarg); break;
case 'h': default: usage(); break;
}
}
if (cfg.listener_port==0) usage();
if (setup_client_listener()) goto done;
if (cfg.config_file==NULL) goto done;
if (parse_config(cfg.config_file) < 0) goto done;
if ( !(cfg.sp = kv_spoolreader_new(cfg.dir))) goto done;
/* block all signals. we take signals synchronously via signalfd */
sigset_t all;
sigfillset(&all);
sigprocmask(SIG_SETMASK,&all,NULL);
/* a few signals we'll accept via our signalfd */
sigset_t sw;
sigemptyset(&sw);
for(n=0; n < sizeof(sigs)/sizeof(*sigs); n++) sigaddset(&sw, sigs[n]);
/* create the signalfd for receiving signals */
cfg.signal_fd = signalfd(-1, &sw, 0);
if (cfg.signal_fd == -1) {
fprintf(stderr,"signalfd: %s\n", strerror(errno));
goto done;
}
/* set up the epoll instance */
cfg.epoll_fd = epoll_create(1);
if (cfg.epoll_fd == -1) {
fprintf(stderr,"epoll: %s\n", strerror(errno));
goto done;
}
/* add descriptors of interest */
if (new_epoll(EPOLLIN, cfg.listener_fd)) goto done; // new client connections
if (new_epoll(EPOLLIN, cfg.signal_fd)) goto done; // signal socket
alarm(1);
while (epoll_wait(cfg.epoll_fd, &ev, 1, -1) > 0) {
if (cfg.verbose > 1) fprintf(stderr,"epoll reports fd %d\n", ev.data.fd);
if (ev.data.fd == cfg.signal_fd) { if (handle_signal() < 0) goto done; }
else if (ev.data.fd == cfg.listener_fd) { if (accept_client() < 0) goto done; }
else feed_client(ev.data.fd, ev.events);
}
done:
/* free the clients: close and deep free their buffers */
fd=NULL; s=NULL;
while ( (fd=(int*)utarray_prev(cfg.clients,fd))) {
s=(UT_string**)utarray_prev(cfg.outbufs,s);
close(*fd);
utstring_free(*s);
}
utarray_free(cfg.clients);
utarray_free(cfg.outbufs);
utarray_free(output_keys);
utarray_free(output_defaults);
utarray_free(output_types);
utstring_free(buf);
utstring_free(cfg.s);
if (cfg.listener_fd) close(cfg.listener_fd);
if (cfg.signal_fd) close(cfg.signal_fd);
if (cfg.sp) kv_spoolreader_free(cfg.sp);
if (cfg.set) kv_set_free(cfg.set);
return 0;
}