mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-09 22:28:27 -05:00
feat(perf): add stats (#1452)
This commit is contained in:
@@ -16,35 +16,68 @@ import ./core, ../../stream/connection
|
||||
logScope:
|
||||
topics = "libp2p perf"
|
||||
|
||||
type PerfClient* = ref object of RootObj
|
||||
type Stats* = object
|
||||
isFinal*: bool
|
||||
uploadBytes*: uint
|
||||
downloadBytes*: uint
|
||||
duration*: Duration
|
||||
|
||||
type PerfClient* = ref object
|
||||
stats: Stats
|
||||
|
||||
proc new*(T: typedesc[PerfClient]): T =
|
||||
return T()
|
||||
|
||||
proc currentStats*(p: PerfClient): Stats =
|
||||
return p.stats
|
||||
|
||||
proc perf*(
|
||||
_: typedesc[PerfClient],
|
||||
conn: Connection,
|
||||
sizeToWrite: uint64 = 0,
|
||||
sizeToRead: uint64 = 0,
|
||||
p: PerfClient, conn: Connection, sizeToWrite: uint64 = 0, sizeToRead: uint64 = 0
|
||||
): Future[Duration] {.public, async: (raises: [CancelledError, LPStreamError]).} =
|
||||
var
|
||||
size = sizeToWrite
|
||||
buf: array[PerfSize, byte]
|
||||
let start = Moment.now()
|
||||
trace "starting performance benchmark", conn, sizeToWrite, sizeToRead
|
||||
|
||||
await conn.write(toSeq(toBytesBE(sizeToRead)))
|
||||
while size > 0:
|
||||
let toWrite = min(size, PerfSize)
|
||||
await conn.write(buf[0 ..< toWrite])
|
||||
size -= toWrite
|
||||
p.stats = Stats()
|
||||
|
||||
await conn.close()
|
||||
try:
|
||||
var
|
||||
size = sizeToWrite
|
||||
buf: array[PerfSize, byte]
|
||||
|
||||
size = sizeToRead
|
||||
let start = Moment.now()
|
||||
|
||||
while size > 0:
|
||||
let toRead = min(size, PerfSize)
|
||||
await conn.readExactly(addr buf[0], toRead.int)
|
||||
size = size - toRead
|
||||
await conn.write(toSeq(toBytesBE(sizeToRead)))
|
||||
while size > 0:
|
||||
let toWrite = min(size, PerfSize)
|
||||
await conn.write(buf[0 ..< toWrite])
|
||||
size -= toWrite.uint
|
||||
|
||||
let duration = Moment.now() - start
|
||||
trace "finishing performance benchmark", duration
|
||||
return duration
|
||||
# set stats using copy value to avoid race condition
|
||||
var statsCopy = p.stats
|
||||
statsCopy.duration = Moment.now() - start
|
||||
statsCopy.uploadBytes += toWrite.uint
|
||||
p.stats = statsCopy
|
||||
|
||||
await conn.close()
|
||||
|
||||
size = sizeToRead
|
||||
|
||||
while size > 0:
|
||||
let toRead = min(size, PerfSize)
|
||||
await conn.readExactly(addr buf[0], toRead.int)
|
||||
size = size - toRead.uint
|
||||
|
||||
# set stats using copy value to avoid race condition
|
||||
var statsCopy = p.stats
|
||||
statsCopy.duration = Moment.now() - start
|
||||
statsCopy.downloadBytes += toRead.uint
|
||||
p.stats = statsCopy
|
||||
except CancelledError as e:
|
||||
raise e
|
||||
except LPStreamError as e:
|
||||
raise e
|
||||
finally:
|
||||
p.stats.isFinal = true
|
||||
|
||||
trace "finishing performance benchmark", duration = p.stats.duration
|
||||
|
||||
return p.stats.duration
|
||||
|
||||
@@ -32,6 +32,6 @@ import
|
||||
testobservedaddrmanager, testconnmngr, testswitch, testnoise, testpeerinfo,
|
||||
testpeerstore, testping, testmplex, testrelayv1, testrelayv2, testrendezvous,
|
||||
testdiscovery, testyamux, testautonat, testautonatservice, testautorelay, testdcutr,
|
||||
testhpservice, testutility, testhelpers, testwildcardresolverservice
|
||||
testhpservice, testutility, testhelpers, testwildcardresolverservice, testperf
|
||||
|
||||
import kademlia/testencoding
|
||||
|
||||
112
tests/testperf.nim
Normal file
112
tests/testperf.nim
Normal file
@@ -0,0 +1,112 @@
|
||||
{.used.}
|
||||
|
||||
# Nim-Libp2p
|
||||
# Copyright (c) 2025 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import chronos
|
||||
import ../libp2p
|
||||
import
|
||||
../libp2p/[switch, protocols/perf/client, protocols/perf/server, protocols/perf/core]
|
||||
import ./helpers
|
||||
|
||||
proc createSwitch(
|
||||
isServer: bool = false, useMplex: bool = false, useYamux: bool = false
|
||||
): Switch =
|
||||
var builder = SwitchBuilder
|
||||
.new()
|
||||
.withRng(newRng())
|
||||
.withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()])
|
||||
.withTcpTransport()
|
||||
.withNoise()
|
||||
if useMplex:
|
||||
builder = builder.withMplex()
|
||||
if useYamux:
|
||||
builder = builder.withYamux()
|
||||
|
||||
var switch = builder.build()
|
||||
|
||||
if isServer:
|
||||
switch.mount(Perf.new())
|
||||
|
||||
return switch
|
||||
|
||||
proc runTest(server: Switch, client: Switch) {.async.} =
|
||||
const
|
||||
bytesToUpload = 100000
|
||||
bytesToDownload = 10000000
|
||||
|
||||
await server.start()
|
||||
await client.start()
|
||||
|
||||
defer:
|
||||
await client.stop()
|
||||
await server.stop()
|
||||
|
||||
let conn = await client.dial(server.peerInfo.peerId, server.peerInfo.addrs, PerfCodec)
|
||||
var perfClient = PerfClient.new()
|
||||
discard await perfClient.perf(conn, bytesToUpload, bytesToDownload)
|
||||
|
||||
let stats = perfClient.currentStats()
|
||||
check:
|
||||
stats.isFinal == true
|
||||
stats.uploadBytes == bytesToUpload
|
||||
stats.downloadBytes == bytesToDownload
|
||||
|
||||
suite "Perf protocol":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
|
||||
asyncTest "tcp::yamux":
|
||||
return # nim-libp2p#1462 test fails with stream closed error
|
||||
let server = createSwitch(isServer = true, useYamux = true)
|
||||
let client = createSwitch(useYamux = true)
|
||||
await runTest(server, client)
|
||||
|
||||
asyncTest "tcp::mplex":
|
||||
let server = createSwitch(isServer = true, useMplex = true)
|
||||
let client = createSwitch(useMplex = true)
|
||||
await runTest(server, client)
|
||||
|
||||
asyncTest "perf with exception":
|
||||
let server = createSwitch(isServer = true, useMplex = true)
|
||||
let client = createSwitch(useMplex = true)
|
||||
|
||||
await server.start()
|
||||
await client.start()
|
||||
|
||||
defer:
|
||||
await client.stop()
|
||||
await server.stop()
|
||||
|
||||
let conn =
|
||||
await client.dial(server.peerInfo.peerId, server.peerInfo.addrs, PerfCodec)
|
||||
var perfClient = PerfClient.new()
|
||||
var perfFut: Future[Duration]
|
||||
try:
|
||||
# start perf future with large download request
|
||||
# this will make perf execute for longer so we can cancel it
|
||||
perfFut = perfClient.perf(conn, 1.uint64, 1000000000000.uint64)
|
||||
except CatchableError:
|
||||
discard
|
||||
|
||||
# after some time upload should be finished
|
||||
await sleepAsync(50.milliseconds)
|
||||
var stats = perfClient.currentStats()
|
||||
check:
|
||||
stats.isFinal == false
|
||||
stats.uploadBytes == 1
|
||||
|
||||
perfFut.cancel() # cancelling future will raise exception
|
||||
await sleepAsync(50.milliseconds)
|
||||
|
||||
# after cancelling perf, stats must indicate that it is final one
|
||||
stats = perfClient.currentStats()
|
||||
check:
|
||||
stats.isFinal == true
|
||||
stats.uploadBytes == 1
|
||||
Reference in New Issue
Block a user