This commit is contained in:
Troy D. Hanson
2016-06-26 10:44:59 -04:00
parent 2826e03e7a
commit 4456e62bda
75 changed files with 87 additions and 1936 deletions

22
LICENSE Normal file
View File

@@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2011-2016 Troy D. Hanson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,31 +0,0 @@
LICENSE AND DISCLAIMER
Copyright (c) 2011 The Johns Hopkins University/Applied Physics Laboratory
This software was developed at The Johns Hopkins University/Applied Physics
Laboratory ("JHU/APL") that is the author thereof under the “work made for
hire” provisions of the copyright law. Permission is hereby granted, free of
charge, to any person obtaining a copy of this software and associated
documentation (the “Software”), to use the Software without restriction,
including without limitation the rights to copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to permit
others to do so, subject to the following conditions:
1. This LICENSE AND DISCLAIMER, including the copyright notice, shall be
included in all copies of the Software, including copies of substantial
portions of the Software;
2. JHU/APL assumes no obligation to provide support of any kind with regard
to the Software. This includes no obligation to provide assistance in using
the Software nor to provide updated versions of the Software; and
3. THE SOFTWARE AND ITS DOCUMENTATION ARE PROVIDED AS IS AND WITHOUT ANY
EXPRESS OR IMPLIED WARRANTIES WHATSOEVER. ALL WARRANTIES INCLUDING, BUT NOT
LIMITED TO, PERFORMANCE, MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
AND NONINFRINGEMENT ARE HEREBY DISCLAIMED. USERS ASSUME THE ENTIRE RISK AND
LIABILITY OF USING THE SOFTWARE. USERS ARE ADVISED TO TEST THE SOFTWARE
THOROUGHLY BEFORE RELYING ON IT. IN NO EVENT SHALL THE JOHNS HOPKINS
UNIVERSITY BE LIABLE FOR ANY DAMAGES WHATSOEVER, INCLUDING, WITHOUT
LIMITATION, ANY LOST PROFITS, LOST SAVINGS OR OTHER INCIDENTAL OR
CONSEQUENTIAL DAMAGES, ARISING OUT OF THE USE OR INABILITY TO USE THE
SOFTWARE.”

View File

@@ -1,11 +1 @@
SUBDIRS = src utils
if HAVE_PYTHON
if HAVE_SSL
SUBDIRS += kvpy
endif
endif
if HAVE_PERL
SUBDIRS += kvperl
endif

1
autogen.sh Executable file
View File

@@ -0,0 +1 @@
autoreconf -ifv

View File

@@ -1,11 +0,0 @@
#!/bin/sh
# THIS SCRIPT IS FOR PROJECT MAINTAINER ONLY
# It is executed only to generate "configure"
set -x
if [ ! -d config ]; then mkdir config; fi
aclocal -I config
autoheader
automake --foreign --add-missing --copy
autoconf

View File

@@ -33,24 +33,6 @@ AC_CHECK_LIB(rdkafka,rd_kafka_new,
AM_CONDITIONAL(HAVE_RDKAFKA,true),
AM_CONDITIONAL(HAVE_RDKAFKA,false))
# is libnnctl installed
AC_CHECK_LIB(nnctl,nnctl_exec,
AM_CONDITIONAL(HAVE_NNCTL,true),
AM_CONDITIONAL(HAVE_NNCTL,false), -lnanomsg)
# is SSL installed. Not sure why Python build requires
AC_CHECK_LIB(ssl,SSL_library_init,
AM_CONDITIONAL(HAVE_SSL,true),
AM_CONDITIONAL(HAVE_SSL,false))
AX_PYTHON_DEVEL(>= '2.4')
AM_CONDITIONAL(HAVE_PYTHON,test "x$pythonexists" = "xyes")
AC_CHECK_PROG(PERL,perl,perl)
AX_PROG_PERL_VERSION([5.10.1],
AM_CONDITIONAL(HAVE_PERL,test "x$PERL" != "x"),
AM_CONDITIONAL(HAVE_PERL,false))
AC_CONFIG_FILES(Makefile src/Makefile utils/Makefile)
AC_OUTPUT

View File

@@ -1,11 +0,0 @@
asciidoc -a toc2 kvspool.txt
cp kvspool.html *.png /tmp
cd ..
git checkout gh-pages
#git clean -d -f
cp /tmp/kvspool.html index.html
cp /tmp/*.png .
git add index.html *.png
git commit -m "page update"
git push

View File

@@ -1,8 +0,0 @@
kvspool was developed in 2011 by Troy D. Hanson
Special thanks to:
JHU/APL OTT
Trevor Adams
JT Halbert
Jeff James
Nick Clote

View File

@@ -1,31 +1,22 @@
LICENSE AND DISCLAIMER
The MIT License (MIT)
Copyright (c) 2011 The Johns Hopkins University/Applied Physics Laboratory
Copyright (c) 2011-2016 Troy D. Hanson
This software was developed at The Johns Hopkins University/Applied Physics
Laboratory ("JHU/APL") that is the author thereof under the "work made for
hire" provisions of the copyright law. Permission is hereby granted, free of
charge, to any person obtaining a copy of this software and associated
documentation (the "Software"), to use the Software without restriction,
including without limitation the rights to copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to permit
others to do so, subject to the following conditions:
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
1. This LICENSE AND DISCLAIMER, including the copyright notice, shall be
included in all copies of the Software, including copies of substantial
portions of the Software;
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
2. JHU/APL assumes no obligation to provide support of any kind with regard
to the Software. This includes no obligation to provide assistance in using
the Software nor to provide updated versions of the Software; and
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
3. THE SOFTWARE AND ITS DOCUMENTATION ARE PROVIDED AS IS AND WITHOUT ANY
EXPRESS OR IMPLIED WARRANTIES WHATSOEVER. ALL WARRANTIES INCLUDING, BUT NOT
LIMITED TO, PERFORMANCE, MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
AND NONINFRINGEMENT ARE HEREBY DISCLAIMED. USERS ASSUME THE ENTIRE RISK AND
LIABILITY OF USING THE SOFTWARE. USERS ARE ADVISED TO TEST THE SOFTWARE
THOROUGHLY BEFORE RELYING ON IT. IN NO EVENT SHALL THE JOHNS HOPKINS
UNIVERSITY BE LIABLE FOR ANY DAMAGES WHATSOEVER, INCLUDING, WITHOUT
LIMITATION, ANY LOST PROFITS, LOST SAVINGS OR OTHER INCIDENTAL OR
CONSEQUENTIAL DAMAGES, ARISING OUT OF THE USE OR INABILITY TO USE THE
SOFTWARE.

View File

@@ -1,78 +0,0 @@
Design concepts for "v2" rewrite of kvspool
-------------------------------------------
1. Support multi-writer, multi-reader from same spool
2. Use a memory-mapped file for reading/writing spool data so that:
(1) I/O occurs through shared memory even without a ramdisk, while
(2) data is still persisted back to disk
3. Support for multi-writers requires a synchronization mechanism.
(1) This is one of the functions of the "control file".
(a) this file exists alongside the spool data file
(b) by flock'ing it (or fcntl lock on a region of it), one writer
can gain exclusive write (which applies to the spool data file too);
a second level of record-locking using fcntl lock on the spool data
file can act as a redundant safeguard
(c) the control file has the min and max sequence number in it
(d) the max sequence number is just the "frame number" of the next
frame to be written
(e) the min sequence number is incremented (sometimes by an increment
greater than one) when the writer is overwriting previous frame(s).
It's purpose is explained under the "Support for multi-readers" later.
(f) The offset of the min and max frames are also stored
(g) The control block may also contain a few time-series on write rates.
(i) It would also be possible to place the control file into the data
spool itself, in which case its a "control block" of fixed size
at the beginning; this would eliminate some failure modes and
reduce the file descriptor bookkeeping by one
4. Spool data file is a single, large, circular data buffer
(a) It is pre-created prior to data being written to reserve the space
(b) This requires that it be a non-sparse file
(c) It is used as a cyclic buffer
(d) When the end is reached, a new frame may not quite fit at the end,
in which case the frame starts at the beginning of the file; but
this requires that the frame's content-length may differ from its
stored length (so that the frame that ends up at the end of the
buffer can be adjusted to consume the full remaining space).
(e) Thus the frame format is
(1) sequence number
(2) storage length
(3) content length
(4) data (in JSON)
(f) The single large data file replaces the kvspool-v1 approach
where ten sequenced files contain the spool data, and old files
are deleted as new files are written. The v1 logic requires
detection of new files in the spool, although its advantegous
in that read/write through standard (non-mmap) calls does not
swap in the entire data spool as the v2 approach may tend to do
5. Support for multi-readers
(1) since readers that are inactive for a long time may get to the point
that their next read position is potentially invalid (due to a
writer wraparound that puts a new frame into the read area),
(a) the reader that is entering the 'read' state will first
lock the control file, acquire the minimum sequence number
to see if its exceeded its own read position
(i) If it has, then the reader has experienced frame loss
and adjusts its next-read-position to the min frame
(ii) if not, the reader can then record-lock the spool
data and read the next frame
(iii) note that if the max sequence number is the same as
the read position, then the reader needs to block
(by placing inotify on the control file, unlocking
and going into a select/epoll).
(2) If reader needs persistence for its read position it should
store its own sequence number and identifier in the spool dir
6. Key repetition
(1) if every frame tends to repeat the same keys or a subset of a
relatively small set of keys (as is typically expected) then
the keys are highly redundant and suitable for compression
(2) one option is to use indexes instead of the keys themselves;
seperately a key-store would be maintained with a table
(into which the index points) whose value is the offset on
disk of the key itself. Adds complexity but saves a lot of
space. Alternatively some kind of run time compression on
the frames is possible particularly if some kind of frame
history is kept (e.g. as with video, key frames at intervals
would keep the whole keys, into which the indexes would point
for subsequent frames; this would complicate the cyclic
wraparound logic for recycling old frames by pushing it to
key-frame boundaries

View File

@@ -4,7 +4,7 @@
<head>
<meta http-equiv="Content-Type" content="application/xhtml+xml; charset=UTF-8" />
<meta name="generator" content="AsciiDoc 8.6.9" />
<title>kvspool: a tool for data streams</title>
<title>kvspool data stream tools</title>
<style type="text/css">
/* Shared CSS for AsciiDoc xhtml11 and html5 backends */
@@ -767,7 +767,7 @@ asciidoc.install(2);
</head>
<body class="article">
<div id="header">
<h1>kvspool: a tool for data streams</h1>
<h1>kvspool data stream tools</h1>
<span id="author">Troy D. Hanson</span><br />
<span id="email"><code>&lt;<a href="mailto:tdh@tkhanson.net">tdh@tkhanson.net</a>&gt;</code></span><br />
<div id="toc">
@@ -778,6 +778,8 @@ asciidoc.install(2);
<div id="content">
<div id="preamble">
<div class="sectionbody">
<div class="paragraph"><p>Back to the <a href="https://github.com/troydhanson/kvspool">kvspool Github page</a>. Back to
<a href="http://troydhanson.github.io">my other software</a>.</p></div>
<div class="dlist"><dl>
<dt class="hdlist1">
kv-spool ("key-value" spool)
@@ -803,26 +805,6 @@ when it&#8217;s caught up, waiting for more data. Like this,</p></div>
<div class="paragraph"><p><span class="image">
<img src="reader-writer.png" alt="A spool writer and reader" />
</span></p></div>
<div class="paragraph"><div class="title">Sneak peak</div><p>Here&#8217;s a sneak peak at a really simple writer and reader:</p></div>
<div class="listingblock">
<div class="title">Sample Python writer</div>
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
<pre><tt><span style="font-weight: bold"><span style="color: #000080">import</span></span> kvspool
kv <span style="color: #990000">=</span> kvspool<span style="color: #990000">.</span><span style="font-weight: bold"><span style="color: #000000">Kvspool</span></span><span style="color: #990000">(</span><span style="color: #FF0000">"spool"</span><span style="color: #990000">)</span>
d <span style="color: #990000">=</span> <span style="color: #990000">{</span><span style="color: #FF0000">"day"</span><span style="color: #990000">:</span> <span style="color: #FF0000">"Wed"</span><span style="color: #990000">,</span> <span style="color: #FF0000">"temp"</span><span style="color: #990000">:</span> <span style="color: #993399">37</span><span style="color: #990000">}</span>
kv<span style="color: #990000">.</span><span style="font-weight: bold"><span style="color: #000000">write</span></span><span style="color: #990000">(</span>d<span style="color: #990000">)</span></tt></pre></div></div>
<div class="listingblock">
<div class="title">Sample Perl reader</div>
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
<pre><tt><span style="font-weight: bold"><span style="color: #0000FF">use</span></span> KVSpool<span style="color: #990000">;</span>
<span style="font-weight: bold"><span style="color: #0000FF">my</span></span> <span style="color: #009900">$kv</span> <span style="color: #990000">=</span> KVSpool<span style="color: #990000">-&gt;</span><span style="font-weight: bold"><span style="color: #000000">new</span></span><span style="color: #990000">(</span><span style="color: #FF0000">"spool"</span><span style="color: #990000">);</span>
<span style="font-weight: bold"><span style="color: #0000FF">my</span></span> <span style="color: #009900">$d</span> <span style="color: #990000">=</span> <span style="color: #009900">$kv</span><span style="color: #990000">-&gt;</span><span style="font-weight: bold"><span style="color: #0000FF">read</span></span><span style="color: #990000">();</span></tt></pre></div></div>
<div class="sidebarblock">
<div class="content">
<div class="title">Why did I write kvspool?</div>
@@ -916,24 +898,12 @@ The reader and writer are completely uninvolved in the replication process.</p><
<div class="paragraph"><p><span class="image">
<img src="pub-sub.png" alt="Publish and Subscribe" />
</span></p></div>
<div class="admonitionblock">
<table><tr>
<td class="icon">
<div class="title">Tip</div>
</td>
<td class="content">A job manager such as the author&#8217;s <a href="http://troydhanson.github.com/pmtr/">pmtr process
monitor</a> can be used to run <code>kvsp-sub</code> and <code>kvsp-pub</code> in the background, and restart
them when the system reboots.</td>
</tr></table>
</div>
<div class="paragraph"><p>You can run <code>kvsp-sub</code> and <code>kvsp-pub</code> in the background at system boot by setting
them up as services or <a href="http://troydhanson.github.com/pmtr/">pmtr</a> jobs.</p></div>
</div>
<div class="sect2">
<h3 id="_license">License</h3>
<div class="paragraph"><p>See the <a href="LICENSE.txt">LICENSE.txt</a> file. Kvspool is free and open source.</p></div>
</div>
<div class="sect2">
<h3 id="_resources">Resources</h3>
<div class="paragraph"><p>Additional software by the author is cataloged at <a href="http://troydhanson.github.io/">http://troydhanson.github.io/</a>.</p></div>
<div class="paragraph"><p>Kvspool uses the MIT license.</p></div>
</div>
</div>
</div>
@@ -945,21 +915,20 @@ them when the system reboots.</td>
<div class="content">
<pre><code>% git clone git://github.com/troydhanson/kvspool.git</code></pre>
</div></div>
<div class="paragraph"><p>To build kvspool:</p></div>
<div class="paragraph"><p>To build and install kvspool, you need autotools installed. The configure script looks
to see if optional libraries including ZeroMQ, Nanomsg, Jansson and librdkafka re
installed. It builds additional utilities if they are present.</p></div>
<div class="literalblock">
<div class="content">
<pre><code>% cd kvspool</code></pre>
</div></div>
<div class="paragraph"><p>If the <em>configure</em> script does not yet exist, run <code>./bootstrap</code>. If you want to build the
network replication utilities, make sure you have <strong>Jansson</strong> and <strong>ZeroMQ</strong> (2.x or 3.x).</p></div>
<div class="literalblock">
<div class="content">
<pre><code>% ./configure
<pre><code>% ./autogen.sh
% ./configure
% make
% sudo make install</code></pre>
</div></div>
<div class="paragraph"><p>This builds and installs the C library and utilities, and if the prerequisite packages
are installed, it builds the Perl, Python and Java bindings, and ZeroMQ-based utilities.</p></div>
</div>
</div>
<div class="sect1">
@@ -1069,6 +1038,10 @@ cellspacing="0" cellpadding="4">
<td align="left" valign="top"><p class="table"><code>kvsp-bsub -b cast.cfg -d spool tcp://192.168.1.9:2110</code></p></td>
</tr>
<tr>
<td align="left" valign="top"><p class="table"><code>kvsp-kkpub</code></p></td>
<td align="left" valign="top"><p class="table"><code>kvsp-kkpub -b kafka.host.name -t topic</code></p></td>
</tr>
<tr>
<td align="left" valign="top"><p class="table"><code>kvsp-tpub</code></p></td>
<td align="left" valign="top"><p class="table"><code>kvsp-tpub -b cast.cfg -d spool -p 2110</code></p></td>
</tr>
@@ -1117,6 +1090,11 @@ d64 // double (64-bit float)</code></pre>
</div></div>
</div>
<div class="sect3">
<h4 id="_kvsp_kkpub">kvsp-kkpub</h4>
<div class="paragraph"><p>Publishes the spool in JSON encoding to a Kakfa topic. This tool requires librdkakfa and
Jansson to be installed.</p></div>
</div>
<div class="sect3">
<h4 id="_kvsp_tpub">kvsp-tpub</h4>
<div class="paragraph"><p>Finally there is a "plain TCP" binary publisher. It has no subscriber counterpart yet, so
you have to code your own subscriber to use it. It takes a cast.cfg of the same form as above.
@@ -1191,91 +1169,11 @@ convenient to locate a spool on a ramdisk for performance.</p></div>
</div>
</div>
<div class="sect1">
<h2 id="_examples_amp_api">Examples &amp; API</h2>
<h2 id="_api">API</h2>
<div class="sectionbody">
<div class="sect2">
<h3 id="_perl">Perl</h3>
<div class="listingblock">
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
<pre><tt><span style="font-weight: bold"><span style="color: #0000FF">use</span></span> KVSpool<span style="color: #990000">;</span>
<span style="font-weight: bold"><span style="color: #0000FF">my</span></span> <span style="color: #009900">$v</span> <span style="color: #990000">=</span> KVSpool<span style="color: #990000">-&gt;</span><span style="font-weight: bold"><span style="color: #000000">new</span></span><span style="color: #990000">(</span><span style="color: #FF0000">"spool"</span><span style="color: #990000">);</span>
<span style="font-weight: bold"><span style="color: #0000FF">my</span></span> <span style="color: #009900">$h</span> <span style="color: #990000">=</span> <span style="color: #FF0000">{</span><span style="color: #FF0000">'day'</span> <span style="color: #990000">=&gt;</span> <span style="color: #FF0000">'Wednesday'</span><span style="color: #990000">,</span> <span style="color: #FF0000">'user'</span> <span style="color: #990000">=&gt;</span> <span style="color: #FF0000">'Troy'</span><span style="color: #FF0000">}</span><span style="color: #990000">;</span>
<span style="color: #009900">$v</span><span style="color: #990000">-&gt;</span><span style="font-weight: bold"><span style="color: #0000FF">write</span></span><span style="color: #990000">(</span><span style="color: #009900">$h</span><span style="color: #990000">);</span></tt></pre></div></div>
<div class="paragraph"><p>In Perl the KVSpool object is instantiated with one directory argument. Then every time
the <code>write</code> method is called with a hash reference as argument, it&#8217;s written to the spool.
To read a spool, use the <code>read</code> method which returns a hash reference each times it&#8217;s
called:</p></div>
<div class="literalblock">
<div class="content">
<pre><code>my $h = $v-&gt;read();</code></pre>
</div></div>
<div class="paragraph"><p>The reader is normally blocking (it waits for data if none is immediately available).
To enact non-blocking mode, use <code>$v-&gt;{blocking}=0;</code>, then test the reference returned from
<code>read()</code> using <code>defined()</code>. If it&#8217;s not, no data is currently available in the spool.</p></div>
</div>
<div class="sect2">
<h3 id="_python">Python</h3>
<div class="listingblock">
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
<pre><tt><span style="font-weight: bold"><span style="color: #000080">import</span></span> kvspool
kv <span style="color: #990000">=</span> kvspool<span style="color: #990000">.</span><span style="font-weight: bold"><span style="color: #000000">Kvspool</span></span><span style="color: #990000">(</span><span style="color: #FF0000">"spool"</span><span style="color: #990000">)</span>
d <span style="color: #990000">=</span> <span style="color: #990000">{</span><span style="color: #FF0000">"day"</span><span style="color: #990000">:</span><span style="color: #FF0000">"Wednesday"</span><span style="color: #990000">,</span><span style="color: #FF0000">"user"</span><span style="color: #990000">:</span><span style="color: #FF0000">"Troy"</span><span style="color: #990000">}</span>
kv<span style="color: #990000">.</span><span style="font-weight: bold"><span style="color: #000000">write</span></span><span style="color: #990000">(</span>d<span style="color: #990000">)</span></tt></pre></div></div>
<div class="paragraph"><p>The <code>write</code> method takes a dictionary, while the <code>read</code> method returns a dictionary:</p></div>
<div class="literalblock">
<div class="content">
<pre><code>d = kv.read()</code></pre>
</div></div>
<div class="paragraph"><p>To enact non-blocking mode on the reader, use <code>kv.blocking = 0</code> then test whether the
object returned from <code>read()</code> is <code>None</code>. If so, no data is currently available in the spool.</p></div>
</div>
<div class="sect2">
<h3 id="_java">Java</h3>
<div class="listingblock">
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
<pre><tt><span style="font-weight: bold"><span style="color: #0000FF">public</span></span> <span style="font-weight: bold"><span style="color: #0000FF">class</span></span> <span style="color: #008080">TestKVJava</span> <span style="color: #FF0000">{</span>
<span style="font-weight: bold"><span style="color: #0000FF">public</span></span> <span style="font-weight: bold"><span style="color: #0000FF">static</span></span> <span style="color: #009900">void</span> <span style="font-weight: bold"><span style="color: #000000">main</span></span><span style="color: #990000">(</span><span style="color: #008080">String</span> args<span style="color: #990000">[])</span> <span style="color: #FF0000">{</span>
<span style="color: #008080">KVJava</span> kv <span style="color: #990000">=</span> <span style="font-weight: bold"><span style="color: #0000FF">new</span></span> <span style="font-weight: bold"><span style="color: #000000">KVJava</span></span><span style="color: #990000">(</span><span style="color: #FF0000">"spool"</span><span style="color: #990000">);</span>
<span style="color: #008080">HashMap&lt;String,String&gt;</span> h <span style="color: #990000">=</span> <span style="font-weight: bold"><span style="color: #0000FF">new</span></span> HashMap<span style="color: #990000">&lt;</span>String<span style="color: #990000">,</span>String<span style="color: #990000">&gt;();</span>
h<span style="color: #990000">.</span><span style="font-weight: bold"><span style="color: #000000">put</span></span><span style="color: #990000">(</span><span style="color: #FF0000">"day"</span><span style="color: #990000">,</span> <span style="color: #FF0000">"Wednesday"</span><span style="color: #990000">);</span>
h<span style="color: #990000">.</span><span style="font-weight: bold"><span style="color: #000000">put</span></span><span style="color: #990000">(</span><span style="color: #FF0000">"user"</span><span style="color: #990000">,</span> <span style="color: #FF0000">"Troy"</span><span style="color: #990000">);</span>
kv<span style="color: #990000">.</span><span style="font-weight: bold"><span style="color: #000000">write</span></span><span style="color: #990000">(</span>h<span style="color: #990000">);</span>
<span style="color: #FF0000">}</span>
<span style="color: #FF0000">}</span></tt></pre></div></div>
<div class="paragraph"><p>The example first instantiates a <code>KVJava</code> object. (You can copy <code>KVJava.java</code> from the
<code>kvjava</code> directory into your own source tree; this class loads the underlying JNI-based
library, which must be built and installed as described below).</p></div>
<div class="paragraph"><p>To write to the spool, pass a <code>HashMap&lt;String,String&gt;</code> to the <code>write</code> method. The <code>read</code>
method is similar:</p></div>
<div class="literalblock">
<div class="content">
<pre><code>HashMap&lt;String,String&gt; m;
m = kv.read();</code></pre>
</div></div>
<div class="paragraph"><p>You can put the reader in non-blocking mode by setting <code>kv.blocking = false;</code>
and then test whether the object returned from <code>read()</code> is equal to <code>null</code>. If so,
no data is currently available in the spool.</p></div>
<div class="paragraph"><div class="title">Building the JNI library</div><p>The Java binding is not yet integrated into the main build, so it requires extra steps to
build. After building kvspool, <code>cd kvjava</code> and run <code>make</code>. Most likely you will need to
edit the <code>kvjava/Makefile</code> lines where <code>JNIINC</code> and <code>JNILIB</code> are defined to make it work.
The result of compilation is <code>libKVJava.so</code> which must be manually copied to your choice
of library directory such as <code>/usr/local/lib</code> (You may also need to set <code>LD_LIBRARY_PATH</code>
so that the dynamic linker can find this shared library it at runtime).</p></div>
</div>
<div class="sect2">
<h3 id="_c_c">C/C++</h3>
<div class="paragraph"><p>C programs must be linked with -lkvspool.</p></div>
<div class="listingblock">
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
<div class="content"><!-- Generator: GNU source-highlight 3.1.8
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
@@ -1304,7 +1202,7 @@ set to stderr using <code>kv_set_dump(set,stderr)</code>.</p></div>
<div class="paragraph"><p>To open a spool for reading, call <code>kv_spoolreader_new</code> which takes the spool directory and
returns an opaque handle to the spool. Then call <code>kv_spool_read</code> to read the spool.</p></div>
<div class="listingblock">
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
<div class="content"><!-- Generator: GNU source-highlight 3.1.8
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
@@ -1317,7 +1215,7 @@ spool and it&#8217;s been populated into the set). A zero value means that non-
was used, but no data is currently available in the spool.</p></div>
<div class="paragraph"><p>A C program can iterate through all the key-value pairs in the result set like this:</p></div>
<div class="listingblock">
<div class="content"><!-- Generator: GNU source-highlight 3.1.6
<div class="content"><!-- Generator: GNU source-highlight 3.1.8
by Lorenzo Bettini
http://www.lorenzobettini.it
http://www.gnu.org/software/src-highlite -->
@@ -1346,18 +1244,11 @@ has the spool open at the time. It takes the spool directory as its only argumen
</div>
</div>
</div>
<div class="sect1">
<h2 id="_acknowledgments">Acknowledgments</h2>
<div class="sectionbody">
<div class="paragraph"><p>Thanks to Trevor Adams for writing the original Perl and Java bindings and to
<a href="http://jhuapl.edu/ott">JHU/APL OTT</a> for their support.</p></div>
</div>
</div>
</div>
<div id="footnotes"><hr /></div>
<div id="footer">
<div id="footer-text">
Last updated 2015-08-16 15:12:26 EDT
Last updated
2016-06-26 10:36:51 EDT
</div>
</div>
</body>

View File

@@ -1,7 +1,10 @@
kvspool: a tool for data streams
================================
kvspool data stream tools
=========================
Troy D. Hanson <tdh@tkhanson.net>
Back to the https://github.com/troydhanson/kvspool[kvspool Github page]. Back to
http://troydhanson.github.io[my other software].
kv-spool ("key-value" spool)::
a Linux-based C library to stream data between programs as key-value dictionaries.
It has tools to pipe the streams around, such as by network pub/sub or tee, to
@@ -18,22 +21,6 @@ when it's caught up, waiting for more data. Like this,
image:reader-writer.png[A spool writer and reader]
.Sneak peak
Here's a sneak peak at a really simple writer and reader:
.Sample Python writer
[source,python]
import kvspool
kv = kvspool.Kvspool("spool")
d = {"day": "Wed", "temp": 37}
kv.write(d)
.Sample Perl reader
[source,perl]
use KVSpool;
my $kv = KVSpool->new("spool");
my $d = $kv->read();
.Why did I write kvspool?
*******************************************************************************
I wanted a very simple library that only writes to the local file system, so
@@ -126,18 +113,12 @@ The reader and writer are completely uninvolved in the replication process.
image:pub-sub.png[Publish and Subscribe]
[TIP]
A job manager such as the author's http://troydhanson.github.com/pmtr/[pmtr process
monitor] can be used to run `kvsp-sub` and `kvsp-pub` in the background, and restart
them when the system reboots.
You can run `kvsp-sub` and `kvsp-pub` in the background at system boot by setting
them up as services or http://troydhanson.github.com/pmtr/[pmtr] jobs.
License
~~~~~~~
See the link:LICENSE.txt[LICENSE.txt] file. Kvspool is free and open source.
Resources
~~~~~~~~~
Additional software by the author is cataloged at http://troydhanson.github.io/.
Kvspool uses the MIT license.
Getting kvspool
---------------
@@ -145,19 +126,17 @@ You can clone kvspool from github:
% git clone git://github.com/troydhanson/kvspool.git
To build kvspool:
To build and install kvspool, you need autotools installed. The configure script looks
to see if optional libraries including ZeroMQ, Nanomsg, Jansson and librdkafka re
installed. It builds additional utilities if they are present.
% cd kvspool
If the 'configure' script does not yet exist, run `./bootstrap`. If you want to build the
network replication utilities, make sure you have *Jansson* and *ZeroMQ* (2.x or 3.x).
% ./autogen.sh
% ./configure
% make
% sudo make install
This builds and installs the C library and utilities, and if the prerequisite packages
are installed, it builds the Perl, Python and Java bindings, and ZeroMQ-based utilities.
Utilities
---------
@@ -219,6 +198,7 @@ Network utilities
|kvsp-sub | kvsp-sub -d spool tcp://192.168.1.9:1110
|kvsp-bpub | kvsp-bpub -b cast.cfg -d spool tcp://192.168.1.9:2110
|kvsp-bsub | kvsp-bsub -b cast.cfg -d spool tcp://192.168.1.9:2110
|kvsp-kkpub | kvsp-kkpub -b kafka.host.name -t topic
|kvsp-tpub | kvsp-tpub -b cast.cfg -d spool -p 2110
|===============================================================================
@@ -262,6 +242,11 @@ These data types may appear in `cast.cfg`
str // string
d64 // double (64-bit float)
kvsp-kkpub
^^^^^^^^^^
Publishes the spool in JSON encoding to a Kakfa topic. This tool requires librdkakfa and
Jansson to be installed.
kvsp-tpub
^^^^^^^^^
Finally there is a "plain TCP" binary publisher. It has no subscriber counterpart yet, so
@@ -312,83 +297,9 @@ specify (as absolute paths) directories to create within the ramdisk. Using `ra
and show its size. The `ramdisk` utility is included with kvspool because it is often
convenient to locate a spool on a ramdisk for performance.
Examples & API
--------------
API
---
Perl
~~~~
[source,perl]
use KVSpool;
my $v = KVSpool->new("spool");
my $h = {'day' => 'Wednesday', 'user' => 'Troy'};
$v->write($h);
In Perl the KVSpool object is instantiated with one directory argument. Then every time
the `write` method is called with a hash reference as argument, it's written to the spool.
To read a spool, use the `read` method which returns a hash reference each times it's
called:
my $h = $v->read();
The reader is normally blocking (it waits for data if none is immediately available).
To enact non-blocking mode, use `$v->{blocking}=0;`, then test the reference returned from
`read()` using `defined()`. If it's not, no data is currently available in the spool.
Python
~~~~~~
[source,python]
import kvspool
kv = kvspool.Kvspool("spool")
d = {"day":"Wednesday","user":"Troy"}
kv.write(d)
The `write` method takes a dictionary, while the `read` method returns a dictionary:
d = kv.read()
To enact non-blocking mode on the reader, use `kv.blocking = 0` then test whether the
object returned from `read()` is `None`. If so, no data is currently available in the spool.
Java
~~~~
[source,java]
public class TestKVJava {
public static void main(String args[]) {
KVJava kv = new KVJava("spool");
HashMap<String,String> h = new HashMap<String,String>();
h.put("day", "Wednesday");
h.put("user", "Troy");
kv.write(h);
}
}
The example first instantiates a `KVJava` object. (You can copy `KVJava.java` from the
`kvjava` directory into your own source tree; this class loads the underlying JNI-based
library, which must be built and installed as described below).
To write to the spool, pass a `HashMap<String,String>` to the `write` method. The `read`
method is similar:
HashMap<String,String> m;
m = kv.read();
You can put the reader in non-blocking mode by setting `kv.blocking = false;`
and then test whether the object returned from `read()` is equal to `null`. If so,
no data is currently available in the spool.
// sudo apt-get install default-jdk
.Building the JNI library
The Java binding is not yet integrated into the main build, so it requires extra steps to
build. After building kvspool, `cd kvjava` and run `make`. Most likely you will need to
edit the `kvjava/Makefile` lines where `JNIINC` and `JNILIB` are defined to make it work.
The result of compilation is `libKVJava.so` which must be manually copied to your choice
of library directory such as `/usr/local/lib` (You may also need to set `LD_LIBRARY_PATH`
so that the dynamic linker can find this shared library it at runtime).
C/C++
~~~~~
C programs must be linked with -lkvspool.
[source,c]
@@ -453,10 +364,5 @@ has the spool open at the time. It takes the spool directory as its only argumen
sp_reset(dir);
Acknowledgments
---------------
Thanks to Trevor Adams for writing the original Perl and Java bindings and to
http://jhuapl.edu/ott[JHU/APL OTT] for their support.
// vim: set tw=90 wm=2 syntax=asciidoc:

View File

@@ -1,6 +0,0 @@
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
<div style="float:right">
<a href="https://twitter.com/share" class="twitter-share-button" data-via="troydhanson">Tweet</a>
<script>!function(d,s,id){var js,fjs=d.getElementsByTagName(s)[0];if(!d.getElementById(id))\{js=d.createElement(s);js.id=id;js.src="//platform.twitter.com/widgets.js";fjs.parentNode.insertBefore(js,fjs);}}(document,"script","twitter-wjs");</script>
</div>
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

View File

@@ -1,30 +0,0 @@
quick key (qk)
--------------
This is a mini-kvspool that lets the application determine what to do with each
key set that is produced. First create one:
struct qk *qk = qk_new();
Set up a callback to be invoked whenever you "end" a key set:
qk->cb = your_callback;
The callback has this prototype:
int (*cb)(struct qk *);
The callback can use `qk->tmp` (a `UT_string`) as a scratch buffer. It can
iterate over `qk->keys` (of type `UT_vector` whose elements are `UT_string`).
Use this sequence to produce a key set:
qk_start(qk);
qk_add(qk, key, ...);
...
qk_end(qk);
Call `qk_add` multiple times to add several keys to the key set.
Note `key` is a printf-style format string that can take additional arguments.
At program termination do this:
qk_free(qk);

View File

@@ -1,393 +0,0 @@
/*
Copyright (c) 2008-2014, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* a dynamic string implementation using macros
*/
#ifndef UTSTRING_H
#define UTSTRING_H
#define UTSTRING_VERSION 1.9.9
#ifdef __GNUC__
#define _UNUSED_ __attribute__ ((__unused__))
#else
#define _UNUSED_
#endif
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
#define oom() exit(-1)
typedef struct {
char *d;
size_t n; /* allocd size */
size_t i; /* index of first unused byte */
} UT_string;
#define utstring_reserve(s,amt) \
do { \
if (((s)->n - (s)->i) < (size_t)(amt)) { \
(s)->d = (char*)realloc((s)->d, (s)->n + amt); \
if ((s)->d == NULL) oom(); \
(s)->n += amt; \
} \
} while(0)
#define utstring_init(s) \
do { \
(s)->n = 0; (s)->i = 0; (s)->d = NULL; \
utstring_reserve(s,100); \
(s)->d[0] = '\0'; \
} while(0)
#define utstring_done(s) \
do { \
if ((s)->d != NULL) free((s)->d); \
(s)->n = 0; \
} while(0)
#define utstring_free(s) \
do { \
utstring_done(s); \
free(s); \
} while(0)
#define utstring_new(s) \
do { \
s = (UT_string*)calloc(sizeof(UT_string),1); \
if (!s) oom(); \
utstring_init(s); \
} while(0)
#define utstring_renew(s) \
do { \
if (s) { \
utstring_clear(s); \
} else { \
utstring_new(s); \
} \
} while(0)
#define utstring_clear(s) \
do { \
(s)->i = 0; \
(s)->d[0] = '\0'; \
} while(0)
#define utstring_bincpy(s,b,l) \
do { \
utstring_reserve((s),(l)+1); \
if (l) memcpy(&(s)->d[(s)->i], b, l); \
(s)->i += l; \
(s)->d[(s)->i]='\0'; \
} while(0)
#define utstring_concat(dst,src) \
do { \
utstring_reserve((dst),((src)->i)+1); \
if ((src)->i) memcpy(&(dst)->d[(dst)->i], (src)->d, (src)->i); \
(dst)->i += (src)->i; \
(dst)->d[(dst)->i]='\0'; \
} while(0)
#define utstring_len(s) ((unsigned)((s)->i))
#define utstring_body(s) ((s)->d)
_UNUSED_ static void utstring_printf_va(UT_string *s, const char *fmt, va_list ap) {
int n;
va_list cp;
while (1) {
#ifdef _WIN32
cp = ap;
#else
va_copy(cp, ap);
#endif
n = vsnprintf (&s->d[s->i], s->n-s->i, fmt, cp);
va_end(cp);
if ((n > -1) && ((size_t) n < (s->n-s->i))) {
s->i += n;
return;
}
/* Else try again with more space. */
if (n > -1) utstring_reserve(s,n+1); /* exact */
else utstring_reserve(s,(s->n)*2); /* 2x */
}
}
#ifdef __GNUC__
/* support printf format checking (2=the format string, 3=start of varargs) */
static void utstring_printf(UT_string *s, const char *fmt, ...)
__attribute__ (( format( printf, 2, 3) ));
#endif
_UNUSED_ static void utstring_printf(UT_string *s, const char *fmt, ...) {
va_list ap;
va_start(ap,fmt);
utstring_printf_va(s,fmt,ap);
va_end(ap);
}
/*******************************************************************************
* begin substring search functions *
******************************************************************************/
/* Build KMP table from left to right. */
_UNUSED_ static void _utstring_BuildTable(
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
i = 0;
j = i - 1;
P_KMP_Table[i] = j;
while (i < (long) P_NeedleLen)
{
while ( (j > -1) && (P_Needle[i] != P_Needle[j]) )
{
j = P_KMP_Table[j];
}
i++;
j++;
if (i < (long) P_NeedleLen)
{
if (P_Needle[i] == P_Needle[j])
{
P_KMP_Table[i] = P_KMP_Table[j];
}
else
{
P_KMP_Table[i] = j;
}
}
else
{
P_KMP_Table[i] = j;
}
}
return;
}
/* Build KMP table from right to left. */
_UNUSED_ static void _utstring_BuildTableR(
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
i = P_NeedleLen - 1;
j = i + 1;
P_KMP_Table[i + 1] = j;
while (i >= 0)
{
while ( (j < (long) P_NeedleLen) && (P_Needle[i] != P_Needle[j]) )
{
j = P_KMP_Table[j + 1];
}
i--;
j--;
if (i >= 0)
{
if (P_Needle[i] == P_Needle[j])
{
P_KMP_Table[i + 1] = P_KMP_Table[j + 1];
}
else
{
P_KMP_Table[i + 1] = j;
}
}
else
{
P_KMP_Table[i + 1] = j;
}
}
return;
}
/* Search data from left to right. ( Multiple search mode. ) */
_UNUSED_ static long _utstring_find(
const char *P_Haystack,
size_t P_HaystackLen,
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
long V_FindPosition = -1;
/* Search from left to right. */
i = j = 0;
while ( (j < (int)P_HaystackLen) && (((P_HaystackLen - j) + i) >= P_NeedleLen) )
{
while ( (i > -1) && (P_Needle[i] != P_Haystack[j]) )
{
i = P_KMP_Table[i];
}
i++;
j++;
if (i >= (int)P_NeedleLen)
{
/* Found. */
V_FindPosition = j - i;
break;
}
}
return V_FindPosition;
}
/* Search data from right to left. ( Multiple search mode. ) */
_UNUSED_ static long _utstring_findR(
const char *P_Haystack,
size_t P_HaystackLen,
const char *P_Needle,
size_t P_NeedleLen,
long *P_KMP_Table)
{
long i, j;
long V_FindPosition = -1;
/* Search from right to left. */
j = (P_HaystackLen - 1);
i = (P_NeedleLen - 1);
while ( (j >= 0) && (j >= i) )
{
while ( (i < (int)P_NeedleLen) && (P_Needle[i] != P_Haystack[j]) )
{
i = P_KMP_Table[i + 1];
}
i--;
j--;
if (i < 0)
{
/* Found. */
V_FindPosition = j + 1;
break;
}
}
return V_FindPosition;
}
/* Search data from left to right. ( One time search mode. ) */
_UNUSED_ static long utstring_find(
UT_string *s,
long P_StartPosition, /* Start from 0. -1 means last position. */
const char *P_Needle,
size_t P_NeedleLen)
{
long V_StartPosition;
long V_HaystackLen;
long *V_KMP_Table;
long V_FindPosition = -1;
if (P_StartPosition < 0)
{
V_StartPosition = s->i + P_StartPosition;
}
else
{
V_StartPosition = P_StartPosition;
}
V_HaystackLen = s->i - V_StartPosition;
if ( (V_HaystackLen >= (long) P_NeedleLen) && (P_NeedleLen > 0) )
{
V_KMP_Table = (long *)malloc(sizeof(long) * (P_NeedleLen + 1));
if (V_KMP_Table != NULL)
{
_utstring_BuildTable(P_Needle, P_NeedleLen, V_KMP_Table);
V_FindPosition = _utstring_find(s->d + V_StartPosition,
V_HaystackLen,
P_Needle,
P_NeedleLen,
V_KMP_Table);
if (V_FindPosition >= 0)
{
V_FindPosition += V_StartPosition;
}
free(V_KMP_Table);
}
}
return V_FindPosition;
}
/* Search data from right to left. ( One time search mode. ) */
_UNUSED_ static long utstring_findR(
UT_string *s,
long P_StartPosition, /* Start from 0. -1 means last position. */
const char *P_Needle,
size_t P_NeedleLen)
{
long V_StartPosition;
long V_HaystackLen;
long *V_KMP_Table;
long V_FindPosition = -1;
if (P_StartPosition < 0)
{
V_StartPosition = s->i + P_StartPosition;
}
else
{
V_StartPosition = P_StartPosition;
}
V_HaystackLen = V_StartPosition + 1;
if ( (V_HaystackLen >= (long) P_NeedleLen) && (P_NeedleLen > 0) )
{
V_KMP_Table = (long *)malloc(sizeof(long) * (P_NeedleLen + 1));
if (V_KMP_Table != NULL)
{
_utstring_BuildTableR(P_Needle, P_NeedleLen, V_KMP_Table);
V_FindPosition = _utstring_findR(s->d,
V_HaystackLen,
P_Needle,
P_NeedleLen,
V_KMP_Table);
free(V_KMP_Table);
}
}
return V_FindPosition;
}
/*******************************************************************************
* end substring search functions *
******************************************************************************/
#endif /* UTSTRING_H */

View File

@@ -1,161 +0,0 @@
/*
Copyright (c) 2003-2014, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include "utvector.h"
/* utvector
*
* maintain a contiguous buffer of 'n' elements ('i' occupied)
* the 'n' buffers are deep-inited at the time of allocation
* the vector leaves popped slots as-is, clearing them on re-use
* the memory management helper mm is used to define the size and
* deep-init, deep-fini, deep-copy (into inited slots) and deep-clear.
* deep-clear prepares a slot for re-use e.g. reset slot state.
*
*/
void oom(void) {
//fprintf(stderr,"out of memory\n");
exit(-1);
}
UT_vector *utvector_new(UT_vector_mm *mm) {
UT_vector *v = malloc(sizeof(UT_vector)); if (!v) return NULL;
utvector_init(v,mm);
return v;
}
unsigned utvector_len(UT_vector *v) {
return v->i;
}
void utvector_init(UT_vector *v, UT_vector_mm *mm) {
v->mm = *mm; // struct copy
v->i = v->n = 0;
v->d = NULL;
utvector_reserve(v, INITIAL_SIZE); // also inits them, sets v->n
}
void utvector_reserve(UT_vector *v, unsigned num) {
if (v->n - v->i >= num) return; // space is big enough, return
unsigned n = num - (v->n - v->i); // minimum we need to grow by
if (n < (v->n * 2)) n = (v->n * 2); // grow by at least double current size
char *d = realloc(v->d, (n + v->n) * v->mm.sz);
if (!d) oom();
v->d = d;
void *b = v->d + (v->n * v->mm.sz); // start of newly allocated area
if (v->mm.init) v->mm.init(b, n);
else memset(b, 0, n*v->mm.sz);
v->n = n + v->n;
}
void utvector_fini(UT_vector *v) {
if (v->mm.fini) v->mm.fini(v->d, v->n);
free(v->d);
v->d = NULL;
v->i = v->n = 0;
}
UT_vector * utvector_clone(UT_vector *src) {
UT_vector *v = utvector_new(&src->mm);
utvector_copy(v, src);
return v;
}
void utvector_clear(UT_vector *v) {
v->i = 0;
}
void utvector_copy(UT_vector *dst, UT_vector *src) { /* dst, src both inited */
assert(dst->mm.sz == src->mm.sz); // double check that its inited
utvector_clear(dst);
utvector_reserve(dst, src->i);
dst->i = src->i;
if (dst->mm.clear) dst->mm.clear(dst->d, src->i);
if (src->mm.copy) src->mm.copy(dst->d, src->d, src->i);
else memcpy(dst->d, src->d, src->mm.sz * src->i);
}
void utvector_free(UT_vector *v) {
utvector_fini(v);
free(v);
}
void *utvector_extend(UT_vector *v) {
utvector_reserve(v,1);
void *b = v->d + (v->i * v->mm.sz);
if (v->mm.clear) v->mm.clear(b,1);
v->i++;
return b;
}
void *utvector_next(UT_vector *v, void *cur) {
if (cur == NULL) return v->i ? v->d : NULL;
assert(cur >= (void*)(v->d)); // user pointer must be inside our data area
char *n = (char*)cur + v->mm.sz; // next slot address
if (n >= v->d + (v->i * v->mm.sz)) n=NULL; // only if next slot occupied
return n;
}
void *utvector_head(UT_vector *v) {
if (v->i == 0) return NULL;
return v->d;
}
void *utvector_tail(UT_vector *v) {
if (v->i == 0) return NULL;
return v->d + ((v->i - 1) * v->mm.sz);
}
void *utvector_pop(UT_vector *v) {
if (v->i == 0) return NULL;
return v->d + (--(v->i) * v->mm.sz);
}
/* shifting is not very efficient. we end up throwing away/fini'ing the
* head of the vector, then doing a memmove, then having to init a new slot.
* we don't return the shifted item because its been fini'd, and we have
* no caller memory to copy it into anyway. a cpy_shift maybe handy */
void utvector_shift(UT_vector *v) {
assert (v->i);
if (v->mm.fini) v->mm.fini(v->d, 1);
v->i--;
memmove(v->d, v->d + v->mm.sz, (v->n-1)*v->mm.sz);
char *b = v->d + ((v->n-1) * v->mm.sz);
if (v->mm.init) v->mm.init(b, 1);
else memset(b, 0, v->mm.sz);
}
void utvector_push(UT_vector *v, void *e) {
void *b = utvector_extend(v);
if (v->mm.copy) v->mm.copy(b, e, 1);
else memcpy(b, e, v->mm.sz);
}
/* a few basic vector types as described via mm that can be passed to utvector_init/new */
static UT_vector_mm utvector_int_mm = {.sz = sizeof(int)};
UT_vector_mm* utvector_int = &utvector_int_mm;

View File

@@ -1,76 +0,0 @@
/*
Copyright (c) 2003-2014, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* utvector
*
* maintain a contiguous buffer of 'n' elements ('i' occupied)
* the 'n' buffers are deep-inited at the time of allocation
* the vector leaves popped slots as-is, clearing them on re-use
* the memory management helper mm is used to define the size and
* deep-init, deep-fini, deep-copy (into inited slots) and deep-clear.
* deep-clear prepares a slot for re-use e.g. reset slot state.
*
*/
#ifndef __UTVECTOR_H_
#define __UTVECTOR_H_
#include <stddef.h>
#define INITIAL_SIZE 16
typedef struct _UT_vector_mm {
size_t sz;
void (*init)(void *buf, unsigned num); //-> utstring-init
void (*fini)(void *buf, unsigned num); //-> utstring-done
void (*copy)(void *dst, void *src, unsigned num); //-> ustring_concat
void (*clear)(void *buf, unsigned num); //-> utstring-clear
} UT_vector_mm;
typedef struct _UT_vector {
UT_vector_mm mm;
unsigned i,n;/* i: index of next available slot, n: num slots */
char *d; /* n slots of size icd->sz*/
} UT_vector;
UT_vector *utvector_new(UT_vector_mm *mm);
void utvector_init(UT_vector *v, UT_vector_mm *mm);
void utvector_reserve(UT_vector *v, unsigned num);
void utvector_fini(UT_vector *v);
UT_vector * utvector_clone(UT_vector *src);
void utvector_clear(UT_vector *v);
void utvector_copy(UT_vector *dst, UT_vector *src);
void utvector_free(UT_vector *v);
void *utvector_extend(UT_vector *v);
void *utvector_head(UT_vector *v);
void *utvector_tail(UT_vector *v);
void *utvector_next(UT_vector *v, void *cur);
void *utvector_pop(UT_vector *v);
void utvector_shift(UT_vector *v);
void utvector_push(UT_vector *v, void *e);
unsigned utvector_len(UT_vector *v);
extern UT_vector_mm* utvector_int;
#endif /* __UTVECTOR_H_ */

72
qk/qk.c
View File

@@ -1,72 +0,0 @@
#include <stdarg.h>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include "qk.h"
/*******************************************************************************
* plumbing for utvector of string
******************************************************************************/
void _utstring_init(void *_buf, unsigned num) {
UT_string *s = (UT_string*)_buf;
while(num--) utstring_init(&s[num]);
}
void _utstring_fini(void *_buf, unsigned num) {
UT_string *s = (UT_string*)_buf;
while(num--) utstring_done(&s[num]);
}
void _utstring_copy(void *_dst, void *_src, unsigned num) {
UT_string *dst = (UT_string*)_dst;
UT_string *src = (UT_string*)_src;
while(num--) utstring_concat( &dst[num], &src[num] );
}
void _utstring_clear(void *_buf, unsigned num) {
UT_string *s = (UT_string*)_buf;
while(num--) utstring_clear(&s[num]);
}
static UT_vector_mm utvector_utstring_mm = {
.sz = sizeof(UT_string),
.init = _utstring_init,
.fini = _utstring_fini,
.copy = _utstring_copy,
.clear = _utstring_clear,
};
/*******************************************************************************
* end of plumbing
******************************************************************************/
struct qk *qk_new(void) {
struct qk *qk = malloc(sizeof(*qk));
if (qk == NULL) goto done;
memset(qk,0,sizeof(*qk));
utvector_init(&qk->keys, &utvector_utstring_mm);
utstring_init(&qk->tmp);
done:
return qk;
}
int qk_start(struct qk *qk) {
utvector_clear(&qk->keys);
utstring_clear(&qk->tmp);
}
int qk_end(struct qk *qk) {
if (qk->cb == NULL) return;
return qk->cb(qk);
}
int qk_add(struct qk *qk, char *key, ...) {
va_list ap;
va_start(ap,key);
UT_string *k = (UT_string*)utvector_extend(&qk->keys);
utstring_printf_va(k,key,ap);
va_end(ap);
}
void qk_free(struct qk *qk) {
utvector_fini(&qk->keys);
utstring_done(&qk->tmp);
free(qk);
}

26
qk/qk.h
View File

@@ -1,26 +0,0 @@
#ifndef _QK_H_
#define _QK_H_
#include "utvector.h"
#include "utstring.h"
struct qk {
UT_vector /* of UT_string */ keys;
/* the callback below is invoked on qk_end. it receives this struct.
* the tmp is reserved for the callback to use a scratch space. the
* data argument is opaque and is for passing state to the callback. */
int (*cb)(struct qk *);
UT_string tmp;
void *data;
};
/* API */
struct qk *qk_new(void);
int qk_start(struct qk *qk);
int qk_add(struct qk *qk, char *key, ...);
int qk_end(struct qk *qk);
void qk_free(struct qk *qk);
#endif // _QK_H_

View File

@@ -1,2 +0,0 @@
test1: test basic kv_start, kv_add ..., kv_end
test2: test multiple kv_add, kv_end

View File

@@ -1,21 +0,0 @@
#!/usr/bin/perl
use strict;
use warnings;
my @tests;
for (glob "test*[0-9]") {
push @tests, $_ if -e "$_.ans";
}
my $num_failed=0;
for my $test (@tests) {
`./$test > $test.out`;
`diff $test.out $test.ans`;
print "$test failed\n" if $?;
$num_failed++ if $?;
}
print scalar @tests . " tests conducted, $num_failed failed.\n";
exit $num_failed;

View File

@@ -1,6 +0,0 @@
qk_end: 5 keys
A:1
B:2a
C:3b
D:IV
E:5

View File

@@ -1,39 +0,0 @@
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include "qk.h"
int dump(struct qk *qk) {
size_t l = utvector_len(&qk->keys);
utstring_clear(&qk->tmp);
utstring_printf(&qk->tmp, "qk_end: %lu keys\n", l);
UT_string *k=NULL;
while(l--) {
k = (UT_string*)utvector_next(&qk->keys,k); assert(k);
utstring_printf(&qk->tmp, "%s\n", utstring_body(k));
}
char *out = utstring_body(&qk->tmp);
size_t len =utstring_len( &qk->tmp);
write(STDOUT_FILENO, out, len);
return 0;
}
int main() {
struct qk *qk = qk_new();
qk->cb = dump;
qk_start(qk);
qk_add(qk, "A:%d", 1);
qk_add(qk, "B:%d%c", 2, 'a');
qk_add(qk, "C:%d%c", 3, 'b');
qk_add(qk, "D:%s", "IV");
qk_add(qk, "E:%lu", (long)5);
qk_end(qk);
qk_free(qk);
return 0;
}

View File

@@ -1,11 +0,0 @@
qk_end: 5 keys
A:1
B:2a
C:3b
D:IV
E:5
qk_end: 4 keys
A:1
B:2a
C:3b
D:IV

View File

@@ -1,46 +0,0 @@
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include "qk.h"
int dump(struct qk *qk) {
size_t l = utvector_len(&qk->keys);
utstring_clear(&qk->tmp);
utstring_printf(&qk->tmp, "qk_end: %lu keys\n", l);
UT_string *k=NULL;
while(l--) {
k = (UT_string*)utvector_next(&qk->keys,k); assert(k);
utstring_printf(&qk->tmp, "%s\n", utstring_body(k));
}
char *out = utstring_body(&qk->tmp);
size_t len =utstring_len( &qk->tmp);
write(STDOUT_FILENO, out, len);
return 0;
}
int main() {
struct qk *qk = qk_new();
qk->cb = dump;
qk_start(qk);
qk_add(qk, "A:%d", 1);
qk_add(qk, "B:%d%c", 2, 'a');
qk_add(qk, "C:%d%c", 3, 'b');
qk_add(qk, "D:%s", "IV");
qk_add(qk, "E:%lu", (long)5);
qk_end(qk);
qk_start(qk);
qk_add(qk, "A:%d", 1);
qk_add(qk, "B:%d%c", 2, 'a');
qk_add(qk, "C:%d%c", 3, 'b');
qk_add(qk, "D:%s", "IV");
qk_end(qk);
qk_free(qk);
return 0;
}

2
tests/.gitignore vendored
View File

@@ -1,2 +0,0 @@
*[0-9]
*.out

View File

@@ -1,28 +0,0 @@
SRCS = $(wildcard test*.c)
PROGS = $(patsubst %.c,%,$(SRCS))
LIBDIR = ..
LIB = ../libkvspool.a
LDFLAGS = -L$(LIBDIR) -lkvspool
CFLAGS = -I$(LIBDIR)/include -I$(LIBDIR) -fno-strict-aliasing
CFLAGS += -g
CFLAGS += -Wall
CFLAGS += ${EXTRA_CFLAGS}
TEST_TARGET=run_tests
TESTS=./do_tests
all: $(PROGS) $(TEST_TARGET)
$(PROGS): $(LIB) utils.o
$(CC) $(CFLAGS) -o $@ $(@).c utils.o $(LDFLAGS)
run_tests: $(PROGS)
perl $(TESTS)
.PHONY: clean
clean:
rm -f $(PROGS) test*.out
rm -rf *.dSYM

View File

@@ -1,6 +0,0 @@
test1: initial key-value creation and free
test2: test key replacement
test3: test spooling out
test4: test spooling out, then a second set (append)
test5: spool reader test
test6: test programmatic reset

View File

@@ -1,21 +0,0 @@
#!/usr/bin/perl
use strict;
use warnings;
my @tests;
for (glob "test*[0-9]") {
push @tests, $_ if -e "$_.ans";
}
my $num_failed=0;
for my $test (@tests) {
`./$test > $test.out`;
`diff $test.out $test.ans`;
print "$test failed\n" if $?;
$num_failed++ if $?;
}
print scalar @tests . " tests conducted, $num_failed failed.\n";
exit $num_failed;

View File

@@ -1,15 +0,0 @@
#!/usr/bin/perl
use strict;
use warnings;
use Data::Dumper;
use JSON;
use ZeroMQ qw/:all/;
my $ctx = ZeroMQ::Context->new;
my $sock = $ctx->socket(ZMQ_PULL);
$sock->connect("tcp://127.0.0.1:1234");
for(;;) {
my $d = $sock->recv()->data();
print Dumper from_json($d), "\n";
}

View File

@@ -1,3 +0,0 @@
kv set has 2 items
key [hello], val [world]
key [second], val [life]

View File

@@ -1,21 +0,0 @@
#include <stdio.h>
#include "kvspool.h"
int main() {
void *set = kv_set_new();
/* add some kv pairs */
kv_adds(set, "hello", "world");
kv_adds(set, "second", "life");
/* see if it worked- count kv pairs */
int len = kv_len(set);
printf("kv set has %d items\n", (int)len);
/* loop over them */
kv_t *kv = NULL;
while ( (kv = kv_next(set, kv))) {
printf("key [%.*s], val [%.*s]\n", kv->klen, kv->key, kv->vlen, kv->val);
}
kv_set_free(set);
return 0;
}

View File

@@ -1,3 +0,0 @@
kv set has 2 items
key [hello], val [new]
key [second], val [life]

View File

@@ -1,22 +0,0 @@
#include <stdio.h>
#include "kvspool.h"
int main() {
void *set = kv_set_new();
/* add some kv pairs */
kv_adds(set, "hello", "world");
kv_adds(set, "second", "life");
kv_adds(set, "hello", "new");
/* see if it worked- count kv pairs */
int len = kv_len(set);
printf("kv set has %d items\n", (int)len);
/* loop over them */
kv_t *kv = NULL;
while ( (kv = kv_next(set, kv))) {
printf("key [%.*s], val [%.*s]\n", kv->klen, kv->key, kv->vlen, kv->val);
}
kv_set_free(set);
return 0;
}

View File

@@ -1,2 +0,0 @@
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 69

View File

@@ -1,30 +0,0 @@
#include <glob.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include "kvspool.h"
#include "utarray.h"
#include "utils.h"
int main(int argc, char *argv[]) {
char *dir = mktmpdir();
void *sp = kv_spoolwriter_new(dir);
if (!sp) exit(-1);
void *set = kv_set_new();
kv_adds(set, "hello", "again");
kv_adds(set, "second", "life");
kv_spool_write(sp,set);
kv_set_free(set);
kv_spoolwriter_free(sp);
/* scan spool to validate expected file creation */
scan_spool(1);
return 0;
}

View File

@@ -1,5 +0,0 @@
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 69
spooling second frame
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 126

View File

@@ -1,38 +0,0 @@
#include <glob.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include "kvspool.h"
#include "utarray.h"
#include "utils.h"
int main(int argc, char *argv[]) {
char *dir = mktmpdir();
void *sp = kv_spoolwriter_new(dir);
if (!sp) exit(-1);
void *set = kv_set_new();
kv_adds(set, "hello", "again");
kv_adds(set, "second", "life");
kv_spool_write(sp,set);
/* scan spool to validate expected file creation */
scan_spool(0);
/* replace a value in the set, spool the set out. it should append
* to the spool file previously created. */
printf("spooling second frame\n");
kv_adds(set, "second", "time");
kv_spool_write(sp,set);
kv_spoolwriter_free(sp);
kv_set_free(set);
/* verify spool has updated according to expectation */
scan_spool(1);
return 0;
}

View File

@@ -1,16 +0,0 @@
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 69
spooling second frame
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 126
clear set
reading from spool
reader read frame:
key [hello], val [again]
key [second], val [life]
reader read frame:
key [hello], val [again]
key [second], val [time]
kv_spool_read returned 0
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 126

View File

@@ -1,64 +0,0 @@
#include <glob.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include "kvspool.h"
#include "utarray.h"
#include "utils.h"
/******************************************************************************
* the test itself
*****************************************************************************/
int main(int argc, char *argv[]) {
char *dir = mktmpdir();
void *sp = kv_spoolwriter_new(dir);
if (!sp) exit(-1);
void *set = kv_set_new();
kv_adds(set, "hello", "again");
kv_adds(set, "second", "life");
kv_spool_write(sp,set);
/* scan spool to validate expected file creation */
scan_spool(0);
/* replace a value in the set, spool the set out. it should append
* to the spool file previously created. */
printf("spooling second frame\n");
kv_adds(set, "second", "time");
kv_spool_write(sp,set);
kv_spoolwriter_free(sp);
/* verify spool has updated according to expectation */
scan_spool(0);
printf("clear set\n");
kv_set_clear(set);
/* loop over them */
kv_t *kv = NULL;
while ( (kv = kv_next(set, kv))) {
printf("key [%.*s], val [%.*s]\n", kv->klen, kv->key, kv->vlen, kv->val);
}
printf("reading from spool\n");
/* now try reading the spool */
sp = kv_spoolreader_new(dir);
int rc;
while ( (rc=kv_spool_read(sp, set, 0)) == 1) {
printf("reader read frame:\n");
kv = NULL;
while ( (kv = kv_next(set, kv))) {
printf(" key [%.*s], val [%.*s]\n", kv->klen, kv->key, kv->vlen, kv->val);
}
}
printf("kv_spool_read returned %d\n", rc);
kv_spoolreader_free(sp);
kv_set_free(set);
scan_spool(1);
return 0;
}

View File

@@ -1,24 +0,0 @@
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 69
spooling second frame
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 126
clear set
reading from spool
reader read frame:
key [hello], val [again]
key [second], val [life]
reader read frame:
key [hello], val [again]
key [second], val [time]
kv_spool_read returned 0
resetting the spool directory for another round of reading
reader read frame:
key [hello], val [again]
key [second], val [life]
reader read frame:
key [hello], val [again]
key [second], val [time]
kv_spool_read returned 0
file kv.**********.***-*.sr, len 4
file kv.**********.***-*.sp, len 126

View File

@@ -1,80 +0,0 @@
#include <glob.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include "kvspool.h"
#include "utarray.h"
#include "utils.h"
/******************************************************************************
* the test itself
*****************************************************************************/
int main(int argc, char *argv[]) {
char *dir = mktmpdir();
void *sp = kv_spoolwriter_new(dir);
if (!sp) exit(-1);
void *set = kv_set_new();
kv_adds(set, "hello", "again");
kv_adds(set, "second", "life");
kv_spool_write(sp,set);
/* scan spool to validate expected file creation */
scan_spool(0);
/* replace a value in the set, spool the set out. it should append
* to the spool file previously created. */
printf("spooling second frame\n");
kv_adds(set, "second", "time");
kv_spool_write(sp,set);
kv_spoolwriter_free(sp);
/* verify spool has updated according to expectation */
scan_spool(0);
printf("clear set\n");
kv_set_clear(set);
/* loop over them */
kv_t *kv = NULL;
while ( (kv = kv_next(set, kv))) {
printf("key [%.*s], val [%.*s]\n", kv->klen, kv->key, kv->vlen, kv->val);
}
printf("reading from spool\n");
/* now try reading the spool */
sp = kv_spoolreader_new(dir);
int rc;
while ( (rc=kv_spool_read(sp, set, 0)) == 1) {
printf("reader read frame:\n");
kv = NULL;
while ( (kv = kv_next(set, kv))) {
printf(" key [%.*s], val [%.*s]\n", kv->klen, kv->key, kv->vlen, kv->val);
}
}
printf("kv_spool_read returned %d\n", rc);
kv_spoolreader_free(sp);
/* now reset it and read again */
printf("resetting the spool directory for another round of reading\n");
sp_reset(dir);
sp = kv_spoolreader_new(dir);
while ( (rc=kv_spool_read(sp, set, 0)) == 1) {
printf("reader read frame:\n");
kv = NULL;
while ( (kv = kv_next(set, kv))) {
printf(" key [%.*s], val [%.*s]\n", kv->klen, kv->key, kv->vlen, kv->val);
}
}
printf("kv_spool_read returned %d\n", rc);
kv_spoolreader_free(sp);
kv_set_free(set);
scan_spool(1);
return 0;
}

View File

@@ -1,82 +0,0 @@
#include <glob.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include "kvspool.h"
#include "utarray.h"
/******************************************************************************
* boilerplate utilities for test
*****************************************************************************/
static char *wild = "*";
static char dir[100];
static void cleanup(void) { chdir("/tmp"); if (*dir) rmdir(dir); }
char* mktmpdir(void) {
snprintf(dir,sizeof(dir),"/tmp/test.%d", (int)getpid());
mkdir(dir,0777);
atexit(cleanup);
return dir;
}
/* change digits to '-', used to normalize output for automated tests */
static char *blot(char*c) {
char *s;
for(s=c; *s != '\0'; s++) { if ((*s >= '0') && (*s <= '9')) *s='*'; }
return c;
}
static int strsort(const void *_a, const void *_b) {
char *a = *(char**)_a;
char *b = *(char**)_b;
return strcmp(a,b);
}
typedef struct {
int file_idx;
int file_len;
} file_xtra;
static UT_icd xtra_icd = {sizeof(file_xtra),NULL,NULL,NULL};
static int xtrasort(const void *_a, const void *_b) {
file_xtra *a = (file_xtra*)_a;
file_xtra *b = (file_xtra*)_b;
return (a->file_len - b->file_len);
}
void scan_spool(int do_unlink) {
int i;
glob_t g;
struct stat sb;
UT_array *files;
UT_array *xtras;
utarray_new(files,&ut_str_icd);
utarray_new(xtras,&xtra_icd);
char **f, *file;
file_xtra x, *xp;
if (chdir(dir) == -1) exit(-1);
glob(wild, 0, NULL, &g);
for(i=0; i < g.gl_pathc; i++) {
utarray_push_back(files, &g.gl_pathv[i]);
}
utarray_sort(files, strsort);
f=NULL;
while ( (f=(char**)utarray_next(files,f))) {
file = *f;
stat(file,&sb);
if (do_unlink) unlink(file);
x.file_idx = utarray_eltidx(files,f);
x.file_len = sb.st_size;
utarray_push_back(xtras, &x);
}
utarray_sort(xtras, xtrasort);
xp=NULL;
while ( (xp=(file_xtra*)utarray_next(xtras,xp))) {
f = (char**)utarray_eltptr(files,xp->file_idx);
file = *f;
printf("file %s, len %d\n", blot(file),xp->file_len);
}
globfree(&g);
utarray_free(files);
utarray_free(xtras);
}

View File

@@ -1,3 +0,0 @@
char* mktmpdir(void);
void scan_spool(int do_unlink);

View File

@@ -24,7 +24,6 @@ kvsp_sub_LDADD = $(LIBSPOOL)
kvsp_concen_LDADD = $(LIBSPOOL)
kvsp_tpub_LDADD = $(LIBSPOOL)
kvsp_upub_LDADD = $(LIBSPOOL)
kvsp_kpub_LDADD = $(LIBSPOOL)
kvsp_kkpub_LDADD = $(LIBSPOOL)
kvsp_bcat_SOURCES = kvsp-bcat.c kvsp-bconfig.c
@@ -47,8 +46,6 @@ endif
if HAVE_RDKAFKA
if HAVE_JANSSON
bin_PROGRAMS += kvsp-kpub
kvsp_kpub_LDADD += -lrdkafka -ljansson
if HAVE_NANOMSG
bin_PROGRAMS += kvsp-kkpub
kvsp_kkpub_SOURCES = kvsp-kkpub.c ts.c ts.h
@@ -78,22 +75,3 @@ bin_PROGRAMS += kvsp-upub
kvsp_upub_LDADD += -ljansson
endif
# to get a rebuild of the utilities when ../libkvspool.a changes:
kvsp_spr_DEPENDENCIES = ../src/libkvspool.a
kvsp_spw_DEPENDENCIES = ../src/libkvspool.a
kvsp_tee_DEPENDENCIES = ../src/libkvspool.a
kvsp_init_DEPENDENCIES = ../src/libkvspool.a
kvsp_status_DEPENDENCIES = ../src/libkvspool.a
kvsp_speed_DEPENDENCIES = ../src/libkvspool.a
kvsp_mod_DEPENDENCIES = ../src/libkvspool.a
kvsp_rewind_DEPENDENCIES = ../src/libkvspool.a
kvsp_sub_DEPENDENCIES = ../src/libkvspool.a
kvsp_pub_DEPENDENCIES = ../src/libkvspool.a
kvsp_bcat_DEPENDENCIES = ../src/libkvspool.a
kvsp_bpub_DEPENDENCIES = ../src/libkvspool.a
kvsp_bsub_DEPENDENCIES = ../src/libkvspool.a
kvsp_npub_DEPENDENCIES = ../src/libkvspool.a
kvsp_nsub_DEPENDENCIES = ../src/libkvspool.a
kvsp_concen_DEPENDENCIES = ../src/libkvspool.a
kvsp_tpub_DEPENDENCIES = ../src/libkvspool.a
kvsp_upub_DEPENDENCIES = ../src/libkvspool.a

View File

@@ -1,117 +0,0 @@
#include <stdio.h>
#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/prctl.h>
#include <assert.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <jansson.h>
#include <librdkafka/rdkafka.h>
#include "kvspool.h"
int verbose;
char *spool;
char *broker;
char *topic;
char errstr[512];
void usage(char *prog) {
fprintf(stderr, "usage: %s [-v] [-t topic] -d spool <broker>\n", prog);
exit(-1);
}
int main(int argc, char *argv[]) {
void *sp=NULL;
void *set=NULL;
int opt,rc=-1;
json_t *o = NULL;
int ticks=0;
fprintf(stderr,"this program is not recommended; use kvsp-kkpub instead\n");
while ( (opt = getopt(argc, argv, "v+d:st:")) != -1) {
switch (opt) {
case 'v': verbose++; break;
case 'd': spool=strdup(optarg); break;
case 't': topic=strdup(optarg); break;
default: usage(argv[0]); break;
}
}
if (optind < argc) broker = argv[optind++];
if (broker == NULL) usage(argv[0]);
if (spool == NULL) usage(argv[0]);
if (topic == NULL) topic = spool;
set = kv_set_new();
sp = kv_spoolreader_new(spool);
if (!sp) goto done;
o = json_object();
rd_kafka_t *k;
rd_kafka_topic_t *t;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
int partition = RD_KAFKA_PARTITION_UA;
char *key = NULL;
int keylen = key ? strlen(key) : 0;
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();
k = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (k == NULL) {
fprintf(stderr, "rd_kafka_new: %s\n", errstr);
goto done;
}
if (rd_kafka_brokers_add(k, broker) < 1) {
fprintf(stderr, "invalid broker\n");
goto done;
}
t = rd_kafka_topic_new(k, topic, topic_conf);
while (kv_spool_read(sp,set,1) > 0) { /* read til interrupted by signal */
json_object_clear(o);
kv_t *kv = NULL;
while (kv = kv_next(set,kv)) {
json_t *jval = json_string(kv->val);
json_object_set_new(o, kv->key, jval);
}
if (verbose) json_dumpf(o, stderr, JSON_INDENT(1));
char *json = json_dumps(o, JSON_INDENT(1));
size_t len = strlen(json);
rc = rd_kafka_produce(t, partition, RD_KAFKA_MSG_F_FREE, json, len,
key, keylen, NULL);
if ((rc == -1) && (errno == ENOBUFS)) {
/* check for backpressure. what to do? wait for space.. */
fprintf(stderr,"backpressure\n");
goto done; // FIXME
}
if (rc == -1) {
fprintf(stderr,"rd_kafka_produce: failed\n");
goto done;
}
if ((++ticks % 1000) == 0) rd_kafka_poll(k, 10);
}
rc = 0;
done:
if (sp) kv_spoolreader_free(sp);
kv_set_free(set);
if(o) json_decref(o);
return 0;
}