Compare commits

...

7 Commits

Author SHA1 Message Date
Richard Ramos
d4eccd1259 test 2025-06-27 20:00:08 -04:00
Richard Ramos
7c121323c0 fgdfgdf 2025-06-27 19:22:38 -04:00
Richard Ramos
7e04051b0a test 2025-06-27 19:01:33 -04:00
Richard Ramos
b80e950456 test 2025-06-27 18:51:02 -04:00
Richard Ramos
3fd08d169c test 2025-06-27 18:01:28 -04:00
Richard Ramos
f73c293992 test 2025-06-27 17:49:51 -04:00
Richard Ramos
b71285f0ae fix: bump quic and remove uneeded write 2025-06-27 16:28:50 -04:00
5 changed files with 30 additions and 11 deletions

View File

@@ -10,7 +10,8 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
requires "nim >= 1.6.0",
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
"chronicles >= 0.10.3 & < 0.11.0", "chronos >= 4.0.4", "metrics", "secp256k1",
"stew >= 0.4.0", "websock >= 0.2.0", "unittest2", "results", "quic >= 0.2.7", "bio",
"stew >= 0.4.0", "websock >= 0.2.0", "unittest2", "results", "bio",
"https://github.com/vacp2p/nim-quic.git#62f6ca38b6363a47e1ba43643e25cca7398bf605",
"https://github.com/vacp2p/nim-jwt.git#18f8378de52b241f321c1f9ea905456e89b95c6f"
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use

View File

@@ -342,7 +342,7 @@ proc getOutgoingSlot*(
if forceDial:
c.outSema.forceAcquire()
elif not c.outSema.tryAcquire():
trace "Too many outgoing connections!",
debug "Too many outgoing connections!",
available = c.outSema.count, max = c.outSema.size
raise newTooManyConnectionsError()
return ConnectionSlot(connManager: c, direction: Out)

View File

@@ -369,10 +369,15 @@ method getOrCreatePeer*(
async: (raises: [CancelledError, GetConnDialError])
.} =
try:
return await p.switch.dial(peerId, protosToDial)
echo "DIALING PEER!!!!!!!!!!!!!!!", peerId
let x = await p.switch.dial(peerId, protosToDial)
echo "DIALED PEER!", peerId
return x
except CancelledError as exc:
debug "CANCLLED DIAL PEER", peerId
raise exc
except DialFailedError as e:
debug "DIAL FAILED", peerId, err=e.msg
raise (ref GetConnDialError)(parent: e)
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =

View File

@@ -245,6 +245,7 @@ proc closeSendConn(
await p.sendConn.close()
p.sendConn = nil
debug "CLOSE SEND CONN", fin=p.connectedFut.finished
if not p.connectedFut.finished:
p.connectedFut.complete()
@@ -263,16 +264,19 @@ proc connectOnce(
p.connectedFut = newFuture[void]()
let newConn =
try:
await p.getConn().wait(5.seconds)
debug "TRYING TO GET CONN"
let x = await p.getConn().wait(5.seconds)
debug "GOT THE CONN!!!"
x
except AsyncTimeoutError as error:
trace "getConn timed out", description = error.msg
debug "getConn timed out", description = error.msg
raise (ref LPError)(msg: "Cannot establish send connection: " & error.msg)
# When the send channel goes up, subscriptions need to be sent to the
# remote peer - if we had multiple channels up and one goes down, all
# stop working so we make an effort to only keep a single channel alive
trace "Get new send connection", p, newConn
debug "Get new send connection", p, newConn
# Careful to race conditions here.
# Topic subscription relies on either connectedFut
@@ -300,6 +304,7 @@ proc connectImpl(p: PubSubPeer) {.async: (raises: []).} =
while true:
if p.disconnected:
if not p.connectedFut.finished:
debug "CONNECT COMPLETE 2"
p.connectedFut.complete()
return
await connectOnce(p)
@@ -311,7 +316,9 @@ proc connectImpl(p: PubSubPeer) {.async: (raises: []).} =
debug "Could not establish send connection", description = exc.msg
proc connect*(p: PubSubPeer) =
debug "CONNECT..."
if p.connected:
echo "Already connected"
return
asyncSpawn connectImpl(p)
@@ -362,11 +369,15 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledErro
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
debug "await connected fut"
discard await race(p.connectedFut)
var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", p, payload = shortLog(msg)
if conn == nil:
debug "No send connection - nil", p, payload = shortLog(msg)
else:
debug "No send connection - closed", p, payload = shortLog(msg)
return
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)

View File

@@ -45,9 +45,11 @@ proc new(
template mapExceptions(body: untyped) =
try:
body
except QuicError:
except QuicError as ex:
debug "QUIC ERROR", err=ex.msg
raise newLPStreamEOFError()
except CatchableError:
except CatchableError as ex:
debug "QUIC ERROR2", err=ex.msg
raise newLPStreamEOFError()
method readOnce*(
@@ -61,6 +63,7 @@ method readOnce*(
stream.cached = stream.cached[result ..^ 1]
libp2p_network_bytes.inc(result.int64, labelValues = ["in"])
except CatchableError as exc:
debug "QUIC ERROR", err=exc.msg
raise newLPStreamEOFError()
{.push warning[LockLevel]: off.}
@@ -97,7 +100,6 @@ proc getStream*(
stream = await session.connection.incomingStream()
of Direction.Out:
stream = await session.connection.openStream()
await stream.write(@[]) # QUIC streams do not exist until data is sent
return QuicStream.new(stream, session.observedAddr, session.peerId)
except CatchableError as exc:
# TODO: incomingStream is using {.async.} with no raises
@@ -113,7 +115,7 @@ type QuicMuxer = ref object of Muxer
method newStream*(
m: QuicMuxer, name: string = "", lazy: bool = false
): Future[P2PConnection] {.
): Future[connection.Connection] {.
async: (raises: [CancelledError, LPStreamError, MuxerError])
.} =
try: