mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 22:28:27 -05:00
chore(yamux): improve performance with zero allocation queue (#1488)
This commit is contained in:
@@ -12,6 +12,7 @@
|
||||
import sequtils, std/[tables]
|
||||
import chronos, chronicles, metrics, stew/[endians2, byteutils, objects]
|
||||
import ../muxer, ../../stream/connection
|
||||
import ../../utils/zeroqueue
|
||||
|
||||
export muxer
|
||||
|
||||
@@ -151,7 +152,7 @@ type
|
||||
opened: bool
|
||||
isSending: bool
|
||||
sendQueue: seq[ToSend]
|
||||
recvQueue: seq[byte]
|
||||
recvQueue: ZeroQueue
|
||||
isReset: bool
|
||||
remoteReset: bool
|
||||
closedRemotely: AsyncEvent
|
||||
@@ -229,7 +230,6 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).}
|
||||
for (d, s, fut) in channel.sendQueue:
|
||||
fut.fail(newLPStreamEOFError())
|
||||
channel.sendQueue = @[]
|
||||
channel.recvQueue = @[]
|
||||
channel.sendWindow = 0
|
||||
if not channel.closedLocally:
|
||||
if isLocal and not channel.isSending:
|
||||
@@ -257,7 +257,7 @@ proc updateRecvWindow(
|
||||
return
|
||||
|
||||
let delta = channel.maxRecvWindow - inWindow
|
||||
channel.recvWindow.inc(delta)
|
||||
channel.recvWindow.inc(delta.int)
|
||||
await channel.conn.write(YamuxHeader.windowUpdate(channel.id, delta.uint32))
|
||||
trace "increasing the recvWindow", delta
|
||||
|
||||
@@ -279,7 +279,7 @@ method readOnce*(
|
||||
newLPStreamConnDownError()
|
||||
if channel.isEof:
|
||||
raise newLPStreamRemoteClosedError()
|
||||
if channel.recvQueue.len == 0:
|
||||
if channel.recvQueue.isEmpty():
|
||||
channel.receivedData.clear()
|
||||
let
|
||||
closedRemotelyFut = channel.closedRemotely.wait()
|
||||
@@ -290,28 +290,23 @@ method readOnce*(
|
||||
if not receivedDataFut.finished():
|
||||
await receivedDataFut.cancelAndWait()
|
||||
await closedRemotelyFut or receivedDataFut
|
||||
if channel.closedRemotely.isSet() and channel.recvQueue.len == 0:
|
||||
if channel.closedRemotely.isSet() and channel.recvQueue.isEmpty():
|
||||
channel.isEof = true
|
||||
return
|
||||
0 # we return 0 to indicate that the channel is closed for reading from now on
|
||||
|
||||
let toRead = min(channel.recvQueue.len, nbytes)
|
||||
|
||||
var p = cast[ptr UncheckedArray[byte]](pbytes)
|
||||
toOpenArray(p, 0, nbytes - 1)[0 ..< toRead] =
|
||||
channel.recvQueue.toOpenArray(0, toRead - 1)
|
||||
channel.recvQueue = channel.recvQueue[toRead ..^ 1]
|
||||
let consumed = channel.recvQueue.consumeTo(pbytes, nbytes)
|
||||
|
||||
# We made some room in the recv buffer let the peer know
|
||||
await channel.updateRecvWindow()
|
||||
channel.activity = true
|
||||
return toRead
|
||||
return consumed
|
||||
|
||||
proc gotDataFromRemote(
|
||||
channel: YamuxChannel, b: seq[byte]
|
||||
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
||||
channel.recvWindow -= b.len
|
||||
channel.recvQueue = channel.recvQueue.concat(b)
|
||||
channel.recvQueue.push(b)
|
||||
channel.receivedData.fire()
|
||||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_recv_queue.observe(channel.recvQueue.len.int64)
|
||||
|
||||
84
libp2p/utils/zeroqueue.nim
Normal file
84
libp2p/utils/zeroqueue.nim
Normal file
@@ -0,0 +1,84 @@
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2025 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import std/deques
|
||||
|
||||
type Chunk = ref object
|
||||
data: seq[byte]
|
||||
size: int
|
||||
start: int
|
||||
|
||||
template clone(c: Chunk): Chunk =
|
||||
Chunk(data: c.data, size: c.size, start: c.start)
|
||||
|
||||
template newChunk(b: sink seq[byte]): Chunk =
|
||||
Chunk(data: b, size: b.len, start: 0)
|
||||
|
||||
template len(c: Chunk): int =
|
||||
c.size - c.start
|
||||
|
||||
type ZeroQueue* = object
|
||||
# ZeroQueue is queue structure optimized for efficient pushing and popping of
|
||||
# byte sequences `seq[byte]` (called chunks). This type is useful for streaming or buffering
|
||||
# scenarios where chunks of binary data are accumulated and consumed incrementally.
|
||||
chunks: Deque[Chunk]
|
||||
|
||||
proc clear*(q: var ZeroQueue) =
|
||||
q.chunks.clear()
|
||||
|
||||
proc isEmpty*(q: ZeroQueue): bool =
|
||||
return q.chunks.len() == 0
|
||||
|
||||
proc len*(q: ZeroQueue): int64 =
|
||||
var l: int64
|
||||
for b in q.chunks.items():
|
||||
l += b.len()
|
||||
return l
|
||||
|
||||
proc push*(q: var ZeroQueue, b: sink seq[byte]) =
|
||||
if b.len > 0:
|
||||
q.chunks.addLast(newChunk(b))
|
||||
|
||||
proc popChunk(q: var ZeroQueue, count: int): Chunk {.inline.} =
|
||||
var first = q.chunks.popFirst()
|
||||
|
||||
# first chunk has up to requested count elements,
|
||||
# queue will return this chunk (chunk might have less then requested)
|
||||
if first.len() <= count:
|
||||
return first
|
||||
|
||||
# first chunk has more elements then requested count,
|
||||
# queue will return view of first count elements, leaving the rest in the queue
|
||||
var ret = first.clone()
|
||||
ret.size = ret.start + count
|
||||
first.start += count
|
||||
q.chunks.addFirst(first)
|
||||
return ret
|
||||
|
||||
proc consumeTo*(q: var ZeroQueue, pbytes: pointer, nbytes: int): int =
|
||||
var consumed = 0
|
||||
while consumed < nbytes and not q.isEmpty():
|
||||
let chunk = q.popChunk(nbytes - consumed)
|
||||
let dest = cast[pointer](cast[ByteAddress](pbytes) + consumed)
|
||||
let offsetPtr = cast[ptr byte](cast[int](unsafeAddr chunk.data[0]) + chunk.start)
|
||||
copyMem(dest, offsetPtr, chunk.len())
|
||||
consumed += chunk.len()
|
||||
|
||||
return consumed
|
||||
|
||||
proc popChunkSeq*(q: var ZeroQueue, count: int): seq[byte] =
|
||||
if q.isEmpty:
|
||||
return @[]
|
||||
|
||||
let chunk = q.popChunk(count)
|
||||
var dest = newSeq[byte](chunk.len())
|
||||
let offsetPtr = cast[ptr byte](cast[int](unsafeAddr chunk.data[0]) + chunk.start)
|
||||
copyMem(dest[0].addr, offsetPtr, chunk.len())
|
||||
|
||||
return dest
|
||||
@@ -11,7 +11,7 @@
|
||||
|
||||
import
|
||||
testvarint, testconnection, testbridgestream, testminprotobuf, teststreamseq,
|
||||
testsemaphore, testheartbeat, testfuture
|
||||
testsemaphore, testheartbeat, testfuture, testzeroqueue
|
||||
|
||||
import testminasn1, testrsa, testecnist, tested25519, testsecp256k1, testcrypto
|
||||
|
||||
|
||||
115
tests/testzeroqueue.nim
Normal file
115
tests/testzeroqueue.nim
Normal file
@@ -0,0 +1,115 @@
|
||||
{.used.}
|
||||
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2025 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import unittest2
|
||||
import ../libp2p/utils/zeroqueue
|
||||
|
||||
proc toSeq(p: pointer, length: int): seq[byte] =
|
||||
let b = cast[ptr UncheckedArray[byte]](p)
|
||||
var res = newSeq[byte](length)
|
||||
copyMem(res[0].addr, p, length)
|
||||
return res
|
||||
|
||||
suite "ZeroQueue":
|
||||
test "push-pop":
|
||||
var q: ZeroQueue
|
||||
check q.len() == 0
|
||||
check q.isEmpty()
|
||||
check q.popChunkSeq(1).len == 0 # pop empty seq when queue is empty
|
||||
|
||||
q.push(@[1'u8, 2, 3])
|
||||
q.push(@[4'u8, 5])
|
||||
check q.len() == 5
|
||||
check not q.isEmpty()
|
||||
|
||||
check q.popChunkSeq(3) == @[1'u8, 2, 3] # pop eactly the size of the chunk
|
||||
check q.popChunkSeq(1) == @[4'u8] # pop less then size of the chunk
|
||||
check q.popChunkSeq(5) == @[5'u8] # pop more then size of the chunk
|
||||
check q.isEmpty()
|
||||
|
||||
# should not push empty seq
|
||||
q.push(@[])
|
||||
q.push(@[])
|
||||
check q.isEmpty()
|
||||
|
||||
test "clear":
|
||||
var q: ZeroQueue
|
||||
q.push(@[1'u8, 2, 3])
|
||||
check not q.isEmpty()
|
||||
q.clear()
|
||||
check q.isEmpty()
|
||||
check q.len() == 0
|
||||
|
||||
test "consumeTo":
|
||||
var q: ZeroQueue
|
||||
let nbytes = 20
|
||||
var pbytes = alloc(nbytes)
|
||||
defer:
|
||||
dealloc(pbytes)
|
||||
|
||||
# consumeTo: on empty queue
|
||||
check q.consumeTo(pbytes, nbytes) == 0
|
||||
|
||||
# consumeTo: emptying whole queue (multiple pushes)
|
||||
q.push(@[1'u8, 2, 3])
|
||||
q.push(@[4'u8, 5])
|
||||
q.push(@[6'u8, 7])
|
||||
check q.consumeTo(pbytes, nbytes) == 7
|
||||
check toSeq(pbytes, 7) == @[1'u8, 2, 3, 4, 5, 6, 7]
|
||||
check q.isEmpty()
|
||||
|
||||
# consumeTo: consuming one chunk of data in two steps
|
||||
q.push(@[1'u8, 2, 3])
|
||||
# first consume
|
||||
check q.consumeTo(pbytes, 1) == 1
|
||||
check toSeq(pbytes, 1) == @[1'u8]
|
||||
check q.len() == 2
|
||||
# second consime
|
||||
check q.consumeTo(pbytes, nbytes) == 2
|
||||
check toSeq(pbytes, 2) == @[2'u8, 3]
|
||||
check q.isEmpty()
|
||||
|
||||
# consumeTo: consuming multiple chunks of data in two steps
|
||||
q.clear()
|
||||
q.push(@[4'u8, 5])
|
||||
q.push(@[1'u8, 2, 3])
|
||||
# first consume
|
||||
check q.consumeTo(pbytes, 3) == 3
|
||||
check toSeq(pbytes, 3) == @[4'u8, 5, 1]
|
||||
check q.len() == 2
|
||||
# second consume
|
||||
check q.consumeTo(pbytes, nbytes) == 2
|
||||
check toSeq(pbytes, 2) == @[2'u8, 3]
|
||||
check q.isEmpty()
|
||||
|
||||
# consumeTo: parially consume big push multiple times
|
||||
q.clear()
|
||||
q.push(newSeq[byte](20))
|
||||
for i in 1 .. 10:
|
||||
check q.consumeTo(pbytes, 2) == 2
|
||||
check q.isEmpty()
|
||||
check q.consumeTo(pbytes, 2) == 0
|
||||
|
||||
# consumeTo: parially consuming while pushing
|
||||
q.push(@[1'u8, 2, 3])
|
||||
check q.consumeTo(pbytes, 2) == 2
|
||||
check toSeq(pbytes, 2) == @[1'u8, 2]
|
||||
q.push(@[1'u8, 2, 3])
|
||||
check q.consumeTo(pbytes, 2) == 2
|
||||
check toSeq(pbytes, 2) == @[3'u8, 1]
|
||||
q.push(@[1'u8, 2, 3])
|
||||
check q.consumeTo(pbytes, 2) == 2
|
||||
check toSeq(pbytes, 2) == @[2'u8, 3]
|
||||
check q.consumeTo(pbytes, 2) == 2
|
||||
check toSeq(pbytes, 2) == @[1'u8, 2]
|
||||
check q.consumeTo(pbytes, 2) == 1
|
||||
check toSeq(pbytes, 1) == @[3'u8]
|
||||
check q.isEmpty()
|
||||
Reference in New Issue
Block a user