Compare commits

...

2 Commits

Author SHA1 Message Date
Tanguy
aa784a374a warningAsError is only available on nim >1.6 2022-11-03 12:00:24 +01:00
Tanguy
aaf13aeddf Try lent 2022-11-03 10:52:57 +01:00
19 changed files with 28 additions and 26 deletions

View File

@@ -10,6 +10,7 @@ switch("warning", "LockLevel:off")
if (NimMajor, NimMinor) < (1, 6):
--styleCheck:hint
else:
switch("warningAsError", "UseBase:on")
--styleCheck:error
# Avoid some rare stack corruption while using exceptions with a SEH-enabled

View File

@@ -61,7 +61,7 @@ proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]): Future[void] =
data: sink seq[byte] = @[]): Future[void] =
var
left = data.len
offset = 0

View File

@@ -197,7 +197,7 @@ method readOnce*(s: LPChannel,
await s.reset()
raise newLPStreamConnDownError(exc)
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
proc prepareWrite(s: LPChannel, msg: sink seq[byte]): Future[void] {.async.} =
# prepareWrite is the slow path of writing a message - see conditions in
# write
if s.remoteReset:
@@ -254,7 +254,7 @@ proc completeWrite(
finally:
s.writes -= 1
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
method write*(s: LPChannel, msg: sink seq[byte]): Future[void] =
## Write to mplex channel - there may be up to MaxWrite concurrent writes
## pending after which the peer is disconnected

View File

@@ -265,7 +265,7 @@ method readOnce*(
channel.activity = true
return toRead
proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} =
proc gotDataFromRemote(channel: YamuxChannel, b: sink seq[byte]) {.async.} =
channel.recvWindow -= b.len
channel.recvQueue = channel.recvQueue.concat(b)
channel.receivedData.fire()
@@ -333,7 +333,7 @@ proc trySend(channel: YamuxChannel) {.async.} =
fut.complete()
channel.activity = true
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
method write*(channel: YamuxChannel, msg: sink seq[byte]): Future[void] =
result = newFuture[void]("Yamux Send")
if channel.remoteReset:
result.fail(newLPStreamResetError())

View File

@@ -30,7 +30,7 @@ method readOnce*(
self.activity = true
return await self.conn.readOnce(pbytes, nbytes)
method write*(self: RelayConnection, msg: seq[byte]): Future[void] {.async.} =
method write*(self: RelayConnection, msg: sink seq[byte]): Future[void] {.async.} =
self.dataSent.inc(msg.len)
if self.limitData != 0 and self.dataSent > self.limitData:
await self.close()

View File

@@ -111,7 +111,7 @@ proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: boo
result.finish()
proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] =
proc decodeMsg*(buf: sink seq[byte]): Option[IdentifyInfo] =
var
iinfo: IdentifyInfo
pubkey: PublicKey

View File

@@ -176,7 +176,7 @@ method init*(f: FloodSub) =
method publish*(f: FloodSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: sink seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)

View File

@@ -477,7 +477,7 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
method publish*(g: GossipSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: sink seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)

View File

@@ -84,6 +84,7 @@ declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", lab
type
InitializationError* = object of LPError
#TODO sink?
TopicHandler* {.public.} = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
@@ -311,7 +312,7 @@ proc getOrCreatePeer*(
return pubSubPeer
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
proc handleData*(p: PubSub, topic: string, data: sink seq[byte]): Future[void] =
# Start work on all data handlers without copying data into closure like
# happens on {.async.} transformation
p.topics.withValue(topic, handlers) do:
@@ -474,7 +475,7 @@ proc subscribe*(p: PubSub,
method publish*(p: PubSub,
topic: string,
data: seq[byte]): Future[int] {.base, async, public.} =
data: sink seq[byte]): Future[int] {.base, async, public.} =
## publish to a ``topic``
##
## The return value is the number of neighbours that we attempted to send the

View File

@@ -210,7 +210,7 @@ proc connectImpl(p: PubSubPeer) {.async.} =
proc connect*(p: PubSubPeer) =
asyncSpawn connectImpl(p)
proc sendImpl(conn: Connection, encoded: seq[byte]): Future[void] {.raises: [Defect].} =
proc sendImpl(conn: Connection, encoded: sink seq[byte]): Future[void] {.raises: [Defect].} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded)
let fut = conn.writeLp(encoded) # Avoid copying `encoded` into future
@@ -237,7 +237,7 @@ template sendMetrics(msg: RPCMsg): untyped =
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])
proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect].} =
proc sendEncoded*(p: PubSubPeer, msg: sink seq[byte]) {.raises: [Defect].} =
doAssert(not isNil(p), "pubsubpeer nil!")
if msg.len <= 0:

View File

@@ -322,7 +322,7 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] =
pb.finish()
pb.buffer
proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} =
proc decodeRpcMsg*(msg: sink seq[byte]): ProtoResult[RPCMsg] {.inline.} =
trace "decodeRpcMsg: decoding message", msg = msg.shortLog()
var pb = initProtoBuffer(msg)
var rpcMsg = ok(RPCMsg())

View File

@@ -333,7 +333,7 @@ proc sendHSMessage(sconn: Connection, buf: openArray[byte]): Future[void] =
proc handshakeXXOutbound(
p: Noise, conn: Connection,
p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} =
p2pSecret: sink seq[byte]): Future[HandshakeResult] {.async.} =
const initiator = true
var
hs = HandshakeState.init()
@@ -381,7 +381,7 @@ proc handshakeXXOutbound(
proc handshakeXXInbound(
p: Noise, conn: Connection,
p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} =
p2pSecret: sink seq[byte]): Future[HandshakeResult] {.async.} =
const initiator = false
var
@@ -461,7 +461,7 @@ proc encryptFrame(
cipherFrame[2 + src.len()..<cipherFrame.len] = tag
method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] =
method write*(sconn: NoiseConnection, message: sink seq[byte]): Future[void] =
# Fast path: `{.async.}` would introduce a copy of `message`
const FramingSize = 2 + sizeof(ChaChaPolyTag)

View File

@@ -226,7 +226,7 @@ method readMessage*(sconn: SecioConn): Future[seq[byte]] {.async.} =
trace "Message MAC verification failed", buf = buf.shortLog
raise (ref SecioError)(msg: "message failed MAC verification")
method write*(sconn: SecioConn, message: seq[byte]) {.async.} =
method write*(sconn: SecioConn, message: sink seq[byte]) {.async.} =
## Write message ``message`` to secure connection ``sconn``.
if message.len == 0:
return

View File

@@ -68,7 +68,7 @@ proc new*(
bufferStream.initStream()
bufferStream
method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
method pushData*(s: BufferStream, data: sink seq[byte]) {.base, async.} =
## Write bytes to internal read buffer, use this to fill up the
## buffer with data.
##

View File

@@ -127,7 +127,7 @@ proc completeWrite(
if s.tracked:
libp2p_peers_traffic_write.inc(msgLen.int64, labelValues = [s.shortAgent])
method write*(s: ChronosStream, msg: seq[byte]): Future[void] =
method write*(s: ChronosStream, msg: sink seq[byte]): Future[void] =
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
# drives up memory usage
if msg.len == 0:

View File

@@ -283,7 +283,7 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe, publ
await s.readExactly(addr res[0], res.len)
return res
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, public.} =
method write*(s: LPStream, msg: sink seq[byte]): Future[void] {.base, public.} =
# Write `msg` to stream, waiting for the write to be finished
doAssert(false, "not implemented!")

View File

@@ -83,7 +83,7 @@ method readOnce*(
method write*(
s: WsStream,
msg: seq[byte]): Future[void] {.async.} =
msg: sink seq[byte]): Future[void] {.async.} =
mapExceptions(await s.session.send(msg, Opcode.Binary))
s.activity = true # reset activity flag

View File

@@ -81,7 +81,7 @@ type
TestBufferStream* = ref object of BufferStream
writeHandler*: WriteHandler
method write*(s: TestBufferStream, msg: seq[byte]): Future[void] =
method write*(s: TestBufferStream, msg: sink seq[byte]): Future[void] =
s.writeHandler(msg)
proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T =

View File

@@ -57,7 +57,7 @@ method readOnce*(s: TestSelectStream,
return "\0x3na\n".len()
method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
method write*(s: TestSelectStream, msg: sink seq[byte]) {.async, gcsafe.} = discard
method close(s: TestSelectStream) {.async, gcsafe.} =
s.isClosed = true
@@ -106,7 +106,7 @@ method readOnce*(s: TestLsStream,
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
method write*(s: TestLsStream, msg: seq[byte]) {.async, gcsafe.} =
method write*(s: TestLsStream, msg: sink seq[byte]) {.async, gcsafe.} =
if s.step == 4:
await s.ls(msg)
@@ -160,7 +160,7 @@ method readOnce*(s: TestNaStream,
return "\0x3na\n".len()
method write*(s: TestNaStream, msg: seq[byte]) {.async, gcsafe.} =
method write*(s: TestNaStream, msg: sink seq[byte]) {.async, gcsafe.} =
if s.step == 4:
await s.na(string.fromBytes(msg))