Compare commits

...

2 Commits

Author SHA1 Message Date
Diego
06b472d30c Add run and stop 2022-11-21 10:40:54 +01:00
Diego
1e46642c96 Add services 2022-11-21 10:27:08 +01:00
3 changed files with 60 additions and 2 deletions

View File

@@ -0,0 +1,32 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import ../switch
import chronos
import std/tables
type
HPService* = ref object of Service
newPeerHandler: PeerEventHandler
proc askPeer(s: Switch, peerId: PeerId): Future[void] {.async.} =
echo "Asking peer " & $(peerId)
proc h(switch: Switch) =
for p in switch.peerStore[AddressBook].book.keys:
discard askPeer(switch, p)
method setup*(self: HPService, switch: Switch) {.async.} =
self.newPeerHandler = proc (peerId: PeerId, event: PeerEvent): Future[void] =
return askPeer(switch, peerId)
switch.connManager.addPeerEventHandler(self.newPeerHandler, PeerEventKind.Joined)
method run*(self: HPService, switch: Switch) {.async, gcsafe, public.} =
h(switch)
method stop*(self: HPService, switch: Switch) {.async, gcsafe, public.} =
if not isNil(self.newPeerHandler):
switch.connManager.removePeerEventHandler(self.newPeerHandler, PeerEventKind.Joined)

View File

@@ -74,6 +74,15 @@ type
peerStore*: PeerStore
nameResolver*: NameResolver
started: bool
services*: seq[Service]
Service* = ref object of RootObj
method setup*(self: Service, switch: Switch) {.base, async, gcsafe, public.} = discard
method run*(self: Service, switch: Switch) {.base, async, gcsafe, public.} = discard
method stop*(self: Service, switch: Switch) {.base, async, gcsafe, public.} = discard
proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler,
@@ -108,6 +117,9 @@ method addTransport*(s: Switch, t: Transport) =
s.transports &= t
s.dialer.addTransport(t)
method addService*(switch: Switch, service: Service) =
switch.services.add(service)
proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} =
## returns true if the peer has one or more
## associated connections
@@ -289,6 +301,9 @@ proc stop*(s: Switch) {.async, public.} =
if not a.finished:
a.cancel()
for service in s.services:
await service.stop(s)
await s.ms.stop()
trace "Switch stopped"
@@ -326,6 +341,10 @@ proc start*(s: Switch) {.async, gcsafe, public.} =
await s.ms.start()
for service in s.services:
await service.setup(s)
await service.run(s)
s.started = true
debug "Started libp2p node", peer = s.peerInfo
@@ -337,7 +356,8 @@ proc newSwitch*(peerInfo: PeerInfo,
connManager: ConnManager,
ms: MultistreamSelect,
nameResolver: NameResolver = nil,
peerStore = PeerStore.new()): Switch
peerStore = PeerStore.new(),
services = newSeq[Service]()): Switch
{.raises: [Defect, LPError], public.} =
if secureManagers.len == 0:
raise newException(LPError, "Provide at least one secure manager")
@@ -349,8 +369,10 @@ proc newSwitch*(peerInfo: PeerInfo,
connManager: connManager,
peerStore: peerStore,
dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms, nameResolver),
nameResolver: nameResolver)
nameResolver: nameResolver,
services: services)
switch.connManager.peerStore = peerStore
switch.mount(identity)
return switch

View File

@@ -26,6 +26,8 @@ import ../libp2p/[errors,
transports/wstransport]
import ./helpers
import ../libp2p/services/hpservice
const
TestCodec = "/test/proto/1.0.0"
@@ -289,6 +291,8 @@ suite "Switch":
switch2.addConnEventHandler(hook, ConnEventKind.Connected)
switch2.addConnEventHandler(hook, ConnEventKind.Disconnected)
switch1.addService(HPService.new())
await switch1.start()
await switch2.start()