Add export to GCS and message ordering in pubsub

This commit is contained in:
medvedev1088
2021-01-09 19:34:53 +07:00
parent 511b60ecfa
commit 48f11fc9e1
6 changed files with 152 additions and 4 deletions

View File

@@ -0,0 +1,90 @@
# MIT License
#
# Copyright (c) 2020 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import logging
from collections import defaultdict
from google.cloud import storage
class GcsItemExporter:
def __init__(
self,
bucket):
self.bucket = bucket
self.storage_client = storage.Client()
def open(self):
pass
def export_items(self, items):
block_bundles = build_block_bundles(items)
for block_bundle in block_bundles:
block_number = block_bundle['block']['number']
destination_blob_name = f'blocks/{block_number}.json'
bucket = self.storage_client.bucket(self.bucket)
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(json.dumps(block_bundle))
logging.info(f'Uploaded file gs://{self.bucket}/{destination_blob_name}')
def close(self):
pass
def build_block_bundles(items):
blocks = defaultdict(list)
transactions = defaultdict(list)
logs = defaultdict(list)
token_transfers = defaultdict(list)
traces = defaultdict(list)
for item in items:
item_type = item.get('type')
if item_type == 'block':
blocks[item.get('number')].append(item)
elif item_type == 'transaction':
transactions[item.get('block_number')].append(item)
elif item_type == 'log':
logs[item.get('block_number')].append(item)
elif item_type == 'token_transfer':
token_transfers[item.get('block_number')].append(item)
elif item_type == 'trace':
traces[item.get('block_number')].append(item)
else:
logging.info(f'Skipping item with type {item_type}')
block_bundles = []
for block_number in sorted(blocks.keys()):
if len(blocks[block_number]) != 1:
raise ValueError(f'There must be a single block for a given block number, was {len(blocks[block_number])} for block number {block_number}')
block_bundles.append({
'block': blocks[block_number][0],
'transactions': transactions[block_number],
'logs': logs[block_number],
'token_transfers': token_transfers[block_number],
'traces': traces[block_number],
})
return block_bundles

View File

@@ -0,0 +1,42 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
class MultiItemExporter:
def __init__(self, item_exporters):
self.item_exporters = item_exporters
def open(self):
for exporter in self.item_exporters:
exporter.open()
def export_items(self, items):
for exporter in self.item_exporters:
exporter.export_items(items)
def export_item(self, item):
for exporter in self.item_exporters:
exporter.export_item(item)
def close(self):
for exporter in self.item_exporters:
exporter.close()

View File

@@ -1,7 +1,7 @@
# Uploading to Docker Hub
```bash
ETHEREUMETL_VERSION=1.6.0
ETHEREUMETL_VERSION=1.6.0-ordering1
docker build -t ethereum-etl:${ETHEREUMETL_VERSION} -f Dockerfile .
docker tag ethereum-etl:${ETHEREUMETL_VERSION} blockchainetl/ethereum-etl:${ETHEREUMETL_VERSION}
docker push blockchainetl/ethereum-etl:${ETHEREUMETL_VERSION}

View File

@@ -27,6 +27,7 @@ from blockchainetl.streaming.streaming_utils import configure_signals, configure
from ethereumetl.enumeration.entity_type import EntityType
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.streaming.item_exporter_creator import create_item_exporters
from ethereumetl.thread_local_proxy import ThreadLocalProxy
@@ -55,7 +56,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
configure_logging(log_file)
configure_signals()
entity_types = parse_entity_types(entity_types)
validate_entity_types(entity_types, output)
# validate_entity_types(entity_types, output)
from ethereumetl.streaming.item_exporter_creator import create_item_exporter
from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
@@ -67,7 +68,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
streamer_adapter = EthStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
item_exporter=create_item_exporter(output),
item_exporter=create_item_exporters(output),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types

View File

@@ -21,6 +21,14 @@
# SOFTWARE.
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter
def create_item_exporters(outputs):
split_outputs = [output.strip() for output in outputs.split(',')]
item_exporters = [create_item_exporter(output) for output in split_outputs]
return MultiItemExporter(item_exporters)
def create_item_exporter(output):
@@ -35,7 +43,7 @@ def create_item_exporter(output):
'trace': output + '.traces',
'contract': output + '.contracts',
'token': output + '.tokens',
}, enable_message_ordering=True)
}, enable_message_ordering=False)
elif item_exporter_type == ItemExporterType.POSTGRES:
from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter
from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table
@@ -54,6 +62,9 @@ def create_item_exporter(output):
},
converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(),
ListFieldItemConverter('topics', 'topic', fill=4)])
elif item_exporter_type == ItemExporterType.GCS:
from blockchainetl.jobs.exporters.gcs_item_exporter import GcsItemExporter
item_exporter = GcsItemExporter(bucket=output.replace('gs://', ''))
elif item_exporter_type == ItemExporterType.CONSOLE:
item_exporter = ConsoleItemExporter()
else:
@@ -67,6 +78,8 @@ def determine_item_exporter_type(output):
return ItemExporterType.PUBSUB
elif output is not None and output.startswith('postgresql'):
return ItemExporterType.POSTGRES
elif output is not None and output.startswith('gs://'):
return ItemExporterType.GCS
elif output is None or output == 'console':
return ItemExporterType.CONSOLE
else:
@@ -76,5 +89,6 @@ def determine_item_exporter_type(output):
class ItemExporterType:
PUBSUB = 'pubsub'
POSTGRES = 'postgres'
GCS = 'gcs'
CONSOLE = 'console'
UNKNOWN = 'unknown'

View File

@@ -47,6 +47,7 @@ setup(
'streaming': [
'timeout-decorator==0.4.1',
'google-cloud-pubsub==2.1.0',
'google-cloud-storage==1.33.0',
'sqlalchemy==1.3.13',
'pg8000==1.13.2',
],