Remove Kafka from Prysm (#9470)

* remove kafka

* gaz

* rem foreign rules

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Raul Jordan
2021-08-26 18:59:00 -05:00
committed by GitHub
parent 2cc9fc9e0e
commit 031830baa4
16 changed files with 5 additions and 630 deletions

View File

@@ -29,18 +29,10 @@ build --incompatible_strict_action_env
test --incompatible_strict_action_env
run --incompatible_strict_action_env
# Disable kafka by default, it takes a long time to build...
build --define kafka_enabled=false
test --define kafka_enabled=false
run --define kafka_enabled=false
build --define blst_disabled=false
test --define blst_disabled=false
run --define blst_disabled=false
build:kafka_enabled --define kafka_enabled=true
build:kafka_enabled --define gotags=kafka_enabled
build:blst_disabled --define blst_disabled=true
build:blst_disabled --define gotags=blst_disabled

View File

@@ -296,27 +296,6 @@ git_repository(
# Group the sources of the library so that CMake rule have access to it
all_content = """filegroup(name = "all", srcs = glob(["**"]), visibility = ["//visibility:public"])"""
http_archive(
name = "rules_foreign_cc",
sha256 = "b85ce66a3410f7370d1a9a61dfe3a29c7532b7637caeb2877d8d0dfd41d77abb",
strip_prefix = "rules_foreign_cc-3515b20a2417c4dd51c8a4a8cac1f6ecf3c6d934",
url = "https://github.com/bazelbuild/rules_foreign_cc/archive/3515b20a2417c4dd51c8a4a8cac1f6ecf3c6d934.zip",
)
load("@rules_foreign_cc//:workspace_definitions.bzl", "rules_foreign_cc_dependencies")
rules_foreign_cc_dependencies([
"@prysm//:built_cmake_toolchain",
])
http_archive(
name = "librdkafka",
build_file_content = all_content,
sha256 = "3b99a36c082a67ef6295eabd4fb3e32ab0bff7c6b0d397d6352697335f4e57eb",
strip_prefix = "librdkafka-1.4.2",
urls = ["https://github.com/edenhill/librdkafka/archive/v1.4.2.tar.gz"],
)
http_archive(
name = "sigp_beacon_fuzz_corpora",
build_file = "//third_party:beacon-fuzz/corpora.BUILD",

View File

@@ -1,27 +1,13 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
# Build with --define=kafka_enabled=false to exclude kafka wrapper.
config_setting(
name = "kafka_disabled",
values = {"define": "kafka_enabled=false"},
)
# gazelle:exclude db.go
# gazelle:exclude db_kafka_wrapped.go
go_library(
name = "go_default_library",
srcs = [
"alias.go",
"db.go",
"log.go",
"restore.go",
] + select({
":kafka_disabled": [
"db.go",
],
"//conditions:default": [
"db_kafka_wrapped.go",
],
}),
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db",
visibility = [
"//beacon-chain:__subpackages__",
@@ -30,23 +16,15 @@ go_library(
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/db/slasherkv:go_default_library",
"//shared/cmd:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/promptutil:go_default_library",
"//shared/tos:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
] + select({
"//conditions:default": [
"//beacon-chain/db/kafka:go_default_library",
],
":kafka_disabled": [],
}),
],
)
go_test(

View File

@@ -1,18 +0,0 @@
// +build kafka_enabled
package db
import (
"github.com/prysmaticlabs/prysm/beacon-chain/db/kafka"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
)
// NewDB initializes a new DB with kafka wrapper.
func NewDB(dirPath string, stateSummaryCache *kv.stateSummaryCache) (Database, error) {
db, err := kv.NewKVStore(dirPath, stateSummaryCache, &kv.Config{})
if err != nil {
return nil, err
}
return kafka.Wrap(db)
}

View File

@@ -1,31 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"export_wrapper.go",
"log.go",
"passthrough.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kafka",
tags = ["manual"],
visibility = ["//beacon-chain/db:__pkg__"],
deps = [
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/state:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/block:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ferranbt_fastssz//:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@in_gopkg_confluentinc_confluent_kafka_go_v1//kafka:go_default_library",
"@in_gopkg_confluentinc_confluent_kafka_go_v1//kafka/librdkafka:go_default_library",
"@in_gopkg_errgo_v2//fmt/errors:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_protobuf//encoding/protojson:go_default_library",
],
)

View File

@@ -1,107 +0,0 @@
// Package kafka defines an implementation of Database interface
// which exports streaming data using Kafka for data analysis.
package kafka
import (
"context"
fssz "github.com/ferranbt/fastssz"
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"go.opencensus.io/trace"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
_ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka" // Required for c++ kafka library.
"gopkg.in/errgo.v2/fmt/errors"
)
var _ iface.Database = (*Exporter)(nil)
var marshaler = jsonpb.MarshalOptions{}
// Exporter wraps a database interface and exports certain objects to kafka topics.
type Exporter struct {
db iface.Database
p *kafka.Producer
}
// Wrap the db with kafka exporter. If the feature flag is not enabled, this service does not wrap
// the database, but returns the underlying database pointer itself.
func Wrap(db iface.Database) (iface.Database, error) {
if featureconfig.Get().KafkaBootstrapServers == "" {
log.Debug("Empty Kafka bootstrap servers list, database was not wrapped with Kafka exporter")
return db, nil
}
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": featureconfig.Get().KafkaBootstrapServers})
if err != nil {
return nil, err
}
return &Exporter{db: db, p: p}, nil
}
func (e Exporter) publish(ctx context.Context, topic string, msg block.SignedBeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "kafka.publish")
defer span.End()
var err error
var buf []byte
if buf, err = marshaler.Marshal(msg.Proto()); err != nil {
traceutil.AnnotateError(span, err)
return err
}
var key [32]byte
if v, ok := msg.(fssz.HashRoot); ok {
key, err = v.HashTreeRoot()
} else {
err = errors.New("object does not follow hash tree root interface")
}
if err != nil {
traceutil.AnnotateError(span, err)
return err
}
if err := e.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
Value: buf,
Key: key[:],
}, nil); err != nil {
traceutil.AnnotateError(span, err)
return err
}
return nil
}
// Close closes kafka producer and underlying db.
func (e Exporter) Close() error {
e.p.Close()
return e.db.Close()
}
// SaveBlock publishes to the kafka topic for beacon blocks.
func (e Exporter) SaveBlock(ctx context.Context, block block.SignedBeaconBlock) error {
go func() {
if err := e.publish(ctx, "beacon_block", block); err != nil {
log.WithError(err).Error("Failed to publish block")
}
}()
return e.db.SaveBlock(ctx, block)
}
// SaveBlocks publishes to the kafka topic for beacon blocks.
func (e Exporter) SaveBlocks(ctx context.Context, blocks []block.SignedBeaconBlock) error {
go func() {
for _, block := range blocks {
if err := e.publish(ctx, "beacon_block", block); err != nil {
log.WithError(err).Error("Failed to publish block")
}
}
}()
return e.db.SaveBlocks(ctx, blocks)
}

View File

@@ -1,5 +0,0 @@
package kafka
import "github.com/sirupsen/logrus"
var log = logrus.WithField("prefix", "exporter")

View File

@@ -1,285 +0,0 @@
package kafka
import (
"context"
"io"
"github.com/ethereum/go-ethereum/common"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
v2 "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
)
// DatabasePath -- passthrough.
func (e Exporter) DatabasePath() string {
return e.db.DatabasePath()
}
// ClearDB -- passthrough.
func (e Exporter) ClearDB() error {
return e.db.ClearDB()
}
// Backup -- passthrough.
func (e Exporter) Backup(ctx context.Context, outputDir string, overridePermission bool) error {
return e.db.Backup(ctx, outputDir, false)
}
// Block -- passthrough.
func (e Exporter) Block(ctx context.Context, blockRoot [32]byte) (block.SignedBeaconBlock, error) {
return e.db.Block(ctx, blockRoot)
}
// HeadBlock -- passthrough.
func (e Exporter) HeadBlock(ctx context.Context) (block.SignedBeaconBlock, error) {
return e.db.HeadBlock(ctx)
}
// Blocks -- passthrough.
func (e Exporter) Blocks(ctx context.Context, f *filters.QueryFilter) ([]block.SignedBeaconBlock, [][32]byte, error) {
return e.db.Blocks(ctx, f)
}
// BlockRoots -- passthrough.
func (e Exporter) BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][32]byte, error) {
return e.db.BlockRoots(ctx, f)
}
// BlocksBySlot -- passthrough.
func (e Exporter) BlocksBySlot(ctx context.Context, slot types.Slot) (bool, []block.SignedBeaconBlock, error) {
return e.db.BlocksBySlot(ctx, slot)
}
// BlockRootsBySlot -- passthrough.
func (e Exporter) BlockRootsBySlot(ctx context.Context, slot types.Slot) (bool, [][32]byte, error) {
return e.db.BlockRootsBySlot(ctx, slot)
}
// HasBlock -- passthrough.
func (e Exporter) HasBlock(ctx context.Context, blockRoot [32]byte) bool {
return e.db.HasBlock(ctx, blockRoot)
}
// State -- passthrough.
func (e Exporter) State(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
return e.db.State(ctx, blockRoot)
}
// StateSummary -- passthrough.
func (e Exporter) StateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.StateSummary, error) {
return e.db.StateSummary(ctx, blockRoot)
}
// GenesisState -- passthrough.
func (e Exporter) GenesisState(ctx context.Context) (state.BeaconState, error) {
return e.db.GenesisState(ctx)
}
// ProposerSlashing -- passthrough.
func (e Exporter) ProposerSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.ProposerSlashing, error) {
return e.db.ProposerSlashing(ctx, slashingRoot)
}
// AttesterSlashing -- passthrough.
func (e Exporter) AttesterSlashing(ctx context.Context, slashingRoot [32]byte) (*eth.AttesterSlashing, error) {
return e.db.AttesterSlashing(ctx, slashingRoot)
}
// HasProposerSlashing -- passthrough.
func (e Exporter) HasProposerSlashing(ctx context.Context, slashingRoot [32]byte) bool {
return e.db.HasProposerSlashing(ctx, slashingRoot)
}
// HasAttesterSlashing -- passthrough.
func (e Exporter) HasAttesterSlashing(ctx context.Context, slashingRoot [32]byte) bool {
return e.db.HasAttesterSlashing(ctx, slashingRoot)
}
// VoluntaryExit -- passthrough.
func (e Exporter) VoluntaryExit(ctx context.Context, exitRoot [32]byte) (*eth.VoluntaryExit, error) {
return e.db.VoluntaryExit(ctx, exitRoot)
}
// HasVoluntaryExit -- passthrough.
func (e Exporter) HasVoluntaryExit(ctx context.Context, exitRoot [32]byte) bool {
return e.db.HasVoluntaryExit(ctx, exitRoot)
}
// JustifiedCheckpoint -- passthrough.
func (e Exporter) JustifiedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) {
return e.db.JustifiedCheckpoint(ctx)
}
// FinalizedCheckpoint -- passthrough.
func (e Exporter) FinalizedCheckpoint(ctx context.Context) (*eth.Checkpoint, error) {
return e.db.FinalizedCheckpoint(ctx)
}
// DepositContractAddress -- passthrough.
func (e Exporter) DepositContractAddress(ctx context.Context) ([]byte, error) {
return e.db.DepositContractAddress(ctx)
}
// SaveHeadBlockRoot -- passthrough.
func (e Exporter) SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error {
return e.db.SaveHeadBlockRoot(ctx, blockRoot)
}
// GenesisBlock -- passthrough.
func (e Exporter) GenesisBlock(ctx context.Context) (block.SignedBeaconBlock, error) {
return e.db.GenesisBlock(ctx)
}
// SaveGenesisBlockRoot -- passthrough.
func (e Exporter) SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) error {
return e.db.SaveGenesisBlockRoot(ctx, blockRoot)
}
// SaveState -- passthrough.
func (e Exporter) SaveState(ctx context.Context, st state.ReadOnlyBeaconState, blockRoot [32]byte) error {
return e.db.SaveState(ctx, st, blockRoot)
}
// SaveStateSummary -- passthrough.
func (e Exporter) SaveStateSummary(ctx context.Context, summary *ethpb.StateSummary) error {
return e.db.SaveStateSummary(ctx, summary)
}
// SaveStateSummaries -- passthrough.
func (e Exporter) SaveStateSummaries(ctx context.Context, summaries []*ethpb.StateSummary) error {
return e.db.SaveStateSummaries(ctx, summaries)
}
// SaveStates -- passthrough.
func (e Exporter) SaveStates(ctx context.Context, states []state.ReadOnlyBeaconState, blockRoots [][32]byte) error {
return e.db.SaveStates(ctx, states, blockRoots)
}
// SaveProposerSlashing -- passthrough.
func (e Exporter) SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error {
return e.db.SaveProposerSlashing(ctx, slashing)
}
// SaveAttesterSlashing -- passthrough.
func (e Exporter) SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error {
return e.db.SaveAttesterSlashing(ctx, slashing)
}
// SaveVoluntaryExit -- passthrough.
func (e Exporter) SaveVoluntaryExit(ctx context.Context, exit *eth.VoluntaryExit) error {
return e.db.SaveVoluntaryExit(ctx, exit)
}
// SaveJustifiedCheckpoint -- passthrough.
func (e Exporter) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error {
return e.db.SaveJustifiedCheckpoint(ctx, checkpoint)
}
// SaveFinalizedCheckpoint -- passthrough.
func (e Exporter) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *eth.Checkpoint) error {
return e.db.SaveFinalizedCheckpoint(ctx, checkpoint)
}
// SaveDepositContractAddress -- passthrough.
func (e Exporter) SaveDepositContractAddress(ctx context.Context, addr common.Address) error {
return e.db.SaveDepositContractAddress(ctx, addr)
}
// DeleteState -- passthrough.
func (e Exporter) DeleteState(ctx context.Context, blockRoot [32]byte) error {
return e.db.DeleteState(ctx, blockRoot)
}
// DeleteStates -- passthrough.
func (e Exporter) DeleteStates(ctx context.Context, blockRoots [][32]byte) error {
return e.db.DeleteStates(ctx, blockRoots)
}
// HasState -- passthrough.
func (e Exporter) HasState(ctx context.Context, blockRoot [32]byte) bool {
return e.db.HasState(ctx, blockRoot)
}
// HasStateSummary -- passthrough.
func (e Exporter) HasStateSummary(ctx context.Context, blockRoot [32]byte) bool {
return e.db.HasStateSummary(ctx, blockRoot)
}
// IsFinalizedBlock -- passthrough.
func (e Exporter) IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool {
return e.db.IsFinalizedBlock(ctx, blockRoot)
}
// FinalizedChildBlock -- passthrough.
func (e Exporter) FinalizedChildBlock(ctx context.Context, blockRoot [32]byte) (block.SignedBeaconBlock, error) {
return e.db.FinalizedChildBlock(ctx, blockRoot)
}
// PowchainData -- passthrough
func (e Exporter) PowchainData(ctx context.Context) (*v2.ETH1ChainData, error) {
return e.db.PowchainData(ctx)
}
// SavePowchainData -- passthrough
func (e Exporter) SavePowchainData(ctx context.Context, data *v2.ETH1ChainData) error {
return e.db.SavePowchainData(ctx, data)
}
// ArchivedPointRoot -- passthrough
func (e Exporter) ArchivedPointRoot(ctx context.Context, index types.Slot) [32]byte {
return e.db.ArchivedPointRoot(ctx, index)
}
// HasArchivedPoint -- passthrough
func (e Exporter) HasArchivedPoint(ctx context.Context, index types.Slot) bool {
return e.db.HasArchivedPoint(ctx, index)
}
// LastArchivedRoot -- passthrough
func (e Exporter) LastArchivedRoot(ctx context.Context) [32]byte {
return e.db.LastArchivedRoot(ctx)
}
// HighestSlotBlocksBelow -- passthrough
func (e Exporter) HighestSlotBlocksBelow(ctx context.Context, slot types.Slot) ([]block.SignedBeaconBlock, error) {
return e.db.HighestSlotBlocksBelow(ctx, slot)
}
// HighestSlotStatesBelow -- passthrough
func (e Exporter) HighestSlotStatesBelow(ctx context.Context, slot types.Slot) ([]state.ReadOnlyBeaconState, error) {
return e.db.HighestSlotStatesBelow(ctx, slot)
}
// LastArchivedSlot -- passthrough
func (e Exporter) LastArchivedSlot(ctx context.Context) (types.Slot, error) {
return e.db.LastArchivedSlot(ctx)
}
// RunMigrations -- passthrough
func (e Exporter) RunMigrations(ctx context.Context) error {
return e.db.RunMigrations(ctx)
}
// CleanUpDirtyStates -- passthrough
func (e Exporter) CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint types.Slot) error {
return e.db.RunMigrations(ctx)
}
// LoadGenesis -- passthrough
func (e Exporter) LoadGenesis(ctx context.Context, r io.Reader) error {
return e.db.LoadGenesis(ctx, r)
}
// SaveGenesisData -- passthrough
func (e Exporter) SaveGenesisData(ctx context.Context, state state.BeaconState) error {
return e.db.SaveGenesisData(ctx, state)
}
// EnsureEmbeddedGenesis -- passthrough.
func (e Exporter) EnsureEmbeddedGenesis(ctx context.Context) error {
return e.db.EnsureEmbeddedGenesis(ctx)
}

View File

@@ -539,15 +539,6 @@ def prysm_deps():
version = "v0.0.0-20161010025455-3a0bb77429bd",
)
go_repository(
name = "com_github_confluentinc_confluent_kafka_go",
importpath = "github.com/confluentinc/confluent-kafka-go",
patch_args = ["-p1"],
patches = ["@prysm//third_party:in_gopkg_confluentinc_confluent_kafka_go_v1.patch"],
sum = "h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q=",
version = "v1.4.2",
)
go_repository(
name = "com_github_consensys_bavard",
importpath = "github.com/consensys/bavard",
@@ -3026,12 +3017,6 @@ def prysm_deps():
version = "v0.0.0-20170313163322-e2103e2c3529",
)
go_repository(
name = "com_github_segmentio_kafka_go",
importpath = "github.com/segmentio/kafka-go",
sum = "h1:HtCSf6B4gN/87yc5qTl7WsxPKQIIGXLPPM1bMCPOsoY=",
version = "v0.2.0",
)
go_repository(
name = "com_github_sergi_go_diff",
importpath = "github.com/sergi/go-diff",
@@ -3772,15 +3757,6 @@ def prysm_deps():
sum = "h1:Ev7yu1/f6+d+b3pi5vPdRPc6nNtP1umSfcWiEfRqv6I=",
version = "v1.0.25",
)
go_repository(
name = "in_gopkg_confluentinc_confluent_kafka_go_v1",
importpath = "gopkg.in/confluentinc/confluent-kafka-go.v1",
patch_args = ["-p1"],
patches = ["@prysm//third_party:in_gopkg_confluentinc_confluent_kafka_go_v1.patch"],
sum = "h1:JabkIV98VYFqYKHHzXtgGMFuRgFBNTNzBytbGByzrJI=",
version = "v1.4.2",
)
go_repository(
name = "in_gopkg_d4l3k_messagediff_v1",
importpath = "gopkg.in/d4l3k/messagediff.v1",

3
go.mod
View File

@@ -10,7 +10,6 @@ require (
github.com/bazelbuild/rules_go v0.23.2
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cespare/cp v1.1.1 // indirect
github.com/confluentinc/confluent-kafka-go v1.4.2 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/d4l3k/messagediff v1.2.1
github.com/deckarep/golang-set v1.7.1 // indirect
@@ -122,9 +121,7 @@ require (
google.golang.org/genproto v0.0.0-20210426193834-eac7f76ac494
google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.27.1
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
gopkg.in/d4l3k/messagediff.v1 v1.2.1
gopkg.in/errgo.v2 v2.1.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.18.3
k8s.io/client-go v0.18.3

5
go.sum
View File

@@ -184,8 +184,6 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/confluentinc/confluent-kafka-go v1.4.2 h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q=
github.com/confluentinc/confluent-kafka-go v1.4.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/consensys/bavard v0.1.8-0.20210406032232-f3452dc9b572/go.mod h1:Bpd0/3mZuaj6Sj+PqrmIquiOKy397AKGThQPaGzNXAQ=
github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f/go.mod h1:815PAHg3wvysy0SyIqanF8gZ0Y1wjk/hrDHD/iT88+Q=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
@@ -1866,11 +1864,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2 h1:JabkIV98VYFqYKHHzXtgGMFuRgFBNTNzBytbGByzrJI=
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
gopkg.in/d4l3k/messagediff.v1 v1.2.1 h1:70AthpjunwzUiarMHyED52mj9UwtAnE89l1Gmrt3EU0=
gopkg.in/d4l3k/messagediff.v1 v1.2.1/go.mod h1:EUzikiKadqXWcD1AzJLagx0j/BeeWGtn++04Xniyg44=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=

View File

@@ -64,9 +64,7 @@ type Flags struct {
EnableActiveBalanceCache bool // EnableActiveBalanceCache enables active balance cache.
// Bug fixes related flags.
AttestTimely bool // AttestTimely fixes #8185. It is gated behind a flag to ensure beacon node's fix can safely roll out first. We'll invert this in v1.1.0.
KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc.
AttestTimely bool // AttestTimely fixes #8185. It is gated behind a flag to ensure beacon node's fix can safely roll out first. We'll invert this in v1.1.0.
AttestationAggregationStrategy string // AttestationAggregationStrategy defines aggregation strategy to be used when aggregating.
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
@@ -152,10 +150,6 @@ func ConfigureBeaconChain(ctx *cli.Context) {
cfg.EnableSSZCache = true
if ctx.String(kafkaBootstrapServersFlag.Name) != "" {
logEnabled(kafkaBootstrapServersFlag)
cfg.KafkaBootstrapServers = ctx.String(kafkaBootstrapServersFlag.Name)
}
if ctx.IsSet(disableGRPCConnectionLogging.Name) {
logDisabled(disableGRPCConnectionLogging)
cfg.DisableGRPCConnectionLogs = true

View File

@@ -31,10 +31,6 @@ var (
Name: "interop-write-ssz-state-transitions",
Usage: "Write ssz states to disk after attempted state transition",
}
kafkaBootstrapServersFlag = &cli.StringFlag{
Name: "kafka-url",
Usage: "Stream attestations and blocks to specified kafka servers. This field is used for bootstrap.servers kafka config field.",
}
enableExternalSlasherProtectionFlag = &cli.BoolFlag{
Name: "enable-external-slasher-protection",
Usage: "Enables the validator to connect to external slasher to prevent it from " +
@@ -182,7 +178,6 @@ var E2EValidatorFlags = []string{
var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
devModeFlag,
writeSSZStateTransitionsFlag,
kafkaBootstrapServersFlag,
disableGRPCConnectionLogging,
attestationAggregationStrategy,
PyrmontTestnet,

View File

@@ -1,60 +0,0 @@
diff --git a/kafka/BUILD.bazel b/kafka/BUILD.bazel
index bc46110..367c9f6 100644
--- a/kafka/BUILD.bazel
+++ b/kafka/BUILD.bazel
@@ -29,19 +29,20 @@ go_library(
"testhelpers.go",
"time.go",
],
+ cdeps = ["//kafka/librdkafka:precompiled"],
cgo = True,
clinkopts = select({
"@io_bazel_rules_go//go/platform:android": [
- "kafka/librdkafka/librdkafka_glibc_linux.a -lm -ldl -lpthread -lrt",
+ "-lm -ldl -lpthread -lrt",
],
"@io_bazel_rules_go//go/platform:darwin": [
- "kafka/librdkafka/librdkafka_darwin.a -lm -lsasl2 -lz -ldl -lpthread",
+ "-lm -lsasl2 -lz -ldl -lpthread",
],
"@io_bazel_rules_go//go/platform:ios": [
- "kafka/librdkafka/librdkafka_darwin.a -lm -lsasl2 -lz -ldl -lpthread",
+ "-lm -lsasl2 -lz -ldl -lpthread",
],
"@io_bazel_rules_go//go/platform:linux": [
- "kafka/librdkafka/librdkafka_glibc_linux.a -lm -ldl -lpthread -lrt",
+ "-lm -ldl -lpthread -lrt",
],
"//conditions:default": [],
}),
diff --git a/kafka/librdkafka/BUILD.bazel b/kafka/librdkafka/BUILD.bazel
index 2ced242..9c06d83 100644
--- a/kafka/librdkafka/BUILD.bazel
+++ b/kafka/librdkafka/BUILD.bazel
@@ -8,4 +8,26 @@ go_library(
],
importpath = "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka",
visibility = ["//visibility:public"],
+ cgo = True,
+ cdeps = [":precompiled"],
+)
+
+cc_library(
+ name = "precompiled",
+ srcs = select({
+ "@io_bazel_rules_go//go/platform:android": [
+ "librdkafka_glibc_linux.a",
+ ],
+ "@io_bazel_rules_go//go/platform:darwin": [
+ "librdkafka_darwin.a",
+ ],
+ "@io_bazel_rules_go//go/platform:ios": [
+ "librdkafka_darwin.a",
+ ],
+ "@io_bazel_rules_go//go/platform:linux": [
+ "librdkafka_glibc_linux.a",
+ ],
+ }),
+ visibility = ["//visibility:public"],
+ hdrs = ["rdkafka.h"],
)

View File

@@ -1,24 +0,0 @@
load("@rules_foreign_cc//tools/build_defs:cmake.bzl", "cmake_external")
#cmake_external(
# name = "librdkafka",
# cache_entries = {
# "RDKAFKA_BUILD_STATIC": "ON",
# "WITH_ZSTD": "OFF",
# "WITH_SSL": "OFF",
# "WITH_SASL": "OFF",
# "ENABLE_LZ4_EXT": "OFF",
# "WITH_LIBDL": "OFF",
# "WITH_ZLIB": "OFF",
# },
# lib_source = "@librdkafka//:all",
# static_libraries = [
# "librdkafka++.a",
# "librdkafka.a",
# ],
# tags = [
# "manual",
# "no-remote-exec",
# ],
# visibility = ["//visibility:public"],
#)

View File

@@ -15,7 +15,7 @@ This toolchain suite describes cross compile configuration with a Dockerfile wit
| target | linux_amd64 | linux_arm64 | osx_amd64 | windows_amd64 |
|----------|-------------------|------------------|-----------------|-----------------------|
| `//beacon-chain` | :heavy_check_mark: docker-sandbox and RBE, libkafka supported locally only | :heavy_check_mark: docker-sandbox and RBE, no libkafka support | :heavy_check_mark: docker-sandbox, no libkafka support | :heavy_check_mark: docker-sandbox, no libkafka support |
| `//beacon-chain` | :heavy_check_mark: docker-sandbox and RBE, supported locally only | :heavy_check_mark: docker-sandbox and RBE | :heavy_check_mark: docker-sandbox | :heavy_check_mark: docker-sandbox |
| `//validator`| :heavy_check_mark: docker-sandbox and RBE | :heavy_check_mark: docker-sandbox and RBE | :heavy_check_mark: docker-sandbox | :heavy_check_mark: |
The configurations above are enforced via pull request presubmit checks.
@@ -37,4 +37,3 @@ There are a few caveats to each of these strategies.
- Local runs require clang compiler and the appropriate cross compilers installed. These runs should only be considered for a power user or user with specific build requirements. See the Dockerfile setup scripts to understand what dependencies must be installed and where.
- Docker sandbox is *slow*. Like really slow! The purpose of the docker sandbox is to test RBE builds without deploying a full RBE system. Each build action is executed in its own container. Given the large number of small targets in this project, the overhead of creating docker containers makes this strategy the slowest of all, but requires zero additional setup.
- Remote Build Execution is by far the fastest, if you have a RBE backend available. This is another advanced use case which will require two config flags above as well as additional flags to specify the `--remote_executor`. Some of these flags are present in the project `.bazelrc` with example values, but commented out.
- Building with libkafka (`--define kafka_enabled=true`) is not supported with docker-sandbox or RBE at this time. Likely due to missing cmake dependencies in the builder image or lack of configuration via toolchains.