mirror of
https://github.com/blockchain-etl/ethereum-etl.git
synced 2026-01-11 06:38:17 -05:00
Compare commits
271 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fbd57fc079 | ||
|
|
8204c0827d | ||
|
|
46b91a9ff2 | ||
|
|
b5fd64bdca | ||
|
|
d8547e9c7c | ||
|
|
7ef53859c1 | ||
|
|
e38d1c1f2f | ||
|
|
43fe6b49b3 | ||
|
|
db274c8a85 | ||
|
|
69247042a4 | ||
|
|
218e1e4356 | ||
|
|
5e0fc8cc75 | ||
|
|
77efda5106 | ||
|
|
ece0b7f422 | ||
|
|
b31b76a73a | ||
|
|
0cb7eb60b5 | ||
|
|
02943f7caf | ||
|
|
b844b95868 | ||
|
|
4d305a284f | ||
|
|
e161e6ef13 | ||
|
|
9b917b8ddd | ||
|
|
383caf8331 | ||
|
|
c61e91235f | ||
|
|
0e4b4a894b | ||
|
|
d58c1ebda7 | ||
|
|
f0bf07e60c | ||
|
|
efe7acdc13 | ||
|
|
20404eca9e | ||
|
|
435cbe0a74 | ||
|
|
b606e22cd5 | ||
|
|
4943b0b795 | ||
|
|
eed2068def | ||
|
|
313b4b1237 | ||
|
|
ad6149155e | ||
|
|
c55c0f68dc | ||
|
|
b031b04bc7 | ||
|
|
b314f1ed0c | ||
|
|
61eb2e6e21 | ||
|
|
9f62e7ecea | ||
|
|
4da7e7b23f | ||
|
|
de72ba3511 | ||
|
|
3aabf9aa54 | ||
|
|
284755bafc | ||
|
|
23133594e8 | ||
|
|
ca54ef6c4b | ||
|
|
836f30e198 | ||
|
|
1c6508f15d | ||
|
|
a4d6f8fcb1 | ||
|
|
bc79d7d9bf | ||
|
|
7fdcf0f7b7 | ||
|
|
d3330f7ddc | ||
|
|
1066ec9025 | ||
|
|
2a92ecbf31 | ||
|
|
c238e8b57b | ||
|
|
a27d2427e1 | ||
|
|
c18f78506c | ||
|
|
23bad940db | ||
|
|
0a52db4b8a | ||
|
|
9fd1f906f2 | ||
|
|
f08f93ddfe | ||
|
|
9e51c3b8d4 | ||
|
|
79d341ea45 | ||
|
|
9db1ff104a | ||
|
|
952a49ba4b | ||
|
|
aab122ebf3 | ||
|
|
438c9af751 | ||
|
|
3ec2af25e1 | ||
|
|
84101407c1 | ||
|
|
97a0275ced | ||
|
|
7cbfd0e533 | ||
|
|
94ebd3f3e9 | ||
|
|
c0fd158211 | ||
|
|
7529c43f4e | ||
|
|
ce906f0af1 | ||
|
|
eaf4bf0bf2 | ||
|
|
1a0a8cf0f8 | ||
|
|
f0e4302423 | ||
|
|
fb35431aa7 | ||
|
|
87b1669434 | ||
|
|
9678bb91c7 | ||
|
|
f4e2b57463 | ||
|
|
6599a438a0 | ||
|
|
f8a5f25376 | ||
|
|
de96e394ee | ||
|
|
a58fe4585d | ||
|
|
f8878ff320 | ||
|
|
993ebe67c8 | ||
|
|
f967d73a95 | ||
|
|
e8b0447a63 | ||
|
|
af2ef17832 | ||
|
|
161aa6e472 | ||
|
|
7c80c09500 | ||
|
|
3affbadac3 | ||
|
|
84518f70ae | ||
|
|
aae968cd4b | ||
|
|
6f44daf023 | ||
|
|
2da9d050f4 | ||
|
|
2939c0afbf | ||
|
|
2678a2a2e3 | ||
|
|
d801da96dd | ||
|
|
b876f2059e | ||
|
|
204bcb65f6 | ||
|
|
92c07982c4 | ||
|
|
b6dbf07dbf | ||
|
|
f0732961f5 | ||
|
|
8498a775da | ||
|
|
f0e98871a2 | ||
|
|
f7f192510b | ||
|
|
b1acfa3be7 | ||
|
|
372bf2cb16 | ||
|
|
45a089fe0c | ||
|
|
688ecdfa3f | ||
|
|
0f6234ade3 | ||
|
|
47308f4891 | ||
|
|
2c91a31061 | ||
|
|
956695b77b | ||
|
|
533f516296 | ||
|
|
d34b28e4bf | ||
|
|
3ed8b8bc3e | ||
|
|
e1f658bc36 | ||
|
|
aae2edb20b | ||
|
|
12851c17a5 | ||
|
|
f5115547a3 | ||
|
|
58f5d9020c | ||
|
|
f5fa89a916 | ||
|
|
262e5f65f1 | ||
|
|
6b64c2338b | ||
|
|
be64a901ab | ||
|
|
97e2749f2a | ||
|
|
ca9eb6696b | ||
|
|
6c3a0694a3 | ||
|
|
837c324448 | ||
|
|
7ef53acee0 | ||
|
|
119a54fca1 | ||
|
|
cb0f955c27 | ||
|
|
9725ff9122 | ||
|
|
a142542ef9 | ||
|
|
342c5df3bb | ||
|
|
d189e7a344 | ||
|
|
f8f22f93a1 | ||
|
|
f4403a7e3f | ||
|
|
4ee070627c | ||
|
|
7a337e724a | ||
|
|
ac812a0f36 | ||
|
|
1711d2e809 | ||
|
|
d251f21b04 | ||
|
|
dcdc776c1b | ||
|
|
59ddb23f45 | ||
|
|
64adeb77a8 | ||
|
|
caff3065f7 | ||
|
|
d5567bf343 | ||
|
|
26e940224b | ||
|
|
5efa6e0eb9 | ||
|
|
53c1b59c84 | ||
|
|
8c9d6a62cc | ||
|
|
d085d5a5a4 | ||
|
|
43227e54b2 | ||
|
|
00e63d2b83 | ||
|
|
d58e72974a | ||
|
|
817660199c | ||
|
|
50925fc94d | ||
|
|
e63e703390 | ||
|
|
8a87ba85e3 | ||
|
|
15ff2a2ecb | ||
|
|
e511dac818 | ||
|
|
64d16f581b | ||
|
|
898ce3f3bf | ||
|
|
da6cc6f653 | ||
|
|
53c74e9996 | ||
|
|
67e27a6536 | ||
|
|
3a28eb116d | ||
|
|
b80eac42a6 | ||
|
|
72dcfd4979 | ||
|
|
4bfa3e6ba4 | ||
|
|
1883a01e3f | ||
|
|
1883e5cdac | ||
|
|
8a49edcae3 | ||
|
|
ce2ce23ccd | ||
|
|
d1189ad721 | ||
|
|
c135afc4bc | ||
|
|
65feed595a | ||
|
|
e82a86ca7f | ||
|
|
ed31940391 | ||
|
|
a0689730e4 | ||
|
|
0beebb139d | ||
|
|
5dea830c16 | ||
|
|
37d89e9c9d | ||
|
|
baa79e74c9 | ||
|
|
db590188d1 | ||
|
|
87f5e45d17 | ||
|
|
b772ec7fd7 | ||
|
|
69bb6f9bb3 | ||
|
|
2a9e468c1e | ||
|
|
be1892dffa | ||
|
|
31fb4efc48 | ||
|
|
167b38b6bc | ||
|
|
7d47dd34d6 | ||
|
|
c6fbd10ef3 | ||
|
|
114cd60b5a | ||
|
|
1a0bac2e2c | ||
|
|
2a17fb67ad | ||
|
|
dba7adf8f1 | ||
|
|
75847dd6ba | ||
|
|
e3b83639c2 | ||
|
|
6bb0fffd38 | ||
|
|
b62a2f1b30 | ||
|
|
9d9c383ab8 | ||
|
|
79ad41aad9 | ||
|
|
38c2c1beec | ||
|
|
a582f73cd2 | ||
|
|
257da16c48 | ||
|
|
1b9c07862c | ||
|
|
0667b68cb6 | ||
|
|
28acabe45e | ||
|
|
f593053af3 | ||
|
|
8df7d901ee | ||
|
|
a2b678167b | ||
|
|
c4c9207474 | ||
|
|
289b9005a0 | ||
|
|
eefffb0aa6 | ||
|
|
967c1ad37a | ||
|
|
b0408582db | ||
|
|
8f93376232 | ||
|
|
de4380fb89 | ||
|
|
e0ca8f9a8c | ||
|
|
589cb06ef0 | ||
|
|
54d9220130 | ||
|
|
c2f24c6d18 | ||
|
|
fedf6e60a4 | ||
|
|
629aed5bc8 | ||
|
|
25fc768f39 | ||
|
|
42b96bcf7b | ||
|
|
cf80415fcf | ||
|
|
104576d5eb | ||
|
|
135a475d46 | ||
|
|
90afaabce6 | ||
|
|
55a9371b2b | ||
|
|
1a8ac0630f | ||
|
|
3d79a22370 | ||
|
|
d2b84bd643 | ||
|
|
1a212405ed | ||
|
|
a808330950 | ||
|
|
9ff51f993c | ||
|
|
f2f88e64c5 | ||
|
|
7ee3497431 | ||
|
|
170e7979fe | ||
|
|
5dd95554ef | ||
|
|
45c3baffe6 | ||
|
|
86bb20e9d1 | ||
|
|
8aa076bfb7 | ||
|
|
d9378e7d17 | ||
|
|
55332cde00 | ||
|
|
eaf6a8f9b6 | ||
|
|
040849c66b | ||
|
|
c2a878e175 | ||
|
|
083cbd6891 | ||
|
|
c7ffffa5a8 | ||
|
|
240982bac1 | ||
|
|
53fa461001 | ||
|
|
efeeb297df | ||
|
|
1e00335b71 | ||
|
|
e70698e8b5 | ||
|
|
5f41b1ef15 | ||
|
|
926c0afad1 | ||
|
|
47049e0697 | ||
|
|
1bacd89423 | ||
|
|
686107b313 | ||
|
|
4dba6a1e8c | ||
|
|
ecc4484034 | ||
|
|
48f11fc9e1 | ||
|
|
511b60ecfa |
4
.github/FUNDING.yml
vendored
4
.github/FUNDING.yml
vendored
@@ -1,4 +0,0 @@
|
||||
# These are supported funding model platforms
|
||||
|
||||
custom: https://gitcoin.co/grants/233/ethereumetl
|
||||
|
||||
2
.github/workflows/publish-to-dockerhub.yml
vendored
2
.github/workflows/publish-to-dockerhub.yml
vendored
@@ -7,7 +7,7 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-18.04
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
- name: Publish to DockerHub
|
||||
|
||||
2
.github/workflows/publish-to-pypi.yml
vendored
2
.github/workflows/publish-to-pypi.yml
vendored
@@ -8,7 +8,7 @@ on:
|
||||
jobs:
|
||||
build-n-publish:
|
||||
name: Build and publish to PyPI and TestPyPI
|
||||
runs-on: ubuntu-18.04
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
- name: Set up Python 3.7
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -47,3 +47,6 @@ coverage.xml
|
||||
.venv
|
||||
venv/
|
||||
ENV/
|
||||
|
||||
# etl
|
||||
/last_synced_block.txt
|
||||
14
.readthedocs.yaml
Normal file
14
.readthedocs.yaml
Normal file
@@ -0,0 +1,14 @@
|
||||
# Read the Docs configuration file for MkDocs projects
|
||||
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
|
||||
|
||||
# Required
|
||||
version: 2
|
||||
|
||||
# Set the version of Python and other tools you might need
|
||||
build:
|
||||
os: ubuntu-22.04
|
||||
tools:
|
||||
python: "3.12"
|
||||
|
||||
mkdocs:
|
||||
configuration: mkdocs.yml
|
||||
10
.travis.yml
10
.travis.yml
@@ -2,15 +2,13 @@ language: python
|
||||
dist: xenial
|
||||
matrix:
|
||||
include:
|
||||
- python: "3.5"
|
||||
env: TOX_POSARGS="-e py35"
|
||||
- python: "3.6"
|
||||
env: TOX_POSARGS="-e py36"
|
||||
- python: "3.7"
|
||||
- python: "3.7.2"
|
||||
env: TOX_POSARGS="-e py37"
|
||||
- python: "3.8"
|
||||
env: TOX_POSARGS="-e py38"
|
||||
- python: "3.9"
|
||||
env: TOX_POSARGS="-e py39"
|
||||
install:
|
||||
- travis_retry pip install tox
|
||||
script:
|
||||
- tox $TOX_POSARGS
|
||||
- travis_wait tox $TOX_POSARGS
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM python:3.6
|
||||
FROM python:3.7
|
||||
MAINTAINER Evgeny Medvedev <evge.medvedev@gmail.com>
|
||||
ENV PROJECT_DIR=ethereum-etl
|
||||
|
||||
|
||||
4
LICENSE
4
LICENSE
@@ -1,6 +1,6 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com, https://twitter.com/EvgeMedvedev
|
||||
Copyright (c) 2018-2025 Evgeny Medvedev, evge.medvedev@gmail.com, https://twitter.com/EvgeMedvedev
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
SOFTWARE.
|
||||
|
||||
28
README.md
28
README.md
@@ -1,9 +1,9 @@
|
||||
# Ethereum ETL
|
||||
|
||||
[](https://travis-ci.org/blockchain-etl/ethereum-etl)
|
||||
[](https://gitter.im/ethereum-etl/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
|
||||
[](https://t.me/joinchat/GsMpbA3mv1OJ6YMp3T5ORQ)
|
||||
[](https://discord.gg/wukrezR)
|
||||
[](https://travis-ci.com/github/blockchain-etl/ethereum-etl)
|
||||
[](https://github.com/blockchain-etl/ethereum-etl/blob/develop/LICENSE)
|
||||
[](https://t.me/BlockchainETL)
|
||||
[](https://x.com/EthereumETL)
|
||||
|
||||
Ethereum ETL lets you convert blockchain data into convenient formats like CSVs and relational databases.
|
||||
|
||||
@@ -27,7 +27,7 @@ Export blocks and transactions ([Schema](docs/schema.md#blockscsv), [Reference](
|
||||
--provider-uri https://mainnet.infura.io/v3/7aef3f0cd1f64408b163814b22cc643c
|
||||
```
|
||||
|
||||
Export ERC20 and ERC721 transfers ([Schema](docs/schema.md#token_transferscsv), [Reference](docs/commands.md##export_token_transfers)):
|
||||
Export ERC20 and ERC721 transfers ([Schema](docs/schema.md#token_transferscsv), [Reference](docs/commands.md#export_token_transfers)):
|
||||
|
||||
```bash
|
||||
> ethereumetl export_token_transfers --start-block 0 --end-block 500000 \
|
||||
@@ -78,8 +78,9 @@ For the latest version, check out the repo and call
|
||||
```bash
|
||||
> pip3 install -e .[dev,streaming]
|
||||
> export ETHEREUM_ETL_RUN_SLOW_TESTS=True
|
||||
> export PROVIDER_URL=<your_provider_uri>
|
||||
> pytest -vv
|
||||
```
|
||||
```
|
||||
|
||||
### Running Tox Tests
|
||||
|
||||
@@ -90,7 +91,7 @@ For the latest version, check out the repo and call
|
||||
|
||||
## Running in Docker
|
||||
|
||||
1. Install Docker: https://docs.docker.com/install/
|
||||
1. Install Docker: https://docs.docker.com/get-docker/
|
||||
|
||||
2. Build a docker image
|
||||
|
||||
@@ -104,12 +105,19 @@ For the latest version, check out the repo and call
|
||||
|
||||
4. Run streaming to console or Pub/Sub
|
||||
|
||||
> docker build -t ethereum-etl:latest -f Dockerfile .
|
||||
> docker build -t ethereum-etl:latest .
|
||||
> echo "Stream to console"
|
||||
> docker run ethereum-etl:latest stream --start-block 500000 --log-file log.txt
|
||||
> echo "Stream to Pub/Sub"
|
||||
> docker run -v /path_to_credentials_file/:/ethereum-etl/ --env GOOGLE_APPLICATION_CREDENTIALS=/ethereum-etl/credentials_file.json ethereum-etl:latest stream --start-block 500000 --output projects/<your-project>/topics/crypto_ethereum
|
||||
> docker run -v /path_to_credentials_file/:/ethereum-etl/ --env GOOGLE_APPLICATION_CREDENTIALS=/ethereum-etl/credentials_file.json ethereum-etl:latest stream --start-block 500000 --output projects/<your_project>/topics/crypto_ethereum
|
||||
|
||||
If running on an Apple M1 chip add the `--platform linux/x86_64` option to the `build` and `run` commands e.g.:
|
||||
|
||||
```
|
||||
docker build --platform linux/x86_64 -t ethereum-etl:latest .
|
||||
docker run --platform linux/x86_64 ethereum-etl:latest stream --start-block 500000
|
||||
```
|
||||
|
||||
## Projects using Ethereum ETL
|
||||
* [Google](https://goo.gl/oY5BCQ) - Public BigQuery Ethereum datasets
|
||||
* [Nansen](https://www.nansen.ai/?ref=ethereumetl) - Analytics platform for Ethereum
|
||||
* [Nansen](https://nansen.ai/query?ref=ethereumetl) - Analytics platform for Ethereum
|
||||
|
||||
@@ -45,7 +45,7 @@ class BaseItemExporter(object):
|
||||
self._configure(kwargs)
|
||||
|
||||
def _configure(self, options, dont_fail=False):
|
||||
"""Configure the exporter by poping options from the ``options`` dict.
|
||||
"""Configure the exporter by popping options from the ``options`` dict.
|
||||
If dont_fail is set, it won't raise an exception on unexpected options
|
||||
(useful for using with keyword arguments in subclasses constructors)
|
||||
"""
|
||||
@@ -119,9 +119,16 @@ class CsvItemExporter(BaseItemExporter):
|
||||
return serializer(value)
|
||||
|
||||
def _join_if_needed(self, value):
|
||||
def to_string(x):
|
||||
if isinstance(x, dict):
|
||||
# Separators without whitespace for compact format.
|
||||
return JSONEncoder(separators=(',', ':')).encode(x)
|
||||
else:
|
||||
return str(x)
|
||||
|
||||
if isinstance(value, (list, tuple)):
|
||||
try:
|
||||
return self._join_multivalued.join(str(x) for x in value)
|
||||
return self._join_multivalued.join(to_string(x) for x in value)
|
||||
except TypeError: # list in value may not contain strings
|
||||
pass
|
||||
return value
|
||||
|
||||
@@ -24,10 +24,11 @@ import logging
|
||||
from blockchainetl.atomic_counter import AtomicCounter
|
||||
from blockchainetl.exporters import CsvItemExporter, JsonLinesItemExporter
|
||||
from blockchainetl.file_utils import get_file_handle, close_silently
|
||||
from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter
|
||||
|
||||
|
||||
class CompositeItemExporter:
|
||||
def __init__(self, filename_mapping, field_mapping=None):
|
||||
def __init__(self, filename_mapping, field_mapping=None, converters=()):
|
||||
self.filename_mapping = filename_mapping
|
||||
self.field_mapping = field_mapping or {}
|
||||
|
||||
@@ -35,6 +36,8 @@ class CompositeItemExporter:
|
||||
self.exporter_mapping = {}
|
||||
self.counter_mapping = {}
|
||||
|
||||
self.converter = CompositeItemConverter(converters)
|
||||
|
||||
self.logger = logging.getLogger('CompositeItemExporter')
|
||||
|
||||
def open(self):
|
||||
@@ -62,7 +65,7 @@ class CompositeItemExporter:
|
||||
exporter = self.exporter_mapping.get(item_type)
|
||||
if exporter is None:
|
||||
raise ValueError('Exporter for item type {} not found'.format(item_type))
|
||||
exporter.export_item(item)
|
||||
exporter.export_item(self.converter.convert_item(item))
|
||||
|
||||
counter = self.counter_mapping.get(item_type)
|
||||
if counter is not None:
|
||||
|
||||
@@ -37,6 +37,9 @@ class CompositeItemConverter:
|
||||
self.converters = converters
|
||||
|
||||
def convert_item(self, item):
|
||||
if self.converters is None:
|
||||
return item
|
||||
|
||||
for converter in self.converters:
|
||||
item = converter.convert_item(item)
|
||||
return item
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
# MIT License
|
||||
#
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
|
||||
|
||||
from blockchainetl.jobs.exporters.converters.simple_item_converter import SimpleItemConverter
|
||||
|
||||
|
||||
class IntToStringItemConverter(SimpleItemConverter):
|
||||
|
||||
def __init__(self, keys=None):
|
||||
self.keys = set(keys) if keys else None
|
||||
|
||||
def convert_field(self, key, value):
|
||||
if isinstance(value, int) and (self.keys is None or key in self.keys):
|
||||
return str(value)
|
||||
else:
|
||||
return value
|
||||
@@ -30,11 +30,10 @@
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
|
||||
|
||||
class SimpleItemConverter:
|
||||
|
||||
def __init__(self, converters=()):
|
||||
self.converters = converters
|
||||
def __init__(self, field_converters=None):
|
||||
self.field_converters = field_converters
|
||||
|
||||
def convert_item(self, item):
|
||||
return {
|
||||
@@ -42,4 +41,7 @@ class SimpleItemConverter:
|
||||
}
|
||||
|
||||
def convert_field(self, key, value):
|
||||
return value
|
||||
if self.field_converters is not None and key in self.field_converters:
|
||||
return self.field_converters[key](value)
|
||||
else:
|
||||
return value
|
||||
|
||||
111
blockchainetl/jobs/exporters/gcs_item_exporter.py
Normal file
111
blockchainetl/jobs/exporters/gcs_item_exporter.py
Normal file
@@ -0,0 +1,111 @@
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2020 Evgeny Medvedev, evge.medvedev@gmail.com
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
|
||||
from google.cloud import storage
|
||||
|
||||
|
||||
def build_block_bundles(items):
|
||||
blocks = defaultdict(list)
|
||||
transactions = defaultdict(list)
|
||||
logs = defaultdict(list)
|
||||
token_transfers = defaultdict(list)
|
||||
traces = defaultdict(list)
|
||||
for item in items:
|
||||
item_type = item.get('type')
|
||||
if item_type == 'block':
|
||||
blocks[item.get('number')].append(item)
|
||||
elif item_type == 'transaction':
|
||||
transactions[item.get('block_number')].append(item)
|
||||
elif item_type == 'log':
|
||||
logs[item.get('block_number')].append(item)
|
||||
elif item_type == 'token_transfer':
|
||||
token_transfers[item.get('block_number')].append(item)
|
||||
elif item_type == 'trace':
|
||||
traces[item.get('block_number')].append(item)
|
||||
else:
|
||||
logging.info(f'Skipping item with type {item_type}')
|
||||
|
||||
block_bundles = []
|
||||
for block_number in sorted(blocks.keys()):
|
||||
if len(blocks[block_number]) != 1:
|
||||
raise ValueError(f'There must be a single block for a given block number, was {len(blocks[block_number])} for block number {block_number}')
|
||||
block_bundles.append({
|
||||
'block': blocks[block_number][0],
|
||||
'transactions': transactions[block_number],
|
||||
'logs': logs[block_number],
|
||||
'token_transfers': token_transfers[block_number],
|
||||
'traces': traces[block_number],
|
||||
})
|
||||
|
||||
return block_bundles
|
||||
|
||||
|
||||
class GcsItemExporter:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
bucket,
|
||||
path='blocks',
|
||||
build_block_bundles_func=build_block_bundles):
|
||||
self.bucket = bucket
|
||||
self.path = normalize_path(path)
|
||||
self.build_block_bundles_func = build_block_bundles_func
|
||||
self.storage_client = storage.Client()
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
|
||||
def export_items(self, items):
|
||||
block_bundles = self.build_block_bundles_func(items)
|
||||
|
||||
for block_bundle in block_bundles:
|
||||
block = block_bundle.get('block')
|
||||
if block is None:
|
||||
raise ValueError('block_bundle must include the block field')
|
||||
block_number = block.get('number')
|
||||
if block_number is None:
|
||||
raise ValueError('block_bundle must include the block.number field')
|
||||
|
||||
destination_blob_name = f'{self.path}/{block_number}.json'
|
||||
|
||||
bucket = self.storage_client.bucket(self.bucket)
|
||||
blob = bucket.blob(destination_blob_name)
|
||||
blob.upload_from_string(json.dumps(block_bundle))
|
||||
logging.info(f'Uploaded file gs://{self.bucket}/{destination_blob_name}')
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def normalize_path(p):
|
||||
if p is None:
|
||||
p = ''
|
||||
if p.startswith('/'):
|
||||
p = p[1:]
|
||||
if p.endswith('/'):
|
||||
p = p[:len(p) - 1]
|
||||
|
||||
return p
|
||||
@@ -29,9 +29,19 @@ from timeout_decorator import timeout_decorator
|
||||
|
||||
class GooglePubSubItemExporter:
|
||||
|
||||
def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id', 'item_timestamp')):
|
||||
def __init__(self, item_type_to_topic_mapping, message_attributes=(),
|
||||
batch_max_bytes=1024 * 5, batch_max_latency=1, batch_max_messages=1000,
|
||||
enable_message_ordering=False):
|
||||
self.item_type_to_topic_mapping = item_type_to_topic_mapping
|
||||
self.publisher = create_publisher()
|
||||
|
||||
self.batch_max_bytes = batch_max_bytes
|
||||
self.batch_max_latency = batch_max_latency
|
||||
self.batch_max_messages = batch_max_messages
|
||||
|
||||
self.enable_message_ordering = enable_message_ordering
|
||||
|
||||
self.publisher = self.create_publisher()
|
||||
|
||||
self.message_attributes = message_attributes
|
||||
|
||||
def open(self):
|
||||
@@ -46,7 +56,7 @@ class GooglePubSubItemExporter:
|
||||
# details = "channel is in state TRANSIENT_FAILURE"
|
||||
# https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606
|
||||
logging.info('Recreating Pub/Sub publisher.')
|
||||
self.publisher = create_publisher()
|
||||
self.publisher = self.create_publisher()
|
||||
raise e
|
||||
|
||||
@timeout_decorator.timeout(300)
|
||||
@@ -66,7 +76,8 @@ class GooglePubSubItemExporter:
|
||||
topic_path = self.item_type_to_topic_mapping.get(item_type)
|
||||
data = json.dumps(item).encode('utf-8')
|
||||
|
||||
message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item))
|
||||
ordering_key = 'all' if self.enable_message_ordering else ''
|
||||
message_future = self.publisher.publish(topic_path, data=data, ordering_key=ordering_key, **self.get_message_attributes(item))
|
||||
return message_future
|
||||
else:
|
||||
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
|
||||
@@ -80,15 +91,15 @@ class GooglePubSubItemExporter:
|
||||
|
||||
return attributes
|
||||
|
||||
def create_publisher(self):
|
||||
batch_settings = pubsub_v1.types.BatchSettings(
|
||||
max_bytes=self.batch_max_bytes,
|
||||
max_latency=self.batch_max_latency,
|
||||
max_messages=self.batch_max_messages,
|
||||
)
|
||||
|
||||
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=self.enable_message_ordering)
|
||||
return pubsub_v1.PublisherClient(batch_settings=batch_settings, publisher_options=publisher_options)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def create_publisher():
|
||||
batch_settings = pubsub_v1.types.BatchSettings(
|
||||
max_bytes=1024 * 5, # 5 kilobytes
|
||||
max_latency=1, # 1 second
|
||||
max_messages=1000,
|
||||
)
|
||||
|
||||
return pubsub_v1.PublisherClient(batch_settings)
|
||||
|
||||
54
blockchainetl/jobs/exporters/kafka_exporter.py
Normal file
54
blockchainetl/jobs/exporters/kafka_exporter.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import collections
|
||||
import json
|
||||
import logging
|
||||
|
||||
from kafka import KafkaProducer
|
||||
|
||||
from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter
|
||||
|
||||
|
||||
class KafkaItemExporter:
|
||||
|
||||
def __init__(self, output, item_type_to_topic_mapping, converters=()):
|
||||
self.item_type_to_topic_mapping = item_type_to_topic_mapping
|
||||
self.converter = CompositeItemConverter(converters)
|
||||
self.connection_url = self.get_connection_url(output)
|
||||
print(self.connection_url)
|
||||
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)
|
||||
|
||||
def get_connection_url(self, output):
|
||||
try:
|
||||
return output.split('/')[1]
|
||||
except KeyError:
|
||||
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"')
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
|
||||
def export_items(self, items):
|
||||
for item in items:
|
||||
self.export_item(item)
|
||||
|
||||
def export_item(self, item):
|
||||
item_type = item.get('type')
|
||||
if item_type is not None and item_type in self.item_type_to_topic_mapping:
|
||||
data = json.dumps(item).encode('utf-8')
|
||||
logging.debug(data)
|
||||
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
|
||||
else:
|
||||
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
|
||||
|
||||
def convert_items(self, items):
|
||||
for item in items:
|
||||
yield self.converter.convert_item(item)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def group_by_item_type(items):
|
||||
result = collections.defaultdict(list)
|
||||
for item in items:
|
||||
result[item.get('type')].append(item)
|
||||
|
||||
return result
|
||||
82
blockchainetl/jobs/exporters/kinesis_item_exporter.py
Normal file
82
blockchainetl/jobs/exporters/kinesis_item_exporter.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2022 CoinStats LLC
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
import json
|
||||
import typing as t
|
||||
import uuid
|
||||
from itertools import zip_longest
|
||||
|
||||
import boto3
|
||||
|
||||
_KINESIS_BATCH_LIMIT = 500
|
||||
|
||||
|
||||
def _uuid_partition_key(_: dict) -> str:
|
||||
return uuid.uuid4().hex
|
||||
|
||||
|
||||
class KinesisItemExporter:
|
||||
def __init__(
|
||||
self,
|
||||
stream_name: str,
|
||||
partition_key_callable: t.Callable[[dict], str] = _uuid_partition_key,
|
||||
):
|
||||
import boto3
|
||||
self._stream_name = stream_name
|
||||
self._partition_key_callable = partition_key_callable
|
||||
self._kinesis_client = None # initialized in .open
|
||||
|
||||
def open(self) -> None:
|
||||
self._kinesis_client = boto3.client('kinesis')
|
||||
|
||||
def export_items(self, items: t.Iterable[dict]) -> None:
|
||||
sentinel = object()
|
||||
chunks = zip_longest(
|
||||
*(iter(items),) * _KINESIS_BATCH_LIMIT,
|
||||
fillvalue=sentinel,
|
||||
)
|
||||
for chunk in chunks:
|
||||
self._kinesis_client.put_records(
|
||||
StreamName=self._stream_name,
|
||||
Records=[
|
||||
{
|
||||
'Data': _serialize_item(item),
|
||||
'PartitionKey': self._partition_key_callable(item),
|
||||
}
|
||||
for item in chunk
|
||||
if item is not sentinel
|
||||
],
|
||||
)
|
||||
|
||||
def export_item(self, item: dict) -> None:
|
||||
self._kinesis_client.put_record(
|
||||
StreamName=self._stream_name,
|
||||
Data=_serialize_item(item),
|
||||
PartitionKey=self._partition_key_callable(item),
|
||||
)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def _serialize_item(item: dict) -> bytes:
|
||||
return json.dumps(item).encode()
|
||||
42
blockchainetl/jobs/exporters/multi_item_exporter.py
Normal file
42
blockchainetl/jobs/exporters/multi_item_exporter.py
Normal file
@@ -0,0 +1,42 @@
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
|
||||
class MultiItemExporter:
|
||||
def __init__(self, item_exporters):
|
||||
self.item_exporters = item_exporters
|
||||
|
||||
def open(self):
|
||||
for exporter in self.item_exporters:
|
||||
exporter.open()
|
||||
|
||||
def export_items(self, items):
|
||||
for exporter in self.item_exporters:
|
||||
exporter.export_items(items)
|
||||
|
||||
def export_item(self, item):
|
||||
for exporter in self.item_exporters:
|
||||
exporter.export_item(item)
|
||||
|
||||
def close(self):
|
||||
for exporter in self.item_exporters:
|
||||
exporter.close()
|
||||
@@ -7,3 +7,5 @@ def logging_basic_config(filename=None):
|
||||
logging.basicConfig(level=logging.INFO, format=format, filename=filename)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO, format=format)
|
||||
|
||||
logging.getLogger('ethereum_dasm.evmdasm').setLevel(logging.ERROR)
|
||||
|
||||
@@ -56,7 +56,7 @@ Include `--tokens <token1> --tokens <token2>` to filter only certain tokens, e.g
|
||||
```bash
|
||||
> ethereumetl export_token_transfers --start-block 0 --end-block 500000 \
|
||||
--provider-uri file://$HOME/Library/Ethereum/geth.ipc --output token_transfers.csv \
|
||||
--tokens 0x86fa049857e0209aa7d9e616f7eb3b3b78ecfdb0 --tokens 0x06012c8cf97bead5deae237070f9587f8e7a266d
|
||||
--tokens 0x1F573D6Fb3F13d689FF844B4cE37794d79a7FF1C --tokens 0x80fB784B7eD66730e8b1DBd9820aFD29931aab03
|
||||
```
|
||||
|
||||
You can tune `--batch-size`, `--max-workers` for performance.
|
||||
@@ -165,7 +165,7 @@ You can tune `--batch-size`, `--max-workers` for performance.
|
||||
Read [Differences between geth and parity traces.csv](schema.md#differences-between-geth-and-parity-tracescsv)
|
||||
|
||||
The API used in this command is not supported by Infura,
|
||||
so you will need a local Geth archive node (`geth --gcmode archive --syncmode full --ipcapi debug`).
|
||||
so you will need a local Geth archive node (`geth --gcmode archive --syncmode full --txlookuplimit 0`).
|
||||
When using rpc, add `--rpc --rpcapi debug` options.
|
||||
|
||||
```bash
|
||||
@@ -207,11 +207,15 @@ You can tune `--batch-size`, `--max-workers` for performance.
|
||||
- This command outputs blocks, transactions, logs, token_transfers to the console by default.
|
||||
- Entity types can be specified with the `-e` option,
|
||||
e.g. `-e block,transaction,log,token_transfer,trace,contract,token`.
|
||||
- Use `--output` option to specify the Google Pub/Sub topic or Postgres database where to publish blockchain data,
|
||||
- Use `--output` option to specify the Google Pub/Sub topic, Postgres database or GCS bucket where to publish blockchain data,
|
||||
- For Google PubSub: `--output=projects/<your-project>/topics/crypto_ethereum`.
|
||||
Data will be pushed to `projects/<your-project>/topics/crypto_ethereum.blocks`, `projects/<your-project>/topics/crypto_ethereum.transactions` etc. topics.
|
||||
- For Postgres: `--output=postgresql+pg8000://<user>:<password>@<host>:<port>/<database_name>`,
|
||||
e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`.
|
||||
e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`.
|
||||
- For GCS: `--output=gs://<bucket_name>`. Make sure to install and initialize `gcloud` cli.
|
||||
- For Kafka: `--output=kafka/<host>:<port>`, e.g. `--output=kafka/127.0.0.1:9092`
|
||||
- Those output types can be combined with a comma e.g. `--output=gs://<bucket_name>,projects/<your-project>/topics/crypto_ethereum`
|
||||
|
||||
The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema)
|
||||
and [indexes](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/indexes) can be found in this
|
||||
repo [ethereum-etl-postgres](https://github.com/blockchain-etl/ethereum-etl-postgres).
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# Contact
|
||||
|
||||
- [D5 Discord Server](https://discord.gg/wukrezR)
|
||||
- [Telegram Group](https://t.me/joinchat/GsMpbA3mv1OJ6YMp3T5ORQ)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Uploading to Docker Hub
|
||||
|
||||
```bash
|
||||
ETHEREUMETL_VERSION=1.6.3
|
||||
ETHEREUMETL_VERSION=1.11.0
|
||||
docker build -t ethereum-etl:${ETHEREUMETL_VERSION} -f Dockerfile .
|
||||
docker tag ethereum-etl:${ETHEREUMETL_VERSION} blockchainetl/ethereum-etl:${ETHEREUMETL_VERSION}
|
||||
docker push blockchainetl/ethereum-etl:${ETHEREUMETL_VERSION}
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
## Exporting the Blockchain
|
||||
|
||||
If you'd like to have blockchain data set up and hosted for you, [get in touch with us at D5](https://d5.ai/?ref=ethereumetl).
|
||||
|
||||
1. Install python 3.5.3+ https://www.python.org/downloads/
|
||||
1. Install python 3.5.3+: [https://www.python.org/downloads/](https://www.python.org/downloads/)
|
||||
|
||||
1. You can use Infura if you don't need ERC20 transfers (Infura doesn't support eth_getFilterLogs JSON RPC method).
|
||||
For that use `-p https://mainnet.infura.io` option for the commands below. If you need ERC20 transfers or want to
|
||||
export the data ~40 times faster, you will need to set up a local Ethereum node:
|
||||
|
||||
1. Install geth https://github.com/ethereum/go-ethereum/wiki/Installing-Geth
|
||||
1. Install geth: [https://github.com/ethereum/go-ethereum/wiki/Installing-Geth](https://github.com/ethereum/go-ethereum/wiki/Installing-Geth)
|
||||
|
||||
1. Start geth.
|
||||
Make sure it downloaded the blocks that you need by executing `eth.syncing` in the JS console.
|
||||
You can export blocks below `currentBlock`,
|
||||
there is no need to wait until the full sync as the state is not needed (unless you also need contracts bytecode
|
||||
and token details; for those you need to wait until the full sync).
|
||||
|
||||
and token details; for those you need to wait until the full sync). Note that you may need to wait for another day or
|
||||
two for the node to download the states. See this issue [https://github.com/blockchain-etl/ethereum-etl/issues/265#issuecomment-970451522](https://github.com/blockchain-etl/ethereum-etl/issues/265#issuecomment-970451522).
|
||||
Make sure to set `--txlookuplimit 0` if you use geth.
|
||||
|
||||
1. Install Ethereum ETL: `> pip3 install ethereum-etl`
|
||||
|
||||
1. Export all:
|
||||
@@ -40,7 +40,7 @@ output/token_transfers/start_block=00000000/end_block=00099999/token_transfers_0
|
||||
|
||||
Should work with geth and parity, on Linux, Mac, Windows.
|
||||
If you use Parity you should disable warp mode with `--no-warp` option because warp mode
|
||||
does not place all of the block or receipt data into the database https://wiki.parity.io/Getting-Synced
|
||||
does not place all of the block or receipt data into the database [https://wiki.parity.io/Getting-Synced](https://wiki.parity.io/Getting-Synced)
|
||||
|
||||
If you see weird behavior, e.g. wrong number of rows in the CSV files or corrupted files,
|
||||
check out this issue: https://github.com/medvedev1088/ethereum-etl/issues/28
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Google BiqQuery
|
||||
# Google BigQuery
|
||||
|
||||
## Querying in BigQuery
|
||||
|
||||
@@ -16,4 +16,4 @@ Read [this article](https://medium.com/google-cloud/building-token-recommender-i
|
||||
|
||||
### Awesome BigQuery Views
|
||||
|
||||
https://github.com/blockchain-etl/awesome-bigquery-views
|
||||
[https://github.com/blockchain-etl/awesome-bigquery-views](https://github.com/blockchain-etl/awesome-bigquery-views)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
Ethereum ETL lets you convert blockchain data into convenient formats like CSVs and relational databases.
|
||||
|
||||
With 700+ likes on Github, Ethereum ETL is the most popular open source project for Ethereum data.
|
||||
With 1,700+ likes on GitHub, Ethereum ETL is the most popular open-source project for Ethereum data.
|
||||
|
||||
Data is available for you to query right away in [Google BigQuery](https://goo.gl/oY5BCQ).
|
||||
|
||||
@@ -17,8 +17,31 @@ Easily export:
|
||||
* Receipts
|
||||
* Logs
|
||||
* Contracts
|
||||
* Internal transactions
|
||||
* Internal transactions (traces)
|
||||
|
||||
## Advanced Features
|
||||
|
||||
* Stream blockchain data to Pub/Sub, Postgres, or other destinations in real-time
|
||||
* Filter and transform data using flexible command-line options
|
||||
* Support for multiple Ethereum node providers (Geth, Parity, Infura, etc.)
|
||||
* Handles chain reorganizations through configurable lag
|
||||
* Export data by block range or by date
|
||||
* Scalable architecture with configurable batch sizes and worker counts
|
||||
|
||||
## Use Cases
|
||||
|
||||
* Data analysis and visualization
|
||||
* Machine learning on blockchain data
|
||||
* Building analytics dashboards
|
||||
* Market research and token analysis
|
||||
* Compliance and audit reporting
|
||||
* Academic research on blockchain economics
|
||||
|
||||
## Projects using Ethereum ETL
|
||||
* [Google](https://goo.gl/oY5BCQ) - Public BigQuery Ethereum datasets
|
||||
* [Nansen](https://nansen.ai/?ref=ethereumetl) - Analytics platform for Ethereum
|
||||
* [Nansen](https://nansen.ai/query?ref=ethereumetl) - Analytics platform for Ethereum
|
||||
* [Ethereum Blockchain ETL on GCP](https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-public-dataset-smart-contract-analytics) - Official Google Cloud reference architecture
|
||||
|
||||
## Getting Started
|
||||
|
||||
Check the [Quickstart](quickstart.md) guide to begin using Ethereum ETL or explore the [Commands](commands.md) page for detailed usage instructions.
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
which means `is_erc20` and `is_erc721` will always be false for proxy contracts and they will be missing in the `tokens`
|
||||
table.
|
||||
- The metadata methods (`symbol`, `name`, `decimals`, `total_supply`) for ERC20 are optional, so around 10% of the
|
||||
contracts are missing this data. Also some contracts (EOS) implement these methods but with wrong return type,
|
||||
contracts are missing this data. Also some contracts (EOS) implement these methods but with the wrong return type,
|
||||
so the metadata columns are missing in this case as well.
|
||||
- `token_transfers.value`, `tokens.decimals` and `tokens.total_supply` have type `STRING` in BigQuery tables,
|
||||
because numeric types there can't handle 32-byte integers. You should use
|
||||
@@ -12,4 +12,4 @@ because numeric types there can't handle 32-byte integers. You should use
|
||||
`safe_cast(value as NUMERIC)` (possible overflow) to convert to numbers.
|
||||
- The contracts that don't implement `decimals()` function but have the
|
||||
[fallback function](https://solidity.readthedocs.io/en/v0.4.21/contracts.html#fallback-function) that returns a `boolean`
|
||||
will have `0` or `1` in the `decimals` column in the CSVs.
|
||||
will have `0` or `1` in the `decimals` column in the CSVs.
|
||||
|
||||
@@ -7,5 +7,4 @@
|
||||
- [Introducing six new cryptocurrencies in BigQuery Public Datasets—and how to analyze them](https://cloud.google.com/blog/products/data-analytics/introducing-six-new-cryptocurrencies-in-bigquery-public-datasets-and-how-to-analyze-them)
|
||||
- [Querying the Ethereum Blockchain in Snowflake](https://community.snowflake.com/s/article/Querying-the-Ethereum-Blockchain-in-Snowflake)
|
||||
- [ConsenSys Grants funds third cohort of projects to benefit the Ethereum ecosystem](https://www.cryptoninjas.net/2020/02/17/consensys-grants-funds-third-cohort-of-projects-to-benefit-the-ethereum-ecosystem/)
|
||||
- [Ivan on Tech overviews crypto datasets in BigQuery](https://youtu.be/2IkJBNhsXNY?t=239)
|
||||
- [Unlocking the Power of Google BigQuery (Cloud Next '19)](https://youtu.be/KL_i5XZIaJg?t=131)
|
||||
|
||||
@@ -22,6 +22,11 @@ gas_limit | bigint |
|
||||
gas_used | bigint |
|
||||
timestamp | bigint |
|
||||
transaction_count | bigint |
|
||||
base_fee_per_gas | bigint |
|
||||
withdrawals_root | string |
|
||||
withdrawals | string |
|
||||
blob_gas_used | bigint |
|
||||
excess_blob_gas | bigint |
|
||||
|
||||
---
|
||||
|
||||
@@ -41,6 +46,11 @@ gas | bigint |
|
||||
gas_price | bigint |
|
||||
input | hex_string |
|
||||
block_timestamp | bigint |
|
||||
max_fee_per_gas | bigint |
|
||||
max_priority_fee_per_gas | bigint |
|
||||
transaction_type | bigint |
|
||||
max_fee_per_blob_gas | bigint |
|
||||
blob_versioned_hashes | string |
|
||||
|
||||
---
|
||||
|
||||
@@ -71,6 +81,9 @@ gas_used | bigint |
|
||||
contract_address | address |
|
||||
root | hex_string |
|
||||
status | bigint |
|
||||
effective_gas_price | bigint |
|
||||
blob_gas_price | bigint |
|
||||
blob_gas_used | bigint |
|
||||
|
||||
---
|
||||
|
||||
@@ -111,6 +124,7 @@ symbol | string |
|
||||
name | string |
|
||||
decimals | bigint |
|
||||
total_supply | numeric |
|
||||
block_number | bigint |
|
||||
|
||||
---
|
||||
|
||||
@@ -139,7 +153,7 @@ trace_id | string |
|
||||
|
||||
### Differences between geth and parity traces.csv
|
||||
|
||||
- `to_address` field differs for `callcode` trace (geth seems to return correct value, as parity value of `to_address` is same as `to_address` of parent call);
|
||||
- `to_address` field differs for `callcode` trace (geth seems to return correct value, as parity value of `to_address` is the same as `to_address` of parent call);
|
||||
- geth output doesn't have `reward` traces;
|
||||
- geth output doesn't have `to_address`, `from_address`, `value` for `suicide` traces;
|
||||
- `error` field contains human readable error message, which might differ in geth/parity output;
|
||||
@@ -150,4 +164,4 @@ trace_id | string |
|
||||
You can find column descriptions in [https://github.com/medvedev1088/ethereum-etl-airflow](https://github.com/medvedev1088/ethereum-etl-airflow/tree/master/dags/resources/stages/raw/schemas)
|
||||
|
||||
Note: for the `address` type all hex characters are lower-cased.
|
||||
`boolean` type can have 2 values: `True` or `False`.
|
||||
`boolean` type can have 2 values: `True` or `False`.
|
||||
|
||||
@@ -19,6 +19,10 @@
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
logging_basic_config()
|
||||
|
||||
import click
|
||||
|
||||
from ethereumetl.cli.export_all import export_all
|
||||
@@ -44,7 +48,7 @@ from ethereumetl.cli.stream import stream
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.version_option(version='1.6.3')
|
||||
@click.version_option(version='2.4.2')
|
||||
@click.pass_context
|
||||
def cli(ctx):
|
||||
pass
|
||||
|
||||
@@ -27,7 +27,7 @@ import re
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
from ethereumetl.jobs.export_all_common import export_all_common
|
||||
from ethereumetl.providers.auto import get_provider_from_uri
|
||||
@@ -74,7 +74,7 @@ def get_partitions(start, end, partition_batch_size, provider_uri):
|
||||
day = timedelta(days=1)
|
||||
|
||||
provider = get_provider_from_uri(provider_uri)
|
||||
web3 = Web3(provider)
|
||||
web3 = build_web3(provider)
|
||||
eth_service = EthService(web3)
|
||||
|
||||
while start_date <= end_date:
|
||||
|
||||
@@ -36,7 +36,7 @@ logging_basic_config()
|
||||
|
||||
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
|
||||
@click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of blocks to filter at a time.')
|
||||
@click.option('-c', '--contract-addresses', required=True, type=str,
|
||||
@click.option('-ca', '--contract-addresses', required=True, type=str,
|
||||
help='The file containing contract addresses, one per line.')
|
||||
@click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.')
|
||||
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.')
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
import click
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
from ethereumetl.jobs.export_origin_job import ExportOriginJob
|
||||
@@ -48,7 +48,7 @@ def export_origin(start_block, end_block, batch_size, marketplace_output, shop_o
|
||||
start_block=start_block,
|
||||
end_block=end_block,
|
||||
batch_size=batch_size,
|
||||
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri))),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))),
|
||||
ipfs_client=get_origin_ipfs_client(),
|
||||
marketplace_listing_exporter=origin_marketplace_listing_item_exporter(marketplace_output),
|
||||
shop_product_exporter=origin_shop_product_item_exporter(shop_output),
|
||||
|
||||
@@ -46,7 +46,7 @@ logging_basic_config()
|
||||
help='The output file for receipts. If not provided receipts will not be exported. Use "-" for stdout')
|
||||
@click.option('--logs-output', default=None, show_default=True, type=str,
|
||||
help='The output file for receipt logs. '
|
||||
'aIf not provided receipt logs will not be exported. Use "-" for stdout')
|
||||
'If not provided receipt logs will not be exported. Use "-" for stdout')
|
||||
@click.option('-c', '--chain', default='ethereum', show_default=True, type=str, help='The chain network to connect to.')
|
||||
def export_receipts_and_logs(batch_size, transaction_hashes, provider_uri, max_workers, receipts_output, logs_output,
|
||||
chain='ethereum'):
|
||||
|
||||
@@ -23,8 +23,9 @@
|
||||
|
||||
import click
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
from ethereumetl.csv_utils import set_max_field_size_limit
|
||||
from ethereumetl.jobs.export_token_transfers_job import ExportTokenTransfersJob
|
||||
from ethereumetl.jobs.exporters.token_transfers_item_exporter import token_transfers_item_exporter
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
@@ -45,11 +46,12 @@ logging_basic_config()
|
||||
@click.option('-t', '--tokens', default=None, show_default=True, type=str, multiple=True, help='The list of token addresses to filter by.')
|
||||
def export_token_transfers(start_block, end_block, batch_size, output, max_workers, provider_uri, tokens):
|
||||
"""Exports ERC20/ERC721 transfers."""
|
||||
set_max_field_size_limit()
|
||||
job = ExportTokenTransfersJob(
|
||||
start_block=start_block,
|
||||
end_block=end_block,
|
||||
batch_size=batch_size,
|
||||
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri))),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))),
|
||||
item_exporter=token_transfers_item_exporter(output),
|
||||
max_workers=max_workers,
|
||||
tokens=tokens)
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
import click
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
from blockchainetl.file_utils import smart_open
|
||||
from ethereumetl.jobs.export_tokens_job import ExportTokensJob
|
||||
@@ -51,7 +51,7 @@ def export_tokens(token_addresses, output, max_workers, provider_uri, chain='eth
|
||||
with smart_open(token_addresses, 'r') as token_addresses_file:
|
||||
job = ExportTokensJob(
|
||||
token_addresses_iterable=(token_address.strip() for token_address in token_addresses_file),
|
||||
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri))),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))),
|
||||
item_exporter=tokens_item_exporter(output),
|
||||
max_workers=max_workers)
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
import click
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
from ethereumetl.jobs.export_traces_job import ExportTracesJob
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
@@ -57,7 +57,7 @@ def export_traces(start_block, end_block, batch_size, output, max_workers, provi
|
||||
start_block=start_block,
|
||||
end_block=end_block,
|
||||
batch_size=batch_size,
|
||||
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri, timeout=timeout))),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri, timeout=timeout))),
|
||||
item_exporter=traces_item_exporter(output),
|
||||
max_workers=max_workers,
|
||||
include_genesis_traces=genesis_traces,
|
||||
|
||||
@@ -25,30 +25,35 @@ import click
|
||||
import csv
|
||||
import json
|
||||
|
||||
from ethereumetl.csv_utils import set_max_field_size_limit
|
||||
from blockchainetl.file_utils import smart_open
|
||||
from blockchainetl.jobs.exporters.converters.int_to_string_item_converter import IntToStringItemConverter
|
||||
from ethereumetl.jobs.exporters.token_transfers_item_exporter import token_transfers_item_exporter
|
||||
from ethereumetl.jobs.extract_token_transfers_job import ExtractTokenTransfersJob
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
|
||||
logging_basic_config()
|
||||
|
||||
set_max_field_size_limit()
|
||||
|
||||
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
|
||||
@click.option('-l', '--logs', type=str, required=True, help='The CSV file containing receipt logs.')
|
||||
@click.option('-b', '--batch-size', default=100, show_default=True, type=int, help='The number of blocks to filter at a time.')
|
||||
@click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.')
|
||||
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.')
|
||||
def extract_token_transfers(logs, batch_size, output, max_workers):
|
||||
@click.option('--values-as-strings', default=False, show_default=True, is_flag=True, help='Whether to convert values to strings.')
|
||||
def extract_token_transfers(logs, batch_size, output, max_workers, values_as_strings=False):
|
||||
"""Extracts ERC20/ERC721 transfers from logs file."""
|
||||
with smart_open(logs, 'r') as logs_file:
|
||||
if logs.endswith('.json'):
|
||||
logs_reader = (json.loads(line) for line in logs_file)
|
||||
else:
|
||||
logs_reader = csv.DictReader(logs_file)
|
||||
converters = [IntToStringItemConverter(keys=['value'])] if values_as_strings else []
|
||||
job = ExtractTokenTransfersJob(
|
||||
logs_iterable=logs_reader,
|
||||
batch_size=batch_size,
|
||||
max_workers=max_workers,
|
||||
item_exporter=token_transfers_item_exporter(output))
|
||||
item_exporter=token_transfers_item_exporter(output, converters=converters))
|
||||
|
||||
job.run()
|
||||
|
||||
@@ -27,12 +27,13 @@ import json
|
||||
import click
|
||||
from blockchainetl.csv_utils import set_max_field_size_limit
|
||||
from blockchainetl.file_utils import smart_open
|
||||
from blockchainetl.jobs.exporters.converters.int_to_string_item_converter import IntToStringItemConverter
|
||||
from ethereumetl.jobs.exporters.tokens_item_exporter import tokens_item_exporter
|
||||
from ethereumetl.jobs.extract_tokens_job import ExtractTokensJob
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
from ethereumetl.providers.auto import get_provider_from_uri
|
||||
from ethereumetl.thread_local_proxy import ThreadLocalProxy
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
logging_basic_config()
|
||||
|
||||
@@ -44,7 +45,8 @@ logging_basic_config()
|
||||
'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io')
|
||||
@click.option('-o', '--output', default='-', show_default=True, type=str, help='The output file. If not specified stdout is used.')
|
||||
@click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The maximum number of workers.')
|
||||
def extract_tokens(contracts, provider_uri, output, max_workers):
|
||||
@click.option('--values-as-strings', default=False, show_default=True, is_flag=True, help='Whether to convert values to strings.')
|
||||
def extract_tokens(contracts, provider_uri, output, max_workers, values_as_strings=False):
|
||||
"""Extracts tokens from contracts file."""
|
||||
|
||||
set_max_field_size_limit()
|
||||
@@ -54,10 +56,11 @@ def extract_tokens(contracts, provider_uri, output, max_workers):
|
||||
contracts_iterable = (json.loads(line) for line in contracts_file)
|
||||
else:
|
||||
contracts_iterable = csv.DictReader(contracts_file)
|
||||
converters = [IntToStringItemConverter(keys=['decimals', 'total_supply'])] if values_as_strings else []
|
||||
job = ExtractTokensJob(
|
||||
contracts_iterable=contracts_iterable,
|
||||
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri))),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))),
|
||||
max_workers=max_workers,
|
||||
item_exporter=tokens_item_exporter(output))
|
||||
item_exporter=tokens_item_exporter(output, converters))
|
||||
|
||||
job.run()
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
import click
|
||||
|
||||
from datetime import datetime
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
from blockchainetl.file_utils import smart_open
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
@@ -47,7 +47,7 @@ def get_block_range_for_date(provider_uri, date, output, chain='ethereum'):
|
||||
"""Outputs start and end blocks for given date."""
|
||||
provider_uri = check_classic_provider_uri(chain, provider_uri)
|
||||
provider = get_provider_from_uri(provider_uri)
|
||||
web3 = Web3(provider)
|
||||
web3 = build_web3(provider)
|
||||
eth_service = EthService(web3)
|
||||
|
||||
start_block, end_block = eth_service.get_block_range_for_date(date)
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
import click
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
from blockchainetl.file_utils import smart_open
|
||||
from blockchainetl.logging_utils import logging_basic_config
|
||||
@@ -46,7 +46,7 @@ def get_block_range_for_timestamps(provider_uri, start_timestamp, end_timestamp,
|
||||
"""Outputs start and end blocks for given timestamps."""
|
||||
provider_uri = check_classic_provider_uri(chain, provider_uri)
|
||||
provider = get_provider_from_uri(provider_uri)
|
||||
web3 = Web3(provider)
|
||||
web3 = build_web3(provider)
|
||||
eth_service = EthService(web3)
|
||||
|
||||
start_block, end_block = eth_service.get_block_range_for_timestamps(start_timestamp, end_timestamp)
|
||||
|
||||
@@ -27,6 +27,7 @@ from blockchainetl.streaming.streaming_utils import configure_signals, configure
|
||||
from ethereumetl.enumeration.entity_type import EntityType
|
||||
|
||||
from ethereumetl.providers.auto import get_provider_from_uri
|
||||
from ethereumetl.streaming.item_exporter_creator import create_item_exporters
|
||||
from ethereumetl.thread_local_proxy import ThreadLocalProxy
|
||||
|
||||
|
||||
@@ -38,7 +39,10 @@ from ethereumetl.thread_local_proxy import ThreadLocalProxy
|
||||
'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io')
|
||||
@click.option('-o', '--output', type=str,
|
||||
help='Either Google PubSub topic path e.g. projects/your-project/topics/crypto_ethereum; '
|
||||
'or Postgres connection url e.g. postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum. '
|
||||
'or Postgres connection url e.g. postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum; '
|
||||
'or GCS bucket e.g. gs://your-bucket-name; '
|
||||
'or kafka, output name and connection host:port e.g. kafka/127.0.0.1:9092 '
|
||||
'or Kinesis, e.g. kinesis://your-data-stream-name'
|
||||
'If not specified will print to console')
|
||||
@click.option('-s', '--start-block', default=None, show_default=True, type=int, help='Start block')
|
||||
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), show_default=True, type=str,
|
||||
@@ -55,9 +59,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
|
||||
configure_logging(log_file)
|
||||
configure_signals()
|
||||
entity_types = parse_entity_types(entity_types)
|
||||
validate_entity_types(entity_types, output)
|
||||
|
||||
from ethereumetl.streaming.item_exporter_creator import create_item_exporter
|
||||
from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
|
||||
from blockchainetl.streaming.streamer import Streamer
|
||||
|
||||
@@ -67,7 +69,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit
|
||||
|
||||
streamer_adapter = EthStreamerAdapter(
|
||||
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
|
||||
item_exporter=create_item_exporter(output),
|
||||
item_exporter=create_item_exporters(output),
|
||||
batch_size=batch_size,
|
||||
max_workers=max_workers,
|
||||
entity_types=entity_types
|
||||
@@ -97,14 +99,6 @@ def parse_entity_types(entity_types):
|
||||
return entity_types
|
||||
|
||||
|
||||
def validate_entity_types(entity_types, output):
|
||||
from ethereumetl.streaming.item_exporter_creator import determine_item_exporter_type, ItemExporterType
|
||||
item_exporter_type = determine_item_exporter_type(output)
|
||||
if item_exporter_type == ItemExporterType.POSTGRES \
|
||||
and (EntityType.CONTRACT in entity_types or EntityType.TOKEN in entity_types):
|
||||
raise ValueError('contract and token are not yet supported entity types for postgres item exporter.')
|
||||
|
||||
|
||||
def pick_random_provider_uri(provider_uri):
|
||||
provider_uris = [uri.strip() for uri in provider_uri.split(',')]
|
||||
return random.choice(provider_uris)
|
||||
|
||||
@@ -40,6 +40,12 @@ class EthBlock(object):
|
||||
self.gas_limit = None
|
||||
self.gas_used = None
|
||||
self.timestamp = None
|
||||
self.withdrawals_root = None
|
||||
|
||||
self.transactions = []
|
||||
self.transaction_count = 0
|
||||
self.base_fee_per_gas = 0
|
||||
self.withdrawals = []
|
||||
|
||||
self.blob_gas_used = None
|
||||
self.excess_blob_gas = None
|
||||
|
||||
@@ -33,3 +33,10 @@ class EthReceipt(object):
|
||||
self.logs = []
|
||||
self.root = None
|
||||
self.status = None
|
||||
self.effective_gas_price = None
|
||||
self.l1_fee = None
|
||||
self.l1_gas_used = None
|
||||
self.l1_gas_price = None
|
||||
self.l1_fee_scalar = None
|
||||
self.blob_gas_price = None
|
||||
self.blob_gas_used = None
|
||||
|
||||
@@ -41,3 +41,4 @@ class EthTrace(object):
|
||||
self.error = None
|
||||
self.status = None
|
||||
self.trace_id = None
|
||||
self.trace_index = None
|
||||
|
||||
@@ -34,3 +34,8 @@ class EthTransaction(object):
|
||||
self.gas = None
|
||||
self.gas_price = None
|
||||
self.input = None
|
||||
self.max_fee_per_gas = None
|
||||
self.max_priority_fee_per_gas = None
|
||||
self.transaction_type = None
|
||||
self.max_fee_per_blob_gas = None
|
||||
self.blob_versioned_hashes = []
|
||||
|
||||
@@ -284,3 +284,64 @@ ERC20_ABI = json.loads('''
|
||||
}
|
||||
]
|
||||
''')
|
||||
|
||||
ERC20_ABI_ALTERNATIVE_1 = json.loads('''
|
||||
[
|
||||
{
|
||||
"constant": true,
|
||||
"inputs": [],
|
||||
"name": "symbol",
|
||||
"outputs": [
|
||||
{
|
||||
"name": "",
|
||||
"type": "bytes32"
|
||||
}
|
||||
],
|
||||
"payable": false,
|
||||
"stateMutability": "view",
|
||||
"type": "function"
|
||||
},
|
||||
{
|
||||
"constant": true,
|
||||
"inputs": [],
|
||||
"name": "SYMBOL",
|
||||
"outputs": [
|
||||
{
|
||||
"name": "",
|
||||
"type": "bytes32"
|
||||
}
|
||||
],
|
||||
"payable": false,
|
||||
"stateMutability": "view",
|
||||
"type": "function"
|
||||
},
|
||||
{
|
||||
"constant": true,
|
||||
"inputs": [],
|
||||
"name": "name",
|
||||
"outputs": [
|
||||
{
|
||||
"name": "",
|
||||
"type": "bytes32"
|
||||
}
|
||||
],
|
||||
"payable": false,
|
||||
"stateMutability": "view",
|
||||
"type": "function"
|
||||
},
|
||||
{
|
||||
"constant": true,
|
||||
"inputs": [],
|
||||
"name": "NAME",
|
||||
"outputs": [
|
||||
{
|
||||
"name": "",
|
||||
"type": "bytes32"
|
||||
}
|
||||
],
|
||||
"payable": false,
|
||||
"stateMutability": "view",
|
||||
"type": "function"
|
||||
}
|
||||
]
|
||||
''')
|
||||
@@ -24,7 +24,7 @@ import logging
|
||||
import time
|
||||
|
||||
from requests.exceptions import Timeout as RequestsTimeout, HTTPError, TooManyRedirects
|
||||
from web3.utils.threads import Timeout as Web3Timeout
|
||||
from web3._utils.threads import Timeout as Web3Timeout
|
||||
|
||||
from ethereumetl.executors.bounded_executor import BoundedExecutor
|
||||
from ethereumetl.executors.fail_safe_executor import FailSafeExecutor
|
||||
|
||||
@@ -44,7 +44,7 @@ class BaseItemExporter(object):
|
||||
self._configure(kwargs)
|
||||
|
||||
def _configure(self, options, dont_fail=False):
|
||||
"""Configure the exporter by poping options from the ``options`` dict.
|
||||
"""Configure the exporter by popping options from the ``options`` dict.
|
||||
If dont_fail is set, it won't raise an exception on unexpected options
|
||||
(useful for using with keyword arguments in subclasses constructors)
|
||||
"""
|
||||
|
||||
@@ -6,7 +6,7 @@ from ethereumetl.ipfs.client import IpfsClient
|
||||
|
||||
logger = logging.getLogger('origin')
|
||||
|
||||
IPFS_PRIMARY_GATEWAY_URL = 'https://ipfs-prod.ogn.app/ipfs'
|
||||
IPFS_PRIMARY_GATEWAY_URL = 'https://cf-ipfs.com/ipfs'
|
||||
IPFS_SECONDARY_GATEWAY_URL = 'https://gateway.ipfs.io/ipfs'
|
||||
|
||||
# Returns an IPFS client that can be used to fetch Origin Protocol's data.
|
||||
@@ -15,7 +15,7 @@ def get_origin_ipfs_client():
|
||||
|
||||
|
||||
# Parses the shop's HTML index page to extract the name of the IPFS directory under
|
||||
# which all the shops data is located.
|
||||
# which all the shop data is located.
|
||||
def _get_shop_data_dir(shop_index_page):
|
||||
match = re.search('<link rel="data-dir" href="(.+?)"', shop_index_page)
|
||||
return match.group(1) if match else None
|
||||
|
||||
@@ -41,7 +41,7 @@ from ethereumetl.jobs.exporters.token_transfers_item_exporter import token_trans
|
||||
from ethereumetl.jobs.exporters.tokens_item_exporter import tokens_item_exporter
|
||||
from ethereumetl.providers.auto import get_provider_from_uri
|
||||
from ethereumetl.thread_local_proxy import ThreadLocalProxy
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
logger = logging.getLogger('export_all')
|
||||
|
||||
@@ -146,7 +146,7 @@ def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_s
|
||||
start_block=batch_start_block,
|
||||
end_block=batch_end_block,
|
||||
batch_size=batch_size,
|
||||
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri))),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))),
|
||||
item_exporter=token_transfers_item_exporter(token_transfers_file),
|
||||
max_workers=max_workers)
|
||||
job.run()
|
||||
@@ -272,7 +272,7 @@ def export_all_common(partitions, output_dir, provider_uri, max_workers, batch_s
|
||||
with smart_open(token_addresses_file, 'r') as token_addresses:
|
||||
job = ExportTokensJob(
|
||||
token_addresses_iterable=(token_address.strip() for token_address in token_addresses),
|
||||
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri))),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(get_provider_from_uri(provider_uri))),
|
||||
item_exporter=tokens_item_exporter(tokens_file),
|
||||
max_workers=max_workers)
|
||||
job.run()
|
||||
|
||||
@@ -43,6 +43,7 @@ class ExportOriginJob(BaseJob):
|
||||
self.receipt_log_mapper = EthReceiptLogMapper()
|
||||
self.marketplace_listing_mapper = OriginMarketplaceListingMapper()
|
||||
self.shop_listing_mapper = OriginShopProductMapper()
|
||||
self._supports_eth_newFilter = True
|
||||
|
||||
|
||||
def _start(self):
|
||||
@@ -94,14 +95,24 @@ class ExportOriginJob(BaseJob):
|
||||
})
|
||||
|
||||
for batch in batches:
|
||||
# https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
|
||||
# https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_getfilterlogs
|
||||
filter_params = {
|
||||
'address': batch['contract_address'],
|
||||
'fromBlock': batch['from_block'],
|
||||
'toBlock': batch['to_block']
|
||||
}
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
if self._supports_eth_newFilter:
|
||||
try:
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
except ValueError as e:
|
||||
if str(e) == "{'code': -32000, 'message': 'the method is currently not implemented: eth_newFilter'}":
|
||||
self._supports_eth_newFilter = False
|
||||
events = self.web3.eth.getLogs(filter_params)
|
||||
else:
|
||||
raise(e)
|
||||
else:
|
||||
events = self.web3.eth.getLogs(filter_params)
|
||||
for event in events:
|
||||
log = self.receipt_log_mapper.web3_dict_to_receipt_log(event)
|
||||
listing, shop_products = self.event_extractor.extract_event_from_log(log, batch['contract_version'])
|
||||
@@ -112,9 +123,10 @@ class ExportOriginJob(BaseJob):
|
||||
item = self.shop_listing_mapper.product_to_dict(product)
|
||||
self.shop_product_exporter.export_item(item)
|
||||
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
if self._supports_eth_newFilter:
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
|
||||
def _end(self):
|
||||
self.batch_work_executor.shutdown()
|
||||
self.marketplace_listing_exporter.close()
|
||||
self.shop_product_exporter.close()
|
||||
self.shop_product_exporter.close()
|
||||
|
||||
@@ -51,6 +51,7 @@ class ExportTokenTransfersJob(BaseJob):
|
||||
self.receipt_log_mapper = EthReceiptLogMapper()
|
||||
self.token_transfer_mapper = EthTokenTransferMapper()
|
||||
self.token_transfer_extractor = EthTokenTransferExtractor()
|
||||
self._supports_eth_newFilter = True
|
||||
|
||||
def _start(self):
|
||||
self.item_exporter.open()
|
||||
@@ -64,7 +65,7 @@ class ExportTokenTransfersJob(BaseJob):
|
||||
|
||||
def _export_batch(self, block_number_batch):
|
||||
assert len(block_number_batch) > 0
|
||||
# https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
|
||||
# https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_getfilterlogs
|
||||
filter_params = {
|
||||
'fromBlock': block_number_batch[0],
|
||||
'toBlock': block_number_batch[-1],
|
||||
@@ -74,15 +75,23 @@ class ExportTokenTransfersJob(BaseJob):
|
||||
if self.tokens is not None and len(self.tokens) > 0:
|
||||
filter_params['address'] = self.tokens
|
||||
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
try:
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
except ValueError as e:
|
||||
if str(e) == "{'code': -32000, 'message': 'the method is currently not implemented: eth_newFilter'}":
|
||||
self._supports_eth_newFilter = False
|
||||
events = self.web3.eth.getLogs(filter_params)
|
||||
else:
|
||||
raise(e)
|
||||
for event in events:
|
||||
log = self.receipt_log_mapper.web3_dict_to_receipt_log(event)
|
||||
token_transfer = self.token_transfer_extractor.extract_transfer_from_log(log)
|
||||
if token_transfer is not None:
|
||||
self.item_exporter.export_item(self.token_transfer_mapper.token_transfer_to_dict(token_transfer))
|
||||
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
if self._supports_eth_newFilter:
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
|
||||
def _end(self):
|
||||
self.batch_work_executor.shutdown()
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
import logging
|
||||
|
||||
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
|
||||
from blockchainetl.jobs.base_job import BaseJob
|
||||
@@ -89,13 +90,14 @@ class ExportTracesJob(BaseJob):
|
||||
json_traces = self.web3.parity.traceBlock(block_number)
|
||||
|
||||
if json_traces is None:
|
||||
raise ValueError('Response from the node is None. Is the node fully synced?')
|
||||
raise ValueError('Response from the node is None. Is the node fully synced? Is the node started with tracing enabled? Is trace_block API enabled?')
|
||||
|
||||
traces = [self.trace_mapper.json_dict_to_trace(json_trace) for json_trace in json_traces]
|
||||
all_traces.extend(traces)
|
||||
|
||||
calculate_trace_statuses(all_traces)
|
||||
calculate_trace_ids(all_traces)
|
||||
calculate_trace_indexes(all_traces)
|
||||
|
||||
for trace in all_traces:
|
||||
self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace))
|
||||
@@ -103,3 +105,9 @@ class ExportTracesJob(BaseJob):
|
||||
def _end(self):
|
||||
self.batch_work_executor.shutdown()
|
||||
self.item_exporter.close()
|
||||
|
||||
|
||||
def calculate_trace_indexes(traces):
|
||||
# Only works if traces were originally ordered correctly which is the case for Parity traces
|
||||
for ind, trace in enumerate(traces):
|
||||
trace.trace_index = ind
|
||||
|
||||
@@ -41,7 +41,12 @@ BLOCK_FIELDS_TO_EXPORT = [
|
||||
'gas_limit',
|
||||
'gas_used',
|
||||
'timestamp',
|
||||
'transaction_count'
|
||||
'transaction_count',
|
||||
'base_fee_per_gas',
|
||||
'withdrawals_root',
|
||||
'withdrawals',
|
||||
'blob_gas_used',
|
||||
'excess_blob_gas'
|
||||
]
|
||||
|
||||
TRANSACTION_FIELDS_TO_EXPORT = [
|
||||
@@ -56,7 +61,12 @@ TRANSACTION_FIELDS_TO_EXPORT = [
|
||||
'gas',
|
||||
'gas_price',
|
||||
'input',
|
||||
'block_timestamp'
|
||||
'block_timestamp',
|
||||
'max_fee_per_gas',
|
||||
'max_priority_fee_per_gas',
|
||||
'transaction_type',
|
||||
'max_fee_per_blob_gas',
|
||||
'blob_versioned_hashes'
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -32,7 +32,14 @@ RECEIPT_FIELDS_TO_EXPORT = [
|
||||
'gas_used',
|
||||
'contract_address',
|
||||
'root',
|
||||
'status'
|
||||
'status',
|
||||
'effective_gas_price',
|
||||
'l1_fee',
|
||||
'l1_gas_used',
|
||||
'l1_gas_price',
|
||||
'l1_fee_scalar',
|
||||
'blob_gas_price',
|
||||
'blob_gas_used'
|
||||
]
|
||||
|
||||
LOG_FIELDS_TO_EXPORT = [
|
||||
|
||||
@@ -34,12 +34,13 @@ FIELDS_TO_EXPORT = [
|
||||
]
|
||||
|
||||
|
||||
def token_transfers_item_exporter(token_transfer_output):
|
||||
def token_transfers_item_exporter(token_transfer_output, converters=()):
|
||||
return CompositeItemExporter(
|
||||
filename_mapping={
|
||||
'token_transfer': token_transfer_output
|
||||
},
|
||||
field_mapping={
|
||||
'token_transfer': FIELDS_TO_EXPORT
|
||||
}
|
||||
},
|
||||
converters=converters
|
||||
)
|
||||
|
||||
@@ -33,12 +33,13 @@ FIELDS_TO_EXPORT = [
|
||||
]
|
||||
|
||||
|
||||
def tokens_item_exporter(tokens_output):
|
||||
def tokens_item_exporter(tokens_output, converters=()):
|
||||
return CompositeItemExporter(
|
||||
filename_mapping={
|
||||
'token': tokens_output
|
||||
},
|
||||
field_mapping={
|
||||
'token': FIELDS_TO_EXPORT
|
||||
}
|
||||
},
|
||||
converters=converters
|
||||
)
|
||||
|
||||
@@ -52,6 +52,10 @@ class EthBlockMapper(object):
|
||||
block.gas_limit = hex_to_dec(json_dict.get('gasLimit'))
|
||||
block.gas_used = hex_to_dec(json_dict.get('gasUsed'))
|
||||
block.timestamp = hex_to_dec(json_dict.get('timestamp'))
|
||||
block.base_fee_per_gas = hex_to_dec(json_dict.get('baseFeePerGas'))
|
||||
block.withdrawals_root = json_dict.get('withdrawalsRoot')
|
||||
block.blob_gas_used = hex_to_dec(json_dict.get('blobGasUsed'))
|
||||
block.excess_blob_gas = hex_to_dec(json_dict.get('excessBlobGas'))
|
||||
|
||||
if 'transactions' in json_dict:
|
||||
block.transactions = [
|
||||
@@ -62,8 +66,22 @@ class EthBlockMapper(object):
|
||||
|
||||
block.transaction_count = len(json_dict['transactions'])
|
||||
|
||||
if 'withdrawals' in json_dict:
|
||||
block.withdrawals = self.parse_withdrawals(json_dict['withdrawals'])
|
||||
|
||||
return block
|
||||
|
||||
def parse_withdrawals(self, withdrawals):
|
||||
return [
|
||||
{
|
||||
"index": hex_to_dec(withdrawal["index"]),
|
||||
"validator_index": hex_to_dec(withdrawal["validatorIndex"]),
|
||||
"address": withdrawal["address"],
|
||||
"amount": hex_to_dec(withdrawal["amount"]),
|
||||
}
|
||||
for withdrawal in withdrawals
|
||||
]
|
||||
|
||||
def block_to_dict(self, block):
|
||||
return {
|
||||
'type': 'block',
|
||||
@@ -85,4 +103,9 @@ class EthBlockMapper(object):
|
||||
'gas_used': block.gas_used,
|
||||
'timestamp': block.timestamp,
|
||||
'transaction_count': block.transaction_count,
|
||||
'base_fee_per_gas': block.base_fee_per_gas,
|
||||
'withdrawals_root': block.withdrawals_root,
|
||||
'withdrawals': block.withdrawals,
|
||||
'blob_gas_used': block.blob_gas_used,
|
||||
'excess_blob_gas': block.excess_blob_gas,
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
from ethereumetl.domain.receipt import EthReceipt
|
||||
from ethereumetl.mappers.receipt_log_mapper import EthReceiptLogMapper
|
||||
from ethereumetl.utils import hex_to_dec, to_normalized_address
|
||||
from ethereumetl.utils import hex_to_dec, to_normalized_address, to_float_or_none
|
||||
|
||||
|
||||
class EthReceiptMapper(object):
|
||||
@@ -48,6 +48,15 @@ class EthReceiptMapper(object):
|
||||
receipt.root = json_dict.get('root')
|
||||
receipt.status = hex_to_dec(json_dict.get('status'))
|
||||
|
||||
receipt.effective_gas_price = hex_to_dec(json_dict.get('effectiveGasPrice'))
|
||||
|
||||
receipt.l1_fee = hex_to_dec(json_dict.get('l1Fee'))
|
||||
receipt.l1_gas_used = hex_to_dec(json_dict.get('l1GasUsed'))
|
||||
receipt.l1_gas_price = hex_to_dec(json_dict.get('l1GasPrice'))
|
||||
receipt.l1_fee_scalar = to_float_or_none(json_dict.get('l1FeeScalar'))
|
||||
receipt.blob_gas_price = hex_to_dec(json_dict.get('blobGasPrice'))
|
||||
receipt.blob_gas_used = hex_to_dec(json_dict.get('blobGasUsed'))
|
||||
|
||||
if 'logs' in json_dict:
|
||||
receipt.logs = [
|
||||
self.receipt_log_mapper.json_dict_to_receipt_log(log) for log in json_dict['logs']
|
||||
@@ -66,5 +75,12 @@ class EthReceiptMapper(object):
|
||||
'gas_used': receipt.gas_used,
|
||||
'contract_address': receipt.contract_address,
|
||||
'root': receipt.root,
|
||||
'status': receipt.status
|
||||
'status': receipt.status,
|
||||
'effective_gas_price': receipt.effective_gas_price,
|
||||
'l1_fee': receipt.l1_fee,
|
||||
'l1_gas_used': receipt.l1_gas_used,
|
||||
'l1_gas_price': receipt.l1_gas_price,
|
||||
'l1_fee_scalar': receipt.l1_fee_scalar,
|
||||
'blob_gas_price': receipt.blob_gas_price,
|
||||
'blob_gas_used': receipt.blob_gas_used
|
||||
}
|
||||
|
||||
@@ -190,4 +190,5 @@ class EthTraceMapper(object):
|
||||
'error': trace.error,
|
||||
'status': trace.status,
|
||||
'trace_id': trace.trace_id,
|
||||
'trace_index': trace.trace_index,
|
||||
}
|
||||
|
||||
@@ -40,6 +40,14 @@ class EthTransactionMapper(object):
|
||||
transaction.gas = hex_to_dec(json_dict.get('gas'))
|
||||
transaction.gas_price = hex_to_dec(json_dict.get('gasPrice'))
|
||||
transaction.input = json_dict.get('input')
|
||||
transaction.max_fee_per_gas = hex_to_dec(json_dict.get('maxFeePerGas'))
|
||||
transaction.max_priority_fee_per_gas = hex_to_dec(json_dict.get('maxPriorityFeePerGas'))
|
||||
transaction.transaction_type = hex_to_dec(json_dict.get('type'))
|
||||
transaction.max_fee_per_blob_gas = hex_to_dec(json_dict.get('maxFeePerBlobGas'))
|
||||
|
||||
if 'blobVersionedHashes' in json_dict and isinstance(json_dict['blobVersionedHashes'], list):
|
||||
transaction.blob_versioned_hashes = json_dict['blobVersionedHashes']
|
||||
|
||||
return transaction
|
||||
|
||||
def transaction_to_dict(self, transaction):
|
||||
@@ -57,4 +65,9 @@ class EthTransactionMapper(object):
|
||||
'gas': transaction.gas,
|
||||
'gas_price': transaction.gas_price,
|
||||
'input': transaction.input,
|
||||
'max_fee_per_gas': transaction.max_fee_per_gas,
|
||||
'max_priority_fee_per_gas': transaction.max_priority_fee_per_gas,
|
||||
'transaction_type': transaction.transaction_type,
|
||||
"max_fee_per_blob_gas": transaction.max_fee_per_blob_gas,
|
||||
"blob_versioned_hashes": transaction.blob_versioned_hashes
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ import json
|
||||
import socket
|
||||
|
||||
from web3.providers.ipc import IPCProvider
|
||||
from web3.utils.threads import (
|
||||
from web3._utils.threads import (
|
||||
Timeout,
|
||||
)
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
|
||||
from web3 import HTTPProvider
|
||||
from web3.utils.request import make_post_request
|
||||
from web3._utils.request import make_post_request
|
||||
|
||||
|
||||
# Mostly copied from web3.py/providers/rpc.py. Supports batch requests.
|
||||
|
||||
@@ -54,7 +54,7 @@ class EthContractService:
|
||||
c.implements('allowance(address,address)')
|
||||
|
||||
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-721.md
|
||||
# https://github.com/OpenZeppelin/openzeppelin-solidity/blob/master/contracts/token/ERC721/ERC721Basic.sol
|
||||
# https://github.com/OpenZeppelin/openzeppelin-contracts/blob/master/contracts/token/ERC721/ERC721.sol
|
||||
# Doesn't check the below ERC721 methods to match CryptoKitties contract
|
||||
# getApproved(uint256)
|
||||
# setApprovalForAll(address,bool)
|
||||
|
||||
@@ -40,7 +40,7 @@ class EthService(object):
|
||||
start_timestamp = int(start_timestamp)
|
||||
end_timestamp = int(end_timestamp)
|
||||
if start_timestamp > end_timestamp:
|
||||
raise ValueError('start_timestamp must be greater or equal to end_timestamp')
|
||||
raise ValueError('start_timestamp must be lesser than end_timestamp')
|
||||
|
||||
try:
|
||||
start_block_bounds = self._graph_operations.get_bounds_for_y_coordinate(start_timestamp)
|
||||
|
||||
@@ -21,10 +21,10 @@
|
||||
# SOFTWARE.
|
||||
import logging
|
||||
|
||||
from web3.exceptions import BadFunctionCallOutput
|
||||
from web3.exceptions import BadFunctionCallOutput, ContractLogicError
|
||||
|
||||
from ethereumetl.domain.token import EthToken
|
||||
from ethereumetl.erc20_abi import ERC20_ABI
|
||||
from ethereumetl.erc20_abi import ERC20_ABI, ERC20_ABI_ALTERNATIVE_1
|
||||
|
||||
logger = logging.getLogger('eth_token_service')
|
||||
|
||||
@@ -37,9 +37,26 @@ class EthTokenService(object):
|
||||
def get_token(self, token_address):
|
||||
checksum_address = self._web3.toChecksumAddress(token_address)
|
||||
contract = self._web3.eth.contract(address=checksum_address, abi=ERC20_ABI)
|
||||
contract_alternative_1 = self._web3.eth.contract(address=checksum_address, abi=ERC20_ABI_ALTERNATIVE_1)
|
||||
|
||||
symbol = self._get_first_result(
|
||||
contract.functions.symbol(),
|
||||
contract.functions.SYMBOL(),
|
||||
contract_alternative_1.functions.symbol(),
|
||||
contract_alternative_1.functions.SYMBOL(),
|
||||
)
|
||||
if isinstance(symbol, bytes):
|
||||
symbol = self._bytes_to_string(symbol)
|
||||
|
||||
name = self._get_first_result(
|
||||
contract.functions.name(),
|
||||
contract.functions.NAME(),
|
||||
contract_alternative_1.functions.name(),
|
||||
contract_alternative_1.functions.NAME(),
|
||||
)
|
||||
if isinstance(name, bytes):
|
||||
name = self._bytes_to_string(name)
|
||||
|
||||
symbol = self._get_first_result(contract.functions.symbol(), contract.functions.SYMBOL())
|
||||
name = self._get_first_result(contract.functions.name(), contract.functions.NAME())
|
||||
decimals = self._get_first_result(contract.functions.decimals(), contract.functions.DECIMALS())
|
||||
total_supply = self._get_first_result(contract.functions.totalSupply())
|
||||
|
||||
@@ -65,7 +82,7 @@ class EthTokenService(object):
|
||||
# OverflowError exception happens if the return type of the function doesn't match the expected type
|
||||
result = call_contract_function(
|
||||
func=func,
|
||||
ignore_errors=(BadFunctionCallOutput, OverflowError, ValueError),
|
||||
ignore_errors=(BadFunctionCallOutput, ContractLogicError, OverflowError, ValueError),
|
||||
default_value=None)
|
||||
|
||||
if self._function_call_result_transformer is not None:
|
||||
@@ -73,6 +90,23 @@ class EthTokenService(object):
|
||||
else:
|
||||
return result
|
||||
|
||||
def _bytes_to_string(self, b, ignore_errors=True):
|
||||
if b is None:
|
||||
return b
|
||||
|
||||
try:
|
||||
b = b.decode('utf-8')
|
||||
except UnicodeDecodeError as e:
|
||||
if ignore_errors:
|
||||
logger.debug('A UnicodeDecodeError exception occurred while trying to decode bytes to string', exc_info=True)
|
||||
b = None
|
||||
else:
|
||||
raise e
|
||||
|
||||
if self._function_call_result_transformer is not None:
|
||||
b = self._function_call_result_transformer(b)
|
||||
return b
|
||||
|
||||
|
||||
def call_contract_function(func, ignore_errors, default_value=None):
|
||||
try:
|
||||
@@ -80,8 +114,8 @@ def call_contract_function(func, ignore_errors, default_value=None):
|
||||
return result
|
||||
except Exception as ex:
|
||||
if type(ex) in ignore_errors:
|
||||
logger.exception('An exception occurred in function {} of contract {}. '.format(func.fn_name, func.address)
|
||||
+ 'This exception can be safely ignored.')
|
||||
logger.debug('An exception occurred in function {} of contract {}. '.format(func.fn_name, func.address)
|
||||
+ 'This exception can be safely ignored.', exc_info=True)
|
||||
return default_value
|
||||
else:
|
||||
raise ex
|
||||
|
||||
@@ -37,11 +37,10 @@ class EthTokenTransferExtractor(object):
|
||||
|
||||
topics = receipt_log.topics
|
||||
if topics is None or len(topics) < 1:
|
||||
logger.warning("Topics are empty in log {} of transaction {}".format(receipt_log.log_index,
|
||||
receipt_log.transaction_hash))
|
||||
# This is normal, topics can be empty for anonymous events
|
||||
return None
|
||||
|
||||
if topics[0] == TRANSFER_EVENT_TOPIC:
|
||||
if (topics[0]).casefold() == TRANSFER_EVENT_TOPIC:
|
||||
# Handle unindexed event fields
|
||||
topics_with_data = topics + split_to_words(receipt_log.data)
|
||||
# if the number of topics and fields in data part != 4, then it's a weird event
|
||||
|
||||
@@ -73,14 +73,26 @@ def enrich_transactions(transactions, receipts):
|
||||
'input',
|
||||
'block_timestamp',
|
||||
'block_number',
|
||||
'block_hash'
|
||||
'block_hash',
|
||||
'max_fee_per_gas',
|
||||
'max_priority_fee_per_gas',
|
||||
'transaction_type',
|
||||
'max_fee_per_blob_gas',
|
||||
'blob_versioned_hashes'
|
||||
],
|
||||
right_fields=[
|
||||
('cumulative_gas_used', 'receipt_cumulative_gas_used'),
|
||||
('gas_used', 'receipt_gas_used'),
|
||||
('contract_address', 'receipt_contract_address'),
|
||||
('root', 'receipt_root'),
|
||||
('status', 'receipt_status')
|
||||
('status', 'receipt_status'),
|
||||
('effective_gas_price', 'receipt_effective_gas_price'),
|
||||
('l1_fee', 'receipt_l1_fee'),
|
||||
('l1_gas_used', 'receipt_l1_gas_used'),
|
||||
('l1_gas_price', 'receipt_l1_gas_price'),
|
||||
('l1_fee_scalar', 'receipt_l1_fee_scalar'),
|
||||
('blob_gas_price', 'receipt_blob_gas_price'),
|
||||
('blob_gas_used', 'receipt_blob_gas_used')
|
||||
]))
|
||||
|
||||
if len(result) != len(transactions):
|
||||
@@ -159,7 +171,8 @@ def enrich_traces(blocks, traces):
|
||||
'status',
|
||||
'transaction_hash',
|
||||
'block_number',
|
||||
'trace_id'
|
||||
'trace_id',
|
||||
'trace_index'
|
||||
],
|
||||
[
|
||||
('timestamp', 'block_timestamp'),
|
||||
|
||||
@@ -14,7 +14,7 @@ from ethereumetl.streaming.enrich import enrich_transactions, enrich_logs, enric
|
||||
from ethereumetl.streaming.eth_item_id_calculator import EthItemIdCalculator
|
||||
from ethereumetl.streaming.eth_item_timestamp_calculator import EthItemTimestampCalculator
|
||||
from ethereumetl.thread_local_proxy import ThreadLocalProxy
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
|
||||
class EthStreamerAdapter:
|
||||
@@ -37,7 +37,8 @@ class EthStreamerAdapter:
|
||||
self.item_exporter.open()
|
||||
|
||||
def get_current_block_number(self):
|
||||
return int(Web3(self.batch_web3_provider).eth.getBlock("latest").number)
|
||||
w3 = build_web3(self.batch_web3_provider)
|
||||
return int(w3.eth.getBlock("latest").number)
|
||||
|
||||
def export_all(self, start_block, end_block):
|
||||
# Export blocks and transactions
|
||||
@@ -87,13 +88,14 @@ class EthStreamerAdapter:
|
||||
|
||||
logging.info('Exporting with ' + type(self.item_exporter).__name__)
|
||||
|
||||
all_items = enriched_blocks + \
|
||||
enriched_transactions + \
|
||||
enriched_logs + \
|
||||
enriched_token_transfers + \
|
||||
enriched_traces + \
|
||||
enriched_contracts + \
|
||||
enriched_tokens
|
||||
all_items = \
|
||||
sort_by(enriched_blocks, 'number') + \
|
||||
sort_by(enriched_transactions, ('block_number', 'transaction_index')) + \
|
||||
sort_by(enriched_logs, ('block_number', 'log_index')) + \
|
||||
sort_by(enriched_token_transfers, ('block_number', 'log_index')) + \
|
||||
sort_by(enriched_traces, ('block_number', 'trace_index')) + \
|
||||
sort_by(enriched_contracts, ('block_number',)) + \
|
||||
sort_by(enriched_tokens, ('block_number',))
|
||||
|
||||
self.calculate_item_ids(all_items)
|
||||
self.calculate_item_timestamps(all_items)
|
||||
@@ -150,7 +152,7 @@ class EthStreamerAdapter:
|
||||
start_block=start_block,
|
||||
end_block=end_block,
|
||||
batch_size=self.batch_size,
|
||||
web3=ThreadLocalProxy(lambda: Web3(self.batch_web3_provider)),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(self.batch_web3_provider)),
|
||||
max_workers=self.max_workers,
|
||||
item_exporter=exporter
|
||||
)
|
||||
@@ -174,7 +176,7 @@ class EthStreamerAdapter:
|
||||
exporter = InMemoryItemExporter(item_types=['token'])
|
||||
job = ExtractTokensJob(
|
||||
contracts_iterable=contracts,
|
||||
web3=ThreadLocalProxy(lambda: Web3(self.batch_web3_provider)),
|
||||
web3=ThreadLocalProxy(lambda: build_web3(self.batch_web3_provider)),
|
||||
max_workers=self.max_workers,
|
||||
item_exporter=exporter
|
||||
)
|
||||
@@ -219,3 +221,9 @@ class EthStreamerAdapter:
|
||||
|
||||
def close(self):
|
||||
self.item_exporter.close()
|
||||
|
||||
|
||||
def sort_by(arr, fields):
|
||||
if isinstance(fields, tuple):
|
||||
fields = tuple(fields)
|
||||
return sorted(arr, key=lambda item: tuple(item.get(f) for f in fields))
|
||||
|
||||
@@ -21,28 +21,52 @@
|
||||
# SOFTWARE.
|
||||
|
||||
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
|
||||
from blockchainetl.jobs.exporters.multi_item_exporter import MultiItemExporter
|
||||
|
||||
|
||||
def create_item_exporters(outputs):
|
||||
split_outputs = [output.strip() for output in outputs.split(',')] if outputs else ['console']
|
||||
|
||||
item_exporters = [create_item_exporter(output) for output in split_outputs]
|
||||
return MultiItemExporter(item_exporters)
|
||||
|
||||
|
||||
def create_item_exporter(output):
|
||||
item_exporter_type = determine_item_exporter_type(output)
|
||||
if item_exporter_type == ItemExporterType.PUBSUB:
|
||||
from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
|
||||
item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping={
|
||||
'block': output + '.blocks',
|
||||
'transaction': output + '.transactions',
|
||||
'log': output + '.logs',
|
||||
'token_transfer': output + '.token_transfers',
|
||||
'trace': output + '.traces',
|
||||
'contract': output + '.contracts',
|
||||
'token': output + '.tokens',
|
||||
})
|
||||
enable_message_ordering = 'sorted' in output or 'ordered' in output
|
||||
item_exporter = GooglePubSubItemExporter(
|
||||
item_type_to_topic_mapping={
|
||||
'block': output + '.blocks',
|
||||
'transaction': output + '.transactions',
|
||||
'log': output + '.logs',
|
||||
'token_transfer': output + '.token_transfers',
|
||||
'trace': output + '.traces',
|
||||
'contract': output + '.contracts',
|
||||
'token': output + '.tokens',
|
||||
},
|
||||
message_attributes=('item_id', 'item_timestamp'),
|
||||
batch_max_bytes=1024 * 1024 * 5,
|
||||
batch_max_latency=2,
|
||||
batch_max_messages=1000,
|
||||
enable_message_ordering=enable_message_ordering)
|
||||
elif item_exporter_type == ItemExporterType.KINESIS:
|
||||
from blockchainetl.jobs.exporters.kinesis_item_exporter import KinesisItemExporter
|
||||
item_exporter = KinesisItemExporter(
|
||||
stream_name=output[len('kinesis://'):],
|
||||
)
|
||||
elif item_exporter_type == ItemExporterType.POSTGRES:
|
||||
from blockchainetl.jobs.exporters.postgres_item_exporter import PostgresItemExporter
|
||||
from blockchainetl.streaming.postgres_utils import create_insert_statement_for_table
|
||||
from blockchainetl.jobs.exporters.converters.unix_timestamp_item_converter import UnixTimestampItemConverter
|
||||
from blockchainetl.jobs.exporters.converters.int_to_decimal_item_converter import IntToDecimalItemConverter
|
||||
from blockchainetl.jobs.exporters.converters.list_field_item_converter import ListFieldItemConverter
|
||||
from ethereumetl.streaming.postgres_tables import BLOCKS, TRANSACTIONS, LOGS, TOKEN_TRANSFERS, TRACES
|
||||
from blockchainetl.jobs.exporters.converters.simple_item_converter import SimpleItemConverter
|
||||
from ethereumetl.streaming.postgres_tables import BLOCKS, TRANSACTIONS, LOGS, TOKEN_TRANSFERS, TRACES, TOKENS, CONTRACTS
|
||||
|
||||
def array_to_str(val):
|
||||
return ','.join(val) if val is not None else None
|
||||
|
||||
item_exporter = PostgresItemExporter(
|
||||
output, item_type_to_insert_stmt_mapping={
|
||||
@@ -51,22 +75,61 @@ def create_item_exporter(output):
|
||||
'log': create_insert_statement_for_table(LOGS),
|
||||
'token_transfer': create_insert_statement_for_table(TOKEN_TRANSFERS),
|
||||
'trace': create_insert_statement_for_table(TRACES),
|
||||
'token': create_insert_statement_for_table(TOKENS),
|
||||
'contract': create_insert_statement_for_table(CONTRACTS),
|
||||
},
|
||||
converters=[UnixTimestampItemConverter(), IntToDecimalItemConverter(),
|
||||
ListFieldItemConverter('topics', 'topic', fill=4)])
|
||||
converters=[
|
||||
UnixTimestampItemConverter(),
|
||||
IntToDecimalItemConverter(),
|
||||
ListFieldItemConverter('topics', 'topic', fill=4),
|
||||
SimpleItemConverter(field_converters={'blob_versioned_hashes': array_to_str})
|
||||
])
|
||||
elif item_exporter_type == ItemExporterType.GCS:
|
||||
from blockchainetl.jobs.exporters.gcs_item_exporter import GcsItemExporter
|
||||
bucket, path = get_bucket_and_path_from_gcs_output(output)
|
||||
item_exporter = GcsItemExporter(bucket=bucket, path=path)
|
||||
elif item_exporter_type == ItemExporterType.CONSOLE:
|
||||
item_exporter = ConsoleItemExporter()
|
||||
elif item_exporter_type == ItemExporterType.KAFKA:
|
||||
from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter
|
||||
item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={
|
||||
'block': 'blocks',
|
||||
'transaction': 'transactions',
|
||||
'log': 'logs',
|
||||
'token_transfer': 'token_transfers',
|
||||
'trace': 'traces',
|
||||
'contract': 'contracts',
|
||||
'token': 'tokens',
|
||||
})
|
||||
|
||||
else:
|
||||
raise ValueError('Unable to determine item exporter type for output ' + output)
|
||||
|
||||
return item_exporter
|
||||
|
||||
|
||||
def get_bucket_and_path_from_gcs_output(output):
|
||||
output = output.replace('gs://', '')
|
||||
bucket_and_path = output.split('/', 1)
|
||||
bucket = bucket_and_path[0]
|
||||
if len(bucket_and_path) > 1:
|
||||
path = bucket_and_path[1]
|
||||
else:
|
||||
path = ''
|
||||
return bucket, path
|
||||
|
||||
|
||||
def determine_item_exporter_type(output):
|
||||
if output is not None and output.startswith('projects'):
|
||||
return ItemExporterType.PUBSUB
|
||||
if output is not None and output.startswith('kinesis://'):
|
||||
return ItemExporterType.KINESIS
|
||||
if output is not None and output.startswith('kafka'):
|
||||
return ItemExporterType.KAFKA
|
||||
elif output is not None and output.startswith('postgresql'):
|
||||
return ItemExporterType.POSTGRES
|
||||
elif output is not None and output.startswith('gs://'):
|
||||
return ItemExporterType.GCS
|
||||
elif output is None or output == 'console':
|
||||
return ItemExporterType.CONSOLE
|
||||
else:
|
||||
@@ -75,6 +138,9 @@ def determine_item_exporter_type(output):
|
||||
|
||||
class ItemExporterType:
|
||||
PUBSUB = 'pubsub'
|
||||
KINESIS = 'kinesis'
|
||||
POSTGRES = 'postgres'
|
||||
GCS = 'gcs'
|
||||
CONSOLE = 'console'
|
||||
KAFKA = 'kafka'
|
||||
UNKNOWN = 'unknown'
|
||||
|
||||
@@ -20,7 +20,9 @@
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
from sqlalchemy import Table, Column, Integer, BigInteger, String, Numeric, MetaData, TIMESTAMP
|
||||
from sqlalchemy import Table, Column, Integer, BigInteger, Boolean, String, Numeric, \
|
||||
MetaData, PrimaryKeyConstraint, VARCHAR, TIMESTAMP, Float
|
||||
from sqlalchemy.dialects.postgresql import ARRAY
|
||||
|
||||
metadata = MetaData()
|
||||
|
||||
@@ -46,6 +48,10 @@ BLOCKS = Table(
|
||||
Column('gas_limit', BigInteger),
|
||||
Column('gas_used', BigInteger),
|
||||
Column('transaction_count', BigInteger),
|
||||
Column('base_fee_per_gas', BigInteger),
|
||||
Column('withdrawals_root', String),
|
||||
Column('blob_gas_used', BigInteger),
|
||||
Column('excess_blob_gas', BigInteger),
|
||||
)
|
||||
|
||||
TRANSACTIONS = Table(
|
||||
@@ -67,6 +73,18 @@ TRANSACTIONS = Table(
|
||||
Column('block_timestamp', TIMESTAMP),
|
||||
Column('block_number', BigInteger),
|
||||
Column('block_hash', String),
|
||||
Column('max_fee_per_gas', BigInteger),
|
||||
Column('max_priority_fee_per_gas', BigInteger),
|
||||
Column('transaction_type', BigInteger),
|
||||
Column('receipt_effective_gas_price', BigInteger),
|
||||
Column('receipt_l1_fee', BigInteger),
|
||||
Column('receipt_l1_gas_used', BigInteger),
|
||||
Column('receipt_l1_gas_price', BigInteger),
|
||||
Column('receipt_l1_fee_scalar', Float),
|
||||
Column('max_fee_per_blob_gas', BigInteger),
|
||||
Column('blob_versioned_hashes', String),
|
||||
Column('receipt_blob_gas_price', BigInteger),
|
||||
Column('receipt_blob_gas_used', BigInteger),
|
||||
)
|
||||
|
||||
LOGS = Table(
|
||||
@@ -122,4 +140,25 @@ TRACES = Table(
|
||||
Column('trace_id', String, primary_key=True),
|
||||
)
|
||||
|
||||
TOKENS = Table(
|
||||
'tokens', metadata,
|
||||
Column('address', VARCHAR(42)),
|
||||
Column('name', String),
|
||||
Column('symbol', String),
|
||||
Column('decimals', Integer),
|
||||
Column('function_sighashes', ARRAY(String)),
|
||||
Column('total_supply', Numeric(78)),
|
||||
Column('block_number', BigInteger),
|
||||
PrimaryKeyConstraint('address', 'block_number', name='tokens_pk'),
|
||||
)
|
||||
|
||||
CONTRACTS = Table(
|
||||
'contracts', metadata,
|
||||
Column('address', VARCHAR(42)),
|
||||
Column('bytecode', String),
|
||||
Column('function_sighashes', ARRAY(String)),
|
||||
Column('is_erc20', Boolean),
|
||||
Column('is_erc721', Boolean),
|
||||
Column('block_number', BigInteger),
|
||||
PrimaryKeyConstraint('address', 'block_number', name='contracts_pk'),
|
||||
)
|
||||
|
||||
@@ -47,6 +47,16 @@ def to_int_or_none(val):
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def to_float_or_none(val):
|
||||
if isinstance(val, float):
|
||||
return val
|
||||
if val is None or val == "":
|
||||
return None
|
||||
try:
|
||||
return float(val)
|
||||
except ValueError:
|
||||
print("can't cast %s to float" % val)
|
||||
return val
|
||||
|
||||
def chunk_string(string, length):
|
||||
return (string[0 + i:length + i] for i in range(0, len(string), length))
|
||||
|
||||
30
ethereumetl/web3_utils.py
Normal file
30
ethereumetl/web3_utils.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
from web3 import Web3
|
||||
from web3.middleware import geth_poa_middleware
|
||||
|
||||
|
||||
def build_web3(provider):
|
||||
w3 = Web3(provider)
|
||||
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
|
||||
return w3
|
||||
@@ -16,3 +16,4 @@ nav:
|
||||
- Citing: citing.md
|
||||
theme: readthedocs
|
||||
repo_url: https://github.com/blockchain-etl/ethereum-etl/
|
||||
edit_uri: edit/develop/docs
|
||||
@@ -16,19 +16,11 @@ CREATE EXTERNAL TABLE IF NOT EXISTS blocks (
|
||||
gas_limit BIGINT,
|
||||
gas_used BIGINT,
|
||||
timestamp BIGINT,
|
||||
transaction_count BIGINT
|
||||
transaction_count BIGINT,
|
||||
base_fee_per_gas BIGINT
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/blocks'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
PARTITIONED BY (block_date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
|
||||
LOCATION 's3://<your_bucket>/export/blocks/';
|
||||
|
||||
MSCK REPAIR TABLE blocks;
|
||||
MSCK REPAIR TABLE blocks;
|
||||
|
||||
@@ -5,7 +5,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS contracts (
|
||||
is_erc20 BOOLEAN,
|
||||
is_erc721 BOOLEAN
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
|
||||
@@ -6,19 +6,10 @@ CREATE EXTERNAL TABLE IF NOT EXISTS logs (
|
||||
block_number BIGINT,
|
||||
address STRING,
|
||||
data STRING,
|
||||
topics STRING
|
||||
topics ARRAY<STRING>
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/logs'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
PARTITIONED BY (block_date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
|
||||
LOCATION 's3://<your_bucket>/export/logs/';
|
||||
|
||||
MSCK REPAIR TABLE logs;
|
||||
|
||||
@@ -7,19 +7,15 @@ CREATE EXTERNAL TABLE IF NOT EXISTS receipts (
|
||||
gas_used BIGINT,
|
||||
contract_address STRING,
|
||||
root STRING,
|
||||
status BIGINT
|
||||
status BIGINT,
|
||||
effective_gas_price BIGINT,
|
||||
l1_fee BIGINT,
|
||||
l1_gas_used BIGINT,
|
||||
l1_gas_price BIGINT,
|
||||
l1_fee_scalar DECIMAL
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/receipts'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
PARTITIONED BY (block_date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
|
||||
LOCATION 's3://<your_bucket>/export/receipts/';
|
||||
|
||||
MSCK REPAIR TABLE receipts;
|
||||
|
||||
@@ -2,22 +2,13 @@ CREATE EXTERNAL TABLE IF NOT EXISTS token_transfers (
|
||||
token_address STRING,
|
||||
from_address STRING,
|
||||
to_address STRING,
|
||||
value DECIMAL(38,0),
|
||||
value STRING,
|
||||
transaction_hash STRING,
|
||||
log_index BIGINT,
|
||||
block_number BIGINT
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/token_transfers'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
PARTITIONED BY (block_date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
|
||||
LOCATION 's3://<your_bucket>/export/token_transfers/';
|
||||
|
||||
MSCK REPAIR TABLE token_transfers;
|
||||
MSCK REPAIR TABLE token_transfers;
|
||||
|
||||
@@ -5,7 +5,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS tokens (
|
||||
decimals BIGINT,
|
||||
total_supply DECIMAL(38,0)
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
|
||||
@@ -9,19 +9,13 @@ CREATE EXTERNAL TABLE IF NOT EXISTS transactions (
|
||||
value DECIMAL(38,0),
|
||||
gas BIGINT,
|
||||
gas_price BIGINT,
|
||||
input STRING
|
||||
input STRING,
|
||||
max_fee_per_gas BIGINT,
|
||||
max_priority_fee_per_gas BIGINT,
|
||||
transaction_type BIGINT
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/transactions'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
PARTITIONED BY (block_date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
|
||||
LOCATION 's3://<your_bucket>/export/transactions/';
|
||||
|
||||
MSCK REPAIR TABLE transactions;
|
||||
MSCK REPAIR TABLE transactions;
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS blocks (
|
||||
number BIGINT,
|
||||
hash STRING,
|
||||
parent_hash STRING,
|
||||
nonce STRING,
|
||||
sha3_uncles STRING,
|
||||
logs_bloom STRING,
|
||||
transactions_root STRING,
|
||||
state_root STRING,
|
||||
receipts_root STRING,
|
||||
miner STRING,
|
||||
difficulty DECIMAL(38,0),
|
||||
total_difficulty DECIMAL(38,0),
|
||||
size BIGINT,
|
||||
extra_data STRING,
|
||||
gas_limit BIGINT,
|
||||
gas_used BIGINT,
|
||||
timestamp BIGINT,
|
||||
transaction_count BIGINT
|
||||
)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/blocks'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
|
||||
MSCK REPAIR TABLE blocks;
|
||||
@@ -1,21 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS contracts (
|
||||
address STRING,
|
||||
bytecode STRING,
|
||||
function_sighashes STRING,
|
||||
is_erc20 BOOLEAN,
|
||||
is_erc721 BOOLEAN
|
||||
)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/contracts'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
|
||||
MSCK REPAIR TABLE contracts;
|
||||
@@ -1,24 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS logs (
|
||||
log_index BIGINT,
|
||||
transaction_hash STRING,
|
||||
transaction_index BIGINT,
|
||||
block_hash STRING,
|
||||
block_number BIGINT,
|
||||
address STRING,
|
||||
data STRING,
|
||||
topics STRING
|
||||
)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/logs'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
|
||||
MSCK REPAIR TABLE logs;
|
||||
@@ -1,25 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS parquet_blocks (
|
||||
number BIGINT,
|
||||
hash STRING,
|
||||
parent_hash STRING,
|
||||
nonce STRING,
|
||||
sha3_uncles STRING,
|
||||
logs_bloom STRING,
|
||||
transactions_root STRING,
|
||||
state_root STRING,
|
||||
receipts_root STRING,
|
||||
miner STRING,
|
||||
difficulty DECIMAL(38,0),
|
||||
total_difficulty DECIMAL(38,0),
|
||||
size BIGINT,
|
||||
extra_data STRING,
|
||||
gas_limit BIGINT,
|
||||
gas_used BIGINT,
|
||||
timestamp BIGINT,
|
||||
transaction_count BIGINT
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
STORED AS PARQUET
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/parquet/blocks';
|
||||
|
||||
MSCK REPAIR TABLE parquet_blocks;
|
||||
@@ -1,14 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS parquet_token_transfers (
|
||||
token_address STRING,
|
||||
from_address STRING,
|
||||
to_address STRING,
|
||||
value DECIMAL(38,0),
|
||||
transaction_hash STRING,
|
||||
log_index BIGINT,
|
||||
block_number BIGINT
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
STORED AS PARQUET
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/parquet/token_transfers';
|
||||
|
||||
MSCK REPAIR TABLE parquet_token_transfers;
|
||||
@@ -1,18 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS parquet_transactions (
|
||||
hash STRING,
|
||||
nonce BIGINT,
|
||||
block_hash STRING,
|
||||
block_number BIGINT,
|
||||
transaction_index BIGINT,
|
||||
from_address STRING,
|
||||
to_address STRING,
|
||||
value DECIMAL(38,0),
|
||||
gas BIGINT,
|
||||
gas_price BIGINT,
|
||||
input STRING
|
||||
)
|
||||
PARTITIONED BY (start_block BIGINT, end_block BIGINT)
|
||||
STORED AS PARQUET
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/parquet/transactions';
|
||||
|
||||
MSCK REPAIR TABLE parquet_transactions;
|
||||
@@ -1,25 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS receipts (
|
||||
transaction_hash STRING,
|
||||
transaction_index BIGINT,
|
||||
block_hash STRING,
|
||||
block_number BIGINT,
|
||||
cumulative_gas_used BIGINT,
|
||||
gas_used BIGINT,
|
||||
contract_address STRING,
|
||||
root STRING,
|
||||
status BIGINT
|
||||
)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/receipts'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
|
||||
MSCK REPAIR TABLE receipts;
|
||||
@@ -1,23 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS token_transfers (
|
||||
token_address STRING,
|
||||
from_address STRING,
|
||||
to_address STRING,
|
||||
value DECIMAL(38,0),
|
||||
transaction_hash STRING,
|
||||
log_index BIGINT,
|
||||
block_number BIGINT
|
||||
)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/token_transfers'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
|
||||
MSCK REPAIR TABLE token_transfers;
|
||||
@@ -1,21 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS tokens (
|
||||
address STRING,
|
||||
symbol STRING,
|
||||
name STRING,
|
||||
decimals BIGINT,
|
||||
total_supply DECIMAL(38,0)
|
||||
)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/tokens'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
|
||||
MSCK REPAIR TABLE tokens;
|
||||
@@ -1,27 +0,0 @@
|
||||
CREATE EXTERNAL TABLE IF NOT EXISTS transactions (
|
||||
hash STRING,
|
||||
nonce BIGINT,
|
||||
block_hash STRING,
|
||||
block_number BIGINT,
|
||||
transaction_index BIGINT,
|
||||
from_address STRING,
|
||||
to_address STRING,
|
||||
value DECIMAL(38,0),
|
||||
gas BIGINT,
|
||||
gas_price BIGINT,
|
||||
input STRING
|
||||
)
|
||||
PARTITIONED BY (date STRING)
|
||||
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|
||||
WITH SERDEPROPERTIES (
|
||||
'serialization.format' = ',',
|
||||
'field.delim' = ',',
|
||||
'escape.delim' = '\\'
|
||||
)
|
||||
STORED AS TEXTFILE
|
||||
LOCATION 's3://<your_bucket>/ethereumetl/export/transactions'
|
||||
TBLPROPERTIES (
|
||||
'skip.header.line.count' = '1'
|
||||
);
|
||||
|
||||
MSCK REPAIR TABLE transactions;
|
||||
42
setup.py
42
setup.py
@@ -1,6 +1,6 @@
|
||||
import os
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
|
||||
def read(fname):
|
||||
@@ -11,7 +11,7 @@ long_description = read('README.md') if os.path.isfile("README.md") else ""
|
||||
|
||||
setup(
|
||||
name='ethereum-etl',
|
||||
version='1.6.3',
|
||||
version='2.4.2',
|
||||
author='Evgeny Medvedev',
|
||||
author_email='evge.medvedev@gmail.com',
|
||||
description='Tools for exporting Ethereum blockchain data to CSV or JSON',
|
||||
@@ -24,31 +24,41 @@ setup(
|
||||
'Intended Audience :: Developers',
|
||||
'License :: OSI Approved :: MIT License',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Programming Language :: Python :: 3.7',
|
||||
'Programming Language :: Python :: 3.8'
|
||||
'Programming Language :: Python :: 3.8',
|
||||
'Programming Language :: Python :: 3.9'
|
||||
],
|
||||
keywords='ethereum',
|
||||
# web3.py doesn't work on 3.5.2 and less (https://github.com/ethereum/web3.py/issues/1012)
|
||||
python_requires='>=3.5.3,<4',
|
||||
python_requires='>=3.7.2,<4',
|
||||
install_requires=[
|
||||
'web3==4.7.2',
|
||||
'eth-utils==1.8.4',
|
||||
'eth-abi==1.3.0',
|
||||
'web3>=5.29,<6',
|
||||
'eth-utils==1.10',
|
||||
'eth-abi>=2.2.0,<3.0.0',
|
||||
# TODO: This has to be removed when "ModuleNotFoundError: No module named 'eth_utils.toolz'" is fixed at eth-abi
|
||||
'python-dateutil==2.7.0',
|
||||
'click==7.0',
|
||||
'python-dateutil>=2.8.0,<3',
|
||||
'click>=8.0.4,<9',
|
||||
'ethereum-dasm==0.1.4',
|
||||
'urllib3<2',
|
||||
'base58',
|
||||
'requests',
|
||||
'requests'
|
||||
],
|
||||
extras_require={
|
||||
'streaming': [
|
||||
'timeout-decorator==0.4.1',
|
||||
'google-cloud-pubsub==0.39.1',
|
||||
'sqlalchemy==1.3.13',
|
||||
'pg8000==1.13.2',
|
||||
'google-cloud-pubsub==2.13.0',
|
||||
'google-cloud-storage==1.33.0',
|
||||
'kafka-python==2.0.2',
|
||||
'sqlalchemy==1.4',
|
||||
'pg8000==1.16.6',
|
||||
# This library is a dependency for google-cloud-pubsub, starting from 0.3.22 it requires Rust,
|
||||
# that's why we lock the version here
|
||||
'libcst==0.3.21',
|
||||
# Later versions break the build in Travis CI for Python 3.7.2
|
||||
'grpcio==1.46.3'
|
||||
],
|
||||
'streaming-kinesis': [
|
||||
'boto3==1.24.11',
|
||||
'botocore==1.27.11',
|
||||
],
|
||||
'dev': [
|
||||
'pytest~=4.3.0'
|
||||
|
||||
@@ -16,7 +16,13 @@ def get_web3_provider(provider_type, read_resource_lambda=None, batch=False):
|
||||
else:
|
||||
provider = MockWeb3Provider(read_resource_lambda)
|
||||
elif provider_type == 'infura':
|
||||
provider_url = os.environ.get('PROVIDER_URL', 'https://mainnet.infura.io')
|
||||
provider_url = os.environ.get('PROVIDER_URL', 'https://mainnet.infura.io/v3/7aef3f0cd1f64408b163814b22cc643c')
|
||||
if batch:
|
||||
provider = BatchHTTPProvider(provider_url)
|
||||
else:
|
||||
provider = HTTPProvider(provider_url)
|
||||
elif provider_type == 'goerli':
|
||||
provider_url = os.environ.get('GOERLI_PROVIDER_URL', 'https://goerli.infura.io/v3/7aef3f0cd1f64408b163814b22cc643c')
|
||||
if batch:
|
||||
provider = BatchHTTPProvider(provider_url)
|
||||
else:
|
||||
|
||||
@@ -36,18 +36,21 @@ def read_resource(resource_group, file_name):
|
||||
return tests.resources.read_resource([RESOURCE_GROUP, resource_group], file_name)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("start_block,end_block,batch_size,resource_group,web3_provider_type", [
|
||||
(0, 0, 1, 'block_without_transactions', 'mock'),
|
||||
(483920, 483920, 1, 'block_with_logs', 'mock'),
|
||||
(47218, 47219, 1, 'blocks_with_transactions', 'mock'),
|
||||
(47218, 47219, 2, 'blocks_with_transactions', 'mock'),
|
||||
skip_if_slow_tests_disabled((0, 0, 1, 'block_without_transactions', 'infura')),
|
||||
skip_if_slow_tests_disabled((483920, 483920, 1, 'block_with_logs', 'infura')),
|
||||
skip_if_slow_tests_disabled((47218, 47219, 2, 'blocks_with_transactions', 'infura')),
|
||||
@pytest.mark.parametrize("start_block,end_block,batch_size,resource_group,web3_provider_type,format", [
|
||||
(0, 0, 1, 'block_without_transactions', 'mock', 'csv'),
|
||||
(483920, 483920, 1, 'block_with_logs', 'mock', 'csv'),
|
||||
(47218, 47219, 1, 'blocks_with_transactions', 'mock', 'csv'),
|
||||
(47218, 47219, 2, 'blocks_with_transactions', 'mock', 'csv'),
|
||||
(19537146, 19537146, 1, 'blocks_with_dencun_transactions', 'mock', 'csv'),
|
||||
skip_if_slow_tests_disabled((0, 0, 1, 'block_without_transactions', 'infura', 'csv')),
|
||||
skip_if_slow_tests_disabled((483920, 483920, 1, 'block_with_logs', 'infura', 'csv')),
|
||||
skip_if_slow_tests_disabled((47218, 47219, 2, 'blocks_with_transactions', 'infura', 'csv')),
|
||||
skip_if_slow_tests_disabled((17173049, 17173050, 2, 'blocks_with_transactions_and_withdrawals', 'infura', 'csv')),
|
||||
skip_if_slow_tests_disabled((17173049, 17173050, 2, 'blocks_with_transactions_and_withdrawals', 'infura', 'json')),
|
||||
])
|
||||
def test_export_blocks_job(tmpdir, start_block, end_block, batch_size, resource_group, web3_provider_type):
|
||||
blocks_output_file = str(tmpdir.join('actual_blocks.csv'))
|
||||
transactions_output_file = str(tmpdir.join('actual_transactions.csv'))
|
||||
def test_export_blocks_job(tmpdir, start_block, end_block, batch_size, resource_group, web3_provider_type, format):
|
||||
blocks_output_file = str(tmpdir.join(f'actual_blocks.{format}'))
|
||||
transactions_output_file = str(tmpdir.join(f'actual_transactions.{format}'))
|
||||
|
||||
job = ExportBlocksJob(
|
||||
start_block=start_block, end_block=end_block, batch_size=batch_size,
|
||||
@@ -62,9 +65,9 @@ def test_export_blocks_job(tmpdir, start_block, end_block, batch_size, resource_
|
||||
job.run()
|
||||
|
||||
compare_lines_ignore_order(
|
||||
read_resource(resource_group, 'expected_blocks.csv'), read_file(blocks_output_file)
|
||||
read_resource(resource_group, f'expected_blocks.{format}'), read_file(blocks_output_file)
|
||||
)
|
||||
|
||||
compare_lines_ignore_order(
|
||||
read_resource(resource_group, 'expected_transactions.csv'), read_file(transactions_output_file)
|
||||
read_resource(resource_group, f'expected_transactions.{format}'), read_file(transactions_output_file)
|
||||
)
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
import pytest
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
import tests.resources
|
||||
from ethereumetl.jobs.export_geth_traces_job import ExportGethTracesJob
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import pytest
|
||||
import tests.resources
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
import tests.resources
|
||||
|
||||
@@ -35,7 +35,7 @@ def test_export_origin(tmpdir, start_block, end_block, batch_size, output_format
|
||||
end_block=end_block,
|
||||
batch_size=batch_size,
|
||||
web3=ThreadLocalProxy(
|
||||
lambda: Web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
lambda: build_web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
),
|
||||
ipfs_client=ipfs_client,
|
||||
marketplace_listing_exporter=origin_marketplace_listing_item_exporter(marketplace_output_file),
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
|
||||
import pytest
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
import tests.resources
|
||||
from ethereumetl.jobs.export_token_transfers_job import ExportTokenTransfersJob
|
||||
@@ -47,7 +47,7 @@ def test_export_token_transfers_job(tmpdir, start_block, end_block, batch_size,
|
||||
job = ExportTokenTransfersJob(
|
||||
start_block=start_block, end_block=end_block, batch_size=batch_size,
|
||||
web3=ThreadLocalProxy(
|
||||
lambda: Web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
lambda: build_web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
),
|
||||
item_exporter=token_transfers_item_exporter(output_file),
|
||||
max_workers=5
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
|
||||
import pytest
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
import tests.resources
|
||||
from ethereumetl.jobs.export_tokens_job import ExportTokensJob
|
||||
@@ -51,7 +51,7 @@ def test_export_tokens_job(tmpdir, token_addresses, resource_group, web3_provide
|
||||
job = ExportTokensJob(
|
||||
token_addresses_iterable=token_addresses,
|
||||
web3=ThreadLocalProxy(
|
||||
lambda: Web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
lambda: build_web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
),
|
||||
item_exporter=tokens_item_exporter(output_file),
|
||||
max_workers=5
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
|
||||
import pytest
|
||||
|
||||
from web3 import Web3
|
||||
from ethereumetl.web3_utils import build_web3
|
||||
|
||||
import tests.resources
|
||||
from ethereumetl.jobs.export_traces_job import ExportTracesJob
|
||||
@@ -51,7 +51,7 @@ def test_export_traces_job(tmpdir, start_block, end_block, resource_group, web3_
|
||||
job = ExportTracesJob(
|
||||
start_block=start_block, end_block=end_block, batch_size=1,
|
||||
web3=ThreadLocalProxy(
|
||||
lambda: Web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
lambda: build_web3(get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file)))
|
||||
),
|
||||
max_workers=5,
|
||||
item_exporter=traces_item_exporter(traces_output_file),
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user