From 685e299c45042253e25c20308b7de7ac79a0bc52 Mon Sep 17 00:00:00 2001 From: Jonathan Gimeno Date: Fri, 18 Jan 2019 02:43:22 +0100 Subject: [PATCH] add unit test for test broadcast (#1127) * add unit test for test broadcast * execute goimports * add import peerstore * refactor and check message equals expected * remove not needed sleep * add function to connect hosts to host * refactor test for better reading * refactor to only use one channel * refactor subs to channel * close channels instead of sending boolean value * use goimports * copy value to avoid race condition * remove TODO comment * fix issues with gometalinter * run gazelle * reduce sleep time * add comments to goroutines --- CONTRIBUTING.md | 2 +- shared/p2p/BUILD.bazel | 1 + shared/p2p/service_test.go | 98 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 96 insertions(+), 5 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b024f1a5a1..55ce8580fa 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -65,7 +65,7 @@ $ go test Changes that affect multiple files can be tested with ... ``` -$ gometallinter && bazel test +$ gometalinter && bazel test ``` **10. Stage the file or files that you want to commit.** diff --git a/shared/p2p/BUILD.bazel b/shared/p2p/BUILD.bazel index 7f2492361d..92edc6e78c 100644 --- a/shared/p2p/BUILD.bazel +++ b/shared/p2p/BUILD.bazel @@ -60,6 +60,7 @@ go_test( "@com_github_golang_mock//gomock:go_default_library", "@com_github_ipfs_go_log//:go_default_library", "@com_github_libp2p_go_libp2p_blankhost//:go_default_library", + "@com_github_libp2p_go_libp2p_peerstore//:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", "@com_github_libp2p_go_libp2p_swarm//testing:go_default_library", "@com_github_multiformats_go_multiaddr//:go_default_library", diff --git a/shared/p2p/service_test.go b/shared/p2p/service_test.go index 4af194598c..4c4fd8b725 100644 --- a/shared/p2p/service_test.go +++ b/shared/p2p/service_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" ipfslog "github.com/ipfs/go-log" bhost "github.com/libp2p/go-libp2p-blankhost" + peerstore "github.com/libp2p/go-libp2p-peerstore" pubsub "github.com/libp2p/go-libp2p-pubsub" swarmt "github.com/libp2p/go-libp2p-swarm/testing" shardpb "github.com/prysmaticlabs/prysm/proto/sharding/p2p/v1" @@ -66,15 +67,104 @@ func TestP2pPortTakenError(t *testing.T) { } func TestBroadcast(t *testing.T) { - s, err := NewServer(&ServerConfig{}) + servers, err := runP2PServersWithDifferentPorts(3) if err != nil { - t.Fatalf("Could not start a new server: %v", err) + t.Fatalf("error while trying to create server: %s", err) + } + + err = connectServersTo(servers[0], servers[1:]) + if err != nil { + t.Fatalf("error while trying to connect to peer: %s", err) } msg := &shardpb.CollationBodyRequest{} - s.Broadcast(msg) + subscribeServersToTopic(servers, "theTopic", msg) - // TODO(543): test that topic was published + time.Sleep(20 * time.Millisecond) + + msgSubsChannel := make(chan Message) + for _, server := range servers[1:] { + server.Subscribe(&shardpb.CollationBodyRequest{}, msgSubsChannel) + } + + aMessage := &shardpb.CollationBodyRequest{ShardId: 1234} + servers[0].Broadcast(aMessage) + + doneChan := make(chan bool) + errorChan := make(chan bool) + timeoutChan := make(chan bool) + + var wg sync.WaitGroup + wg.Add(len(servers[1:])) // Num of nodes that receive the channel + + // Goroutine that waits the broadcasted messages and completes waitgroup + go func() { + for recMessage := range msgSubsChannel { + protoMsg := recMessage.Data.(*shardpb.CollationBodyRequest) + if protoMsg.ShardId != aMessage.ShardId { + errorChan <- true + } + + wg.Done() + } + }() + + // Goroutine that makes timeouts if there if it is not completed in 5 seconds + go func() { + time.Sleep(5 * time.Second) + close(timeoutChan) + close(msgSubsChannel) + }() + + // Goroutine that closes all channels once broadcast is complete + go func() { + wg.Wait() + close(doneChan) + close(msgSubsChannel) + }() + + select { + case <-doneChan: + break + case <-errorChan: + t.Fatalf("error asserting that received broacast message equals expected") + case <-timeoutChan: + t.Fatalf("timeout while waiting for broadcast message") + } +} + +func runP2PServersWithDifferentPorts(numServers int) ([]*Server, error) { + var servers []*Server + initialPort := 12345 + + for i := 0; i < numServers; i++ { + s, err := NewServer(&ServerConfig{Port: initialPort + i}) + if err != nil { + return nil, err + } + + servers = append(servers, s) + } + + return servers, nil +} + +func connectServersTo(serverToConnect *Server, servers []*Server) error { + for _, server := range servers { + err := server.host.Connect(context.Background(), peerstore.PeerInfo{ID: serverToConnect.host.ID(), Addrs: serverToConnect.host.Addrs()}) + if err != nil { + return err + } + } + + return nil +} + +func subscribeServersToTopic(servers []*Server, topic string, msg proto.Message) { + for _, server := range servers { + copyMsg := *msg.(*shardpb.CollationBodyRequest) + server.RegisterTopic(topic, ©Msg) + } } func TestEmit(t *testing.T) {