mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:58:05 -05:00
Compare commits
7 Commits
tmp/mix-go
...
fix-quic-a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4eccd1259 | ||
|
|
7c121323c0 | ||
|
|
7e04051b0a | ||
|
|
b80e950456 | ||
|
|
3fd08d169c | ||
|
|
f73c293992 | ||
|
|
b71285f0ae |
@@ -10,7 +10,8 @@ skipDirs = @["tests", "examples", "Nim", "tools", "scripts", "docs"]
|
||||
requires "nim >= 1.6.0",
|
||||
"nimcrypto >= 0.6.0 & < 0.7.0", "dnsclient >= 0.3.0 & < 0.4.0", "bearssl >= 0.2.5",
|
||||
"chronicles >= 0.10.3 & < 0.11.0", "chronos >= 4.0.4", "metrics", "secp256k1",
|
||||
"stew >= 0.4.0", "websock >= 0.2.0", "unittest2", "results", "quic >= 0.2.7", "bio",
|
||||
"stew >= 0.4.0", "websock >= 0.2.0", "unittest2", "results", "bio",
|
||||
"https://github.com/vacp2p/nim-quic.git#62f6ca38b6363a47e1ba43643e25cca7398bf605",
|
||||
"https://github.com/vacp2p/nim-jwt.git#18f8378de52b241f321c1f9ea905456e89b95c6f"
|
||||
|
||||
let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
|
||||
|
||||
@@ -342,7 +342,7 @@ proc getOutgoingSlot*(
|
||||
if forceDial:
|
||||
c.outSema.forceAcquire()
|
||||
elif not c.outSema.tryAcquire():
|
||||
trace "Too many outgoing connections!",
|
||||
debug "Too many outgoing connections!",
|
||||
available = c.outSema.count, max = c.outSema.size
|
||||
raise newTooManyConnectionsError()
|
||||
return ConnectionSlot(connManager: c, direction: Out)
|
||||
|
||||
@@ -369,10 +369,15 @@ method getOrCreatePeer*(
|
||||
async: (raises: [CancelledError, GetConnDialError])
|
||||
.} =
|
||||
try:
|
||||
return await p.switch.dial(peerId, protosToDial)
|
||||
echo "DIALING PEER!!!!!!!!!!!!!!!", peerId
|
||||
let x = await p.switch.dial(peerId, protosToDial)
|
||||
echo "DIALED PEER!", peerId
|
||||
return x
|
||||
except CancelledError as exc:
|
||||
debug "CANCLLED DIAL PEER", peerId
|
||||
raise exc
|
||||
except DialFailedError as e:
|
||||
debug "DIAL FAILED", peerId, err=e.msg
|
||||
raise (ref GetConnDialError)(parent: e)
|
||||
|
||||
proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} =
|
||||
|
||||
@@ -245,6 +245,7 @@ proc closeSendConn(
|
||||
await p.sendConn.close()
|
||||
p.sendConn = nil
|
||||
|
||||
debug "CLOSE SEND CONN", fin=p.connectedFut.finished
|
||||
if not p.connectedFut.finished:
|
||||
p.connectedFut.complete()
|
||||
|
||||
@@ -263,16 +264,19 @@ proc connectOnce(
|
||||
p.connectedFut = newFuture[void]()
|
||||
let newConn =
|
||||
try:
|
||||
await p.getConn().wait(5.seconds)
|
||||
debug "TRYING TO GET CONN"
|
||||
let x = await p.getConn().wait(5.seconds)
|
||||
debug "GOT THE CONN!!!"
|
||||
x
|
||||
except AsyncTimeoutError as error:
|
||||
trace "getConn timed out", description = error.msg
|
||||
debug "getConn timed out", description = error.msg
|
||||
raise (ref LPError)(msg: "Cannot establish send connection: " & error.msg)
|
||||
|
||||
# When the send channel goes up, subscriptions need to be sent to the
|
||||
# remote peer - if we had multiple channels up and one goes down, all
|
||||
# stop working so we make an effort to only keep a single channel alive
|
||||
|
||||
trace "Get new send connection", p, newConn
|
||||
debug "Get new send connection", p, newConn
|
||||
|
||||
# Careful to race conditions here.
|
||||
# Topic subscription relies on either connectedFut
|
||||
@@ -300,6 +304,7 @@ proc connectImpl(p: PubSubPeer) {.async: (raises: []).} =
|
||||
while true:
|
||||
if p.disconnected:
|
||||
if not p.connectedFut.finished:
|
||||
debug "CONNECT COMPLETE 2"
|
||||
p.connectedFut.complete()
|
||||
return
|
||||
await connectOnce(p)
|
||||
@@ -311,7 +316,9 @@ proc connectImpl(p: PubSubPeer) {.async: (raises: []).} =
|
||||
debug "Could not establish send connection", description = exc.msg
|
||||
|
||||
proc connect*(p: PubSubPeer) =
|
||||
debug "CONNECT..."
|
||||
if p.connected:
|
||||
echo "Already connected"
|
||||
return
|
||||
|
||||
asyncSpawn connectImpl(p)
|
||||
@@ -362,11 +369,15 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledErro
|
||||
if p.sendConn == nil:
|
||||
# Wait for a send conn to be setup. `connectOnce` will
|
||||
# complete this even if the sendConn setup failed
|
||||
debug "await connected fut"
|
||||
discard await race(p.connectedFut)
|
||||
|
||||
var conn = p.sendConn
|
||||
if conn == nil or conn.closed():
|
||||
debug "No send connection", p, payload = shortLog(msg)
|
||||
if conn == nil:
|
||||
debug "No send connection - nil", p, payload = shortLog(msg)
|
||||
else:
|
||||
debug "No send connection - closed", p, payload = shortLog(msg)
|
||||
return
|
||||
|
||||
trace "sending encoded msg to peer", conn, encoded = shortLog(msg)
|
||||
|
||||
@@ -45,9 +45,11 @@ proc new(
|
||||
template mapExceptions(body: untyped) =
|
||||
try:
|
||||
body
|
||||
except QuicError:
|
||||
except QuicError as ex:
|
||||
debug "QUIC ERROR", err=ex.msg
|
||||
raise newLPStreamEOFError()
|
||||
except CatchableError:
|
||||
except CatchableError as ex:
|
||||
debug "QUIC ERROR2", err=ex.msg
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
method readOnce*(
|
||||
@@ -61,6 +63,7 @@ method readOnce*(
|
||||
stream.cached = stream.cached[result ..^ 1]
|
||||
libp2p_network_bytes.inc(result.int64, labelValues = ["in"])
|
||||
except CatchableError as exc:
|
||||
debug "QUIC ERROR", err=exc.msg
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
{.push warning[LockLevel]: off.}
|
||||
@@ -97,7 +100,6 @@ proc getStream*(
|
||||
stream = await session.connection.incomingStream()
|
||||
of Direction.Out:
|
||||
stream = await session.connection.openStream()
|
||||
await stream.write(@[]) # QUIC streams do not exist until data is sent
|
||||
return QuicStream.new(stream, session.observedAddr, session.peerId)
|
||||
except CatchableError as exc:
|
||||
# TODO: incomingStream is using {.async.} with no raises
|
||||
@@ -113,7 +115,7 @@ type QuicMuxer = ref object of Muxer
|
||||
|
||||
method newStream*(
|
||||
m: QuicMuxer, name: string = "", lazy: bool = false
|
||||
): Future[P2PConnection] {.
|
||||
): Future[connection.Connection] {.
|
||||
async: (raises: [CancelledError, LPStreamError, MuxerError])
|
||||
.} =
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user