mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 11:18:08 -05:00
Compare commits
1 Commits
quic-large
...
1015-log
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9b5f504e9 |
@@ -74,7 +74,7 @@ func shortLog*(s: LPChannel): auto =
|
||||
chronicles.formatIt(LPChannel): shortLog(it)
|
||||
|
||||
proc open*(s: LPChannel) {.async.} =
|
||||
trace "Opening channel", s, conn = s.conn
|
||||
debug "Opening channel", s, conn = s.conn
|
||||
if s.conn.isClosed:
|
||||
return
|
||||
try:
|
||||
@@ -95,44 +95,44 @@ proc closeUnderlying(s: LPChannel): Future[void] {.async.} =
|
||||
if s.closedLocal and s.atEof():
|
||||
await procCall BufferStream(s).close()
|
||||
|
||||
proc reset*(s: LPChannel) {.async.} =
|
||||
proc reset*(s: LPChannel) {.deprecated, async.} =
|
||||
if s.isClosed:
|
||||
trace "Already closed", s
|
||||
debug "Already closed", s
|
||||
return
|
||||
|
||||
s.isClosed = true
|
||||
s.closedLocal = true
|
||||
s.localReset = not s.remoteReset
|
||||
|
||||
trace "Resetting channel", s, len = s.len
|
||||
debug "Resetting channel", s, len = s.len
|
||||
|
||||
if s.isOpen and not s.conn.isClosed:
|
||||
# If the connection is still active, notify the other end
|
||||
proc resetMessage() {.async.} =
|
||||
try:
|
||||
trace "sending reset message", s, conn = s.conn
|
||||
debug "sending reset message", s, conn = s.conn
|
||||
await s.conn.writeMsg(s.id, s.resetCode) # write reset
|
||||
except CatchableError as exc:
|
||||
# No cancellations
|
||||
await s.conn.close()
|
||||
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
|
||||
debug "Can't send reset message", s, conn = s.conn, msg = exc.msg
|
||||
|
||||
asyncSpawn resetMessage()
|
||||
|
||||
await s.closeImpl() # noraises, nocancels
|
||||
|
||||
trace "Channel reset", s
|
||||
debug "Channel reset", s
|
||||
|
||||
method close*(s: LPChannel) {.async.} =
|
||||
## Close channel for writing - a message will be sent to the other peer
|
||||
## informing them that the channel is closed and that we're waiting for
|
||||
## their acknowledgement.
|
||||
if s.closedLocal:
|
||||
trace "Already closed", s
|
||||
debug "Already closed", s
|
||||
return
|
||||
s.closedLocal = true
|
||||
|
||||
trace "Closing channel", s, conn = s.conn, len = s.len
|
||||
debug "Closing channel", s, conn = s.conn, len = s.len
|
||||
|
||||
if s.isOpen and not s.conn.isClosed:
|
||||
try:
|
||||
@@ -144,18 +144,18 @@ method close*(s: LPChannel) {.async.} =
|
||||
# It's harmless that close message cannot be sent - the connection is
|
||||
# likely down already
|
||||
await s.conn.close()
|
||||
trace "Cannot send close message", s, id = s.id, msg = exc.msg
|
||||
debug "Cannot send close message", s, id = s.id, msg = exc.msg
|
||||
|
||||
await s.closeUnderlying() # maybe already eofed
|
||||
|
||||
trace "Closed channel", s, len = s.len
|
||||
debug "Closed channel", s, len = s.len
|
||||
|
||||
method initStream*(s: LPChannel) =
|
||||
if s.objName.len == 0:
|
||||
s.objName = LPChannelTrackerName
|
||||
|
||||
s.timeoutHandler = proc(): Future[void] {.gcsafe.} =
|
||||
trace "Idle timeout expired, resetting LPChannel", s
|
||||
debug "Idle timeout expired, resetting LPChannel", s
|
||||
s.reset()
|
||||
|
||||
procCall BufferStream(s).initStream()
|
||||
@@ -182,7 +182,7 @@ method readOnce*(s: LPChannel,
|
||||
if s.protocol.len > 0:
|
||||
libp2p_protocols_bytes.inc(bytes.int64, labelValues=[s.protocol, "in"])
|
||||
|
||||
trace "readOnce", s, bytes
|
||||
debug "readOnce", s, bytes
|
||||
if bytes == 0:
|
||||
await s.closeUnderlying()
|
||||
return bytes
|
||||
@@ -217,9 +217,13 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
|
||||
return
|
||||
|
||||
if not s.isOpen:
|
||||
debug "Opening channel for writing", s
|
||||
await s.open()
|
||||
debug "Opened channel", s
|
||||
|
||||
debug "Writing msg (prep)", s, msg = msg.len
|
||||
await s.conn.writeMsg(s.id, s.msgCode, msg)
|
||||
debug "Wrote msg (prep)", s, msg = msg.len
|
||||
|
||||
proc completeWrite(
|
||||
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
|
||||
@@ -231,7 +235,9 @@ proc completeWrite(
|
||||
libp2p_mplex_qtime.time:
|
||||
await fut
|
||||
else:
|
||||
debug "Waiting for complete", s, msg = msgLen
|
||||
await fut
|
||||
debug "Completed", s, msg = msgLen
|
||||
|
||||
when defined(libp2p_network_protocols_metrics):
|
||||
if s.protocol.len > 0:
|
||||
@@ -248,7 +254,7 @@ proc completeWrite(
|
||||
except LPStreamEOFError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception in lpchannel write handler", s, msg = exc.msg
|
||||
debug "exception in lpchannel write handler", s, msg = exc.msg
|
||||
await s.reset()
|
||||
await s.conn.close()
|
||||
raise newLPStreamConnDownError(exc)
|
||||
@@ -267,6 +273,7 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] =
|
||||
# Fast path: Avoid a copy of msg being kept in the closure created by
|
||||
# `{.async.}` as this drives up memory usage - the conditions are laid out
|
||||
# in prepareWrite
|
||||
debug "Writing fast path", s, msg = msg.len
|
||||
s.conn.writeMsg(s.id, s.msgCode, msg)
|
||||
else:
|
||||
prepareWrite(s, msg)
|
||||
@@ -300,6 +307,6 @@ proc init*(
|
||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
|
||||
|
||||
trace "Created new lpchannel", s = chann, id, initiator
|
||||
debug "Created new lpchannel", s = chann, id, initiator
|
||||
|
||||
return chann
|
||||
|
||||
@@ -54,15 +54,15 @@ proc newTooManyChannels(): ref TooManyChannels =
|
||||
newException(TooManyChannels, "max allowed channel count exceeded")
|
||||
|
||||
proc newInvalidChannelIdError(): ref InvalidChannelIdError =
|
||||
newException(InvalidChannelIdError, "max allowed channel count exceeded")
|
||||
newException(InvalidChannelIdError, "Channel id already taken")
|
||||
|
||||
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
|
||||
proc cleanupChann(m: Mplex, chann: LPChannel) {.async.} =
|
||||
## remove the local channel from the internal tables
|
||||
##
|
||||
try:
|
||||
await chann.join()
|
||||
m.channels[chann.initiator].del(chann.id)
|
||||
trace "cleaned up channel", m, chann
|
||||
debug "cleaned up channel", m, chann
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_mplex_channels.set(
|
||||
@@ -99,7 +99,7 @@ proc newStreamInternal*(m: Mplex,
|
||||
when defined(libp2p_agents_metrics):
|
||||
result.shortAgent = m.connection.shortAgent
|
||||
|
||||
trace "Creating new channel", m, channel = result, id, initiator, name
|
||||
debug "Creating new channel", m, channel = result, id, initiator, name
|
||||
|
||||
m.channels[initiator][id] = result
|
||||
|
||||
@@ -116,17 +116,17 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
|
||||
##
|
||||
try:
|
||||
await m.streamHandler(chann)
|
||||
trace "finished handling stream", m, chann
|
||||
debug "finished handling stream", m, chann
|
||||
doAssert(chann.closed, "connection not closed by handler!")
|
||||
except CatchableError as exc:
|
||||
trace "Exception in mplex stream handler", m, chann, msg = exc.msg
|
||||
debug "Exception in mplex stream handler", m, chann, msg = exc.msg
|
||||
await chann.reset()
|
||||
|
||||
method handle*(m: Mplex) {.async.} =
|
||||
trace "Starting mplex handler", m
|
||||
debug "Starting mplex handler", m
|
||||
try:
|
||||
while not m.connection.atEof:
|
||||
trace "waiting for data", m
|
||||
debug "waiting for data", m
|
||||
let
|
||||
(id, msgType, data) = await m.connection.readMsg()
|
||||
initiator = bool(ord(msgType) and 1)
|
||||
@@ -137,13 +137,13 @@ method handle*(m: Mplex) {.async.} =
|
||||
msgType = msgType
|
||||
size = data.len
|
||||
|
||||
trace "read message from connection", m, data = data.shortLog
|
||||
debug "read message from connection", m, data = data.shortLog
|
||||
|
||||
var channel =
|
||||
if MessageType(msgType) != MessageType.New:
|
||||
let tmp = m.channels[initiator].getOrDefault(id, nil)
|
||||
if tmp == nil:
|
||||
trace "Channel not found, skipping", m
|
||||
debug "Channel not found, skipping", m
|
||||
continue
|
||||
|
||||
tmp
|
||||
@@ -156,11 +156,11 @@ method handle*(m: Mplex) {.async.} =
|
||||
let name = string.fromBytes(data)
|
||||
m.newStreamInternal(false, id, name, timeout = m.outChannTimeout)
|
||||
|
||||
trace "Processing channel message", m, channel, data = data.shortLog
|
||||
debug "Processing channel message", m, channel, data = data.shortLog
|
||||
|
||||
case msgType:
|
||||
of MessageType.New:
|
||||
trace "created channel", m, channel
|
||||
debug "created channel", m, channel
|
||||
|
||||
if not isNil(m.streamHandler):
|
||||
# Launch handler task
|
||||
@@ -173,13 +173,13 @@ method handle*(m: Mplex) {.async.} =
|
||||
allowed = MaxMsgSize, channel
|
||||
raise newLPStreamLimitError()
|
||||
|
||||
trace "pushing data to channel", m, channel, len = data.len
|
||||
debug "pushing data to channel", m, channel, len = data.len
|
||||
try:
|
||||
await channel.pushData(data)
|
||||
trace "pushed data to channel", m, channel, len = data.len
|
||||
debug "pushed data to channel", m, channel, len = data.len
|
||||
except LPStreamClosedError as exc:
|
||||
# Channel is being closed, but `cleanupChann` was not yet triggered.
|
||||
trace "pushing data to channel failed", m, channel, len = data.len,
|
||||
debug "pushing data to channel failed", m, channel, len = data.len,
|
||||
msg = exc.msg
|
||||
discard # Ignore message, same as if `cleanupChann` had completed.
|
||||
|
||||
@@ -191,12 +191,12 @@ method handle*(m: Mplex) {.async.} =
|
||||
except CancelledError:
|
||||
debug "Unexpected cancellation in mplex handler", m
|
||||
except LPStreamEOFError as exc:
|
||||
trace "Stream EOF", m, msg = exc.msg
|
||||
debug "Stream EOF", m, msg = exc.msg
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected exception in mplex read loop", m, msg = exc.msg
|
||||
finally:
|
||||
await m.close()
|
||||
trace "Stopped mplex handler", m
|
||||
debug "Stopped mplex handler", m
|
||||
|
||||
proc new*(M: type Mplex,
|
||||
conn: Connection,
|
||||
@@ -221,11 +221,11 @@ method newStream*(m: Mplex,
|
||||
|
||||
method close*(m: Mplex) {.async.} =
|
||||
if m.isClosed:
|
||||
trace "Already closed", m
|
||||
debug "Already closed", m
|
||||
return
|
||||
m.isClosed = true
|
||||
|
||||
trace "Closing mplex", m
|
||||
debug "Closing mplex", m
|
||||
|
||||
var channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values)
|
||||
|
||||
@@ -245,7 +245,7 @@ method close*(m: Mplex) {.async.} =
|
||||
m.channels[false].clear()
|
||||
m.channels[true].clear()
|
||||
|
||||
trace "Closed mplex", m
|
||||
debug "Closed mplex", m
|
||||
|
||||
method getStreams*(m: Mplex): seq[Connection] =
|
||||
for c in m.channels[false].values: result.add(c)
|
||||
|
||||
@@ -153,10 +153,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
try:
|
||||
try:
|
||||
while not conn.atEof:
|
||||
trace "waiting for data", conn, peer = p, closed = conn.closed
|
||||
debug "waiting for data", conn, peer = p, closed = conn.closed
|
||||
|
||||
var data = await conn.readLp(p.maxMessageSize)
|
||||
trace "read data from peer",
|
||||
debug "read data from peer",
|
||||
conn, peer = p, closed = conn.closed,
|
||||
data = data.shortLog
|
||||
|
||||
@@ -172,9 +172,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
except CancelledError:
|
||||
# This is top-level procedure which will work as separate task, so it
|
||||
# do not need to propagate CancelledError.
|
||||
trace "Unexpected cancellation in PubSubPeer.handle"
|
||||
debug "Unexpected cancellation in PubSubPeer.handle"
|
||||
except CatchableError as exc:
|
||||
trace "Exception occurred in PubSubPeer.handle",
|
||||
debug "Exception occurred in PubSubPeer.handle",
|
||||
conn, peer = p, closed = conn.closed, exc = exc.msg
|
||||
finally:
|
||||
debug "exiting pubsub read loop",
|
||||
@@ -192,7 +192,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
|
||||
# remote peer - if we had multiple channels up and one goes down, all
|
||||
# stop working so we make an effort to only keep a single channel alive
|
||||
|
||||
trace "Get new send connection", p, newConn
|
||||
debug "Get new send connection", p, newConn
|
||||
|
||||
# Careful to race conditions here.
|
||||
# Topic subscription relies on either connectedFut
|
||||
@@ -207,7 +207,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
|
||||
await handle(p, newConn)
|
||||
finally:
|
||||
if p.sendConn != nil:
|
||||
trace "Removing send connection", p, conn = p.sendConn
|
||||
debug "Removing send connection", p, conn = p.sendConn
|
||||
await p.sendConn.close()
|
||||
p.sendConn = nil
|
||||
|
||||
@@ -254,7 +254,19 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
|
||||
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
|
||||
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
|
||||
let f = p.rpcmessagequeue.sendPriorityQueue.popFirst()
|
||||
debug "Finished", p, f = repr(cast[pointer](f))
|
||||
|
||||
if p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[^1].finished:
|
||||
for f in p.rpcmessagequeue.sendPriorityQueue:
|
||||
if f.failed():
|
||||
debug "Broken failed", p, f = repr(cast[pointer](f)), err= f.error().msg
|
||||
elif f.completed:
|
||||
debug "Broken completed", p, f = repr(cast[pointer](f))
|
||||
else:
|
||||
debug "Broken pending", p, f = repr(cast[pointer](f))
|
||||
|
||||
quit 1
|
||||
|
||||
proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
if p.sendConn == nil:
|
||||
@@ -267,15 +279,15 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
|
||||
debug "No send connection", p, msg = shortLog(msg)
|
||||
return
|
||||
|
||||
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
|
||||
debug "sending encoded msgs to peer", conn, encoded = shortLog(msg)
|
||||
|
||||
try:
|
||||
await conn.writeLp(msg)
|
||||
trace "sent pubsub message to remote", conn
|
||||
debug "sent pubsub message to remote", conn
|
||||
except CatchableError as exc: # never cancelled
|
||||
# Because we detach the send call from the currently executing task using
|
||||
# asyncSpawn, no exceptions may leak out of it
|
||||
trace "Unable to send to remote", conn, msg = exc.msg
|
||||
debug "Unable to send to remote", conn, msg = exc.msg
|
||||
# Next time sendConn is used, it will be have its close flag set and thus
|
||||
# will be recycled
|
||||
|
||||
@@ -304,6 +316,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.}
|
||||
p.clearSendPriorityQueue()
|
||||
let f = p.sendMsg(msg)
|
||||
if not f.finished:
|
||||
debug "Unfinished", p, msg = msg.len, f = repr(cast[pointer](f))
|
||||
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
@@ -311,7 +324,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.}
|
||||
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
|
||||
trace "message queued", p, msg = shortLog(msg)
|
||||
debug "message queued", p, msg = shortLog(msg)
|
||||
|
||||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
|
||||
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
|
||||
@@ -330,10 +343,10 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
|
||||
# Check if adding the next message will exceed maxSize
|
||||
if float(currentSize + msgSize) * 1.1 > float(maxSize): # Guessing 10% protobuf overhead
|
||||
if currentRPCMsg.messages.len == 0:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
debug "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
continue # Skip this message
|
||||
|
||||
trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
debug "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
yield encodeRpcMsg(currentRPCMsg, anonymize)
|
||||
currentRPCMsg = RPCMsg()
|
||||
currentSize = 0
|
||||
@@ -343,10 +356,10 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
|
||||
|
||||
# Check if there is a non-empty currentRPCMsg left to be added
|
||||
if currentSize > 0 and currentRPCMsg.messages.len > 0:
|
||||
trace "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
debug "sending msg to peer", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
yield encodeRpcMsg(currentRPCMsg, anonymize)
|
||||
else:
|
||||
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
debug "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)
|
||||
|
||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} =
|
||||
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
|
||||
@@ -380,7 +393,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.
|
||||
await p.sendEncoded(encodedSplitMsg, isHighPriority)
|
||||
else:
|
||||
# If the message size is within limits, send it as is
|
||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
debug "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||
await p.sendEncoded(encoded, isHighPriority)
|
||||
|
||||
proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
|
||||
|
||||
@@ -74,7 +74,7 @@ method closeImpl*(s: Connection): Future[void] =
|
||||
trace "Closing connection", s
|
||||
|
||||
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
|
||||
s.timerTaskFut.cancel()
|
||||
s.timerTaskFut.cancelSoon()
|
||||
s.timerTaskFut = nil
|
||||
|
||||
trace "Closed connection", s
|
||||
|
||||
@@ -158,7 +158,7 @@ method initStream*(s: LPStream) {.base.} =
|
||||
|
||||
libp2p_open_streams.inc(labelValues = [s.objName, $s.dir])
|
||||
inc getStreamTracker(s.objName).opened
|
||||
trace "Stream created", s, objName = s.objName, dir = $s.dir
|
||||
debug "Stream created", s, objName = s.objName, dir = $s.dir
|
||||
|
||||
proc join*(s: LPStream): Future[void] {.public.} =
|
||||
## Wait for the stream to be closed
|
||||
@@ -206,7 +206,7 @@ proc readExactly*(s: LPStream,
|
||||
|
||||
if read == 0:
|
||||
doAssert s.atEof()
|
||||
trace "couldn't read all bytes, stream EOF", s, nbytes, read
|
||||
debug "couldn't read all bytes, stream EOF", s, nbytes, read
|
||||
# Re-readOnce to raise a more specific error than EOF
|
||||
# Raise EOF if it doesn't raise anything(shouldn't happen)
|
||||
discard await s.readOnce(addr pbuffer[read], nbytes - read)
|
||||
@@ -214,7 +214,7 @@ proc readExactly*(s: LPStream,
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
if read < nbytes:
|
||||
trace "couldn't read all bytes, incomplete data", s, nbytes, read
|
||||
debug "couldn't read all bytes, incomplete data", s, nbytes, read
|
||||
raise newLPStreamIncompleteError()
|
||||
|
||||
proc readLine*(s: LPStream,
|
||||
@@ -300,17 +300,17 @@ proc write*(s: LPStream, msg: string): Future[void] {.public.} =
|
||||
|
||||
method closeImpl*(s: LPStream): Future[void] {.async, base.} =
|
||||
## Implementation of close - called only once
|
||||
trace "Closing stream", s, objName = s.objName, dir = $s.dir
|
||||
debug "Closing stream", s, objName = s.objName, dir = $s.dir
|
||||
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
|
||||
inc getStreamTracker(s.objName).closed
|
||||
s.closeEvent.fire()
|
||||
trace "Closed stream", s, objName = s.objName, dir = $s.dir
|
||||
debug "Closed stream", s, objName = s.objName, dir = $s.dir
|
||||
|
||||
method close*(s: LPStream): Future[void] {.base, async, public.} = # {.raises [Defect].}
|
||||
## close the stream - this may block, but will not raise exceptions
|
||||
##
|
||||
if s.isClosed:
|
||||
trace "Already closed", s
|
||||
debug "Already closed", s
|
||||
return
|
||||
|
||||
s.isClosed = true # Set flag before performing virtual close
|
||||
@@ -332,9 +332,9 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
|
||||
## ongoing (which may be the case during cancellations)!
|
||||
##
|
||||
|
||||
trace "Closing with EOF", s
|
||||
debug "Closing with EOF", s
|
||||
if s.closedWithEOF:
|
||||
trace "Already closed"
|
||||
debug "Already closed"
|
||||
return
|
||||
|
||||
# prevent any further calls to avoid triggering
|
||||
@@ -350,7 +350,7 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
|
||||
if (await readOnce(s, addr buf[0], buf.len)) != 0:
|
||||
debug "Unexpected bytes while waiting for EOF", s
|
||||
except LPStreamEOFError:
|
||||
trace "Expected EOF came", s
|
||||
debug "Expected EOF came", s
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
|
||||
Reference in New Issue
Block a user