RR uses least-full client

This commit is contained in:
Troy D. Hanson
2014-10-22 18:46:11 -04:00
parent d753ce95a7
commit fe970bd6a0

View File

@@ -72,24 +72,6 @@ void mark_writable() {
}
}
/* clients whose out-buffers exceed thresh. get closed */
void cull_excess() {
int *fd=NULL,pos;
UT_string **s=NULL;
while ( (fd=(int*)utarray_prev(cfg.clients,fd))) {
s=(UT_string**)utarray_prev(cfg.outbufs,s); assert(s);
if (utstring_len(*s) < cfg.mb_per_client*1024*1024) continue;
/* this client output buffer is too big. close the client. */
fprintf(stderr,"closing client@buffer max %d/%d mb\n",
utstring_len(*s)/(1024*1024), cfg.mb_per_client);
close(*fd); /* also deletes any epoll */
pos = utarray_eltidx(cfg.clients,fd);
discard_client_buffers(pos);
}
}
int set_to_binary(void *set, UT_string *bin) {
uint32_t l, u, a,b,c,d, abcd;
uint16_t s;
@@ -147,52 +129,53 @@ int set_to_binary(void *set, UT_string *bin) {
}
void append_to_client_buf(UT_string *f) {
UT_string **s;
size_t l;
assert(utarray_len(cfg.outbufs) > 0);
UT_string **s=NULL;
size_t l,least,c;
char *b;
int i=0,lx;
b = utstring_body(f);
l = utstring_len(f);
if (cfg.mode == fan) { // send to ALL clients
s=NULL;
while ( (s=(UT_string**)utarray_next(cfg.outbufs,s))) {
switch(cfg.mode) {
case fan: // send to ALL clients
while ( (s=(UT_string**)utarray_next(cfg.outbufs,s))) {
utstring_bincpy(*s,b,l);
}
break;
case round_robin: // send to ONE client
while ( (s=(UT_string**)utarray_next(cfg.outbufs,s))) {
c = utstring_len(*s);
if ((i==0) || (c < least)) {least=c; lx=i;}
i++;
}
s = (UT_string**)utarray_eltptr(cfg.outbufs,lx);
utstring_bincpy(*s,b,l);
}
} else { // send to ONE client (cfg.mode == round_robin)
int nb = utarray_len(cfg.outbufs);
int i = (cfg.rr_idx++) % nb;
UT_string **s = (UT_string**)utarray_eltptr(cfg.outbufs,i);
utstring_bincpy(*s,b,l);
break;
}
}
int check_spools() {
if (utstring_len(cfg.outbufs) == 0) return 0; // no clients connected
size_t n=0;
int rc=-1;
/* gather as much as we can from the spools, until it blocks or
* we gather a threshhold of total bytes. the threshold is somewhat
* arbitrary, here we overload cfg.mb_per_client as the threshold. */
while (kv_spool_read(cfg.sp,cfg.set,0) > 0) {
if (set_to_binary(cfg.set, cfg.s)) goto done;
append_to_client_buf(cfg.s);
n += utstring_len(cfg.s);
if (n > cfg.mb_per_client*(1024*1024)) break;
}
rc = 0;
done:
return rc;
/* used to stop reading the spool when internal buffers are 90% full */
int have_capacity() {
size_t max = utarray_len(cfg.outbufs) * cfg.mb_per_client * (1024*1024);
size_t used=0;
UT_string **s=NULL;
while ( (s=(UT_string**)utarray_next(cfg.outbufs,s))) used += utstring_len(*s);
double pct_full = max ? (used*100.0/max) : 100;
return (pct_full > 90) ? 0 : 1;
}
int periodic_work() {
int rc = -1;
int rc = -1, kc;
cull_excess();
if (check_spools()) goto done;
while (have_capacity()) {
kc = kv_spool_read(cfg.sp,cfg.set,0);
if (kc < 0) goto done; // error
if (kc == 0) break; // no data
if (set_to_binary(cfg.set, cfg.s)) goto done;
append_to_client_buf(cfg.s);
}
mark_writable();
rc = 0;