Files
nim-libp2p/tests/testzeroqueue.nim

116 lines
3.2 KiB
Nim

{.used.}
# Nim-Libp2p
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import ../libp2p/utils/zeroqueue
proc toSeq(p: pointer, length: int): seq[byte] =
let b = cast[ptr UncheckedArray[byte]](p)
var res = newSeq[byte](length)
copyMem(res[0].addr, p, length)
return res
suite "ZeroQueue":
test "push-pop":
var q: ZeroQueue
check q.len() == 0
check q.isEmpty()
check q.popChunkSeq(1).len == 0 # pop empty seq when queue is empty
q.push(@[1'u8, 2, 3])
q.push(@[4'u8, 5])
check q.len() == 5
check not q.isEmpty()
check q.popChunkSeq(3) == @[1'u8, 2, 3] # pop eactly the size of the chunk
check q.popChunkSeq(1) == @[4'u8] # pop less then size of the chunk
check q.popChunkSeq(5) == @[5'u8] # pop more then size of the chunk
check q.isEmpty()
# should not push empty seq
q.push(@[])
q.push(@[])
check q.isEmpty()
test "clear":
var q: ZeroQueue
q.push(@[1'u8, 2, 3])
check not q.isEmpty()
q.clear()
check q.isEmpty()
check q.len() == 0
test "consumeTo":
var q: ZeroQueue
let nbytes = 20
var pbytes = alloc(nbytes)
defer:
dealloc(pbytes)
# consumeTo: on empty queue
check q.consumeTo(pbytes, nbytes) == 0
# consumeTo: emptying whole queue (multiple pushes)
q.push(@[1'u8, 2, 3])
q.push(@[4'u8, 5])
q.push(@[6'u8, 7])
check q.consumeTo(pbytes, nbytes) == 7
check toSeq(pbytes, 7) == @[1'u8, 2, 3, 4, 5, 6, 7]
check q.isEmpty()
# consumeTo: consuming one chunk of data in two steps
q.push(@[1'u8, 2, 3])
# first consume
check q.consumeTo(pbytes, 1) == 1
check toSeq(pbytes, 1) == @[1'u8]
check q.len() == 2
# second consime
check q.consumeTo(pbytes, nbytes) == 2
check toSeq(pbytes, 2) == @[2'u8, 3]
check q.isEmpty()
# consumeTo: consuming multiple chunks of data in two steps
q.clear()
q.push(@[4'u8, 5])
q.push(@[1'u8, 2, 3])
# first consume
check q.consumeTo(pbytes, 3) == 3
check toSeq(pbytes, 3) == @[4'u8, 5, 1]
check q.len() == 2
# second consume
check q.consumeTo(pbytes, nbytes) == 2
check toSeq(pbytes, 2) == @[2'u8, 3]
check q.isEmpty()
# consumeTo: parially consume big push multiple times
q.clear()
q.push(newSeq[byte](20))
for i in 1 .. 10:
check q.consumeTo(pbytes, 2) == 2
check q.isEmpty()
check q.consumeTo(pbytes, 2) == 0
# consumeTo: parially consuming while pushing
q.push(@[1'u8, 2, 3])
check q.consumeTo(pbytes, 2) == 2
check toSeq(pbytes, 2) == @[1'u8, 2]
q.push(@[1'u8, 2, 3])
check q.consumeTo(pbytes, 2) == 2
check toSeq(pbytes, 2) == @[3'u8, 1]
q.push(@[1'u8, 2, 3])
check q.consumeTo(pbytes, 2) == 2
check toSeq(pbytes, 2) == @[2'u8, 3]
check q.consumeTo(pbytes, 2) == 2
check toSeq(pbytes, 2) == @[1'u8, 2]
check q.consumeTo(pbytes, 2) == 1
check toSeq(pbytes, 1) == @[3'u8]
check q.isEmpty()