feature/mdf (#76)

* protobuf

* do not send ack when not required

* remove from state if no ack required

* different send modes

* fix

* updated

* retransmit fix

* updated

* fixed

* renamed to ephemeral

* repeated

* gen

* resolution

* cleanup

* rough dependency code

* todo

* removed

* only stores if ephemeral

* started implementing the algo

* simplified

* updated

* we never store ephemeral messages so we did not need

* adds parents

* new schema

* tx to insert

* err

* removed old

* fixed

* changed log

* test persistence of parents

* removed

* rename

* ignoring

* Update store/messagestore_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* more fixes

* Update store/messagestore_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* more fixes

* using refs

* Update node/node.go

Co-Authored-By: Adam Babik <adam@status.im>

* finished

* Update store/messagestore_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* Update 1572372377_initial_schema.down.sql

* desc + refactor

* started refactoring resolution

* Update README.md

* rewrote resolve

* mutex

* todo

* fixes

* sql impl

* added test

* log

* updates

* updated

* little bug

* fix

* added test

* first changes from @adambabik

* moved

* fixed test, started eventual ones

* fixed eventually test

* mock install

* consistent test

* mock

* fix lint

* Update dependency/tracker_sqlite.go

Co-Authored-By: Adam Babik <adam@status.im>

* fix
This commit is contained in:
Dean Eigenmann
2019-11-05 17:32:23 +01:00
committed by GitHub
parent d34db70222
commit 748b61123f
40 changed files with 2524 additions and 184 deletions

View File

@@ -23,6 +23,15 @@ install-linter:
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.17.1
.PHONY: install-linter
mock-install:
go get -u github.com/golang/mock/mockgen
go get -u github.com/golang/mock
.PHONY: mock-install
mock:
mockgen -package=internal -destination=node/internal/syncstate_mock.go -source=state/state.go
.PHONY: mock
vendor:
go mod tidy
go mod vendor

View File

@@ -8,7 +8,7 @@ https://camo.githubusercontent.com/915b7be44ada53c290eb157634330494ebe3e30a/6874
[![Go Report Card](https://goreportcard.com/badge/github.com/vacp2p/mvds)](https://goreportcard.com/report/github.com/vacp2p/mvds)
[![Build Status](https://travis-ci.com/vacp2p/mvds.svg?branch=master)](https://travis-ci.com/vacp2p/mvds)
Experimental implementation of the [minimal viable data sync protocol specification](https://specs.vac.dev/mvds.html).
Experimental implementation of the [minimal viable data sync protocol specification](https://specs.vac.dev/mvds.html) including the [metadata format specification](https://specs.vac.dev/mdf.html).
## Usage

View File

@@ -0,0 +1,317 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1572614870_initial_schema.down.sql (30B)
// 1572614870_initial_schema.up.sql (157B)
// doc.go (377B)
package migrations
import (
"bytes"
"compress/gzip"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
)
func bindataRead(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, gz)
clErr := gz.Close()
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
}
if clErr != nil {
return nil, err
}
return buf.Bytes(), nil
}
type asset struct {
bytes []byte
info os.FileInfo
digest [sha256.Size]byte
}
type bindataFileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
}
func (fi bindataFileInfo) Name() string {
return fi.name
}
func (fi bindataFileInfo) Size() int64 {
return fi.size
}
func (fi bindataFileInfo) Mode() os.FileMode {
return fi.mode
}
func (fi bindataFileInfo) ModTime() time.Time {
return fi.modTime
}
func (fi bindataFileInfo) IsDir() bool {
return false
}
func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1572614870_initial_schemaDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\xc8\x2d\x4b\x29\x8e\x4f\x49\x2d\x48\xcd\x4b\x49\xcd\x4b\xce\x4c\x2d\xb6\xe6\x02\x04\x00\x00\xff\xff\x13\x64\x97\xf6\x1e\x00\x00\x00")
func _1572614870_initial_schemaDownSqlBytes() ([]byte, error) {
return bindataRead(
__1572614870_initial_schemaDownSql,
"1572614870_initial_schema.down.sql",
)
}
func _1572614870_initial_schemaDownSql() (*asset, error) {
bytes, err := _1572614870_initial_schemaDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1572614870_initial_schema.down.sql", size: 30, mode: os.FileMode(0644), modTime: time.Unix(1572706379, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xfc, 0xb3, 0xc3, 0x5e, 0x81, 0xae, 0x77, 0x21, 0x3b, 0xd0, 0xa0, 0x6a, 0xf4, 0x7b, 0xb2, 0x1c, 0x92, 0xd7, 0x5d, 0x4c, 0xa6, 0x4f, 0xe, 0xc6, 0x2d, 0xe4, 0x18, 0x5d, 0x56, 0xe, 0x18, 0x6a}}
return a, nil
}
var __1572614870_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\x08\x71\x74\xf2\x71\x55\xc8\x2d\x4b\x29\x8e\x4f\x49\x2d\x48\xcd\x4b\x49\xcd\x4b\xce\x4c\x2d\x56\xd0\xe0\x52\x50\x50\x50\xc8\x2d\x4e\x8f\xcf\x4c\x51\x70\xf2\xf1\x77\x52\x08\x08\xf2\xf4\x75\x0c\x8a\x54\xf0\x76\x8d\xd4\x01\x4b\xc2\xd5\x57\x42\x14\xf8\xf9\x87\x28\xf8\x85\xfa\xf8\x70\x69\x5a\x73\x71\x41\x8d\xf7\xf4\x73\x71\x8d\x50\xc8\x4c\xa9\x88\x47\x52\xed\xef\x87\x69\xa1\x06\x42\x5e\xd3\x9a\x0b\x10\x00\x00\xff\xff\x11\xfa\x7b\x28\x9d\x00\x00\x00")
func _1572614870_initial_schemaUpSqlBytes() ([]byte, error) {
return bindataRead(
__1572614870_initial_schemaUpSql,
"1572614870_initial_schema.up.sql",
)
}
func _1572614870_initial_schemaUpSql() (*asset, error) {
bytes, err := _1572614870_initial_schemaUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1572614870_initial_schema.up.sql", size: 157, mode: os.FileMode(0644), modTime: time.Unix(1572742027, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xad, 0x62, 0x2e, 0xb1, 0x1c, 0x65, 0x39, 0xb9, 0xe4, 0xae, 0xf6, 0x28, 0x6d, 0xbe, 0x62, 0xba, 0x93, 0x80, 0x6c, 0x47, 0x4f, 0x98, 0x5b, 0xf0, 0xfa, 0x16, 0x2, 0x7e, 0xaa, 0x15, 0x63, 0xf2}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x8f\xbb\x6e\xc3\x30\x0c\x45\x77\x7f\xc5\x45\x96\x2c\xb5\xb4\x74\xea\xd6\xb1\x7b\x7f\x80\x91\x68\x89\x88\x1e\xae\x48\xe7\xf1\xf7\x85\xd3\x02\xcd\xd6\xf5\x00\xe7\xf0\xd2\x7b\x7c\x66\x51\x2c\x52\x18\xa2\x68\x1c\x58\x95\xc6\x1d\x27\x0e\xb4\x29\xe3\x90\xc4\xf2\x76\x72\xa1\x57\xaf\x46\xb6\xe9\x2c\xd5\x57\x49\x83\x8c\xfd\xe5\xf5\x30\x79\x8f\x40\xed\x68\xc8\xd4\x62\xe1\x47\x4b\xa1\x46\xc3\xa4\x25\x5c\xc5\x32\x08\xeb\xe0\x45\x6e\x0e\xef\x86\xc2\xa4\x06\xcb\x64\x47\x85\x65\x46\x20\xe5\x3d\xb3\xf4\x81\xd4\xe7\x93\xb4\x48\x46\x6e\x47\x1f\xcb\x13\xd9\x17\x06\x2a\x85\x23\x96\xd1\xeb\xc3\x55\xaa\x8c\x28\x83\x83\xf5\x71\x7f\x01\xa9\xb2\xa1\x51\x65\xdd\xfd\x4c\x17\x46\xeb\xbf\xe7\x41\x2d\xfe\xff\x11\xae\x7d\x9c\x15\xa4\xe0\xdb\xca\xc1\x38\xba\x69\x5a\x29\x9c\x29\x31\xf4\xab\x88\xf1\x34\x79\x9f\xfa\x5b\xe2\xc6\xbb\xf5\xbc\x71\x5e\xcf\x09\x3f\x35\xe9\x4d\x31\x77\x38\xe7\xff\x80\x4b\x1d\x6e\xfa\x0e\x00\x00\xff\xff\x9d\x60\x3d\x88\x79\x01\x00\x00")
func docGoBytes() ([]byte, error) {
return bindataRead(
_docGo,
"doc.go",
)
}
func docGo() (*asset, error) {
bytes, err := docGoBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1572706379, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}}
return a, nil
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func Asset(name string) ([]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
}
return a.bytes, nil
}
return nil, fmt.Errorf("Asset %s not found", name)
}
// AssetString returns the asset contents as a string (instead of a []byte).
func AssetString(name string) (string, error) {
data, err := Asset(name)
return string(data), err
}
// MustAsset is like Asset but panics when Asset would return an error.
// It simplifies safe initialization of global variables.
func MustAsset(name string) []byte {
a, err := Asset(name)
if err != nil {
panic("asset: Asset(" + name + "): " + err.Error())
}
return a
}
// MustAssetString is like AssetString but panics when Asset would return an
// error. It simplifies safe initialization of global variables.
func MustAssetString(name string) string {
return string(MustAsset(name))
}
// AssetInfo loads and returns the asset info for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func AssetInfo(name string) (os.FileInfo, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
}
return a.info, nil
}
return nil, fmt.Errorf("AssetInfo %s not found", name)
}
// AssetDigest returns the digest of the file with the given name. It returns an
// error if the asset could not be found or the digest could not be loaded.
func AssetDigest(name string) ([sha256.Size]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err)
}
return a.digest, nil
}
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name)
}
// Digests returns a map of all known files and their checksums.
func Digests() (map[string][sha256.Size]byte, error) {
mp := make(map[string][sha256.Size]byte, len(_bindata))
for name := range _bindata {
a, err := _bindata[name]()
if err != nil {
return nil, err
}
mp[name] = a.digest
}
return mp, nil
}
// AssetNames returns the names of the assets.
func AssetNames() []string {
names := make([]string, 0, len(_bindata))
for name := range _bindata {
names = append(names, name)
}
return names
}
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1572614870_initial_schema.down.sql": _1572614870_initial_schemaDownSql,
"1572614870_initial_schema.up.sql": _1572614870_initial_schemaUpSql,
"doc.go": docGo,
}
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
// data/
// foo.txt
// img/
// a.png
// b.png
// then AssetDir("data") would return []string{"foo.txt", "img"},
// AssetDir("data/img") would return []string{"a.png", "b.png"},
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
// AssetDir("") will return []string{"data"}.
func AssetDir(name string) ([]string, error) {
node := _bintree
if len(name) != 0 {
canonicalName := strings.Replace(name, "\\", "/", -1)
pathList := strings.Split(canonicalName, "/")
for _, p := range pathList {
node = node.Children[p]
if node == nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
}
}
if node.Func != nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
rv := make([]string, 0, len(node.Children))
for childName := range node.Children {
rv = append(rv, childName)
}
return rv, nil
}
type bintree struct {
Func func() (*asset, error)
Children map[string]*bintree
}
var _bintree = &bintree{nil, map[string]*bintree{
"1572614870_initial_schema.down.sql": &bintree{_1572614870_initial_schemaDownSql, map[string]*bintree{}},
"1572614870_initial_schema.up.sql": &bintree{_1572614870_initial_schemaUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.
func RestoreAsset(dir, name string) error {
data, err := Asset(name)
if err != nil {
return err
}
info, err := AssetInfo(name)
if err != nil {
return err
}
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
if err != nil {
return err
}
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}
return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
}
// RestoreAssets restores an asset under the given directory recursively.
func RestoreAssets(dir, name string) error {
children, err := AssetDir(name)
// File
if err != nil {
return RestoreAsset(dir, name)
}
// Dir
for _, child := range children {
err = RestoreAssets(dir, filepath.Join(name, child))
if err != nil {
return err
}
}
return nil
}
func _filePath(dir, name string) string {
canonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...)
}

View File

@@ -0,0 +1 @@
DROP TABLE mvds_dependencies;

View File

@@ -0,0 +1,6 @@
CREATE TABLE mvds_dependencies (
msg_id BLOB PRIMARY KEY,
dependency BLOB NOT NULL
);
CREATE INDEX idx_dependency ON mvds_dependencies(dependency);

View File

@@ -0,0 +1,9 @@
// This file is necessary because "github.com/status-im/migrate/v4"
// can't handle files starting with a prefix. At least that's the case
// for go-bindata.
// If go-bindata is called from the same directory, asset names
// have no prefix and "github.com/status-im/migrate/v4" works as expected.
package sqlite
//go:generate go-bindata -pkg migrations -o ../migrations.go .

12
dependency/tracker.go Normal file
View File

@@ -0,0 +1,12 @@
package dependency
import (
"github.com/vacp2p/mvds/state"
)
type Tracker interface {
Add(msg, dependency state.MessageID) error
Dependants(id state.MessageID) ([]state.MessageID, error)
Resolve(msg state.MessageID, dependency state.MessageID) error
IsResolved(id state.MessageID) (bool, error)
}

View File

@@ -0,0 +1,71 @@
package dependency
import (
"reflect"
"sync"
"github.com/vacp2p/mvds/state"
)
// Verify that Tracker interface is implemented.
var _ Tracker = (*inMemoryTracker)(nil)
type inMemoryTracker struct {
sync.Mutex
dependents map[state.MessageID][]state.MessageID
dependencies map[state.MessageID]int
}
func NewInMemoryTracker() *inMemoryTracker {
return &inMemoryTracker{
dependents: make(map[state.MessageID][]state.MessageID),
dependencies: make(map[state.MessageID]int),
}
}
func (md *inMemoryTracker) Add(msg, dependency state.MessageID) error {
md.Lock()
defer md.Unlock()
// @todo check it wasn't already added
md.dependents[dependency] = append(md.dependents[dependency], msg)
md.dependencies[msg] += 1
return nil
}
func (md *inMemoryTracker) Dependants(id state.MessageID) ([]state.MessageID, error) {
md.Lock()
defer md.Unlock()
return md.dependents[id], nil
}
func (md *inMemoryTracker) Resolve(msg state.MessageID, dependency state.MessageID) error {
md.Lock()
defer md.Unlock()
for i, item := range md.dependents[dependency] {
if !reflect.DeepEqual(msg[:], item[:]) {
continue
}
md.dependents[dependency] = remove(md.dependents[dependency], i)
md.dependencies[msg] -= 1
return nil
}
return nil
}
func (md *inMemoryTracker) IsResolved(id state.MessageID) (bool, error) {
md.Lock()
defer md.Unlock()
return md.dependencies[id] == 0, nil
}
func remove(s []state.MessageID, i int) []state.MessageID {
s[len(s)-1], s[i] = s[i], s[len(s)-1]
return s[:len(s)-1]
}

View File

@@ -0,0 +1,42 @@
package dependency
import (
"io/ioutil"
"testing"
"github.com/stretchr/testify/require"
"github.com/vacp2p/mvds/dependency/migrations"
"github.com/vacp2p/mvds/persistenceutil"
"github.com/vacp2p/mvds/state"
)
func TestTrackerSQLitePersistence(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "")
require.NoError(t, err)
db, err := persistenceutil.Open(tmpFile.Name(), "", persistenceutil.MigrationConfig{
AssetNames: migrations.AssetNames(),
AssetGetter: migrations.Asset,
})
require.NoError(t, err)
d := NewPersistentTracker(db)
msg := state.MessageID{0x01}
dependency := state.MessageID{0x02}
err = d.Add(msg, dependency)
require.NoError(t, err)
dependants, err := d.Dependants(dependency)
require.NoError(t, err)
require.Equal(t, msg, dependants[0])
res, err := d.IsResolved(msg)
require.NoError(t, err)
require.False(t, res)
err = d.Resolve(msg, dependency)
require.NoError(t, err)
res, err = d.IsResolved(msg)
require.NoError(t, err)
require.True(t, res)
}

View File

@@ -0,0 +1,71 @@
package dependency
import (
"database/sql"
"github.com/vacp2p/mvds/state"
)
// Verify that Tracker interface is implemented.
var _ Tracker = (*sqliteTracker)(nil)
type sqliteTracker struct {
db *sql.DB
}
func NewPersistentTracker(db *sql.DB) *sqliteTracker {
return &sqliteTracker{db: db}
}
func (sd *sqliteTracker) Add(msg, dependency state.MessageID) error {
_, err := sd.db.Exec(`INSERT INTO mvds_dependencies (msg_id, dependency) VALUES (?, ?)`, msg[:], dependency[:])
if err != nil {
return err
}
return nil
}
func (sd *sqliteTracker) Dependants(id state.MessageID) ([]state.MessageID, error) {
rows, err := sd.db.Query(`SELECT msg_id FROM mvds_dependencies WHERE dependency = ?`, id[:])
if err != nil {
return nil, err
}
defer rows.Close()
var msgs []state.MessageID
for rows.Next() {
var msg []byte
err := rows.Scan(&msg)
if err != nil {
return nil, err
}
msgs = append(msgs, state.ToMessageID(msg))
}
return msgs, nil
}
func (sd *sqliteTracker) Resolve(msg state.MessageID, dependency state.MessageID) error {
_, err := sd.db.Exec(
`DELETE FROM mvds_dependencies WHERE msg_id = ? AND dependency = ?`,
msg[:],
dependency[:],
)
return err
}
func (sd *sqliteTracker) IsResolved(id state.MessageID) (bool, error) {
result := sd.db.QueryRow(`SELECT COUNT(*) FROM mvds_dependencies WHERE msg_id = ?`, id[:])
var num int64
err := result.Scan(&num)
if err != nil {
return false, err
}
return num == 0, nil
}

1
go.mod
View File

@@ -4,6 +4,7 @@ go 1.12
require (
github.com/golang-migrate/migrate/v4 v4.6.2 // indirect
github.com/golang/mock v1.2.0
github.com/golang/protobuf v1.3.2
github.com/mutecomm/go-sqlcipher v0.0.0-20190227152316-55dbde17881f // indirect
github.com/pkg/errors v0.8.1

1
go.sum
View File

@@ -56,6 +56,7 @@ github.com/golang-migrate/migrate/v4 v4.6.2 h1:LDDOHo/q1W5UDj6PbkxdCv7lv9yunyZHX
github.com/golang-migrate/migrate/v4 v4.6.2/go.mod h1:JYi6reN3+Z734VZ0akNuyOJNcrg45ZL7LDBMW3WGJL0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

31
main.go
View File

@@ -8,6 +8,7 @@ import (
math "math/rand"
"time"
"github.com/vacp2p/mvds/dependency"
"github.com/vacp2p/mvds/node"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/state"
@@ -31,7 +32,7 @@ func parseFlags() {
flag.IntVar(&communicating, "communicating", 2, "amount of nodes sending messages")
flag.IntVar(&sharing, "sharing", 2, "amount of nodes each node shares with")
flag.Int64Var(&interval, "interval", 5, "seconds between messages")
flag.IntVar(&interactive, "interactive", 3, "amount of nodes to use INTERACTIVE mode, the rest will be BATCH") // @todo should probably just be how many nodes are interactive
flag.IntVar(&interactive, "interactive", 3, "amount of nodes to use InteractiveMode mode, the rest will be BatchMode") // @todo should probably just be how many nodes are interactive
flag.Parse()
}
@@ -51,9 +52,9 @@ func main() {
input = append(input, in)
transports = append(transports, t)
mode := node.INTERACTIVE
mode := node.InteractiveMode
if i+1 >= interactive {
mode = node.BATCH
mode = node.BatchMode
}
node, err := createNode(t, peerID(), mode)
@@ -128,6 +129,8 @@ func createNode(transport transport.Transport, id state.PeerID, mode node.Mode)
id,
mode,
peers.NewMemoryPersistence(),
dependency.NewInMemoryTracker(),
node.EventualMode,
logger,
), nil
}
@@ -149,22 +152,16 @@ func Calc(count uint64, epoch int64) int64 {
return epoch + int64(count*2)
}
func peerID() state.PeerID {
bytes := make([]byte, 65)
_, _ = rand.Read(bytes)
func peerID() (id state.PeerID) {
_, _ = rand.Read(id[:])
return
}
id := state.PeerID{}
copy(id[:], bytes)
func groupId() (id state.GroupID) {
_, _ = rand.Read(id[:])
return id
}
func groupId() state.GroupID {
bytes := make([]byte, 32)
_, _ = rand.Read(bytes)
id := state.GroupID{}
copy(id[:], bytes)
return id
}

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/vacp2p/mvds/dependency"
"github.com/vacp2p/mvds/node"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/state"
@@ -40,7 +41,7 @@ func (s *MVDSBatchSuite) SetupTest() {
s.state1 = state.NewMemorySyncState()
s.peers1 = peers.NewMemoryPersistence()
p1 := [65]byte{0x01}
s.client1 = node.NewNode(s.ds1, t1, s.state1, Calc, 0, p1, node.BATCH, s.peers1, logger)
s.client1 = node.NewNode(s.ds1, t1, s.state1, Calc, 0, p1, node.BatchMode, s.peers1, dependency.NewInMemoryTracker(), node.EventualMode, logger)
in2 := make(chan transport.Packet)
t2 := transport.NewChannelTransport(0, in2)
@@ -48,7 +49,7 @@ func (s *MVDSBatchSuite) SetupTest() {
s.state2 = state.NewMemorySyncState()
p2 := [65]byte{0x02}
s.peers2 = peers.NewMemoryPersistence()
s.client2 = node.NewNode(s.ds2, t2, s.state2, Calc, 0, p2, node.BATCH, s.peers2, logger)
s.client2 = node.NewNode(s.ds2, t2, s.state2, Calc, 0, p2, node.BatchMode, s.peers2, dependency.NewInMemoryTracker(), node.EventualMode, logger)
t2.AddOutput(p1, in1)
t1.AddOutput(p2, in2)

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/vacp2p/mvds/dependency"
"github.com/vacp2p/mvds/node"
"github.com/vacp2p/mvds/peers"
"github.com/vacp2p/mvds/state"
@@ -40,7 +41,7 @@ func (s *MVDSInteractiveSuite) SetupTest() {
s.state1 = state.NewMemorySyncState()
s.peers1 = peers.NewMemoryPersistence()
p1 := [65]byte{0x01}
s.client1 = node.NewNode(s.ds1, t1, s.state1, Calc, 0, p1, node.INTERACTIVE, s.peers1, logger)
s.client1 = node.NewNode(s.ds1, t1, s.state1, Calc, 0, p1, node.InteractiveMode, s.peers1, dependency.NewInMemoryTracker(), node.EventualMode, logger)
in2 := make(chan transport.Packet)
t2 := transport.NewChannelTransport(0, in2)
@@ -48,7 +49,7 @@ func (s *MVDSInteractiveSuite) SetupTest() {
s.state2 = state.NewMemorySyncState()
p2 := [65]byte{0x02}
s.peers2 = peers.NewMemoryPersistence()
s.client2 = node.NewNode(s.ds2, t2, s.state2, Calc, 0, p2, node.INTERACTIVE, s.peers2, logger)
s.client2 = node.NewNode(s.ds2, t2, s.state2, Calc, 0, p2, node.InteractiveMode, s.peers2, dependency.NewInMemoryTracker(), node.EventualMode, logger)
t2.AddOutput(p1, in1)
t1.AddOutput(p2, in2)

View File

@@ -0,0 +1,91 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: state/state.go
// Package internal is a generated GoMock package.
package internal
import (
gomock "github.com/golang/mock/gomock"
state "github.com/vacp2p/mvds/state"
reflect "reflect"
)
// MockSyncState is a mock of SyncState interface
type MockSyncState struct {
ctrl *gomock.Controller
recorder *MockSyncStateMockRecorder
}
// MockSyncStateMockRecorder is the mock recorder for MockSyncState
type MockSyncStateMockRecorder struct {
mock *MockSyncState
}
// NewMockSyncState creates a new mock instance
func NewMockSyncState(ctrl *gomock.Controller) *MockSyncState {
mock := &MockSyncState{ctrl: ctrl}
mock.recorder = &MockSyncStateMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockSyncState) EXPECT() *MockSyncStateMockRecorder {
return m.recorder
}
// Add mocks base method
func (m *MockSyncState) Add(newState state.State) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Add", newState)
ret0, _ := ret[0].(error)
return ret0
}
// Add indicates an expected call of Add
func (mr *MockSyncStateMockRecorder) Add(newState interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockSyncState)(nil).Add), newState)
}
// Remove mocks base method
func (m *MockSyncState) Remove(id state.MessageID, peer state.PeerID) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Remove", id, peer)
ret0, _ := ret[0].(error)
return ret0
}
// Remove indicates an expected call of Remove
func (mr *MockSyncStateMockRecorder) Remove(id, peer interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockSyncState)(nil).Remove), id, peer)
}
// All mocks base method
func (m *MockSyncState) All(epoch int64) ([]state.State, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "All", epoch)
ret0, _ := ret[0].([]state.State)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// All indicates an expected call of All
func (mr *MockSyncStateMockRecorder) All(epoch interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "All", reflect.TypeOf((*MockSyncState)(nil).All), epoch)
}
// Map mocks base method
func (m *MockSyncState) Map(epoch int64, process func(state.State) state.State) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Map", epoch, process)
ret0, _ := ret[0].(error)
return ret0
}
// Map indicates an expected call of Map
func (mr *MockSyncStateMockRecorder) Map(epoch, process interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Map", reflect.TypeOf((*MockSyncState)(nil).Map), epoch, process)
}

View File

@@ -86,7 +86,7 @@ func _1565345162_initial_schemaDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1565345162_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1565345447, 0)}
info := bindataFileInfo{name: "1565345162_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7c, 0x69, 0xd2, 0x3, 0xea, 0x82, 0x7c, 0xb3, 0x44, 0x6c, 0xef, 0x64, 0x2c, 0x99, 0x62, 0xa2, 0x8b, 0x6f, 0x96, 0x4f, 0x34, 0x41, 0x87, 0xd5, 0x4e, 0x3, 0x7f, 0x4a, 0xd1, 0x91, 0x9, 0x99}}
return a, nil
}
@@ -106,7 +106,7 @@ func _1565345162_initial_schemaUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1565345162_initial_schema.up.sql", size: 86, mode: os.FileMode(0644), modTime: time.Unix(1565345708, 0)}
info := bindataFileInfo{name: "1565345162_initial_schema.up.sql", size: 86, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x78, 0x7c, 0xdd, 0x67, 0x61, 0x3e, 0x7f, 0xd4, 0xce, 0xb0, 0x17, 0xbe, 0x5a, 0xa7, 0x9e, 0x93, 0x34, 0xe8, 0xbb, 0x44, 0xfb, 0x88, 0xd6, 0x18, 0x6d, 0x9f, 0xb4, 0x22, 0xda, 0xbc, 0x87, 0x94}}
return a, nil
}
@@ -126,7 +126,7 @@ func docGo() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1565345207, 0)}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}}
return a, nil
}
@@ -223,10 +223,8 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1565345162_initial_schema.down.sql": _1565345162_initial_schemaDownSql,
"1565345162_initial_schema.up.sql": _1565345162_initial_schemaUpSql,
"doc.go": docGo,
"1565345162_initial_schema.up.sql": _1565345162_initial_schemaUpSql,
"doc.go": docGo,
}
// AssetDir returns the file names below a certain

View File

@@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"
"github.com/vacp2p/mvds/dependency"
"go.uber.org/zap"
"github.com/vacp2p/mvds/peers"
@@ -24,8 +25,18 @@ import (
type Mode int
const (
INTERACTIVE Mode = iota
BATCH
InteractiveMode Mode = iota + 1
BatchMode
)
// ResolutionMode defines how message dependencies should be resolved.
type ResolutionMode int
const (
// EventualMode is non-blocking and will return messages before dependencies are resolved.
EventualMode ResolutionMode = iota + 1
// ConsistentMode blocks and does not return messages until dependencies have been resolved.
ConsistentMode
)
// CalculateNextEpoch is a function used to calculate the next `SendEpoch` for a given message.
@@ -47,12 +58,16 @@ type Node struct {
payloads payloads
dependencies dependency.Tracker
nextEpoch CalculateNextEpoch
ID state.PeerID
epochPersistence *epochSQLitePersistence
mode Mode
mode Mode
resolution ResolutionMode
subscription chan protobuf.Message
@@ -64,6 +79,7 @@ func NewPersistentNode(
st transport.Transport,
id state.PeerID,
mode Mode,
resolution ResolutionMode,
nextEpoch CalculateNextEpoch,
logger *zap.Logger,
) (*Node, error) {
@@ -83,8 +99,10 @@ func NewPersistentNode(
payloads: newPayloads(),
epochPersistence: newEpochSQLitePersistence(db),
nextEpoch: nextEpoch,
dependencies: dependency.NewPersistentTracker(db),
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
resolution: resolution,
}
if currentEpoch, err := node.epochPersistence.Get(id); err != nil {
return nil, err
@@ -108,18 +126,19 @@ func NewEphemeralNode(
}
return &Node{
ID: id,
ctx: ctx,
cancel: cancel,
store: store.NewMemoryMessageStore(),
transport: t,
syncState: state.NewMemorySyncState(),
peers: peers.NewMemoryPersistence(),
payloads: newPayloads(),
nextEpoch: nextEpoch,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
ID: id,
ctx: ctx,
cancel: cancel,
store: store.NewMemoryMessageStore(),
transport: t,
syncState: state.NewMemorySyncState(),
peers: peers.NewMemoryPersistence(),
payloads: newPayloads(),
dependencies: dependency.NewInMemoryTracker(),
nextEpoch: nextEpoch,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
}
}
@@ -133,6 +152,8 @@ func NewNode(
id state.PeerID,
mode Mode,
pp peers.Persistence,
md dependency.Tracker,
resolution ResolutionMode,
logger *zap.Logger,
) *Node {
ctx, cancel := context.WithCancel(context.Background())
@@ -141,18 +162,20 @@ func NewNode(
}
return &Node{
ctx: ctx,
cancel: cancel,
store: ms,
transport: st,
syncState: ss,
peers: pp,
payloads: newPayloads(),
nextEpoch: nextEpoch,
ID: id,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
ctx: ctx,
cancel: cancel,
store: ms,
transport: st,
syncState: ss,
peers: pp,
payloads: newPayloads(),
nextEpoch: nextEpoch,
ID: id,
epoch: currentEpoch,
logger: logger.With(zap.Namespace("mvds")),
mode: mode,
dependencies: md,
resolution: resolution,
}
}
@@ -223,38 +246,53 @@ func (n *Node) Unsubscribe() {
// AppendMessage sends a message to a given group.
func (n *Node) AppendMessage(groupID state.GroupID, data []byte) (state.MessageID, error) {
m := protobuf.Message{
p, err := n.store.GetMessagesWithoutChildren(groupID)
parents := make([][]byte, len(p))
if err != nil {
n.logger.Error("Failed to retrieve parents",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.Error(err),
)
}
for i, id := range p {
parents[i] = id[:]
}
return n.AppendMessageWithMetadata(groupID, data, &protobuf.Metadata{Ephemeral: false, Parents: parents})
}
// AppendEphemeralMessage sends a message to a given group that has the `no_ack_required` flag set to `true`.
func (n *Node) AppendEphemeralMessage(groupID state.GroupID, data []byte) (state.MessageID, error) {
return n.AppendMessageWithMetadata(groupID, data, &protobuf.Metadata{Ephemeral: true})
}
// AppendMessageWithMetadata sends a message to a given group with metadata.
func (n *Node) AppendMessageWithMetadata(groupID state.GroupID, data []byte, metadata *protobuf.Metadata) (state.MessageID, error) {
m := &protobuf.Message{
GroupId: groupID[:],
Timestamp: time.Now().Unix(),
Body: data,
Metadata: metadata,
}
id := m.ID()
peers, err := n.peers.GetByGroupID(groupID)
if err != nil {
return state.MessageID{}, fmt.Errorf("trying to send to unknown group %x", groupID[:4])
}
err = n.store.Add(&m)
err := n.store.Add(m)
if err != nil {
return state.MessageID{}, err
}
for _, p := range peers {
t := state.OFFER
if n.mode == BATCH {
t = state.MESSAGE
}
n.insertSyncState(&groupID, id, p, t)
err = n.broadcastToGroup(groupID, n.ID, m)
if err != nil {
return state.MessageID{}, err
}
n.logger.Debug("Sending message",
n.logger.Debug("Appending Message to Sync State",
zap.String("node", hex.EncodeToString(n.ID[:4])),
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("id", hex.EncodeToString(id[:4])))
// @todo think about a way to insta trigger send messages when send was selected, we don't wanna wait for ticks here
// @todo think about a way to insta trigger pushToSub messages when pushToSub was selected, we don't wanna wait for ticks here
return id, nil
}
@@ -284,6 +322,9 @@ func (n *Node) IsPeerInGroup(g state.GroupID, p state.PeerID) (bool, error) {
}
func (n *Node) sendMessages() error {
var toRemove []state.State
err := n.syncState.Map(n.epoch, func(s state.State) state.State {
m := s.MessageID
p := s.PeerID
@@ -300,7 +341,6 @@ func (n *Node) sendMessages() error {
case state.MESSAGE:
g := *s.GroupID
// TODO: Handle errors
exist, err := n.IsPeerInGroup(g, p)
if err != nil {
return s
@@ -328,6 +368,9 @@ func (n *Node) sendMessages() error {
zap.String("messageID", hex.EncodeToString(m[:4])),
)
if msg.Metadata != nil && msg.Metadata.Ephemeral {
toRemove = append(toRemove, s)
}
}
return n.updateSendEpoch(s)
@@ -366,7 +409,7 @@ func (n *Node) onPayload(sender state.PeerID, payload protobuf.Payload) {
func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
for _, raw := range offers {
id := toMessageID(raw)
id := state.ToMessageID(raw)
n.logger.Debug("OFFER received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
@@ -390,7 +433,7 @@ func (n *Node) onOffer(sender state.PeerID, offers [][]byte) error {
func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
for _, raw := range requests {
id := toMessageID(raw)
id := state.ToMessageID(raw)
n.logger.Debug("REQUEST received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
@@ -407,7 +450,7 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
continue
}
groupID := toGroupID(message.GroupId)
groupID := state.ToGroupID(message.GroupId)
exist, err := n.IsPeerInGroup(groupID, sender)
if err != nil {
@@ -430,7 +473,7 @@ func (n *Node) onRequest(sender state.PeerID, requests [][]byte) error {
func (n *Node) onAck(sender state.PeerID, acks [][]byte) error {
for _, ack := range acks {
id := toMessageID(ack)
id := state.ToMessageID(ack)
err := n.syncState.Remove(id, sender)
if err != nil {
@@ -452,14 +495,25 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][
a := make([][]byte, 0)
for _, m := range messages {
groupID := toGroupID(m.GroupId)
err := n.onMessage(sender, *m)
groupID := state.ToGroupID(m.GroupId)
err := n.onMessage(sender, m)
if err != nil {
n.logger.Error("Error processing message", zap.Error(err))
continue
}
id := m.ID()
if m.Metadata != nil && m.Metadata.Ephemeral {
n.logger.Debug("not sending ACK",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
zap.String("", hex.EncodeToString(sender[:4])),
zap.String("messageID", hex.EncodeToString(id[:4])),
)
continue
}
n.logger.Debug("sending ACK",
zap.String("groupID", hex.EncodeToString(groupID[:4])),
zap.String("from", hex.EncodeToString(n.ID[:4])),
@@ -473,9 +527,10 @@ func (n *Node) onMessages(sender state.PeerID, messages []*protobuf.Message) [][
return a
}
func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
// @todo cleanup this function
func (n *Node) onMessage(sender state.PeerID, msg *protobuf.Message) error {
id := msg.ID()
groupID := toGroupID(msg.GroupId)
groupID := state.ToGroupID(msg.GroupId)
n.logger.Debug("MESSAGE received",
zap.String("from", hex.EncodeToString(sender[:4])),
zap.String("to", hex.EncodeToString(n.ID[:4])),
@@ -487,32 +542,169 @@ func (n *Node) onMessage(sender state.PeerID, msg protobuf.Message) error {
return err
}
err = n.store.Add(&msg)
if err != nil {
return err
// @todo process, should this function ever even have an error?
if msg.Metadata == nil || !msg.Metadata.Ephemeral {
err = n.store.Add(msg)
if err != nil {
return err
}
}
peers, err := n.peers.GetByGroupID(groupID)
err = n.broadcastToGroup(groupID, sender, msg)
if err != nil {
return err
}
for _, peer := range peers {
n.resolve(sender, msg)
return nil
}
func (n *Node) broadcastToGroup(group state.GroupID, sender state.PeerID, msg *protobuf.Message) error {
p, err := n.peers.GetByGroupID(group)
if err != nil {
return err
}
id := msg.ID()
for _, peer := range p {
if peer == sender {
continue
}
n.insertSyncState(&groupID, id, peer, state.OFFER)
}
t := state.OFFER
if n.mode == BatchMode || (msg.Metadata == nil && !msg.Metadata.Ephemeral) {
t = state.MESSAGE
}
if n.subscription != nil {
n.subscription <- msg
n.insertSyncState(&group, id, peer, t)
}
return nil
}
// @todo I do not think this will work, this needs be some recrusive function
// @todo add method to select depth of how far we resolve dependencies
func (n *Node) resolve(sender state.PeerID, msg *protobuf.Message) {
if n.resolution == EventualMode {
n.resolveEventually(sender, msg)
return
}
n.resolveConsistently(sender, msg)
}
func (n *Node) resolveEventually(sender state.PeerID, msg *protobuf.Message) {
if msg.Metadata == nil || len(msg.Metadata.Parents) == 0 {
n.pushToSub(msg)
return
}
for _, parent := range msg.Metadata.Parents {
pid := state.ToMessageID(parent)
if has, _ := n.store.Has(pid); has {
continue
}
group := state.ToGroupID(msg.GroupId)
n.insertSyncState(&group, pid, sender, state.REQUEST)
}
n.pushToSub(msg)
}
func (n *Node) resolveConsistently(sender state.PeerID, msg *protobuf.Message) {
id := msg.ID()
// We push any messages whose parents have now been resolved
dependants, err := n.dependencies.Dependants(id)
if err != nil {
n.logger.Error("error getting dependants",
zap.Error(err),
zap.String("msg", hex.EncodeToString(id[:4])),
)
}
for _, dependant := range dependants {
err := n.dependencies.Resolve(dependant, id)
if err != nil {
n.logger.Error("error marking resolved dependency",
zap.Error(err),
zap.String("msg", hex.EncodeToString(dependant[:4])),
zap.String("dependency", hex.EncodeToString(id[:4])),
)
}
resolved, err := n.dependencies.IsResolved(dependant)
if err != nil {
n.logger.Error("error getting unresolved dependencies",
zap.Error(err),
zap.String("msg", hex.EncodeToString(dependant[:4])),
)
}
if !resolved {
continue
}
dmsg, err := n.store.Get(dependant)
if err != nil {
n.logger.Error("error getting message",
zap.Error(err),
zap.String("messageID", hex.EncodeToString(dependant[:4])),
)
}
if dmsg != nil {
n.pushToSub(dmsg)
}
}
// @todo add parent dependencies to child, then we can have multiple levels?
if msg.Metadata == nil || len(msg.Metadata.Parents) == 0 {
n.pushToSub(msg)
return
}
hasUnresolvedDependencies := false
for _, parent := range msg.Metadata.Parents {
pid := state.ToMessageID(parent)
if has, _ := n.store.Has(pid); has {
continue
}
group := state.ToGroupID(msg.GroupId)
n.insertSyncState(&group, pid, sender, state.REQUEST)
hasUnresolvedDependencies = true
err := n.dependencies.Add(id, pid)
if err != nil {
n.logger.Error("error adding dependency",
zap.Error(err),
zap.String("msg", hex.EncodeToString(id[:4])),
zap.String("dependency", hex.EncodeToString(pid[:4])),
)
}
}
if hasUnresolvedDependencies {
return
}
n.pushToSub(msg)
}
func (n *Node) pushToSub(msg *protobuf.Message) {
if n.subscription == nil {
return
}
n.subscription <- *msg
}
func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID, peerID state.PeerID, t state.RecordType) {
s := state.State{
GroupID: groupID,
@@ -539,15 +731,3 @@ func (n *Node) updateSendEpoch(s state.State) state.State {
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
return s
}
func toMessageID(b []byte) state.MessageID {
var id state.MessageID
copy(id[:], b)
return id
}
func toGroupID(b []byte) state.GroupID {
var id state.GroupID
copy(id[:], b)
return id
}

130
node/node_test.go Normal file
View File

@@ -0,0 +1,130 @@
package node
import (
"crypto/rand"
"reflect"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/vacp2p/mvds/dependency"
"github.com/vacp2p/mvds/node/internal"
"github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/store"
)
func TestNode_resolveEventually(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
syncstate := internal.NewMockSyncState(ctrl)
node := Node{
syncState: syncstate,
store: store.NewMemoryMessageStore(),
}
channel := node.Subscribe()
peer := peerID()
group := groupID()
parent := messageID()
msg := &protobuf.Message{
GroupId: group[:],
Timestamp: time.Now().Unix(),
Body: []byte{0x01},
Metadata: &protobuf.Metadata{Ephemeral: false, Parents: [][]byte{parent[:]}},
}
expectedState := state.State{
GroupID: &group,
MessageID: parent,
PeerID: peer,
Type: state.REQUEST,
SendEpoch: 1,
}
syncstate.EXPECT().Add(expectedState).Return(nil)
go node.resolveEventually(peer, msg)
received := <-channel
if !reflect.DeepEqual(*msg, received) {
t.Error("expected message did not match received")
}
}
func TestNode_resolveConsistently(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
syncstate := internal.NewMockSyncState(ctrl)
node := Node{
syncState: syncstate,
store: store.NewMemoryMessageStore(),
dependencies: dependency.NewInMemoryTracker(),
}
channel := node.Subscribe()
peer := peerID()
group := groupID()
parent := &protobuf.Message{
GroupId: group[:],
Timestamp: time.Now().Unix(),
Body: []byte{0x02},
}
parentID := parent.ID()
msg := &protobuf.Message{
GroupId: group[:],
Timestamp: time.Now().Unix(),
Body: []byte{0x01},
Metadata: &protobuf.Metadata{Ephemeral: false, Parents: [][]byte{parentID[:]}},
}
// @todo we need to make sure to add the message cause we are going through a subset of the flow
_ = node.store.Add(msg)
syncstate.EXPECT().Add(gomock.Any()).DoAndReturn(func(state.State) error {
return nil
})
node.resolveConsistently(peer, msg)
go node.resolveConsistently(peer, parent)
received := <-channel
if !reflect.DeepEqual(*msg, received) {
t.Error("expected message did not match received")
}
received = <-channel
if !reflect.DeepEqual(*parent, received) {
t.Error("expected message did not match received")
}
}
func peerID() (id state.PeerID) {
_, _ = rand.Read(id[:])
return id
}
func groupID() (id state.GroupID) {
_, _ = rand.Read(id[:])
return id
}
func messageID() (id state.MessageID) {
_, _ = rand.Read(id[:])
return id
}

View File

@@ -86,7 +86,7 @@ func _1565249278_initial_schemaDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1565249278_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1565249542, 0)}
info := bindataFileInfo{name: "1565249278_initial_schema.down.sql", size: 23, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x4, 0xfb, 0x5, 0x92, 0xf0, 0x93, 0xaa, 0x83, 0xb7, 0xdf, 0x66, 0xe2, 0x97, 0x53, 0x9d, 0x34, 0xd3, 0xca, 0x97, 0xd8, 0xe1, 0xed, 0xf0, 0x4a, 0x94, 0x1a, 0xb1, 0x8f, 0xcf, 0xc, 0xa4, 0x6}}
return a, nil
}
@@ -106,7 +106,7 @@ func _1565249278_initial_schemaUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1565249278_initial_schema.up.sql", size: 140, mode: os.FileMode(0644), modTime: time.Unix(1565251147, 0)}
info := bindataFileInfo{name: "1565249278_initial_schema.up.sql", size: 140, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8a, 0xbc, 0x3a, 0x87, 0x12, 0x93, 0xeb, 0xb4, 0xcc, 0x42, 0x6e, 0xb2, 0x7d, 0xfa, 0x9a, 0xa8, 0x3f, 0xb, 0x6b, 0xa8, 0x2d, 0x8b, 0xde, 0x67, 0x2a, 0xa8, 0xa5, 0x42, 0xad, 0x27, 0x15, 0x7e}}
return a, nil
}
@@ -126,7 +126,7 @@ func docGo() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1565250280, 0)}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}}
return a, nil
}
@@ -223,10 +223,8 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1565249278_initial_schema.down.sql": _1565249278_initial_schemaDownSql,
"1565249278_initial_schema.up.sql": _1565249278_initial_schemaUpSql,
"doc.go": docGo,
"1565249278_initial_schema.up.sql": _1565249278_initial_schemaUpSql,
"doc.go": docGo,
}
// AssetDir returns the file names below a certain

View File

@@ -83,20 +83,68 @@ func (m *Payload) GetMessages() []*Message {
return nil
}
type Message struct {
GroupId []byte `protobuf:"bytes,1,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`
type Metadata struct {
Parents [][]byte `protobuf:"bytes,1,rep,name=parents,proto3" json:"parents,omitempty"`
Ephemeral bool `protobuf:"varint,2,opt,name=ephemeral,proto3" json:"ephemeral,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Metadata) Reset() { *m = Metadata{} }
func (m *Metadata) String() string { return proto.CompactTextString(m) }
func (*Metadata) ProtoMessage() {}
func (*Metadata) Descriptor() ([]byte, []int) {
return fileDescriptor_2dca527c092c79d7, []int{1}
}
func (m *Metadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Metadata.Unmarshal(m, b)
}
func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Metadata.Marshal(b, m, deterministic)
}
func (m *Metadata) XXX_Merge(src proto.Message) {
xxx_messageInfo_Metadata.Merge(m, src)
}
func (m *Metadata) XXX_Size() int {
return xxx_messageInfo_Metadata.Size(m)
}
func (m *Metadata) XXX_DiscardUnknown() {
xxx_messageInfo_Metadata.DiscardUnknown(m)
}
var xxx_messageInfo_Metadata proto.InternalMessageInfo
func (m *Metadata) GetParents() [][]byte {
if m != nil {
return m.Parents
}
return nil
}
func (m *Metadata) GetEphemeral() bool {
if m != nil {
return m.Ephemeral
}
return false
}
type Message struct {
GroupId []byte `protobuf:"bytes,1,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`
Metadata *Metadata `protobuf:"bytes,4,opt,name=metadata,proto3" json:"metadata,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_2dca527c092c79d7, []int{1}
return fileDescriptor_2dca527c092c79d7, []int{2}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
@@ -138,27 +186,38 @@ func (m *Message) GetBody() []byte {
return nil
}
func (m *Message) GetMetadata() *Metadata {
if m != nil {
return m.Metadata
}
return nil
}
func init() {
proto.RegisterType((*Payload)(nil), "mvds.Payload")
proto.RegisterType((*Message)(nil), "mvds.Message")
proto.RegisterType((*Payload)(nil), "vac.mvds.Payload")
proto.RegisterType((*Metadata)(nil), "vac.mvds.Metadata")
proto.RegisterType((*Message)(nil), "vac.mvds.Message")
}
func init() { proto.RegisterFile("protobuf/sync.proto", fileDescriptor_2dca527c092c79d7) }
var fileDescriptor_2dca527c092c79d7 = []byte{
// 212 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x8f, 0xb1, 0x4a, 0x04, 0x31,
0x10, 0x86, 0xd9, 0xcb, 0x72, 0x1b, 0xc7, 0xb3, 0x19, 0x41, 0xa2, 0x58, 0x84, 0xab, 0x62, 0xb3,
0x82, 0xbe, 0x81, 0x9d, 0x85, 0x20, 0x29, 0x2c, 0x6c, 0x24, 0x7b, 0xc9, 0x1e, 0x87, 0xc6, 0xac,
0x99, 0xac, 0xb0, 0xe0, 0xc3, 0xcb, 0x8d, 0xe7, 0x5e, 0xf7, 0x7f, 0xdf, 0xcf, 0x30, 0x33, 0x70,
0x3e, 0xe4, 0x54, 0x52, 0x37, 0xf6, 0xb7, 0x34, 0x7d, 0x6e, 0x5a, 0x26, 0xac, 0xe3, 0xb7, 0xa7,
0xf5, 0x0f, 0x34, 0xcf, 0x6e, 0xfa, 0x48, 0xce, 0x23, 0x42, 0xed, 0x36, 0xef, 0xa4, 0x2a, 0x2d,
0xcc, 0xca, 0x72, 0xc6, 0x0b, 0x58, 0xa6, 0xbe, 0x0f, 0x99, 0xd4, 0x82, 0xed, 0x81, 0xf0, 0x0a,
0x64, 0x0e, 0x5f, 0x63, 0xa0, 0x42, 0x4a, 0x70, 0x33, 0x33, 0xde, 0x80, 0x8c, 0x81, 0xc8, 0x6d,
0x03, 0xa9, 0x5a, 0x0b, 0x73, 0x7a, 0x77, 0xd6, 0xee, 0x77, 0xb5, 0x4f, 0x7f, 0xd6, 0xce, 0xf5,
0xfa, 0x05, 0x9a, 0x83, 0xc4, 0x4b, 0x90, 0xdb, 0x9c, 0xc6, 0xe1, 0x6d, 0xe7, 0x55, 0xa5, 0x2b,
0xb3, 0xb2, 0x0d, 0xf3, 0xa3, 0xc7, 0x6b, 0x38, 0x29, 0xbb, 0x18, 0xa8, 0xb8, 0x38, 0xa8, 0x85,
0xae, 0x8c, 0xb0, 0x47, 0xb1, 0x3f, 0xbb, 0x4b, 0x7e, 0x52, 0x82, 0x87, 0x38, 0x3f, 0xc0, 0xab,
0xfc, 0x7f, 0xb9, 0x5b, 0x72, 0xba, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x4c, 0x0a, 0x11, 0xee,
0x05, 0x01, 0x00, 0x00,
// 265 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x50, 0x3d, 0x4f, 0xc3, 0x30,
0x10, 0x95, 0x9b, 0xaa, 0x35, 0x57, 0x16, 0x0e, 0x09, 0x19, 0xc4, 0x10, 0x65, 0xca, 0x42, 0x90,
0xca, 0x3f, 0xe8, 0xc6, 0x50, 0x09, 0x65, 0x64, 0x41, 0x97, 0xf8, 0x52, 0x2a, 0x9a, 0x3a, 0xd8,
0x4e, 0xa5, 0x6c, 0x4c, 0xfc, 0x6e, 0x14, 0xa7, 0x49, 0xd9, 0xde, 0xc7, 0xf9, 0xf9, 0xde, 0xc1,
0x6d, 0x63, 0x8d, 0x37, 0x45, 0x5b, 0x3d, 0xbb, 0xee, 0x58, 0x66, 0x81, 0xa1, 0x3c, 0x51, 0x99,
0xd5, 0x27, 0xed, 0x92, 0x1f, 0x01, 0xcb, 0x37, 0xea, 0x0e, 0x86, 0x34, 0x22, 0xcc, 0xa9, 0xfc,
0x72, 0x4a, 0xc4, 0x51, 0x7a, 0x9d, 0x07, 0x8c, 0x77, 0xb0, 0x30, 0x55, 0xc5, 0xd6, 0xa9, 0x59,
0x50, 0xcf, 0x0c, 0x1f, 0x40, 0x5a, 0xfe, 0x6e, 0xd9, 0x79, 0xa7, 0xa2, 0xe0, 0x4c, 0x1c, 0x9f,
0x40, 0xd6, 0xec, 0x1c, 0xed, 0xd8, 0xa9, 0x79, 0x1c, 0xa5, 0xab, 0xf5, 0x4d, 0x36, 0x7e, 0x98,
0x6d, 0x07, 0x27, 0x9f, 0x46, 0x92, 0x0d, 0xc8, 0x2d, 0x7b, 0xd2, 0xe4, 0x09, 0x15, 0x2c, 0x1b,
0xb2, 0x7c, 0xf4, 0xe3, 0x16, 0x23, 0xc5, 0x47, 0xb8, 0xe2, 0xe6, 0x93, 0x6b, 0xb6, 0x74, 0x50,
0xb3, 0x58, 0xa4, 0x32, 0xbf, 0x08, 0xc9, 0xaf, 0x80, 0xe5, 0x39, 0x19, 0xef, 0x41, 0xee, 0xac,
0x69, 0x9b, 0x8f, 0xbd, 0x56, 0x22, 0x16, 0x7d, 0x48, 0xe0, 0xaf, 0xba, 0x0f, 0xf1, 0xfb, 0x9a,
0x9d, 0xa7, 0xba, 0x09, 0x21, 0x51, 0x7e, 0x11, 0xfa, 0xfe, 0x85, 0xd1, 0x9d, 0x8a, 0xc2, 0xa3,
0x80, 0x31, 0xeb, 0xbb, 0x0c, 0xcb, 0xa9, 0x79, 0x2c, 0xd2, 0xd5, 0x1a, 0xff, 0x77, 0x19, 0x9c,
0x7c, 0x9a, 0xd9, 0xc0, 0xbb, 0x1c, 0x0f, 0x5e, 0x2c, 0x02, 0x7a, 0xf9, 0x0b, 0x00, 0x00, 0xff,
0xff, 0x8f, 0x0f, 0x88, 0x6c, 0x83, 0x01, 0x00, 0x00,
}

View File

@@ -1,6 +1,6 @@
syntax = "proto3";
package mvds;
package vac.mvds;
option go_package = "protobuf";
message Payload {
@@ -10,8 +10,14 @@ message Payload {
repeated Message messages = 4;
}
message Metadata {
repeated bytes parents = 1;
bool ephemeral = 2;
}
message Message {
bytes group_id = 1;
int64 timestamp = 2;
bytes body = 3;
Metadata metadata = 4;
}

View File

@@ -1,7 +1,7 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1565341329_initial_schema.down.sql (24B)
// 1565341329_initial_schema.up.sql (237B)
// 1565341329_initial_schema.up.sql (294B)
// doc.go (377B)
package migrations
@@ -86,12 +86,12 @@ func _1565341329_initial_schemaDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1565341329_initial_schema.down.sql", size: 24, mode: os.FileMode(0644), modTime: time.Unix(1565341770, 0)}
info := bindataFileInfo{name: "1565341329_initial_schema.down.sql", size: 24, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x20, 0x56, 0x1a, 0x0, 0xc5, 0x81, 0xb3, 0xeb, 0x2a, 0xae, 0xed, 0xbb, 0x68, 0x51, 0x68, 0xc7, 0xe3, 0x31, 0xe, 0x1, 0x3e, 0xd2, 0x85, 0x9e, 0x6d, 0x55, 0xad, 0x55, 0xd6, 0x2f, 0x29, 0xca}}
return a, nil
}
var __1565341329_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\x08\x71\x74\xf2\x71\x55\xc8\x2d\x4b\x29\x8e\x2f\x2e\x49\x2c\x49\x2d\x56\xd0\xe0\x52\x50\x50\x50\x28\xa9\x2c\x48\x55\xf0\xf4\x0b\x71\x75\x77\x0d\x52\xf0\xf3\x0f\x51\xf0\x0b\xf5\xf1\xd1\x01\x4b\x15\xa7\xe6\xa5\xc4\x27\xe7\x97\xe6\x95\xe0\x53\x90\x5a\x90\x9f\x9c\x81\x43\x41\x7a\x51\x7e\x69\x41\x7c\x66\x8a\x82\x93\x8f\xbf\x13\x44\xa8\x20\x35\xb5\x08\x26\x82\xa6\x3a\x37\xb5\xb8\x38\x31\x3d\x15\x87\x6c\x40\x90\xa7\xaf\x63\x50\xa4\x82\xb7\x6b\xa4\x82\x06\x42\xa9\x0e\xcc\x44\x4d\x2e\x4d\x6b\x2e\x40\x00\x00\x00\xff\xff\x8b\xf0\x0a\x3b\xed\x00\x00\x00")
var __1565341329_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x7c\x8f\xc1\x8a\x83\x30\x14\x45\xf7\xf9\x8a\xbb\x54\xf0\x0f\x5c\xe9\x4c\x18\x64\xd2\x58\x42\x0a\x75\x15\xc4\x3c\xac\x0b\x35\x98\x58\xda\xbf\x2f\xb4\x15\xa5\x60\xb7\xf7\x1c\x1e\xef\xfc\x28\x9e\x69\x0e\x9d\xe5\x82\xa3\xbf\x5a\x6f\x7c\xa8\x03\x79\x44\x0c\x00\xc2\xdd\x11\x0a\xa9\xf9\x1f\x57\x90\xa5\x86\x3c\x09\x91\x3c\x91\xa7\xc1\x9a\x66\x9c\x87\xf0\x4d\x20\x37\x36\x97\x1d\xa1\x9d\xc6\xd9\x99\xce\x22\x17\x65\xfe\x9a\x1c\xd1\xb4\x2c\x1f\x76\x4f\xde\xd7\x2d\xed\xd0\xa3\x2a\x0e\x99\xaa\xf0\xcf\x2b\x44\xab\x9a\x2c\x17\x63\x16\xa7\x8c\xbd\x6b\x0b\xf9\xcb\xcf\xe8\xec\xcd\x6c\x7e\x2c\xe5\xb6\x3f\x5a\x49\x9c\xb2\x47\x00\x00\x00\xff\xff\x5e\xe5\x72\x74\x26\x01\x00\x00")
func _1565341329_initial_schemaUpSqlBytes() ([]byte, error) {
return bindataRead(
@@ -106,8 +106,8 @@ func _1565341329_initial_schemaUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1565341329_initial_schema.up.sql", size: 237, mode: os.FileMode(0644), modTime: time.Unix(1565342998, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8f, 0xf4, 0x34, 0xb, 0x4e, 0x94, 0x3b, 0xe, 0xf2, 0xc2, 0x36, 0x31, 0x47, 0x2e, 0xf8, 0x85, 0xab, 0x1a, 0x82, 0x24, 0x74, 0x39, 0xf6, 0x83, 0xaf, 0xb6, 0xb1, 0x79, 0x9, 0xda, 0x59, 0xd0}}
info := bindataFileInfo{name: "1565341329_initial_schema.up.sql", size: 294, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x3e, 0xa5, 0x37, 0x9d, 0x3f, 0xf3, 0xc9, 0xc8, 0x12, 0x74, 0x79, 0x74, 0xff, 0xfd, 0xb1, 0x5f, 0x13, 0xaf, 0xf2, 0x50, 0x14, 0x9f, 0xdf, 0xc8, 0xc5, 0xa7, 0xc3, 0xf5, 0xa4, 0x8e, 0x8a, 0xf6}}
return a, nil
}
@@ -126,7 +126,7 @@ func docGo() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1565341287, 0)}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}}
return a, nil
}
@@ -223,10 +223,8 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1565341329_initial_schema.down.sql": _1565341329_initial_schemaDownSql,
"1565341329_initial_schema.up.sql": _1565341329_initial_schemaUpSql,
"doc.go": docGo,
"1565341329_initial_schema.up.sql": _1565341329_initial_schemaUpSql,
"doc.go": docGo,
}
// AssetDir returns the file names below a certain

View File

@@ -2,3 +2,17 @@ package state
type MessageID [32]byte
type GroupID [32]byte
// ToMessageID converts a byte array to a MessageID.
func ToMessageID(b []byte) MessageID {
var id MessageID
copy(id[:], b)
return id
}
// ToGroupID converts a byte array to a GroupID.
func ToGroupID(b []byte) GroupID {
var id GroupID
copy(id[:], b)
return id
}

View File

@@ -10,4 +10,5 @@ type MessageStore interface {
Has(id state.MessageID) (bool, error)
Get(id state.MessageID) (*protobuf.Message, error)
Add(message *protobuf.Message) error
GetMessagesWithoutChildren(id state.GroupID) ([]state.MessageID, error)
}

View File

@@ -43,3 +43,39 @@ func (ds *memoryMessageStore) Add(message *protobuf.Message) error {
ds.ms[message.ID()] = message
return nil
}
func (ds *memoryMessageStore) GetMessagesWithoutChildren(group state.GroupID) ([]state.MessageID, error) {
ds.Lock()
defer ds.Unlock()
hasChildren := make(map[state.MessageID]bool)
for id, msg := range ds.ms {
if state.ToGroupID(msg.GroupId) != group {
continue
}
if msg.Metadata != nil {
for _, parent := range msg.Metadata.Parents {
hasChildren[state.ToMessageID(parent)] = true
}
}
if hasChildren[id] {
continue
}
hasChildren[id] = false
}
msgs := make([]state.MessageID, 0)
for id, hasChildren := range hasChildren {
if hasChildren {
continue
}
msgs = append(msgs, id)
}
return msgs, nil
}

View File

@@ -3,6 +3,7 @@ package store
import (
"database/sql"
"errors"
"strings"
"github.com/vacp2p/mvds/state"
@@ -23,7 +24,13 @@ func NewPersistentMessageStore(db *sql.DB) *persistentMessageStore {
func (p *persistentMessageStore) Add(message *protobuf.Message) error {
id := message.ID()
_, err := p.db.Exec(
tx, err := p.db.Begin()
if err != nil {
return err
}
_, err = tx.Exec(
`INSERT INTO mvds_messages (id, group_id, timestamp, body)
VALUES (?, ?, ?, ?)`,
id[:],
@@ -31,7 +38,37 @@ func (p *persistentMessageStore) Add(message *protobuf.Message) error {
message.Timestamp,
message.Body,
)
return err
if err != nil {
_ = tx.Rollback()
return err
}
if message.Metadata != nil && len(message.Metadata.Parents) > 0 {
var sb strings.Builder
sb.WriteString("INSERT INTO mvds_parents(message_id, parent_id) VALUES ")
var vals []interface{}
for _, row := range message.Metadata.Parents {
sb.WriteString("(?, ?),")
vals = append(vals, id[:], row[:])
}
query := sb.String()
stmt, err := tx.Prepare(query[0:len(query)-1])
if err != nil {
_ = tx.Rollback()
return err
}
_, err = stmt.Exec(vals...)
if err != nil {
_ = tx.Rollback()
return err
}
}
return tx.Commit()
}
func (p *persistentMessageStore) Get(id state.MessageID) (*protobuf.Message, error) {
@@ -47,6 +84,31 @@ func (p *persistentMessageStore) Get(id state.MessageID) (*protobuf.Message, err
); err != nil {
return nil, err
}
message.Metadata = &protobuf.Metadata{Ephemeral: false}
rows, err := p.db.Query(`SELECT parent_id FROM mvds_parents WHERE message_id = ?`, id[:])
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var parent []byte
err := rows.Scan(&parent)
if err != nil {
return nil, err
}
message.Metadata.Parents = append(message.Metadata.Parents, parent)
}
err = rows.Err()
if err != nil {
return nil, err
}
return &message, nil
}
@@ -65,3 +127,34 @@ func (p *persistentMessageStore) Has(id state.MessageID) (bool, error) {
return false, err
}
}
func (p *persistentMessageStore) GetMessagesWithoutChildren(id state.GroupID) ([]state.MessageID, error) {
var result []state.MessageID
rows, err := p.db.Query(
`SELECT id FROM mvds_messages WHERE group_id = ? AND id NOT IN (SELECT parent_id FROM mvds_parents)`,
id[:],
)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var parent []byte
err := rows.Scan(&parent)
if err != nil {
return nil, err
}
result = append(result, state.ToMessageID(parent))
}
err = rows.Err()
if err != nil {
return nil, err
}
return result, nil
}

View File

@@ -2,6 +2,7 @@ package store
import (
"io/ioutil"
"math/rand"
"testing"
"time"
@@ -28,6 +29,7 @@ func TestPersistentMessageStore(t *testing.T) {
GroupId: []byte{0x01},
Timestamp: now,
Body: []byte{0xaa, 0xbb, 0xcc},
Metadata: &protobuf.Metadata{Ephemeral: false, Parents: [][]byte{{0xaa, 0xbb, 0xcc}}},
}
err = p.Add(&message)
@@ -51,3 +53,52 @@ func TestPersistentMessageStore(t *testing.T) {
require.NoError(t, err)
require.False(t, exists)
}
func TestPersistentMessageStore_GetMessagesWithoutChildren(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "")
require.NoError(t, err)
db, err := persistenceutil.Open(tmpFile.Name(), "", persistenceutil.MigrationConfig{
AssetNames: migrations.AssetNames(),
AssetGetter: migrations.Asset,
})
require.NoError(t, err)
p := NewPersistentMessageStore(db)
group := groupId()
now := time.Now().Unix()
msg := &protobuf.Message{
GroupId: group[:],
Timestamp: now,
Body: []byte{0xaa, 0xbb, 0xcc},
Metadata: &protobuf.Metadata{Ephemeral: false, Parents: [][]byte{}},
}
err = p.Add(msg)
require.NoError(t, err)
id := msg.ID()
child := &protobuf.Message{
GroupId: group[:],
Timestamp: now,
Body: []byte{0xaa, 0xcc},
Metadata: &protobuf.Metadata{Ephemeral: false, Parents: [][]byte{id[:]}},
}
err = p.Add(child)
require.NoError(t, err)
msgs, err := p.GetMessagesWithoutChildren(group)
require.NoError(t, err)
if msgs[0] != child.ID() {
t.Errorf("not same \n expected %v \n actual: %v", msgs[0], child.ID())
}
}
func groupId() (id state.GroupID) {
_, _ = rand.Read(id[:])
return id
}

View File

@@ -1,7 +1,7 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1565447861_initial_schema.down.sql (28B)
// 1565447861_initial_schema.up.sql (140B)
// 1572372377_initial_schema.down.sql (55B)
// 1572372377_initial_schema.up.sql (365B)
// doc.go (377B)
package migrations
@@ -71,43 +71,43 @@ func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1565447861_initial_schemaDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x71\xf5\x71\x0d\x71\x55\x08\x71\x74\xf2\x71\x55\xc8\x2d\x4b\x29\x8e\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\x02\x04\x00\x00\xff\xff\xc2\x3a\x18\x9b\x1c\x00\x00\x00")
var __1572372377_initial_schemaDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x71\xf5\x71\x0d\x71\x55\x08\x71\x74\xf2\x71\x55\xc8\x2d\x4b\x29\x8e\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\xb6\xe6\xc2\x94\x2b\x48\x2c\x4a\xcd\x2b\x29\xb6\xe6\x02\x04\x00\x00\xff\xff\xa9\xdd\x32\x20\x37\x00\x00\x00")
func _1565447861_initial_schemaDownSqlBytes() ([]byte, error) {
func _1572372377_initial_schemaDownSqlBytes() ([]byte, error) {
return bindataRead(
__1565447861_initial_schemaDownSql,
"1565447861_initial_schema.down.sql",
__1572372377_initial_schemaDownSql,
"1572372377_initial_schema.down.sql",
)
}
func _1565447861_initial_schemaDownSql() (*asset, error) {
bytes, err := _1565447861_initial_schemaDownSqlBytes()
func _1572372377_initial_schemaDownSql() (*asset, error) {
bytes, err := _1572372377_initial_schemaDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1565447861_initial_schema.down.sql", size: 28, mode: os.FileMode(0644), modTime: time.Unix(1565447902, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x92, 0x55, 0x8d, 0x3, 0x68, 0x1a, 0x9c, 0xd7, 0xc7, 0xb4, 0x5a, 0xb1, 0x27, 0x47, 0xf4, 0xc6, 0x8d, 0x85, 0xbb, 0xae, 0xb6, 0x69, 0xc5, 0xbc, 0x21, 0xba, 0xc0, 0xc6, 0x2a, 0xc8, 0xb2, 0xf7}}
info := bindataFileInfo{name: "1572372377_initial_schema.down.sql", size: 55, mode: os.FileMode(0644), modTime: time.Unix(1572706379, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xc1, 0x69, 0x72, 0x9f, 0x13, 0xdd, 0x23, 0x1b, 0xef, 0x2e, 0x95, 0x19, 0x42, 0xa3, 0x57, 0x8d, 0x77, 0x4, 0x73, 0xf6, 0x8a, 0xab, 0xad, 0xc1, 0xe6, 0xc7, 0x59, 0xbc, 0xee, 0x86, 0xce, 0x2f}}
return a, nil
}
var __1565447861_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\x08\x71\x74\xf2\x71\x55\xc8\x2d\x4b\x29\x8e\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\x56\xd0\xe0\x52\x50\x50\x50\xc8\x4c\x51\x70\xf2\xf1\x77\x52\x08\x08\xf2\xf4\x75\x0c\x8a\x54\xf0\x76\x8d\xd4\x01\x4b\xa4\x17\xe5\x97\x16\xc4\xc3\xa4\xfd\xfc\x43\x14\xfc\x42\x7d\x7c\x20\x72\x25\x99\xb9\xa9\xc5\x25\x89\xb9\x05\x0a\x9e\x7e\x21\xae\xee\xae\x41\x68\xf2\x49\xf9\x29\x95\xa8\xfa\xb8\x34\xad\xb9\x00\x01\x00\x00\xff\xff\xae\x9b\x57\x51\x8c\x00\x00\x00")
var __1572372377_initial_schemaUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x64\x90\xbd\x4e\xc4\x30\x10\x84\x7b\x3f\xc5\x94\xb1\xc4\x1b\x50\x25\xc7\xde\xc9\xc2\xd8\xc8\x18\x89\xab\xa2\x20\x5b\x27\x17\x26\x51\x1c\x10\xbc\x3d\x72\x7e\xc8\x0f\xdb\x7e\x3b\xb3\x33\x7b\x32\x54\x5a\x82\x2d\x2b\x49\x88\x5f\x2e\xd5\xd1\xa7\xd4\xdc\x7c\x42\xc1\x00\x20\x38\xcc\x53\x49\x5d\xe1\xd9\x88\xa7\xd2\x5c\xf1\x48\xd7\xbb\x91\xdf\xfa\xf6\xb3\xab\xf3\xd6\xc8\x01\x28\x6d\xa1\x5e\xa5\x9c\xf8\x10\xa2\x4f\x43\x13\x3b\x08\x65\xe9\x42\xe6\xc0\xdf\x5b\xf7\xb3\xf1\xdf\xe8\x19\xbf\x67\x6c\xce\x27\xd4\x03\xbd\x21\xb8\xef\xfa\xef\x9e\x56\xfb\xbc\xc5\x42\xb2\x8c\xfd\xef\xd5\x35\xbd\xff\x18\x96\x5a\xb3\x2a\x1b\x8d\x77\xf7\xa1\xa6\xdd\xb5\xd5\x9e\x9e\xb5\x21\x71\x51\xf9\x07\x28\x56\x23\x0e\x43\x67\x32\xa4\x4e\xf4\x72\x7c\x65\x70\x9c\x71\xf6\x1b\x00\x00\xff\xff\xed\x46\xeb\x1a\x6d\x01\x00\x00")
func _1565447861_initial_schemaUpSqlBytes() ([]byte, error) {
func _1572372377_initial_schemaUpSqlBytes() ([]byte, error) {
return bindataRead(
__1565447861_initial_schemaUpSql,
"1565447861_initial_schema.up.sql",
__1572372377_initial_schemaUpSql,
"1572372377_initial_schema.up.sql",
)
}
func _1565447861_initial_schemaUpSql() (*asset, error) {
bytes, err := _1565447861_initial_schemaUpSqlBytes()
func _1572372377_initial_schemaUpSql() (*asset, error) {
bytes, err := _1572372377_initial_schemaUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1565447861_initial_schema.up.sql", size: 140, mode: os.FileMode(0644), modTime: time.Unix(1565448062, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x21, 0x13, 0xbf, 0x64, 0x18, 0xf7, 0xe2, 0xd8, 0xb5, 0x7d, 0x8, 0xf1, 0x66, 0xb9, 0xb3, 0x49, 0x68, 0xe2, 0xa2, 0xea, 0x90, 0x11, 0x70, 0x9c, 0x15, 0x28, 0x3f, 0x3f, 0x90, 0x3c, 0x76, 0xf}}
info := bindataFileInfo{name: "1572372377_initial_schema.up.sql", size: 365, mode: os.FileMode(0644), modTime: time.Unix(1572895921, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x9b, 0xb2, 0x13, 0xf6, 0x6c, 0xa8, 0xdb, 0xfb, 0xb5, 0x8d, 0xe3, 0xa9, 0x50, 0x67, 0xb, 0xe2, 0x53, 0x6b, 0x24, 0xde, 0x18, 0x19, 0xac, 0x39, 0x3f, 0x52, 0x3a, 0xe1, 0xb8, 0x91, 0x8d, 0xd0}}
return a, nil
}
@@ -126,7 +126,7 @@ func docGo() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1565447878, 0)}
info := bindataFileInfo{name: "doc.go", size: 377, mode: os.FileMode(0644), modTime: time.Unix(1569335635, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xef, 0xaf, 0xdf, 0xcf, 0x65, 0xae, 0x19, 0xfc, 0x9d, 0x29, 0xc1, 0x91, 0xaf, 0xb5, 0xd5, 0xb1, 0x56, 0xf3, 0xee, 0xa8, 0xba, 0x13, 0x65, 0xdb, 0xab, 0xcf, 0x4e, 0xac, 0x92, 0xe9, 0x60, 0xf1}}
return a, nil
}
@@ -222,11 +222,9 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1565447861_initial_schema.down.sql": _1565447861_initial_schemaDownSql,
"1565447861_initial_schema.up.sql": _1565447861_initial_schemaUpSql,
"doc.go": docGo,
"1572372377_initial_schema.down.sql": _1572372377_initial_schemaDownSql,
"1572372377_initial_schema.up.sql": _1572372377_initial_schemaUpSql,
"doc.go": docGo,
}
// AssetDir returns the file names below a certain
@@ -270,8 +268,8 @@ type bintree struct {
}
var _bintree = &bintree{nil, map[string]*bintree{
"1565447861_initial_schema.down.sql": &bintree{_1565447861_initial_schemaDownSql, map[string]*bintree{}},
"1565447861_initial_schema.up.sql": &bintree{_1565447861_initial_schemaUpSql, map[string]*bintree{}},
"1572372377_initial_schema.down.sql": &bintree{_1572372377_initial_schemaDownSql, map[string]*bintree{}},
"1572372377_initial_schema.up.sql": &bintree{_1572372377_initial_schemaUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}

View File

@@ -1,6 +0,0 @@
CREATE TABLE mvds_messages (
id BLOB PRIMARY KEY,
group_id BLOB NOT NULL,
timestamp INTEGER NOT NULL,
body BLOB NOT NULL
);

View File

@@ -1 +1,2 @@
DELETE TABLE mvds_messages;
DELETE TABLE mvds_parents;

View File

@@ -0,0 +1,15 @@
CREATE TABLE mvds_messages (
id BLOB PRIMARY KEY,
group_id BLOB NOT NULL,
timestamp INTEGER NOT NULL,
body BLOB NOT NULL
);
CREATE INDEX idx_group_id ON mvds_messages(group_id);
CREATE TABLE mvds_parents (
message_id BLOB NOT NULL,
parent_id BLOB NOT NULL,
FOREIGN KEY (message_id) REFERENCES mvds_messages (id)
)

12
vendor/github.com/golang/mock/AUTHORS generated vendored Normal file
View File

@@ -0,0 +1,12 @@
# This is the official list of GoMock authors for copyright purposes.
# This file is distinct from the CONTRIBUTORS files.
# See the latter for an explanation.
# Names should be added to this file as
# Name or Organization <email address>
# The email address is not required for organizations.
# Please keep the list sorted.
Alex Reece <awreece@gmail.com>
Google Inc.

37
vendor/github.com/golang/mock/CONTRIBUTORS generated vendored Normal file
View File

@@ -0,0 +1,37 @@
# This is the official list of people who can contribute (and typically
# have contributed) code to the gomock repository.
# The AUTHORS file lists the copyright holders; this file
# lists people. For example, Google employees are listed here
# but not in AUTHORS, because Google holds the copyright.
#
# The submission process automatically checks to make sure
# that people submitting code are listed in this file (by email address).
#
# Names should be added to this file only after verifying that
# the individual or the individual's organization has agreed to
# the appropriate Contributor License Agreement, found here:
#
# http://code.google.com/legal/individual-cla-v1.0.html
# http://code.google.com/legal/corporate-cla-v1.0.html
#
# The agreement for individuals can be filled out on the web.
#
# When adding J Random Contributor's name to this file,
# either J's name or J's organization's name should be
# added to the AUTHORS file, depending on whether the
# individual or corporate CLA was used.
# Names should be added to this file like so:
# Name <email address>
#
# An entry with two email addresses specifies that the
# first address should be used in the submit logs and
# that the second address should be recognized as the
# same person when interacting with Rietveld.
# Please keep the list sorted.
Aaron Jacobs <jacobsa@google.com> <aaronjjacobs@gmail.com>
Alex Reece <awreece@gmail.com>
David Symonds <dsymonds@golang.org>
Ryan Barrett <ryanb@google.com>

202
vendor/github.com/golang/mock/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

420
vendor/github.com/golang/mock/gomock/call.go generated vendored Normal file
View File

@@ -0,0 +1,420 @@
// Copyright 2010 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gomock
import (
"fmt"
"reflect"
"strconv"
"strings"
)
// Call represents an expected call to a mock.
type Call struct {
t TestHelper // for triggering test failures on invalid call setup
receiver interface{} // the receiver of the method call
method string // the name of the method
methodType reflect.Type // the type of the method
args []Matcher // the args
origin string // file and line number of call setup
preReqs []*Call // prerequisite calls
// Expectations
minCalls, maxCalls int
numCalls int // actual number made
// actions are called when this Call is called. Each action gets the args and
// can set the return values by returning a non-nil slice. Actions run in the
// order they are created.
actions []func([]interface{}) []interface{}
}
// newCall creates a *Call. It requires the method type in order to support
// unexported methods.
func newCall(t TestHelper, receiver interface{}, method string, methodType reflect.Type, args ...interface{}) *Call {
t.Helper()
// TODO: check arity, types.
margs := make([]Matcher, len(args))
for i, arg := range args {
if m, ok := arg.(Matcher); ok {
margs[i] = m
} else if arg == nil {
// Handle nil specially so that passing a nil interface value
// will match the typed nils of concrete args.
margs[i] = Nil()
} else {
margs[i] = Eq(arg)
}
}
origin := callerInfo(3)
actions := []func([]interface{}) []interface{}{func([]interface{}) []interface{} {
// Synthesize the zero value for each of the return args' types.
rets := make([]interface{}, methodType.NumOut())
for i := 0; i < methodType.NumOut(); i++ {
rets[i] = reflect.Zero(methodType.Out(i)).Interface()
}
return rets
}}
return &Call{t: t, receiver: receiver, method: method, methodType: methodType,
args: margs, origin: origin, minCalls: 1, maxCalls: 1, actions: actions}
}
// AnyTimes allows the expectation to be called 0 or more times
func (c *Call) AnyTimes() *Call {
c.minCalls, c.maxCalls = 0, 1e8 // close enough to infinity
return c
}
// MinTimes requires the call to occur at least n times. If AnyTimes or MaxTimes have not been called, MinTimes also
// sets the maximum number of calls to infinity.
func (c *Call) MinTimes(n int) *Call {
c.minCalls = n
if c.maxCalls == 1 {
c.maxCalls = 1e8
}
return c
}
// MaxTimes limits the number of calls to n times. If AnyTimes or MinTimes have not been called, MaxTimes also
// sets the minimum number of calls to 0.
func (c *Call) MaxTimes(n int) *Call {
c.maxCalls = n
if c.minCalls == 1 {
c.minCalls = 0
}
return c
}
// DoAndReturn declares the action to run when the call is matched.
// The return values from this function are returned by the mocked function.
// It takes an interface{} argument to support n-arity functions.
func (c *Call) DoAndReturn(f interface{}) *Call {
// TODO: Check arity and types here, rather than dying badly elsewhere.
v := reflect.ValueOf(f)
c.addAction(func(args []interface{}) []interface{} {
vargs := make([]reflect.Value, len(args))
ft := v.Type()
for i := 0; i < len(args); i++ {
if args[i] != nil {
vargs[i] = reflect.ValueOf(args[i])
} else {
// Use the zero value for the arg.
vargs[i] = reflect.Zero(ft.In(i))
}
}
vrets := v.Call(vargs)
rets := make([]interface{}, len(vrets))
for i, ret := range vrets {
rets[i] = ret.Interface()
}
return rets
})
return c
}
// Do declares the action to run when the call is matched. The function's
// return values are ignored to retain backward compatibility. To use the
// return values call DoAndReturn.
// It takes an interface{} argument to support n-arity functions.
func (c *Call) Do(f interface{}) *Call {
// TODO: Check arity and types here, rather than dying badly elsewhere.
v := reflect.ValueOf(f)
c.addAction(func(args []interface{}) []interface{} {
vargs := make([]reflect.Value, len(args))
ft := v.Type()
for i := 0; i < len(args); i++ {
if args[i] != nil {
vargs[i] = reflect.ValueOf(args[i])
} else {
// Use the zero value for the arg.
vargs[i] = reflect.Zero(ft.In(i))
}
}
v.Call(vargs)
return nil
})
return c
}
// Return declares the values to be returned by the mocked function call.
func (c *Call) Return(rets ...interface{}) *Call {
c.t.Helper()
mt := c.methodType
if len(rets) != mt.NumOut() {
c.t.Fatalf("wrong number of arguments to Return for %T.%v: got %d, want %d [%s]",
c.receiver, c.method, len(rets), mt.NumOut(), c.origin)
}
for i, ret := range rets {
if got, want := reflect.TypeOf(ret), mt.Out(i); got == want {
// Identical types; nothing to do.
} else if got == nil {
// Nil needs special handling.
switch want.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
// ok
default:
c.t.Fatalf("argument %d to Return for %T.%v is nil, but %v is not nillable [%s]",
i, c.receiver, c.method, want, c.origin)
}
} else if got.AssignableTo(want) {
// Assignable type relation. Make the assignment now so that the generated code
// can return the values with a type assertion.
v := reflect.New(want).Elem()
v.Set(reflect.ValueOf(ret))
rets[i] = v.Interface()
} else {
c.t.Fatalf("wrong type of argument %d to Return for %T.%v: %v is not assignable to %v [%s]",
i, c.receiver, c.method, got, want, c.origin)
}
}
c.addAction(func([]interface{}) []interface{} {
return rets
})
return c
}
// Times declares the exact number of times a function call is expected to be executed.
func (c *Call) Times(n int) *Call {
c.minCalls, c.maxCalls = n, n
return c
}
// SetArg declares an action that will set the nth argument's value,
// indirected through a pointer. Or, in the case of a slice, SetArg
// will copy value's elements into the nth argument.
func (c *Call) SetArg(n int, value interface{}) *Call {
c.t.Helper()
mt := c.methodType
// TODO: This will break on variadic methods.
// We will need to check those at invocation time.
if n < 0 || n >= mt.NumIn() {
c.t.Fatalf("SetArg(%d, ...) called for a method with %d args [%s]",
n, mt.NumIn(), c.origin)
}
// Permit setting argument through an interface.
// In the interface case, we don't (nay, can't) check the type here.
at := mt.In(n)
switch at.Kind() {
case reflect.Ptr:
dt := at.Elem()
if vt := reflect.TypeOf(value); !vt.AssignableTo(dt) {
c.t.Fatalf("SetArg(%d, ...) argument is a %v, not assignable to %v [%s]",
n, vt, dt, c.origin)
}
case reflect.Interface:
// nothing to do
case reflect.Slice:
// nothing to do
default:
c.t.Fatalf("SetArg(%d, ...) referring to argument of non-pointer non-interface non-slice type %v [%s]",
n, at, c.origin)
}
c.addAction(func(args []interface{}) []interface{} {
v := reflect.ValueOf(value)
switch reflect.TypeOf(args[n]).Kind() {
case reflect.Slice:
setSlice(args[n], v)
default:
reflect.ValueOf(args[n]).Elem().Set(v)
}
return nil
})
return c
}
// isPreReq returns true if other is a direct or indirect prerequisite to c.
func (c *Call) isPreReq(other *Call) bool {
for _, preReq := range c.preReqs {
if other == preReq || preReq.isPreReq(other) {
return true
}
}
return false
}
// After declares that the call may only match after preReq has been exhausted.
func (c *Call) After(preReq *Call) *Call {
c.t.Helper()
if c == preReq {
c.t.Fatalf("A call isn't allowed to be its own prerequisite")
}
if preReq.isPreReq(c) {
c.t.Fatalf("Loop in call order: %v is a prerequisite to %v (possibly indirectly).", c, preReq)
}
c.preReqs = append(c.preReqs, preReq)
return c
}
// Returns true if the minimum number of calls have been made.
func (c *Call) satisfied() bool {
return c.numCalls >= c.minCalls
}
// Returns true iff the maximum number of calls have been made.
func (c *Call) exhausted() bool {
return c.numCalls >= c.maxCalls
}
func (c *Call) String() string {
args := make([]string, len(c.args))
for i, arg := range c.args {
args[i] = arg.String()
}
arguments := strings.Join(args, ", ")
return fmt.Sprintf("%T.%v(%s) %s", c.receiver, c.method, arguments, c.origin)
}
// Tests if the given call matches the expected call.
// If yes, returns nil. If no, returns error with message explaining why it does not match.
func (c *Call) matches(args []interface{}) error {
if !c.methodType.IsVariadic() {
if len(args) != len(c.args) {
return fmt.Errorf("Expected call at %s has the wrong number of arguments. Got: %d, want: %d",
c.origin, len(args), len(c.args))
}
for i, m := range c.args {
if !m.Matches(args[i]) {
return fmt.Errorf("Expected call at %s doesn't match the argument at index %s.\nGot: %v\nWant: %v",
c.origin, strconv.Itoa(i), args[i], m)
}
}
} else {
if len(c.args) < c.methodType.NumIn()-1 {
return fmt.Errorf("Expected call at %s has the wrong number of matchers. Got: %d, want: %d",
c.origin, len(c.args), c.methodType.NumIn()-1)
}
if len(c.args) != c.methodType.NumIn() && len(args) != len(c.args) {
return fmt.Errorf("Expected call at %s has the wrong number of arguments. Got: %d, want: %d",
c.origin, len(args), len(c.args))
}
if len(args) < len(c.args)-1 {
return fmt.Errorf("Expected call at %s has the wrong number of arguments. Got: %d, want: greater than or equal to %d",
c.origin, len(args), len(c.args)-1)
}
for i, m := range c.args {
if i < c.methodType.NumIn()-1 {
// Non-variadic args
if !m.Matches(args[i]) {
return fmt.Errorf("Expected call at %s doesn't match the argument at index %s.\nGot: %v\nWant: %v",
c.origin, strconv.Itoa(i), args[i], m)
}
continue
}
// The last arg has a possibility of a variadic argument, so let it branch
// sample: Foo(a int, b int, c ...int)
if i < len(c.args) && i < len(args) {
if m.Matches(args[i]) {
// Got Foo(a, b, c) want Foo(matcherA, matcherB, gomock.Any())
// Got Foo(a, b, c) want Foo(matcherA, matcherB, someSliceMatcher)
// Got Foo(a, b, c) want Foo(matcherA, matcherB, matcherC)
// Got Foo(a, b) want Foo(matcherA, matcherB)
// Got Foo(a, b, c, d) want Foo(matcherA, matcherB, matcherC, matcherD)
continue
}
}
// The number of actual args don't match the number of matchers,
// or the last matcher is a slice and the last arg is not.
// If this function still matches it is because the last matcher
// matches all the remaining arguments or the lack of any.
// Convert the remaining arguments, if any, into a slice of the
// expected type.
vargsType := c.methodType.In(c.methodType.NumIn() - 1)
vargs := reflect.MakeSlice(vargsType, 0, len(args)-i)
for _, arg := range args[i:] {
vargs = reflect.Append(vargs, reflect.ValueOf(arg))
}
if m.Matches(vargs.Interface()) {
// Got Foo(a, b, c, d, e) want Foo(matcherA, matcherB, gomock.Any())
// Got Foo(a, b, c, d, e) want Foo(matcherA, matcherB, someSliceMatcher)
// Got Foo(a, b) want Foo(matcherA, matcherB, gomock.Any())
// Got Foo(a, b) want Foo(matcherA, matcherB, someEmptySliceMatcher)
break
}
// Wrong number of matchers or not match. Fail.
// Got Foo(a, b) want Foo(matcherA, matcherB, matcherC, matcherD)
// Got Foo(a, b, c) want Foo(matcherA, matcherB, matcherC, matcherD)
// Got Foo(a, b, c, d) want Foo(matcherA, matcherB, matcherC, matcherD, matcherE)
// Got Foo(a, b, c, d, e) want Foo(matcherA, matcherB, matcherC, matcherD)
// Got Foo(a, b, c) want Foo(matcherA, matcherB)
return fmt.Errorf("Expected call at %s doesn't match the argument at index %s.\nGot: %v\nWant: %v",
c.origin, strconv.Itoa(i), args[i:], c.args[i])
}
}
// Check that all prerequisite calls have been satisfied.
for _, preReqCall := range c.preReqs {
if !preReqCall.satisfied() {
return fmt.Errorf("Expected call at %s doesn't have a prerequisite call satisfied:\n%v\nshould be called before:\n%v",
c.origin, preReqCall, c)
}
}
// Check that the call is not exhausted.
if c.exhausted() {
return fmt.Errorf("Expected call at %s has already been called the max number of times.", c.origin)
}
return nil
}
// dropPrereqs tells the expected Call to not re-check prerequisite calls any
// longer, and to return its current set.
func (c *Call) dropPrereqs() (preReqs []*Call) {
preReqs = c.preReqs
c.preReqs = nil
return
}
func (c *Call) call(args []interface{}) []func([]interface{}) []interface{} {
c.numCalls++
return c.actions
}
// InOrder declares that the given calls should occur in order.
func InOrder(calls ...*Call) {
for i := 1; i < len(calls); i++ {
calls[i].After(calls[i-1])
}
}
func setSlice(arg interface{}, v reflect.Value) {
va := reflect.ValueOf(arg)
for i := 0; i < v.Len(); i++ {
va.Index(i).Set(v.Index(i))
}
}
func (c *Call) addAction(action func([]interface{}) []interface{}) {
c.actions = append(c.actions, action)
}

108
vendor/github.com/golang/mock/gomock/callset.go generated vendored Normal file
View File

@@ -0,0 +1,108 @@
// Copyright 2011 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gomock
import (
"bytes"
"fmt"
)
// callSet represents a set of expected calls, indexed by receiver and method
// name.
type callSet struct {
// Calls that are still expected.
expected map[callSetKey][]*Call
// Calls that have been exhausted.
exhausted map[callSetKey][]*Call
}
// callSetKey is the key in the maps in callSet
type callSetKey struct {
receiver interface{}
fname string
}
func newCallSet() *callSet {
return &callSet{make(map[callSetKey][]*Call), make(map[callSetKey][]*Call)}
}
// Add adds a new expected call.
func (cs callSet) Add(call *Call) {
key := callSetKey{call.receiver, call.method}
m := cs.expected
if call.exhausted() {
m = cs.exhausted
}
m[key] = append(m[key], call)
}
// Remove removes an expected call.
func (cs callSet) Remove(call *Call) {
key := callSetKey{call.receiver, call.method}
calls := cs.expected[key]
for i, c := range calls {
if c == call {
// maintain order for remaining calls
cs.expected[key] = append(calls[:i], calls[i+1:]...)
cs.exhausted[key] = append(cs.exhausted[key], call)
break
}
}
}
// FindMatch searches for a matching call. Returns error with explanation message if no call matched.
func (cs callSet) FindMatch(receiver interface{}, method string, args []interface{}) (*Call, error) {
key := callSetKey{receiver, method}
// Search through the expected calls.
expected := cs.expected[key]
var callsErrors bytes.Buffer
for _, call := range expected {
err := call.matches(args)
if err != nil {
fmt.Fprintf(&callsErrors, "\n%v", err)
} else {
return call, nil
}
}
// If we haven't found a match then search through the exhausted calls so we
// get useful error messages.
exhausted := cs.exhausted[key]
for _, call := range exhausted {
if err := call.matches(args); err != nil {
fmt.Fprintf(&callsErrors, "\n%v", err)
}
}
if len(expected)+len(exhausted) == 0 {
fmt.Fprintf(&callsErrors, "there are no expected calls of the method %q for that receiver", method)
}
return nil, fmt.Errorf(callsErrors.String())
}
// Failures returns the calls that are not satisfied.
func (cs callSet) Failures() []*Call {
failures := make([]*Call, 0, len(cs.expected))
for _, calls := range cs.expected {
for _, call := range calls {
if !call.satisfied() {
failures = append(failures, call)
}
}
}
return failures
}

235
vendor/github.com/golang/mock/gomock/controller.go generated vendored Normal file
View File

@@ -0,0 +1,235 @@
// Copyright 2010 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// GoMock - a mock framework for Go.
//
// Standard usage:
// (1) Define an interface that you wish to mock.
// type MyInterface interface {
// SomeMethod(x int64, y string)
// }
// (2) Use mockgen to generate a mock from the interface.
// (3) Use the mock in a test:
// func TestMyThing(t *testing.T) {
// mockCtrl := gomock.NewController(t)
// defer mockCtrl.Finish()
//
// mockObj := something.NewMockMyInterface(mockCtrl)
// mockObj.EXPECT().SomeMethod(4, "blah")
// // pass mockObj to a real object and play with it.
// }
//
// By default, expected calls are not enforced to run in any particular order.
// Call order dependency can be enforced by use of InOrder and/or Call.After.
// Call.After can create more varied call order dependencies, but InOrder is
// often more convenient.
//
// The following examples create equivalent call order dependencies.
//
// Example of using Call.After to chain expected call order:
//
// firstCall := mockObj.EXPECT().SomeMethod(1, "first")
// secondCall := mockObj.EXPECT().SomeMethod(2, "second").After(firstCall)
// mockObj.EXPECT().SomeMethod(3, "third").After(secondCall)
//
// Example of using InOrder to declare expected call order:
//
// gomock.InOrder(
// mockObj.EXPECT().SomeMethod(1, "first"),
// mockObj.EXPECT().SomeMethod(2, "second"),
// mockObj.EXPECT().SomeMethod(3, "third"),
// )
//
// TODO:
// - Handle different argument/return types (e.g. ..., chan, map, interface).
package gomock
import (
"context"
"fmt"
"reflect"
"runtime"
"sync"
)
// A TestReporter is something that can be used to report test failures.
// It is satisfied by the standard library's *testing.T.
type TestReporter interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
}
// TestHelper is a TestReporter that has the Helper method. It is satisfied
// by the standard library's *testing.T.
type TestHelper interface {
TestReporter
Helper()
}
// A Controller represents the top-level control of a mock ecosystem.
// It defines the scope and lifetime of mock objects, as well as their expectations.
// It is safe to call Controller's methods from multiple goroutines.
type Controller struct {
// T should only be called within a generated mock. It is not intended to
// be used in user code and may be changed in future versions. T is the
// TestReporter passed in when creating the Controller via NewController.
// If the TestReporter does not implment a TestHelper it will be wrapped
// with a nopTestHelper.
T TestHelper
mu sync.Mutex
expectedCalls *callSet
finished bool
}
func NewController(t TestReporter) *Controller {
h, ok := t.(TestHelper)
if !ok {
h = nopTestHelper{t}
}
return &Controller{
T: h,
expectedCalls: newCallSet(),
}
}
type cancelReporter struct {
TestHelper
cancel func()
}
func (r *cancelReporter) Errorf(format string, args ...interface{}) {
r.TestHelper.Errorf(format, args...)
}
func (r *cancelReporter) Fatalf(format string, args ...interface{}) {
defer r.cancel()
r.TestHelper.Fatalf(format, args...)
}
// WithContext returns a new Controller and a Context, which is cancelled on any
// fatal failure.
func WithContext(ctx context.Context, t TestReporter) (*Controller, context.Context) {
h, ok := t.(TestHelper)
if !ok {
h = nopTestHelper{t}
}
ctx, cancel := context.WithCancel(ctx)
return NewController(&cancelReporter{h, cancel}), ctx
}
type nopTestHelper struct {
TestReporter
}
func (h nopTestHelper) Helper() {}
func (ctrl *Controller) RecordCall(receiver interface{}, method string, args ...interface{}) *Call {
ctrl.T.Helper()
recv := reflect.ValueOf(receiver)
for i := 0; i < recv.Type().NumMethod(); i++ {
if recv.Type().Method(i).Name == method {
return ctrl.RecordCallWithMethodType(receiver, method, recv.Method(i).Type(), args...)
}
}
ctrl.T.Fatalf("gomock: failed finding method %s on %T", method, receiver)
panic("unreachable")
}
func (ctrl *Controller) RecordCallWithMethodType(receiver interface{}, method string, methodType reflect.Type, args ...interface{}) *Call {
ctrl.T.Helper()
call := newCall(ctrl.T, receiver, method, methodType, args...)
ctrl.mu.Lock()
defer ctrl.mu.Unlock()
ctrl.expectedCalls.Add(call)
return call
}
func (ctrl *Controller) Call(receiver interface{}, method string, args ...interface{}) []interface{} {
ctrl.T.Helper()
// Nest this code so we can use defer to make sure the lock is released.
actions := func() []func([]interface{}) []interface{} {
ctrl.T.Helper()
ctrl.mu.Lock()
defer ctrl.mu.Unlock()
expected, err := ctrl.expectedCalls.FindMatch(receiver, method, args)
if err != nil {
origin := callerInfo(2)
ctrl.T.Fatalf("Unexpected call to %T.%v(%v) at %s because: %s", receiver, method, args, origin, err)
}
// Two things happen here:
// * the matching call no longer needs to check prerequite calls,
// * and the prerequite calls are no longer expected, so remove them.
preReqCalls := expected.dropPrereqs()
for _, preReqCall := range preReqCalls {
ctrl.expectedCalls.Remove(preReqCall)
}
actions := expected.call(args)
if expected.exhausted() {
ctrl.expectedCalls.Remove(expected)
}
return actions
}()
var rets []interface{}
for _, action := range actions {
if r := action(args); r != nil {
rets = r
}
}
return rets
}
func (ctrl *Controller) Finish() {
ctrl.T.Helper()
ctrl.mu.Lock()
defer ctrl.mu.Unlock()
if ctrl.finished {
ctrl.T.Fatalf("Controller.Finish was called more than once. It has to be called exactly once.")
}
ctrl.finished = true
// If we're currently panicking, probably because this is a deferred call,
// pass through the panic.
if err := recover(); err != nil {
panic(err)
}
// Check that all remaining expected calls are satisfied.
failures := ctrl.expectedCalls.Failures()
for _, call := range failures {
ctrl.T.Errorf("missing call(s) to %v", call)
}
if len(failures) != 0 {
ctrl.T.Fatalf("aborting test due to missing call(s)")
}
}
func callerInfo(skip int) string {
if _, file, line, ok := runtime.Caller(skip + 1); ok {
return fmt.Sprintf("%s:%d", file, line)
}
return "unknown file"
}

122
vendor/github.com/golang/mock/gomock/matchers.go generated vendored Normal file
View File

@@ -0,0 +1,122 @@
// Copyright 2010 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gomock
import (
"fmt"
"reflect"
)
// A Matcher is a representation of a class of values.
// It is used to represent the valid or expected arguments to a mocked method.
type Matcher interface {
// Matches returns whether x is a match.
Matches(x interface{}) bool
// String describes what the matcher matches.
String() string
}
type anyMatcher struct{}
func (anyMatcher) Matches(x interface{}) bool {
return true
}
func (anyMatcher) String() string {
return "is anything"
}
type eqMatcher struct {
x interface{}
}
func (e eqMatcher) Matches(x interface{}) bool {
return reflect.DeepEqual(e.x, x)
}
func (e eqMatcher) String() string {
return fmt.Sprintf("is equal to %v", e.x)
}
type nilMatcher struct{}
func (nilMatcher) Matches(x interface{}) bool {
if x == nil {
return true
}
v := reflect.ValueOf(x)
switch v.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map,
reflect.Ptr, reflect.Slice:
return v.IsNil()
}
return false
}
func (nilMatcher) String() string {
return "is nil"
}
type notMatcher struct {
m Matcher
}
func (n notMatcher) Matches(x interface{}) bool {
return !n.m.Matches(x)
}
func (n notMatcher) String() string {
// TODO: Improve this if we add a NotString method to the Matcher interface.
return "not(" + n.m.String() + ")"
}
type assignableToTypeOfMatcher struct {
targetType reflect.Type
}
func (m assignableToTypeOfMatcher) Matches(x interface{}) bool {
return reflect.TypeOf(x).AssignableTo(m.targetType)
}
func (m assignableToTypeOfMatcher) String() string {
return "is assignable to " + m.targetType.Name()
}
// Constructors
func Any() Matcher { return anyMatcher{} }
func Eq(x interface{}) Matcher { return eqMatcher{x} }
func Nil() Matcher { return nilMatcher{} }
func Not(x interface{}) Matcher {
if m, ok := x.(Matcher); ok {
return notMatcher{m}
}
return notMatcher{Eq(x)}
}
// AssignableToTypeOf is a Matcher that matches if the parameter to the mock
// function is assignable to the type of the parameter to this function.
//
// Example usage:
//
// dbMock.EXPECT().
// Insert(gomock.AssignableToTypeOf(&EmployeeRecord{})).
// Return(errors.New("DB error"))
//
func AssignableToTypeOf(x interface{}) Matcher {
return assignableToTypeOfMatcher{reflect.TypeOf(x)}
}

16
vendor/modules.txt vendored
View File

@@ -1,10 +1,12 @@
# github.com/davecgh/go-spew v1.1.1
github.com/davecgh/go-spew/spew
# github.com/golang-migrate/migrate/v4 v4.6.2
github.com/golang-migrate/migrate/v4
github.com/golang-migrate/migrate/v4/database
github.com/golang-migrate/migrate/v4/internal/url
github.com/golang-migrate/migrate/v4/source
github.com/golang-migrate/migrate/v4
github.com/golang-migrate/migrate/v4/internal/url
# github.com/golang/mock v1.2.0
github.com/golang/mock/gomock
# github.com/golang/protobuf v1.3.2
github.com/golang/protobuf/proto
# github.com/hashicorp/errwrap v1.0.0
@@ -20,23 +22,23 @@ github.com/pmezard/go-difflib/difflib
# github.com/status-im/migrate/v4 v4.6.2-status.2
github.com/status-im/migrate/v4
github.com/status-im/migrate/v4/database/sqlcipher
github.com/status-im/migrate/v4/internal/url
github.com/status-im/migrate/v4/source/go_bindata
github.com/status-im/migrate/v4/internal/url
# github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467
github.com/stretchr/testify/assert
github.com/stretchr/testify/require
github.com/stretchr/testify/suite
github.com/stretchr/testify/require
github.com/stretchr/testify/assert
# go.uber.org/atomic v1.4.0
go.uber.org/atomic
# go.uber.org/multierr v1.1.0
go.uber.org/multierr
# go.uber.org/zap v1.10.0
go.uber.org/zap
go.uber.org/zap/buffer
go.uber.org/zap/internal/bufferpool
go.uber.org/zap/zapcore
go.uber.org/zap/buffer
go.uber.org/zap/internal/color
go.uber.org/zap/internal/exit
go.uber.org/zap/zapcore
# golang.org/x/net v0.0.0-20190424112056-4829fb13d2c6
golang.org/x/net/context
# gopkg.in/yaml.v2 v2.2.2