mirror of
https://github.com/blockchain-etl/ethereum-etl.git
synced 2026-01-09 13:57:54 -05:00
Merge pull request #359 from CoinStatsHQ/pr/aws-kinesis-support
Added support for AWS Kinesis
This commit is contained in:
82
blockchainetl/jobs/exporters/kinesis_item_exporter.py
Normal file
82
blockchainetl/jobs/exporters/kinesis_item_exporter.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2022 CoinStats LLC
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
import json
|
||||
import typing as t
|
||||
import uuid
|
||||
from itertools import zip_longest
|
||||
|
||||
import boto3
|
||||
|
||||
_KINESIS_BATCH_LIMIT = 500
|
||||
|
||||
|
||||
def _uuid_partition_key(_: dict) -> str:
|
||||
return uuid.uuid4().hex
|
||||
|
||||
|
||||
class KinesisItemExporter:
|
||||
def __init__(
|
||||
self,
|
||||
stream_name: str,
|
||||
partition_key_callable: t.Callable[[dict], str] = _uuid_partition_key,
|
||||
):
|
||||
import boto3
|
||||
self._stream_name = stream_name
|
||||
self._partition_key_callable = partition_key_callable
|
||||
self._kinesis_client = None # initialized in .open
|
||||
|
||||
def open(self) -> None:
|
||||
self._kinesis_client = boto3.client('kinesis')
|
||||
|
||||
def export_items(self, items: t.Iterable[dict]) -> None:
|
||||
sentinel = object()
|
||||
chunks = zip_longest(
|
||||
*(iter(items),) * _KINESIS_BATCH_LIMIT,
|
||||
fillvalue=sentinel,
|
||||
)
|
||||
for chunk in chunks:
|
||||
self._kinesis_client.put_records(
|
||||
StreamName=self._stream_name,
|
||||
Records=[
|
||||
{
|
||||
'Data': _serialize_item(item),
|
||||
'PartitionKey': self._partition_key_callable(item),
|
||||
}
|
||||
for item in chunk
|
||||
if item is not sentinel
|
||||
],
|
||||
)
|
||||
|
||||
def export_item(self, item: dict) -> None:
|
||||
self._kinesis_client.put_record(
|
||||
StreamName=self._stream_name,
|
||||
Data=_serialize_item(item),
|
||||
PartitionKey=self._partition_key_callable(item),
|
||||
)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def _serialize_item(item: dict) -> bytes:
|
||||
return json.dumps(item).encode()
|
||||
@@ -42,6 +42,7 @@ from ethereumetl.thread_local_proxy import ThreadLocalProxy
|
||||
'or Postgres connection url e.g. postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum; '
|
||||
'or GCS bucket e.g. gs://your-bucket-name; '
|
||||
'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 '
|
||||
'or Kinesis, e.g. kinesis://your-data-stream-name'
|
||||
'If not specified will print to console')
|
||||
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
|
||||
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str,
|
||||
|
||||
@@ -51,6 +51,11 @@ def create_item_exporter(output):
|
||||
batch_max_latency=2,
|
||||
batch_max_messages=1000,
|
||||
enable_message_ordering=enable_message_ordering)
|
||||
elif item_exporter_type == ItemExporterType.KINESIS:
|
||||
from blockchainetl.jobs.exporters.kinesis_item_exporter import KinesisItemExporter
|
||||
item_exporter = KinesisItemExporter(
|
||||
stream_name=output[len('kinesis://'):],
|
||||
)
|
||||
elif item_exporter_type == ItemExporterType.POSTGRES:
|
||||
from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter
|
||||
from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table
|
||||
@@ -109,6 +114,8 @@ def get_bucket_and_path_from_gcs_output(output):
|
||||
def determine_item_exporter_type(output):
|
||||
if output is not None and output.startswith('projects'):
|
||||
return ItemExporterType.PUBSUB
|
||||
if output is not None and output.startswith('kinesis://'):
|
||||
return ItemExporterType.KINESIS
|
||||
if output is not None and output.startswith('kafka'):
|
||||
return ItemExporterType.KAFKA
|
||||
elif output is not None and output.startswith('postgresql'):
|
||||
@@ -123,6 +130,7 @@ def determine_item_exporter_type(output):
|
||||
|
||||
class ItemExporterType:
|
||||
PUBSUB = 'pubsub'
|
||||
KINESIS = 'kinesis'
|
||||
POSTGRES = 'postgres'
|
||||
GCS = 'gcs'
|
||||
CONSOLE = 'console'
|
||||
|
||||
Reference in New Issue
Block a user