Compare commits

...

1 Commits

Author SHA1 Message Date
Jacek Sieka
a9b5f504e9 debug logging 2024-03-02 10:19:16 +01:00
5 changed files with 81 additions and 61 deletions

View File

@@ -74,7 +74,7 @@ func shortLog*(s: LPChannel): auto =
chronicles.formatIt(LPChannel): shortLog(it)
proc open*(s: LPChannel) {.async.} =
trace "Opening channel", s, conn = s.conn
debug "Opening channel", s, conn = s.conn
if s.conn.isClosed:
return
try:
@@ -95,44 +95,44 @@ proc closeUnderlying(s: LPChannel): Future[void] {.async.} =
if s.closedLocal and s.atEof():
await procCall BufferStream(s).close()
proc reset*(s: LPChannel) {.async.} =
proc reset*(s: LPChannel) {.deprecated, async.} =
if s.isClosed:
trace "Already closed", s
debug "Already closed", s
return
s.isClosed = true
s.closedLocal = true
s.localReset = not s.remoteReset
trace "Resetting channel", s, len = s.len
debug "Resetting channel", s, len = s.len
if s.isOpen and not s.conn.isClosed:
# If the connection is still active, notify the other end
proc resetMessage() {.async.} =
try:
trace "sending reset message", s, conn = s.conn
debug "sending reset message", s, conn = s.conn
await s.conn.writeMsg(s.id, s.resetCode) # write reset
except CatchableError as exc:
# No cancellations
await s.conn.close()
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
debug "Can't send reset message", s, conn = s.conn, msg = exc.msg
asyncSpawn resetMessage()
await s.closeImpl() # noraises, nocancels
trace "Channel reset", s
debug "Channel reset", s
method close*(s: LPChannel) {.async.} =
## Close channel for writing - a message will be sent to the other peer
## informing them that the channel is closed and that we're waiting for
## their acknowledgement.
if s.closedLocal:
trace "Already closed", s
debug "Already closed", s
return
s.closedLocal = true
trace "Closing channel", s, conn = s.conn, len = s.len
debug "Closing channel", s, conn = s.conn, len = s.len
if s.isOpen and not s.conn.isClosed:
try:
@@ -144,18 +144,18 @@ method close*(s: LPChannel) {.async.} =
# It's harmless that close message cannot be sent - the connection is
# likely down already
await s.conn.close()
trace "Cannot send close message", s, id = s.id, msg = exc.msg
debug "Cannot send close message", s, id = s.id, msg = exc.msg
await s.closeUnderlying() # maybe already eofed
trace "Closed channel", s, len = s.len
debug "Closed channel", s, len = s.len
method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = LPChannelTrackerName
s.timeoutHandler = proc(): Future[void] {.gcsafe.} =
trace "Idle timeout expired, resetting LPChannel", s
debug "Idle timeout expired, resetting LPChannel", s
s.reset()
procCall BufferStream(s).initStream()
@@ -182,7 +182,7 @@ method readOnce*(s: LPChannel,
if s.protocol.len > 0:
libp2p_protocols_bytes.inc(bytes.int64, labelValues=[s.protocol, "in"])
trace "readOnce", s, bytes
debug "readOnce", s, bytes
if bytes == 0:
await s.closeUnderlying()
return bytes
@@ -217,9 +217,13 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
return
if not s.isOpen:
debug "Opening channel for writing", s
await s.open()
debug "Opened channel", s
debug "Writing msg (prep)", s, msg = msg.len
await s.conn.writeMsg(s.id, s.msgCode, msg)
debug "Wrote msg (prep)", s, msg = msg.len
proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
@@ -231,7 +235,9 @@ proc completeWrite(
libp2p_mplex_qtime.time:
await fut
else:
debug "Waiting for complete", s, msg = msgLen
await fut
debug "Completed", s, msg = msgLen
when defined(libp2p_network_protocols_metrics):
if s.protocol.len > 0:
@@ -248,7 +254,7 @@ proc completeWrite(
except LPStreamEOFError as exc:
raise exc
except CatchableError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg
debug "exception in lpchannel write handler", s, msg = exc.msg
await s.reset()
await s.conn.close()
raise newLPStreamConnDownError(exc)
@@ -267,6 +273,7 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] =
# Fast path: Avoid a copy of msg being kept in the closure created by
# `{.async.}` as this drives up memory usage - the conditions are laid out
# in prepareWrite
debug "Writing fast path", s, msg = msg.len
s.conn.writeMsg(s.id, s.msgCode, msg)
else:
prepareWrite(s, msg)
@@ -300,6 +307,6 @@ proc init*(
when chronicles.enabledLogLevel == LogLevel.TRACE:
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
trace "Created new lpchannel", s = chann, id, initiator
debug "Created new lpchannel", s = chann, id, initiator
return chann

View File

@@ -54,15 +54,15 @@ proc newTooManyChannels(): ref TooManyChannels =
newException(TooManyChannels, "max allowed channel count exceeded")
proc newInvalidChannelIdError(): ref InvalidChannelIdError =
newException(InvalidChannelIdError, "max allowed channel count exceeded")
newException(InvalidChannelIdError, "Channel id already taken")
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
proc cleanupChann(m: Mplex, chann: LPChannel) {.async.} =
## remove the local channel from the internal tables
##
try:
await chann.join()
m.channels[chann.initiator].del(chann.id)
trace "cleaned up channel", m, chann
debug "cleaned up channel", m, chann
when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set(
@@ -99,7 +99,7 @@ proc newStreamInternal*(m: Mplex,
when defined(libp2p_agents_metrics):
result.shortAgent = m.connection.shortAgent
trace "Creating new channel", m, channel = result, id, initiator, name
debug "Creating new channel", m, channel = result, id, initiator, name
m.channels[initiator][id] = result
@@ -116,17 +116,17 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
##
try:
await m.streamHandler(chann)
trace "finished handling stream", m, chann
debug "finished handling stream", m, chann
doAssert(chann.closed, "connection not closed by handler!")
except CatchableError as exc:
trace "Exception in mplex stream handler", m, chann, msg = exc.msg
debug "Exception in mplex stream handler", m, chann, msg = exc.msg
await chann.reset()
method handle*(m: Mplex) {.async.} =
trace "Starting mplex handler", m
debug "Starting mplex handler", m
try:
while not m.connection.atEof:
trace "waiting for data", m
debug "waiting for data", m
let
(id, msgType, data) = await m.connection.readMsg()
initiator = bool(ord(msgType) and 1)
@@ -137,13 +137,13 @@ method handle*(m: Mplex) {.async.} =
msgType = msgType
size = data.len
trace "read message from connection", m, data = data.shortLog
debug "read message from connection", m, data = data.shortLog
var channel =
if MessageType(msgType) != MessageType.New:
let tmp = m.channels[initiator].getOrDefault(id, nil)
if tmp == nil:
trace "Channel not found, skipping", m
debug "Channel not found, skipping", m
continue
tmp
@@ -156,11 +156,11 @@ method handle*(m: Mplex) {.async.} =
let name = string.fromBytes(data)
m.newStreamInternal(false, id, name, timeout = m.outChannTimeout)
trace "Processing channel message", m, channel, data = data.shortLog
debug "Processing channel message", m, channel, data = data.shortLog
case msgType:
of MessageType.New:
trace "created channel", m, channel
debug "created channel", m, channel
if not isNil(m.streamHandler):
# Launch handler task
@@ -173,13 +173,13 @@ method handle*(m: Mplex) {.async.} =
allowed = MaxMsgSize, channel
raise newLPStreamLimitError()
trace "pushing data to channel", m, channel, len = data.len
debug "pushing data to channel", m, channel, len = data.len
try:
await channel.pushData(data)
trace "pushed data to channel", m, channel, len = data.len
debug "pushed data to channel", m, channel, len = data.len
except LPStreamClosedError as exc:
# Channel is being closed, but `cleanupChann` was not yet triggered.
trace "pushing data to channel failed", m, channel, len = data.len,
debug "pushing data to channel failed", m, channel, len = data.len,
msg = exc.msg
discard # Ignore message, same as if `cleanupChann` had completed.
@@ -191,12 +191,12 @@ method handle*(m: Mplex) {.async.} =
except CancelledError:
debug "Unexpected cancellation in mplex handler", m
except LPStreamEOFError as exc:
trace "Stream EOF", m, msg = exc.msg
debug "Stream EOF", m, msg = exc.msg
except CatchableError as exc:
debug "Unexpected exception in mplex read loop", m, msg = exc.msg
finally:
await m.close()
trace "Stopped mplex handler", m
debug "Stopped mplex handler", m
proc new*(M: type Mplex,
conn: Connection,
@@ -221,11 +221,11 @@ method newStream*(m: Mplex,
method close*(m: Mplex) {.async.} =
if m.isClosed:
trace "Already closed", m
debug "Already closed", m
return
m.isClosed = true
trace "Closing mplex", m
debug "Closing mplex", m
var channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values)
@@ -245,7 +245,7 @@ method close*(m: Mplex) {.async.} =
m.channels[false].clear()
m.channels[true].clear()
trace "Closed mplex", m
debug "Closed mplex", m
method getStreams*(m: Mplex): seq[Connection] =
for c in m.channels[false].values: result.add(c)

View File

@@ -153,10 +153,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
try:
try:
while not conn.atEof:
trace "waiting for data", conn, peer = p, closed = conn.closed
debug "waiting for data", conn, peer = p, closed = conn.closed
var data = await conn.readLp(p.maxMessageSize)
trace "read data from peer",
debug "read data from peer",
conn, peer = p, closed = conn.closed,
data = data.shortLog
@@ -172,9 +172,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
trace "Unexpected cancellation in PubSubPeer.handle"
debug "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle",
debug "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
finally:
debug "exiting pubsub read loop",
@@ -192,7 +192,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
# remote peer - if we had multiple channels up and one goes down, all
# stop working so we make an effort to only keep a single channel alive
trace "Get new send connection", p, newConn
debug "Get new send connection", p, newConn
# Careful to race conditions here.
# Topic subscription relies on either connectedFut
@@ -207,7 +207,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
await handle(p, newConn)
finally:
if p.sendConn != nil:
trace "Removing send connection", p, conn = p.sendConn
debug "Removing send connection", p, conn = p.sendConn
await p.sendConn.close()
p.sendConn = nil
@@ -254,7 +254,19 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
let f = p.rpcmessagequeue.sendPriorityQueue.popFirst()
debug "Finished", p, f = repr(cast[pointer](f))
if p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[^1].finished:
for f in p.rpcmessagequeue.sendPriorityQueue:
if f.failed():
debug "Broken failed", p, f = repr(cast[pointer](f)), err= f.error().msg
elif f.completed:
debug "Broken completed", p, f = repr(cast[pointer](f))
else:
debug "Broken pending", p, f = repr(cast[pointer](f))
quit 1
proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
@@ -267,15 +279,15 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
debug "No send connection", p, msg = shortLog(msg)
return
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
debug "sending encoded msgs to peer", conn, encoded = shortLog(msg)
try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
debug "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
debug "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled
@@ -304,6 +316,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.}
p.clearSendPriorityQueue()
let f = p.sendMsg(msg)
if not f.finished:
debug "Unfinished", p, msg = msg.len, f = repr(cast[pointer](f))
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
@@ -311,7 +324,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.}
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
debug "message queued", p, msg = shortLog(msg)
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
@@ -330,10 +343,10 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
# Check if adding the next message will exceed maxSize
if float(currentSize + msgSize) * 1.1 > float(maxSize): # Guessing 10% protobuf overhead
if currentRPCMsg.messages.len == 0:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
debug "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
continue # Skip this message
trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
debug "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
yield encodeRpcMsg(currentRPCMsg, anonymize)
currentRPCMsg = RPCMsg()
currentSize = 0
@@ -343,10 +356,10 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
# Check if there is a non-empty currentRPCMsg left to be added
if currentSize > 0 and currentRPCMsg.messages.len > 0:
trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
debug "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
yield encodeRpcMsg(currentRPCMsg, anonymize)
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
debug "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} =
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
@@ -380,7 +393,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.
await p.sendEncoded(encodedSplitMsg, isHighPriority)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
debug "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
await p.sendEncoded(encoded, isHighPriority)
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =

View File

@@ -74,7 +74,7 @@ method closeImpl*(s: Connection): Future[void] =
trace "Closing connection", s
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut.cancel()
s.timerTaskFut.cancelSoon()
s.timerTaskFut = nil
trace "Closed connection", s

View File

@@ -158,7 +158,7 @@ method initStream*(s: LPStream) {.base.} =
libp2p_open_streams.inc(labelValues = [s.objName, $s.dir])
inc getStreamTracker(s.objName).opened
trace "Stream created", s, objName = s.objName, dir = $s.dir
debug "Stream created", s, objName = s.objName, dir = $s.dir
proc join*(s: LPStream): Future[void] {.public.} =
## Wait for the stream to be closed
@@ -206,7 +206,7 @@ proc readExactly*(s: LPStream,
if read == 0:
doAssert s.atEof()
trace "couldn't read all bytes, stream EOF", s, nbytes, read
debug "couldn't read all bytes, stream EOF", s, nbytes, read
# Re-readOnce to raise a more specific error than EOF
# Raise EOF if it doesn't raise anything(shouldn't happen)
discard await s.readOnce(addr pbuffer[read], nbytes - read)
@@ -214,7 +214,7 @@ proc readExactly*(s: LPStream,
raise newLPStreamEOFError()
if read < nbytes:
trace "couldn't read all bytes, incomplete data", s, nbytes, read
debug "couldn't read all bytes, incomplete data", s, nbytes, read
raise newLPStreamIncompleteError()
proc readLine*(s: LPStream,
@@ -300,17 +300,17 @@ proc write*(s: LPStream, msg: string): Future[void] {.public.} =
method closeImpl*(s: LPStream): Future[void] {.async, base.} =
## Implementation of close - called only once
trace "Closing stream", s, objName = s.objName, dir = $s.dir
debug "Closing stream", s, objName = s.objName, dir = $s.dir
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
inc getStreamTracker(s.objName).closed
s.closeEvent.fire()
trace "Closed stream", s, objName = s.objName, dir = $s.dir
debug "Closed stream", s, objName = s.objName, dir = $s.dir
method close*(s: LPStream): Future[void] {.base, async, public.} = # {.raises [Defect].}
## close the stream - this may block, but will not raise exceptions
##
if s.isClosed:
trace "Already closed", s
debug "Already closed", s
return
s.isClosed = true # Set flag before performing virtual close
@@ -332,9 +332,9 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
## ongoing (which may be the case during cancellations)!
##
trace "Closing with EOF", s
debug "Closing with EOF", s
if s.closedWithEOF:
trace "Already closed"
debug "Already closed"
return
# prevent any further calls to avoid triggering
@@ -350,7 +350,7 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
if (await readOnce(s, addr buf[0], buf.len)) != 0:
debug "Unexpected bytes while waiting for EOF", s
except LPStreamEOFError:
trace "Expected EOF came", s
debug "Expected EOF came", s
except CancelledError as exc:
raise exc
except CatchableError as exc: