mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2026-01-10 12:58:05 -05:00
Compare commits
2 Commits
minimize_i
...
staggertcp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a216353e5b | ||
|
|
4c01ee5e48 |
@@ -22,6 +22,7 @@ import ./pubsub,
|
||||
./rpc/[messages, message],
|
||||
../protocol,
|
||||
../../stream/connection,
|
||||
../../utils/semaphore,
|
||||
../../peerinfo,
|
||||
../../peerid,
|
||||
../../utility,
|
||||
@@ -365,7 +366,25 @@ proc validateAndRelay(g: GossipSub,
|
||||
|
||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||
# also have to be careful to only include validated messages
|
||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||
#g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||
let sem = newAsyncSemaphore(1)
|
||||
var peers = toSeq(toSendPeers)
|
||||
g.rng.shuffle(peers)
|
||||
|
||||
proc sendToOne(p: PubSubPeer) {.async.} =
|
||||
await sem.acquire()
|
||||
defer: sem.release()
|
||||
|
||||
let stats = p.getStats()
|
||||
let bandwidth = max(stats.bandwidth.get(), 2000)
|
||||
g.broadcast(@[p], RPCMsg(messages: @[msg]))
|
||||
echo stats
|
||||
echo "Sleeping ", msg.data.len div bandwidth
|
||||
await sleepAsync(milliseconds(msg.data.len div bandwidth))
|
||||
echo "After send", p.getStats()
|
||||
|
||||
for p in peers:
|
||||
asyncSpawn sendToOne(p)
|
||||
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
|
||||
for topic in msg.topicIds:
|
||||
if topic notin g.topics: continue
|
||||
|
||||
@@ -16,11 +16,12 @@ import rpc/[messages, message, protobuf],
|
||||
../../peerid,
|
||||
../../peerinfo,
|
||||
../../stream/connection,
|
||||
../../stream/chronosstream,
|
||||
../../crypto/crypto,
|
||||
../../protobuf/minprotobuf,
|
||||
../../utility
|
||||
|
||||
export peerid, connection, deques
|
||||
export peerid, connection, chronosstream, deques
|
||||
|
||||
logScope:
|
||||
topics = "libp2p pubsubpeer"
|
||||
@@ -79,6 +80,14 @@ when defined(libp2p_agents_metrics):
|
||||
#so we have to read the parents short agent..
|
||||
p.sendConn.getWrapped().shortAgent
|
||||
|
||||
proc getStats*(p: PubSubPeer): ConnStats =
|
||||
var c = p.sendConn
|
||||
while true:
|
||||
let w = c.getWrapped()
|
||||
if isNil(w): break
|
||||
c = w
|
||||
ChronosStream(c).getStats
|
||||
|
||||
func hash*(p: PubSubPeer): Hash =
|
||||
p.peerId.hash
|
||||
|
||||
|
||||
@@ -166,3 +166,29 @@ method closeImpl*(s: ChronosStream) {.async.} =
|
||||
await procCall Connection(s).closeImpl()
|
||||
|
||||
method getWrapped*(s: ChronosStream): Connection = nil
|
||||
|
||||
when defined(linux):
|
||||
type TcpInfo {.importc: "struct tcp_info", header: "/usr/include/netinet/tcp.h".} = object
|
||||
tcpi_rtt: uint32
|
||||
tcpi_snd_cwnd: uint32
|
||||
tcpi_snd_mss: uint32
|
||||
let SOL_TCP_INFO {.importc: "TCP_INFO", header: "/usr/include/netinet/tcp.h".}: int
|
||||
let SOL_TCP {.importc: "SOL_TCP", header: "/usr/include/netinet/tcp.h".}: int
|
||||
|
||||
type ConnStats* = object
|
||||
bandwidth*: Opt[int] # bytes per milliseconds
|
||||
ping*: Opt[int] # in usec
|
||||
|
||||
proc getStats*(s: ChronosStream): ConnStats =
|
||||
var tcpInfo: TcpInfo
|
||||
var tcpInfoLength = sizeof(tcpInfo)
|
||||
if getSockOpt(s.client.fd, SOL_TCP, SOL_TCP_INFO, addr tcpInfo, tcpInfoLength):
|
||||
result.ping = Opt.some(int(tcpInfo.tcpi_rtt))
|
||||
let
|
||||
ping = 100000.0
|
||||
#ping = float(tcpInfo.tcpi_rtt)
|
||||
cwnd = float(tcpInfo.tcpi_snd_cwnd)
|
||||
mss = float(tcpInfo.tcpi_snd_mss)
|
||||
bandwidth = (cwnd * mss) / (ping / 1000.0)
|
||||
info "x", ping, cwnd, mss, bandwidth, ping2=tcpInfo.tcpi_rtt
|
||||
result.bandwidth = Opt.some(int(bandwidth))
|
||||
|
||||
@@ -13,6 +13,7 @@ import chronos, stew/byteutils
|
||||
import ../libp2p/[stream/connection,
|
||||
transports/transport,
|
||||
transports/tcptransport,
|
||||
stream/chronosstream,
|
||||
upgrademngrs/upgrade,
|
||||
multiaddress,
|
||||
multicodec,
|
||||
|
||||
Reference in New Issue
Block a user