Resolved Conflicts

This commit is contained in:
deq
2021-12-22 13:02:02 +05:30
14 changed files with 256 additions and 40 deletions

View File

@@ -0,0 +1,111 @@
# MIT License
#
# Copyright (c) 2020 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import logging
from collections import defaultdict
from google.cloud import storage
def build_block_bundles(items):
blocks = defaultdict(list)
transactions = defaultdict(list)
logs = defaultdict(list)
token_transfers = defaultdict(list)
traces = defaultdict(list)
for item in items:
item_type = item.get('type')
if item_type == 'block':
blocks[item.get('number')].append(item)
elif item_type == 'transaction':
transactions[item.get('block_number')].append(item)
elif item_type == 'log':
logs[item.get('block_number')].append(item)
elif item_type == 'token_transfer':
token_transfers[item.get('block_number')].append(item)
elif item_type == 'trace':
traces[item.get('block_number')].append(item)
else:
logging.info(f'Skipping item with type {item_type}')
block_bundles = []
for block_number in sorted(blocks.keys()):
if len(blocks[block_number]) != 1:
raise ValueError(f'There must be a single block for a given block number, was {len(blocks[block_number])} for block number {block_number}')
block_bundles.append({
'block': blocks[block_number][0],
'transactions': transactions[block_number],
'logs': logs[block_number],
'token_transfers': token_transfers[block_number],
'traces': traces[block_number],
})
return block_bundles
class GcsItemExporter:
def __init__(
self,
bucket,
path='blocks',
build_block_bundles_func=build_block_bundles):
self.bucket = bucket
self.path = normalize_path(path)
self.build_block_bundles_func = build_block_bundles_func
self.storage_client = storage.Client()
def open(self):
pass
def export_items(self, items):
block_bundles = self.build_block_bundles_func(items)
for block_bundle in block_bundles:
block = block_bundle.get('block')
if block is None:
raise ValueError('block_bundle must include the block field')
block_number = block.get('number')
if block_number is None:
raise ValueError('block_bundle must include the block.number field')
destination_blob_name = f'{self.path}/{block_number}.json'
bucket = self.storage_client.bucket(self.bucket)
blob = bucket.blob(destination_blob_name)
blob.upload_from_string(json.dumps(block_bundle))
logging.info(f'Uploaded file gs://{self.bucket}/{destination_blob_name}')
def close(self):
pass
def normalize_path(p):
if p is None:
p = ''
if p.startswith('/'):
p = p[1:]
if p.endswith('/'):
p = p[:len(p) - 1]
return p

View File

@@ -29,9 +29,19 @@ from timeout_decorator import timeout_decorator
class GooglePubSubItemExporter:
def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id', 'item_timestamp')):
def __init__(self, item_type_to_topic_mapping, message_attributes=(),
batch_max_bytes=1024 * 5, batch_max_latency=1, batch_max_messages=1000,
enable_message_ordering=False):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.publisher = create_publisher()
self.batch_max_bytes = batch_max_bytes
self.batch_max_latency = batch_max_latency
self.batch_max_messages = batch_max_messages
self.enable_message_ordering = enable_message_ordering
self.publisher = self.create_publisher()
self.message_attributes = message_attributes
def open(self):
@@ -46,7 +56,7 @@ class GooglePubSubItemExporter:
# details = "channel is in state TRANSIENT_FAILURE"
# https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606
logging.info('Recreating Pub/Sub publisher.')
self.publisher = create_publisher()
self.publisher = self.create_publisher()
raise e
@timeout_decorator.timeout(300)
@@ -66,7 +76,8 @@ class GooglePubSubItemExporter:
topic_path = self.item_type_to_topic_mapping.get(item_type)
data = json.dumps(item).encode('utf-8')
message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item))
ordering_key = 'all' if self.enable_message_ordering else ''
message_future = self.publisher.publish(topic_path, data=data, ordering_key=ordering_key, **self.get_message_attributes(item))
return message_future
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
@@ -80,15 +91,15 @@ class GooglePubSubItemExporter:
return attributes
def create_publisher(self):
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=self.batch_max_bytes,
max_latency=self.batch_max_latency,
max_messages=self.batch_max_messages,
)
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=self.enable_message_ordering)
return pubsub_v1.PublisherClient(batch_settings=batch_settings, publisher_options=publisher_options)
def close(self):
pass
def create_publisher():
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024 * 5, # 5 kilobytes
max_latency=1, # 1 second
max_messages=1000,
)
return pubsub_v1.PublisherClient(batch_settings)

View File

@@ -0,0 +1,42 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
class MultiItemExporter:
def __init__(self, item_exporters):
self.item_exporters = item_exporters
def open(self):
for exporter in self.item_exporters:
exporter.open()
def export_items(self, items):
for exporter in self.item_exporters:
exporter.export_items(items)
def export_item(self, item):
for exporter in self.item_exporters:
exporter.export_item(item)
def close(self):
for exporter in self.item_exporters:
exporter.close()

View File

@@ -207,11 +207,13 @@ You can tune `--batch-size`, `--max-workers` for performance.
- This command outputs blocks, transactions, logs, token_transfers to the console by default.
- Entity types can be specified with the `-e` option,
e.g. `-e block,transaction,log,token_transfer,trace,contract,token`.
- Use `--output` option to specify the Google Pub/Sub topic or Postgres database where to publish blockchain data,
- Use `--output` option to specify the Google Pub/Sub topic, Postgres database or GCS bucket where to publish blockchain data,
- For Google PubSub: `--output=projects/<your-project>/topics/crypto_ethereum`.
Data will be pushed to `projects/<your-project>/topics/crypto_ethereum.blocks`, `projects/<your-project>/topics/crypto_ethereum.transactions` etc. topics.
- For Postgres: `--output=postgresql+pg8000://<user>:<password>@<host>:<port>/<database_name>`,
e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`.
e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`.
- For GCS: `--output=gs://<bucket_name>`. Make sure to install and initialize `gcloud` cli.
- Those output types can be combined with a comma e.g. `--output=gs://<bucket_name>,projects/<your-project>/topics/crypto_ethereum`
The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema)
and [indexes](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/indexes) can be found in this
repo [ethereum-etl-postgres](https://github.com/blockchain-etl/ethereum-etl-postgres).

View File

@@ -1,7 +1,7 @@
# Uploading to Docker Hub
```bash
ETHEREUMETL_VERSION=1.7.4
ETHEREUMETL_VERSION=1.8.0
docker build -t ethereum-etl:${ETHEREUMETL_VERSION} -f Dockerfile .
docker tag ethereum-etl:${ETHEREUMETL_VERSION} blockchainetl/ethereum-etl:${ETHEREUMETL_VERSION}
docker push blockchainetl/ethereum-etl:${ETHEREUMETL_VERSION}

View File

@@ -48,7 +48,7 @@ from ethereumetl.cli.stream import stream
@click.group()
@click.version_option(version='1.7.4')
@click.version_option(version='1.8.0')
@click.pass_context
def cli(ctx):
pass

View File

@@ -27,6 +27,7 @@ from blockchainetl.streaming.streaming_utils import configure_signals, configure
from ethereumetl.enumeration.entity_type import EntityType
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.streaming.item_exporter_creator import create_item_exporters
from ethereumetl.thread_local_proxy import ThreadLocalProxy
@@ -100,9 +101,7 @@ def parse_entity_types(entity_types):
def validate_entity_types(entity_types, output):
from ethereumetl.streaming.item_exporter_creator import determine_item_exporter_type, ItemExporterType
item_exporter_type = determine_item_exporter_type(output)
if item_exporter_type == ItemExporterType.POSTGRES \
if output is not None and 'postgres' in output \
and (EntityType.CONTRACT in entity_types or EntityType.TOKEN in entity_types):
raise ValueError('contract and token are not yet supported entity types for postgres item exporter.')

View File

@@ -41,3 +41,4 @@ class EthTrace(object):
self.error = None
self.status = None
self.trace_id = None
self.trace_index = None

View File

@@ -96,6 +96,7 @@ class ExportTracesJob(BaseJob):
calculate_trace_statuses(all_traces)
calculate_trace_ids(all_traces)
calculate_trace_indexes(all_traces)
for trace in all_traces:
self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace))
@@ -103,3 +104,9 @@ class ExportTracesJob(BaseJob):
def _end(self):
self.batch_work_executor.shutdown()
self.item_exporter.close()
def calculate_trace_indexes(traces):
# Only works if traces were originally ordered correctly which is the case for Parity traces
for ind, trace in enumerate(traces):
trace.trace_index = ind

View File

@@ -190,4 +190,5 @@ class EthTraceMapper(object):
'error': trace.error,
'status': trace.status,
'trace_id': trace.trace_id,
'trace_index': trace.trace_index,
}

View File

@@ -163,7 +163,8 @@ def enrich_traces(blocks, traces):
'status',
'transaction_hash',
'block_number',
'trace_id'
'trace_id',
'trace_index'
],
[
('timestamp', 'block_timestamp'),

View File

@@ -87,13 +87,14 @@ class EthStreamerAdapter:
logging.info('Exporting with ' + type(self.item_exporter).__name__)
all_items = enriched_blocks + \
enriched_transactions + \
enriched_logs + \
enriched_token_transfers + \
enriched_traces + \
enriched_contracts + \
enriched_tokens
all_items = \
sort_by(enriched_blocks, 'number') + \
sort_by(enriched_transactions, ('block_number', 'transaction_index')) + \
sort_by(enriched_logs, ('block_number', 'log_index')) + \
sort_by(enriched_token_transfers, ('block_number', 'log_index')) + \
sort_by(enriched_traces, ('block_number', 'trace_index')) + \
sort_by(enriched_contracts, ('block_number',)) + \
sort_by(enriched_tokens, ('block_number',))
self.calculate_item_ids(all_items)
self.calculate_item_timestamps(all_items)
@@ -219,3 +220,9 @@ class EthStreamerAdapter:
def close(self):
self.item_exporter.close()
def sort_by(arr, fields):
if isinstance(fields, tuple):
fields = tuple(fields)
return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields))

View File

@@ -21,21 +21,36 @@
# SOFTWARE.
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter
def create_item_exporters(outputs):
split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console']
item_exporters = [create_item_exporter(output) for output in split_outputs]
return MultiItemExporter(item_exporters)
def create_item_exporter(output, connection_url):
item_exporter_type = determine_item_exporter_type(output)
if item_exporter_type == ItemExporterType.PUBSUB:
from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping={
'block': output + '.blocks',
'transaction': output + '.transactions',
'log': output + '.logs',
'token_transfer': output + '.token_transfers',
'trace': output + '.traces',
'contract': output + '.contracts',
'token': output + '.tokens',
})
enable_message_ordering = 'sorted' in output or 'ordered' in output
item_exporter = GooglePubSubItemExporter(
item_type_to_topic_mapping={
'block': output + '.blocks',
'transaction': output + '.transactions',
'log': output + '.logs',
'token_transfer': output + '.token_transfers',
'trace': output + '.traces',
'contract': output + '.contracts',
'token': output + '.tokens',
},
message_attributes=('item_id', 'item_timestamp'),
batch_max_bytes=1024 * 1024 * 5,
batch_max_latency=2,
batch_max_messages=1000,
enable_message_ordering=enable_message_ordering)
elif item_exporter_type == ItemExporterType.POSTGRES:
from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter
from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table
@@ -54,6 +69,10 @@ def create_item_exporter(output, connection_url):
},
converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(),
ListFieldItemConverter('topics', 'topic', fill=4)])
elif item_exporter_type == ItemExporterType.GCS:
from blockchainetl.jobs.exporters.gcs_item_exporter import GcsItemExporter
bucket, path = get_bucket_and_path_from_gcs_output(output)
item_exporter = GcsItemExporter(bucket=bucket, path=path)
elif item_exporter_type == ItemExporterType.CONSOLE:
item_exporter = ConsoleItemExporter()
elif item_exporter_type == ItemExporterType.KAFKA:
@@ -74,6 +93,17 @@ def create_item_exporter(output, connection_url):
return item_exporter
def get_bucket_and_path_from_gcs_output(output):
output = output.replace('gs://', '')
bucket_and_path = output.split('/', 1)
bucket = bucket_and_path[0]
if len(bucket_and_path) > 1:
path = bucket_and_path[1]
else:
path = ''
return bucket, path
def determine_item_exporter_type(output):
if output is not None and output.startswith('projects'):
return ItemExporterType.PUBSUB
@@ -81,6 +111,8 @@ def determine_item_exporter_type(output):
return ItemExporterType.KAFKA
elif output is not None and output.startswith('postgresql'):
return ItemExporterType.POSTGRES
elif output is not None and output.startswith('gs://'):
return ItemExporterType.GCS
elif output is None or output == 'console':
return ItemExporterType.CONSOLE
else:
@@ -90,6 +122,7 @@ def determine_item_exporter_type(output):
class ItemExporterType:
PUBSUB = 'pubsub'
POSTGRES = 'postgres'
GCS = 'gcs'
CONSOLE = 'console'
KAFKA = 'kafka'
UNKNOWN = 'unknown'

View File

@@ -11,7 +11,7 @@ long_description = read('README.md') if os.path.isfile("README.md") else ""
setup(
name='ethereum-etl',
version='1.7.4',
version='1.8.0',
author='Evgeny Medvedev',
author_email='evge.medvedev@gmail.com',
description='Tools for exporting Ethereum blockchain data to CSV or JSON',
@@ -47,7 +47,8 @@ setup(
extras_require={
'streaming': [
'timeout-decorator==0.4.1',
'google-cloud-pubsub==0.39.1',
'google-cloud-pubsub==2.1.0',
'google-cloud-storage==1.33.0',
'sqlalchemy==1.3.13',
'pg8000==1.13.2',
],