mirror of
https://github.com/blockchain-etl/ethereum-etl.git
synced 2026-01-10 14:27:54 -05:00
Enable message ordering for pubsub exporter
This commit is contained in:
@@ -29,9 +29,14 @@ from timeout_decorator import timeout_decorator
|
||||
|
||||
class GooglePubSubItemExporter:
|
||||
|
||||
def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id', 'item_timestamp')):
|
||||
def __init__(
|
||||
self,
|
||||
item_type_to_topic_mapping,
|
||||
message_attributes=('item_id', 'item_timestamp'),
|
||||
enable_message_ordering=False):
|
||||
self.item_type_to_topic_mapping = item_type_to_topic_mapping
|
||||
self.publisher = create_publisher()
|
||||
self.enable_message_ordering = enable_message_ordering
|
||||
self.publisher = create_publisher(enable_message_ordering)
|
||||
self.message_attributes = message_attributes
|
||||
|
||||
def open(self):
|
||||
@@ -46,7 +51,7 @@ class GooglePubSubItemExporter:
|
||||
# details = "channel is in state TRANSIENT_FAILURE"
|
||||
# https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606
|
||||
logging.info('Recreating Pub/Sub publisher.')
|
||||
self.publisher = create_publisher()
|
||||
self.publisher = create_publisher(self.enable_message_ordering)
|
||||
raise e
|
||||
|
||||
@timeout_decorator.timeout(300)
|
||||
@@ -66,7 +71,8 @@ class GooglePubSubItemExporter:
|
||||
topic_path = self.item_type_to_topic_mapping.get(item_type)
|
||||
data = json.dumps(item).encode('utf-8')
|
||||
|
||||
message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item))
|
||||
ordering_key = 'all' if self.enable_message_ordering else ''
|
||||
message_future = self.publisher.publish(topic_path, data=data, ordering_key=ordering_key, **self.get_message_attributes(item))
|
||||
return message_future
|
||||
else:
|
||||
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
|
||||
@@ -84,11 +90,12 @@ class GooglePubSubItemExporter:
|
||||
pass
|
||||
|
||||
|
||||
def create_publisher():
|
||||
def create_publisher(enable_message_ordering):
|
||||
batch_settings = pubsub_v1.types.BatchSettings(
|
||||
max_bytes=1024 * 5, # 5 kilobytes
|
||||
max_latency=1, # 1 second
|
||||
max_messages=1000,
|
||||
)
|
||||
|
||||
return pubsub_v1.PublisherClient(batch_settings)
|
||||
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=enable_message_ordering)
|
||||
return pubsub_v1.PublisherClient(batch_settings=batch_settings, publisher_options=publisher_options)
|
||||
|
||||
@@ -41,3 +41,4 @@ class EthTrace(object):
|
||||
self.error = None
|
||||
self.status = None
|
||||
self.trace_id = None
|
||||
self.trace_index = None
|
||||
|
||||
@@ -96,6 +96,7 @@ class ExportTracesJob(BaseJob):
|
||||
|
||||
calculate_trace_statuses(all_traces)
|
||||
calculate_trace_ids(all_traces)
|
||||
calculate_trace_indexes(all_traces)
|
||||
|
||||
for trace in all_traces:
|
||||
self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace))
|
||||
@@ -103,3 +104,9 @@ class ExportTracesJob(BaseJob):
|
||||
def _end(self):
|
||||
self.batch_work_executor.shutdown()
|
||||
self.item_exporter.close()
|
||||
|
||||
|
||||
def calculate_trace_indexes(traces):
|
||||
# Only works if traces were originally ordered correctly which is the case for Parity traces
|
||||
for ind, trace in enumerate(traces):
|
||||
trace.trace_index = ind
|
||||
|
||||
@@ -190,4 +190,5 @@ class EthTraceMapper(object):
|
||||
'error': trace.error,
|
||||
'status': trace.status,
|
||||
'trace_id': trace.trace_id,
|
||||
'trace_index': trace.trace_index,
|
||||
}
|
||||
|
||||
@@ -159,7 +159,8 @@ def enrich_traces(blocks, traces):
|
||||
'status',
|
||||
'transaction_hash',
|
||||
'block_number',
|
||||
'trace_id'
|
||||
'trace_id',
|
||||
'trace_index'
|
||||
],
|
||||
[
|
||||
('timestamp', 'block_timestamp'),
|
||||
|
||||
@@ -87,13 +87,14 @@ class EthStreamerAdapter:
|
||||
|
||||
logging.info('Exporting with ' + type(self.item_exporter).__name__)
|
||||
|
||||
all_items = enriched_blocks + \
|
||||
enriched_transactions + \
|
||||
enriched_logs + \
|
||||
enriched_token_transfers + \
|
||||
enriched_traces + \
|
||||
enriched_contracts + \
|
||||
enriched_tokens
|
||||
all_items = \
|
||||
sort_by(enriched_blocks, 'number') + \
|
||||
sort_by(enriched_transactions, ('block_number', 'transaction_index')) + \
|
||||
sort_by(enriched_logs, ('block_number', 'log_index')) + \
|
||||
sort_by(enriched_token_transfers, ('block_number', 'log_index')) + \
|
||||
sort_by(enriched_traces, ('block_number', 'trace_index')) + \
|
||||
sort_by(enriched_contracts, ('block_number',)) + \
|
||||
sort_by(enriched_tokens, ('block_number',))
|
||||
|
||||
self.calculate_item_ids(all_items)
|
||||
self.calculate_item_timestamps(all_items)
|
||||
@@ -219,3 +220,9 @@ class EthStreamerAdapter:
|
||||
|
||||
def close(self):
|
||||
self.item_exporter.close()
|
||||
|
||||
|
||||
def sort_by(arr, fields):
|
||||
if isinstance(fields, tuple):
|
||||
fields = tuple(fields)
|
||||
return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields))
|
||||
|
||||
@@ -35,7 +35,7 @@ def create_item_exporter(output):
|
||||
'trace': output + '.traces',
|
||||
'contract': output + '.contracts',
|
||||
'token': output + '.tokens',
|
||||
})
|
||||
}, enable_message_ordering=True)
|
||||
elif item_exporter_type == ItemExporterType.POSTGRES:
|
||||
from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter
|
||||
from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table
|
||||
|
||||
Reference in New Issue
Block a user