Compare commits

...

2 Commits

Author SHA1 Message Date
Tanguy
a216353e5b Merge remote-tracking branch 'origin/unstable' into staggertcplinux 2023-09-18 16:19:42 +02:00
Tanguy
4c01ee5e48 GossipSub: stagger sending using TCP internals 2023-07-28 11:02:09 +02:00
4 changed files with 57 additions and 2 deletions

View File

@@ -22,6 +22,7 @@ import ./pubsub,
./rpc/[messages, message],
../protocol,
../../stream/connection,
../../utils/semaphore,
../../peerinfo,
../../peerid,
../../utility,
@@ -365,7 +366,25 @@ proc validateAndRelay(g: GossipSub,
# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
#g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
let sem = newAsyncSemaphore(1)
var peers = toSeq(toSendPeers)
g.rng.shuffle(peers)
proc sendToOne(p: PubSubPeer) {.async.} =
await sem.acquire()
defer: sem.release()
let stats = p.getStats()
let bandwidth = max(stats.bandwidth.get(), 2000)
g.broadcast(@[p], RPCMsg(messages: @[msg]))
echo stats
echo "Sleeping ", msg.data.len div bandwidth
await sleepAsync(milliseconds(msg.data.len div bandwidth))
echo "After send", p.getStats()
for p in peers:
asyncSpawn sendToOne(p)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue

View File

@@ -16,11 +16,12 @@ import rpc/[messages, message, protobuf],
../../peerid,
../../peerinfo,
../../stream/connection,
../../stream/chronosstream,
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
export peerid, connection, deques
export peerid, connection, chronosstream, deques
logScope:
topics = "libp2p pubsubpeer"
@@ -79,6 +80,14 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent
proc getStats*(p: PubSubPeer): ConnStats =
var c = p.sendConn
while true:
let w = c.getWrapped()
if isNil(w): break
c = w
ChronosStream(c).getStats
func hash*(p: PubSubPeer): Hash =
p.peerId.hash

View File

@@ -166,3 +166,29 @@ method closeImpl*(s: ChronosStream) {.async.} =
await procCall Connection(s).closeImpl()
method getWrapped*(s: ChronosStream): Connection = nil
when defined(linux):
type TcpInfo {.importc: "struct tcp_info", header: "/usr/include/netinet/tcp.h".} = object
tcpi_rtt: uint32
tcpi_snd_cwnd: uint32
tcpi_snd_mss: uint32
let SOL_TCP_INFO {.importc: "TCP_INFO", header: "/usr/include/netinet/tcp.h".}: int
let SOL_TCP {.importc: "SOL_TCP", header: "/usr/include/netinet/tcp.h".}: int
type ConnStats* = object
bandwidth*: Opt[int] # bytes per milliseconds
ping*: Opt[int] # in usec
proc getStats*(s: ChronosStream): ConnStats =
var tcpInfo: TcpInfo
var tcpInfoLength = sizeof(tcpInfo)
if getSockOpt(s.client.fd, SOL_TCP, SOL_TCP_INFO, addr tcpInfo, tcpInfoLength):
result.ping = Opt.some(int(tcpInfo.tcpi_rtt))
let
ping = 100000.0
#ping = float(tcpInfo.tcpi_rtt)
cwnd = float(tcpInfo.tcpi_snd_cwnd)
mss = float(tcpInfo.tcpi_snd_mss)
bandwidth = (cwnd * mss) / (ping / 1000.0)
info "x", ping, cwnd, mss, bandwidth, ping2=tcpInfo.tcpi_rtt
result.bandwidth = Opt.some(int(bandwidth))

View File

@@ -13,6 +13,7 @@ import chronos, stew/byteutils
import ../libp2p/[stream/connection,
transports/transport,
transports/tcptransport,
stream/chronosstream,
upgrademngrs/upgrade,
multiaddress,
multicodec,