Compare commits

...

120 Commits

Author SHA1 Message Date
medvedev1088
6710e6b894 Parse logs PoC 2020-02-04 14:53:35 +07:00
medvedev1088
e8b6fe742e Update FUNDING.yaml 2020-01-09 16:22:57 +07:00
medvedev1088
d1e2f83071 Update README 2019-12-16 11:04:57 +07:00
Evgeny Medvedev
69c64e048e Update README.md 2019-12-15 00:03:35 +07:00
medvedev1088
1d4aa94d81 Add link to Discord 2019-12-14 23:52:32 +07:00
Evgeny Medvedev
2b23e08a64 Merge pull request #189 from blockchain-etl/feature/log_token_address_on_exception
Add function name and contract address in log message
2019-10-23 19:05:37 +07:00
medvedev1088
7434d149bb Add function name and contract address in log message when function call failed. Related to https://github.com/blockchain-etl/ethereum-etl/issues/159#issuecomment-526910436 2019-09-01 20:01:01 +07:00
Evgeny Medvedev
eab288d507 Add link to Kaggle dataset 2019-08-09 19:42:56 +07:00
Evgeny Medvedev
091c7edd60 Add link to Snowflake tutorial 2019-08-09 19:35:43 +07:00
Evgeny Medvedev
0373f48956 Update link to useful queries 2019-07-27 20:34:33 +07:00
Evgeny Medvedev
32eae84170 Add link to useful queries 2019-07-27 20:24:27 +07:00
Evgeny Medvedev
359fe17ac3 Merge pull request #176 from blockchain-etl/funding
Create FUNDING.yml
2019-07-25 19:24:12 +07:00
Evgeny Medvedev
e428bead6d Update FUNDING.yml 2019-07-25 19:21:38 +07:00
Evgeny Medvedev
ee5de4b465 Merge branch 'develop' into funding 2019-07-25 19:20:17 +07:00
Evgeny Medvedev
ee8c68d215 Merge branch 'master' into develop 2019-07-25 19:18:42 +07:00
Evgeny Medvedev
76cdec4a5c Merge branch 'develop' into funding 2019-07-25 19:14:56 +07:00
Evgeny Medvedev
7d9892de85 Merge pull request #184 from blockchain-etl/feature/item_id
Add possibility to specify multiple provider uris for streaming
2019-07-25 19:09:20 +07:00
Evgeny Medvedev
faffca21ef Create FUNDING.yml 2019-06-13 16:41:18 +07:00
Evgeny Medvedev
a74ab02563 Merge pull request #170 from blockchain-etl/bug/153
Fix https://github.com/blockchain-etl/ethereum-etl/issues/153
2019-06-05 19:02:29 +07:00
Evgeny Medvedev
8daa06d007 Rename constant in batch_work_executor.py 2019-06-05 19:01:56 +07:00
Evgeny Medvedev
2ab3b7e9bf Fix https://github.com/blockchain-etl/ethereum-etl/issues/153 2019-06-05 18:57:26 +07:00
Evgeny Medvedev
3234f64c45 Add possibility to specify multiple provider uris for streaming 2019-05-10 19:18:37 +07:00
Evgeny Medvedev
437718083e Merge pull request #172 from blockchain-etl/feature/item_id
Add item_id to stream output
2019-05-08 20:20:01 +07:00
Evgeny Medvedev
0f28aee915 Bump version 2019-05-08 20:14:48 +07:00
Evgeny Medvedev
5e311b87da Fix google_pubsub_item_exporter.py 2019-05-08 20:12:16 +07:00
Evgeny Medvedev
fdea8ca36e Add item_id to streamer messages 2019-05-07 21:05:08 +07:00
Evgeny Medvedev
ca8cd55223 Fix https://github.com/blockchain-etl/ethereum-etl/issues/153 2019-04-25 21:06:23 +07:00
Evgeny Medvedev
f4586b1501 Update README 2019-04-22 22:14:07 +07:00
Evgeny Medvedev
f49b46363e Update README 2019-04-22 22:11:37 +07:00
Evgeny Medvedev
40d4cf374c Fix variable name 2019-04-19 20:09:56 +07:00
Evgeny Medvedev
031c5acedf Update README 2019-04-17 18:47:18 +07:00
Evgeny Medvedev
f4718a6cb9 Added link to D5 2019-04-17 18:36:35 +07:00
Evgeny Medvedev
f35b4ecde4 Update README 2019-04-16 01:12:15 +07:00
Evgeny Medvedev
8257c4bde5 Update README 2019-04-16 00:57:48 +07:00
Evgeny Medvedev
8b21e34250 Update README 2019-04-16 00:29:04 +07:00
Evgeny Medvedev
e8ea43067a Update README 2019-04-16 00:22:35 +07:00
Evgeny Medvedev
e695c55704 Merge pull request #160 from blockchain-etl/feature/streaming
Feature/streaming
2019-04-15 20:18:02 +07:00
Evgeny Medvedev
5c941a403e Bump version 2019-04-15 20:10:57 +07:00
Evgeny Medvedev
67b9ef1728 Refactor dockerhub.md 2019-04-15 20:10:44 +07:00
Evgeny Medvedev
3d5c5a3c73 Update README 2019-04-15 20:10:32 +07:00
Evgeny Medvedev
fa81a41ae5 Refactoring 2019-04-15 19:02:30 +07:00
Evgeny Medvedev
fcd963ced6 Update README 2019-04-15 18:38:34 +07:00
Evgeny Medvedev
e69148ca9e Update README 2019-04-15 18:08:45 +07:00
Evgeny Medvedev
143f59018f Merge branch 'develop' into feature/streaming
# Conflicts:
#	tests/ethereumetl/job/mock_web3_provider.py
2019-04-13 21:57:11 +07:00
Evgeny Medvedev
b46717bf2b Revert changing test file names 2019-04-13 21:56:53 +07:00
Evgeny Medvedev
66971c82e8 Revert using traceFilter https://github.com/blockchain-etl/ethereum-etl/pull/164#issuecomment-482814833 2019-04-13 21:55:48 +07:00
Evgeny Medvedev
040a42dba5 Change block enrichment in eth_streamer_adapter.py 2019-04-13 21:18:48 +07:00
Evgeny Medvedev
2e0b59553c Fix test file names 2019-04-13 21:15:41 +07:00
Evgeny Medvedev
26bcb6c9d8 Merge branch 'develop' into feature/streaming
# Conflicts:
#	tests/ethereumetl/job/mock_web3_provider.py
2019-04-13 21:10:14 +07:00
Evgeny Medvedev
e82618d1c2 Change default value for --batch-size in export_traces.py 2019-04-13 21:09:28 +07:00
Evgeny Medvedev
e6c055c3fa Merge pull request #164 from t2y/use-trace-filter
Use traceFilter instead of traceBlock
2019-04-13 21:07:08 +07:00
Evgeny Medvedev
925471b064 Change default value for block_timestamp in transaction_mapper.py 2019-04-13 21:02:52 +07:00
Evgeny Medvedev
af72640c37 Merge pull request #163 from t2y/add-block-timestamp-to-transaction
Add block timestamp to transactions.csv
2019-04-13 20:55:29 +07:00
Tetsuya Morimoto
a44637f430 change block_timestamp column position to last column to minimize breaking compatibility 2019-04-13 19:14:48 +09:00
Tetsuya Morimoto
a446b55453 add block timestamp to transactions.csv 2019-04-12 20:48:15 +09:00
Evgeny Medvedev
9072abf55d Fix filename capitalization 2 2019-04-09 22:35:18 +07:00
Evgeny Medvedev
c6118be5a5 Fix filename capitalization 1 2019-04-09 22:33:18 +07:00
Evgeny Medvedev
4ed17d4980 Refactor mock file naming 2019-04-09 22:25:22 +07:00
Evgeny Medvedev
1bf2553aed Fix tests 2019-04-09 21:58:18 +07:00
Evgeny Medvedev
04b34c5dd5 Add link to stackoverflow question 2019-04-09 19:13:00 +07:00
Evgeny Medvedev
9614aeba7f Fix exception when only log specified for -e option 2019-04-09 14:55:34 +07:00
Tetsuya Morimoto
eba4e4e58e applied a reverse patch from 0b3f4d6 since it seems paritytech/parity-ethereum/issues/9822 was fixed 2019-04-09 08:59:54 +09:00
Evgeny Medvedev
c5d155b617 Fix trace status calculation 2019-04-07 21:58:18 +07:00
Evgeny Medvedev
418b7a83d3 Fix timeout error handling 2019-04-07 12:04:46 +07:00
Evgeny Medvedev
4fccd2c181 Fix trace status 2019-04-07 12:04:25 +07:00
Evgeny Medvedev
f07752907a Add extract_tokens command 2019-04-06 20:32:26 +07:00
Evgeny Medvedev
140af3e649 Fix csv max field size in extract_contracts 2019-04-06 15:09:19 +07:00
Evgeny Medvedev
c9fa2a1873 Add pid file to streamer 2019-04-05 14:20:32 +07:00
Evgeny Medvedev
7214d771b9 Increase timeout 2019-04-02 18:08:30 +07:00
Evgeny Medvedev
a2a48f9642 Fix timeout in pubsub exporter 2019-04-02 17:58:08 +07:00
Evgeny Medvedev
ad8fda002e Merge branch 'develop' into feature/streaming 2019-04-02 14:02:55 +07:00
Evgeny Medvedev
99803a772e Disable slow tests in tox 2019-04-01 17:31:29 +07:00
Evgeny Medvedev
1defa289e5 Use comma-separated list for --entity-types option 2019-04-01 14:08:06 +07:00
Evgeny Medvedev
7f725182aa Merge pull request #161 from SteveVitali/patch-1
Update export_all.sh with ethereumetl commands
2019-04-01 14:06:40 +07:00
Steven Vitali
7afe6093b0 Update export_all.sh with ethereumetl commands 2019-04-01 02:58:04 -04:00
Evgeny Medvedev
4465222622 Refactor blockchainetl package 2019-03-30 15:20:26 +07:00
Evgeny Medvedev
2f8d901829 Refactor streamer 2019-03-30 15:12:34 +07:00
Evgeny Medvedev
e27b5c28fd Fix the tests 2019-03-28 17:35:01 +07:00
Evgeny Medvedev
47bd5957d4 Fix tests 2019-03-28 01:12:52 +07:00
Evgeny Medvedev
edc3211544 Fix extract_contracts job 2019-03-28 01:07:45 +07:00
Evgeny Medvedev
a9ee19f871 Update README 2019-03-27 23:33:35 +07:00
Evgeny Medvedev
c5ea25a200 Add timeout for sync cycle 2019-03-27 23:20:15 +07:00
Evgeny Medvedev
81033022b9 Update pubsub exporter 2019-03-27 22:21:28 +07:00
Evgeny Medvedev
ac60502f72 Configure logging 2019-03-27 22:07:16 +07:00
Evgeny Medvedev
9dfff1261d Add extract_contracts command 2019-03-27 21:23:21 +07:00
Evgeny Medvedev
69cc8a70c0 Add trace status calculation 2019-03-27 21:11:54 +07:00
Evgeny Medvedev
ba60c906f5 Add tests for streaming traces 2019-03-27 16:00:28 +07:00
Evgeny Medvedev
751f9b57ac Add entity types 2019-03-27 13:36:46 +07:00
Evgeny Medvedev
a9672ac9c1 Refactor Streamer 2019-03-26 22:09:05 +07:00
Evgeny Medvedev
ea6d0e87da Add streaming tests 2019-03-26 18:05:48 +07:00
Evgeny Medvedev
22e6795789 Remove unused file 2019-03-26 14:33:37 +07:00
Evgeny Medvedev
302fbc9947 Update dependencies versions 2019-03-26 14:30:46 +07:00
Evgeny Medvedev
3483d77aa4 Merge branch 'develop' into feature/streaming 2019-03-26 13:48:21 +07:00
Evgeny Medvedev
871af57840 Update README 2019-03-22 00:35:51 +07:00
Evgeny Medvedev
c76d25bf3f Update README 2019-03-12 21:34:35 +07:00
Evgeny Medvedev
2c3ece7010 Merge pull request #158 from blockchain-etl/develop
Updates to README, fix dependencies conflict, add timeout to export_traces, refactor file utils
2019-03-03 14:54:26 +07:00
Evgeny Medvedev
d63713ece1 Update Docker image tag for streaming 2019-02-18 17:45:59 +07:00
Evgeny Medvedev
ed2466d16d Update Docker image tag for streaming 2019-02-18 17:45:09 +07:00
Evgeny Medvedev
aab657da9b Add comments 2019-02-15 21:55:04 +07:00
Evgeny Medvedev
79b9a46bae Remove unused file 2019-02-15 18:03:28 +07:00
Evgeny Medvedev
cac7305f53 Refactor streaming 2019-02-15 17:10:06 +07:00
Evgeny Medvedev
80cd37bdde Remove requirements.txt 2019-02-15 16:15:42 +07:00
Evgeny Medvedev
ff4218c0b8 Merge branch 'develop' into feature/streaming 2019-02-15 16:15:15 +07:00
Evgeny Medvedev
f50cc7253b Merge branch 'master' into feature/streaming
# Conflicts:
#	.dockerignore
#	Dockerfile
#	requirements.txt
2019-02-15 16:11:22 +07:00
medvedev1088
e30e58f032 Merge pull request #144 from blockchain-etl/develop
Ethereum Classic Support, Python 3.5 support, Bug fixes
2019-01-19 01:00:53 +07:00
Evgeny Medvedev
061f131919 Retry requests when node is not synced 2018-11-18 23:43:23 +07:00
medvedev1088
eb69307ddb Merge pull request #128 from blockchain-etl/develop
Traces
2018-11-14 13:10:12 +07:00
Evgeny Medvedev
10e95f19d0 Update .dockerignore 2018-10-14 20:38:59 +07:00
Evgeny Medvedev
da68fe948b Upload last_synced_block.txt 2018-10-10 22:32:20 +07:00
Evgeny Medvedev
cc3ed86f3b Download last_synced_block_file.txt from GCS bucket 2018-10-10 21:26:25 +07:00
Evgeny Medvedev
60017a5abe Add initialization with start block 2018-10-10 20:22:52 +07:00
Evgeny Medvedev
8cc869694d Update kube.yml 2018-10-10 20:22:39 +07:00
Evgeny Medvedev
3fbf70fb4f Add type when joining 2018-09-28 13:12:04 +07:00
Evgeny Medvedev
f7e7e55441 Fix if condition 2018-09-28 00:05:21 +07:00
Evgeny Medvedev
d677d442bd Add enrichment to streaming.py 2018-09-27 23:49:36 +07:00
Evgeny Medvedev
7a47d93d9e Add docker configs 2018-09-27 16:54:08 +07:00
Evgeny Medvedev
e102f76631 Add pubsub_publish_test.py 2018-09-27 16:54:01 +07:00
Evgeny Medvedev
9bd9d4347b Improve logging 2018-09-13 00:15:59 +07:00
Evgeny Medvedev
54494aef6c Optimize publishing to PubSub 2018-09-13 00:13:17 +07:00
Evgeny Medvedev
c4c3ccc79a Add streaming with Google PubSub 2018-09-12 23:50:49 +07:00
165 changed files with 2843 additions and 227 deletions

View File

@@ -1,3 +1,4 @@
.*
last_synced_block.txt
output
pid.txt
output

4
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,4 @@
# These are supported funding model platforms
custom: https://gitcoin.co/grants/233/ethereumetl

15
Dockerfile_with_streaming Normal file
View File

@@ -0,0 +1,15 @@
FROM python:3.6
MAINTAINER Evgeny Medvedev <evge.medvedev@gmail.com>
ENV PROJECT_DIR=ethereum-etl
RUN mkdir /$PROJECT_DIR
WORKDIR /$PROJECT_DIR
COPY . .
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming]
# Add Tini
ENV TINI_VERSION v0.18.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini
ENTRYPOINT ["/tini", "--", "python", "ethereumetl"]

122
README.md
View File

@@ -1,8 +1,9 @@
# Ethereum ETL
[![Join the chat at https://gitter.im/ethereum-eth](https://badges.gitter.im/ethereum-etl.svg)](https://gitter.im/ethereum-etl/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Build Status](https://travis-ci.org/blockchain-etl/ethereum-etl.png)](https://travis-ci.org/blockchain-etl/ethereum-etl)
[Join Telegram Group](https://t.me/joinchat/GsMpbA3mv1OJ6YMp3T5ORQ)
[![Join the chat at https://gitter.im/ethereum-eth](https://badges.gitter.im/ethereum-etl.svg)](https://gitter.im/ethereum-etl/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Telegram](https://img.shields.io/badge/telegram-join%20chat-blue.svg)](https://t.me/joinchat/GsMpbA3mv1OJ6YMp3T5ORQ)
[![Discord](https://img.shields.io/badge/discord-join%20chat-blue.svg)](https://discord.gg/wukrezR)
Install Ethereum ETL:
@@ -24,20 +25,6 @@ Export ERC20 and ERC721 transfers ([Schema](#token_transferscsv), [Reference](#e
--provider-uri file://$HOME/Library/Ethereum/geth.ipc --output token_transfers.csv
```
Export receipts and logs ([Schema](#receiptscsv), [Reference](#export_receipts_and_logs)):
```bash
> ethereumetl export_receipts_and_logs --transaction-hashes transaction_hashes.txt \
--provider-uri https://mainnet.infura.io --receipts-output receipts.csv --logs-output logs.csv
```
Export ERC20 and ERC721 token details ([Schema](#tokenscsv), [Reference](#export_tokens)):
```bash
> ethereumetl export_tokens --token-addresses token_addresses.csv \
--provider-uri https://mainnet.infura.io --output tokens.csv
```
Export traces ([Schema](#tracescsv), [Reference](#export_traces)):
```bash
@@ -45,6 +32,17 @@ Export traces ([Schema](#tracescsv), [Reference](#export_traces)):
--provider-uri file://$HOME/Library/Ethereum/parity.ipc --output traces.csv
```
---
Stream blocks, transactions, logs, token_transfers continually to console ([Reference](#stream)):
```bash
> pip3 install ethereum-etl[streaming]
> ethereumetl stream --start-block 500000 -e block,transaction,log,token_transfer --log-file log.txt
```
Find other commands [here](#command-reference).
For the latest version, check out the repo and call
```bash
> pip3 install -e .
@@ -71,8 +69,11 @@ For the latest version, check out the repo and call
- [Querying in Amazon Athena](#querying-in-amazon-athena)
- [Querying in Google BigQuery](#querying-in-google-bigquery)
- [Public Dataset](#public-dataset)
- [Useful Queries](#useful-queries)
- [How to Query Balances for all Ethereum Addresses](#how-to-query-balances-for-all-ethereum-addresses)
- [Building Token Recommender in Google Cloud Platform](#building-token-recommender-in-google-cloud-platform)
- [Querying in Kaggle](#querying-in-kaggle)
- [Blockchain ETL in Media](#blockchain-etl-in-media)
## Schema
@@ -115,6 +116,7 @@ value | numeric |
gas | bigint |
gas_price | bigint |
input | hex_string |
block_timestamp | bigint |
### token_transfers.csv
@@ -164,6 +166,7 @@ bytecode | hex_string |
function_sighashes | string |
is_erc20 | boolean |
is_erc721 | boolean |
block_number | bigint |
### tokens.csv
@@ -195,6 +198,7 @@ gas_used | bigint |
subtraces | bigint |
trace_address | string |
error | string |
status | bigint |
You can find column descriptions in [https://github.com/medvedev1088/ethereum-etl-airflow](https://github.com/medvedev1088/ethereum-etl-airflow/tree/master/dags/resources/stages/raw/schemas)
@@ -203,10 +207,9 @@ Note: for the `address` type all hex characters are lower-cased.
## LIMITATIONS
- `contracts.csv` and `tokens.csv` files dont include contracts created by message calls (a.k.a. internal transactions).
We are working on adding support for those.
- In case the contract is a proxy, which forwards all calls to a delegate, interface detection doesnt work,
which means `is_erc20` and `is_erc721` will always be false for proxy contracts.
which means `is_erc20` and `is_erc721` will always be false for proxy contracts and they will be missing in the `tokens`
table.
- The metadata methods (`symbol`, `name`, `decimals`, `total_supply`) for ERC20 are optional, so around 10% of the
contracts are missing this data. Also some contracts (EOS) implement these methods but with wrong return type,
so the metadata columns are missing in this case as well.
@@ -220,6 +223,10 @@ will have `0` or `1` in the `decimals` column in the CSVs.
## Exporting the Blockchain
If you'd like to have the blockchain data platform
set up and hosted for you in AWS or GCP, get in touch with us
[here](https://d5ai.typeform.com/to/cmOoLe).
1. Install python 3.5.3+ https://www.python.org/downloads/
1. You can use Infura if you don't need ERC20 transfers (Infura doesn't support eth_getFilterLogs JSON RPC method).
@@ -289,6 +296,15 @@ Read this article for details https://medium.com/@medvedev1088/how-to-export-the
> docker run -v $HOME/output:/ethereum-etl/output ethereum-etl:latest export_all -s 2018-01-01 -e 2018-01-01 -p https://mainnet.infura.io
```
1. Run streaming to console or Pub/Sub
```bash
> docker build -t ethereum-etl:latest-streaming -f Dockerfile_with_streaming .
> echo "Stream to console"
> docker run ethereum-etl:latest-streaming stream --start-block 500000 --log-file log.txt
> echo "Stream to Pub/Sub"
> docker run -v /path_to_credentials_file/:/ethereum-etl/ --env GOOGLE_APPLICATION_CREDENTIALS=/ethereum-etl/credentials_file.json ethereum-etl:latest-streaming stream --start-block 500000 --output projects/<your-project>/topics/crypto_ethereum
```
### Command Reference
- [export_blocks_and_transactions](#export_blocks_and_transactions)
@@ -302,6 +318,7 @@ Read this article for details https://medium.com/@medvedev1088/how-to-export-the
- [extract_geth_traces](#extract_geth_traces)
- [get_block_range_for_date](#get_block_range_for_date)
- [get_keccak_hash](#get_keccak_hash)
- [stream](#stream)
All the commands accept `-h` parameter for help, e.g.:
@@ -342,6 +359,8 @@ Omit `--blocks-output` or `--transactions-output` options if you want to export
You can tune `--batch-size`, `--max-workers` for performance.
[Blocks and transactions schema](#blockscsv).
#### export_token_transfers
The API used in this command is not supported by Infura, so you will need a local node.
@@ -362,6 +381,8 @@ Include `--tokens <token1> --tokens <token2>` to filter only certain tokens, e.g
You can tune `--batch-size`, `--max-workers` for performance.
[Token transfers schema](#token_transferscsv).
#### export_receipts_and_logs
First extract transaction hashes from `transactions.csv`
@@ -385,6 +406,8 @@ You can tune `--batch-size`, `--max-workers` for performance.
Upvote this feature request https://github.com/paritytech/parity/issues/9075,
it will make receipts and logs export much faster.
[Receipts and logs schema](#receiptscsv).
#### extract_token_transfers
First export receipt logs with [export_receipts_and_logs](#export_receipts_and_logs).
@@ -397,6 +420,8 @@ Then extract transfers from the logs.csv file:
You can tune `--batch-size`, `--max-workers` for performance.
[Token transfers schema](#token_transferscsv).
#### export_contracts
First extract contract addresses from `receipts.csv`
@@ -415,6 +440,8 @@ Then export contracts:
You can tune `--batch-size`, `--max-workers` for performance.
[Contracts schema](#contractscsv).
#### export_tokens
First extract token addresses from `contracts.json`
@@ -434,6 +461,8 @@ Then export ERC20 / ERC721 tokens:
You can tune `--max-workers` for performance.
[Tokens schema](#tokenscsv).
#### export_traces
Also called internal transactions.
@@ -449,6 +478,8 @@ See [this issue](https://github.com/blockchain-etl/ethereum-etl/issues/137)
You can tune `--batch-size`, `--max-workers` for performance.
[Traces schema](#tracescsv).
#### export_geth_traces
Read [Differences between geth and parity traces.csv](#differences-between-geth-and-parity-tracescsv)
@@ -486,10 +517,40 @@ You can tune `--batch-size`, `--max-workers` for performance.
0xa9059cbb2ab09eb219583f4a59a5d0623ade346d962bcd4e46b11da047c9049b
```
#### stream
```bash
> pip3 install ethereum-etl[streaming]
> ethereumetl stream --provider-uri https://mainnet.infura.io --start-block 500000
```
- This command outputs blocks, transactions, logs, token_transfers to the console by default.
- Entity types can be specified with the `-e` option,
e.g. `-e block,transaction,log,token_transfer,trace,contract,token`.
- Use `--output` option to specify the Google Pub/Sub topic where to publish blockchain data,
e.g. `projects/<your-project>/topics/bitcoin_blockchain`. Data will be pushed to
`projects/<your-project>/topics/bitcoin_blockchain.blocks`, `projects/<your-project>/topics/bitcoin_blockchain.transactions`
etc. topics.
- The command saves its state to `last_synced_block.txt` file where the last synced block number is saved periodically.
- Specify either `--start-block` or `--last-synced-block-file` option. `--last-synced-block-file` should point to the
file where the block number, from which to start streaming the blockchain data, is saved.
- Use the `--lag` option to specify how many blocks to lag behind the head of the blockchain. It's the simplest way to
handle chain reorganizations - they are less likely the further a block from the head.
- You can tune `--period-seconds`, `--batch-size`, `--block-batch-size`, `--max-workers` for performance.
- Refer to [blockchain-etl-streaming](https://github.com/blockchain-etl/blockchain-etl-streaming) for
instructions on deploying it to Kubernetes.
Stream blockchain data continually to Google Pub/Sub:
```bash
> export GOOGLE_APPLICATION_CREDENTIALS=/path_to_credentials_file.json
> ethereumetl stream --start-block 500000 --output projects/<your-project>/topics/crypto_ethereum
```
### Running Tests
```bash
> pip3 install -e .[dev]
> pip3 install -e .[dev,streaming]
> export ETHEREUM_ETL_RUN_SLOW_TESTS=True
> pytest -vv
```
@@ -542,6 +603,10 @@ CREATE DATABASE ethereumetl;
- logs: [schemas/aws/logs.sql](schemas/aws/logs.sql)
- tokens: [schemas/aws/tokens.sql](schemas/aws/tokens.sql)
### Airflow DAGs
Refer to https://github.com/medvedev1088/ethereum-etl-airflow for the instructions.
### Tables for Parquet Files
Read this article on how to convert CSVs to Parquet https://medium.com/@medvedev1088/converting-ethereum-etl-files-to-parquet-399e048ddd30
@@ -556,13 +621,15 @@ so values greater than 38 decimals will be null.
## Querying in Google BigQuery
Refer to https://github.com/medvedev1088/ethereum-etl-airflow for the instructions.
### Public Dataset
You can query the data that's updated daily in the public BigQuery dataset
https://medium.com/@medvedev1088/ethereum-blockchain-on-google-bigquery-283fb300f579
### Useful Queries
https://github.com/blockchain-etl/awesome-bigquery-views
### How to Query Balances for all Ethereum Addresses
Read this article
@@ -572,3 +639,14 @@ https://medium.com/google-cloud/how-to-query-balances-for-all-ethereum-addresses
Read this article
https://medium.com/google-cloud/building-token-recommender-in-google-cloud-platform-1be5a54698eb
### Querying in Kaggle
You can access the Ethereum dataset in Kaggle https://www.kaggle.com/bigquery/ethereum-blockchain.
## Blockchain ETL in Media
- A Technical Breakdown Of Google's New Blockchain Search Tools: https://www.forbes.com/sites/michaeldelcastillo/2019/02/05/google-launches-search-for-bitcoin-ethereum-bitcoin-cash-dash-dogecoin-ethereum-classic-litecoin-and-zcash/#394fc868c789
- Navigating Bitcoin, Ethereum, XRP: How Google Is Quietly Making Blockchains Searchable: https://www.forbes.com/sites/michaeldelcastillo/2019/02/04/navigating-bitcoin-ethereum-xrp-how-google-is-quietly-making-blockchains-searchable/?ss=crypto-blockchain#49e111da4248
- Querying the Ethereum Blockchain in Snowflake: https://community.snowflake.com/s/article/Querying-the-Ethereum-Blockchain-in-Snowflake

View File

View File

@@ -0,0 +1,35 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import itertools
# https://stackoverflow.com/a/27062830/1580227
class AtomicCounter:
def __init__(self):
self._counter = itertools.count()
# init to 0
next(self._counter)
def increment(self, increment=1):
assert increment > 0
return [next(self._counter) for _ in range(0, increment)][-1]

View File

@@ -0,0 +1,42 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# https://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072
import sys
import csv
def set_max_field_size_limit():
max_int = sys.maxsize
decrement = True
while decrement:
# decrease the maxInt value by factor 10
# as long as the OverflowError occurs.
decrement = False
try:
csv.field_size_limit(max_int)
except OverflowError:
max_int = int(max_int / 10)
decrement = True

213
blockchainetl/exporters.py Normal file
View File

@@ -0,0 +1,213 @@
# Copyright (c) Scrapy developers.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of Scrapy nor the names of its contributors may be used
# to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Item Exporters are used to export/serialize items into different formats.
"""
import csv
import io
import threading
from json import JSONEncoder
import decimal
import six
class BaseItemExporter(object):
def __init__(self, **kwargs):
self._configure(kwargs)
def _configure(self, options, dont_fail=False):
"""Configure the exporter by poping options from the ``options`` dict.
If dont_fail is set, it won't raise an exception on unexpected options
(useful for using with keyword arguments in subclasses constructors)
"""
self.encoding = options.pop('encoding', None)
self.fields_to_export = options.pop('fields_to_export', None)
self.export_empty_fields = options.pop('export_empty_fields', False)
self.indent = options.pop('indent', None)
if not dont_fail and options:
raise TypeError("Unexpected options: %s" % ', '.join(options.keys()))
def export_item(self, item):
raise NotImplementedError
def serialize_field(self, field, name, value):
serializer = field.get('serializer', lambda x: x)
return serializer(value)
def start_exporting(self):
pass
def finish_exporting(self):
pass
def _get_serialized_fields(self, item, default_value=None, include_empty=None):
"""Return the fields to export as an iterable of tuples
(name, serialized_value)
"""
if include_empty is None:
include_empty = self.export_empty_fields
if self.fields_to_export is None:
if include_empty and not isinstance(item, dict):
field_iter = six.iterkeys(item.fields)
else:
field_iter = six.iterkeys(item)
else:
if include_empty:
field_iter = self.fields_to_export
else:
field_iter = (x for x in self.fields_to_export if x in item)
for field_name in field_iter:
if field_name in item:
field = {} if isinstance(item, dict) else item.fields[field_name]
value = self.serialize_field(field, field_name, item[field_name])
else:
value = default_value
yield field_name, value
class CsvItemExporter(BaseItemExporter):
def __init__(self, file, include_headers_line=True, join_multivalued=',', **kwargs):
self._configure(kwargs, dont_fail=True)
if not self.encoding:
self.encoding = 'utf-8'
self.include_headers_line = include_headers_line
self.stream = io.TextIOWrapper(
file,
line_buffering=False,
write_through=True,
encoding=self.encoding
) if six.PY3 else file
self.csv_writer = csv.writer(self.stream, **kwargs)
self._headers_not_written = True
self._join_multivalued = join_multivalued
self._write_headers_lock = threading.Lock()
def serialize_field(self, field, name, value):
serializer = field.get('serializer', self._join_if_needed)
return serializer(value)
def _join_if_needed(self, value):
if isinstance(value, (list, tuple)):
try:
return self._join_multivalued.join(str(x) for x in value)
except TypeError: # list in value may not contain strings
pass
return value
def export_item(self, item):
# Double-checked locking (safe in Python because of GIL) https://en.wikipedia.org/wiki/Double-checked_locking
if self._headers_not_written:
with self._write_headers_lock:
if self._headers_not_written:
self._write_headers_and_set_fields_to_export(item)
self._headers_not_written = False
fields = self._get_serialized_fields(item, default_value='',
include_empty=True)
values = list(self._build_row(x for _, x in fields))
self.csv_writer.writerow(values)
def _build_row(self, values):
for s in values:
try:
yield to_native_str(s, self.encoding)
except TypeError:
yield s
def _write_headers_and_set_fields_to_export(self, item):
if self.include_headers_line:
if not self.fields_to_export:
if isinstance(item, dict):
# for dicts try using fields of the first item
self.fields_to_export = list(item.keys())
else:
# use fields declared in Item
self.fields_to_export = list(item.fields.keys())
row = list(self._build_row(self.fields_to_export))
self.csv_writer.writerow(row)
def EncodeDecimal(o):
if isinstance(o, decimal.Decimal):
return float(round(o, 8))
raise TypeError(repr(o) + " is not JSON serializable")
class JsonLinesItemExporter(BaseItemExporter):
def __init__(self, file, **kwargs):
self._configure(kwargs, dont_fail=True)
self.file = file
kwargs.setdefault('ensure_ascii', not self.encoding)
# kwargs.setdefault('default', EncodeDecimal)
self.encoder = JSONEncoder(default=EncodeDecimal, **kwargs)
def export_item(self, item):
itemdict = dict(self._get_serialized_fields(item))
data = self.encoder.encode(itemdict) + '\n'
self.file.write(to_bytes(data, self.encoding))
def to_native_str(text, encoding=None, errors='strict'):
""" Return str representation of `text`
(bytes in Python 2.x and unicode in Python 3.x). """
if six.PY2:
return to_bytes(text, encoding, errors)
else:
return to_unicode(text, encoding, errors)
def to_bytes(text, encoding=None, errors='strict'):
"""Return the binary representation of `text`. If `text`
is already a bytes object, return it as-is."""
if isinstance(text, bytes):
return text
if not isinstance(text, six.string_types):
raise TypeError('to_bytes must receive a unicode, str or bytes '
'object, got %s' % type(text).__name__)
if encoding is None:
encoding = 'utf-8'
return text.encode(encoding, errors)
def to_unicode(text, encoding=None, errors='strict'):
"""Return the unicode representation of a bytes object `text`. If `text`
is already an unicode object, return it as-is."""
if isinstance(text, six.text_type):
return text
if not isinstance(text, (bytes, six.text_type)):
raise TypeError('to_unicode must receive a bytes, str or unicode '
'object, got %s' % type(text).__name__)
if encoding is None:
encoding = 'utf-8'
return text.decode(encoding, errors)

View File

View File

View File

@@ -21,15 +21,15 @@
# SOFTWARE.
import logging
from ethereumetl.atomic_counter import AtomicCounter
from ethereumetl.exporters import CsvItemExporter, JsonLinesItemExporter
from ethereumetl.file_utils import get_file_handle, close_silently
from blockchainetl.atomic_counter import AtomicCounter
from blockchainetl.exporters import CsvItemExporter, JsonLinesItemExporter
from blockchainetl.file_utils import get_file_handle, close_silently
class CompositeItemExporter:
def __init__(self, filename_mapping, field_mapping):
def __init__(self, filename_mapping, field_mapping=None):
self.filename_mapping = filename_mapping
self.field_mapping = field_mapping
self.field_mapping = field_mapping or {}
self.file_mapping = {}
self.exporter_mapping = {}
@@ -40,7 +40,7 @@ class CompositeItemExporter:
def open(self):
for item_type, filename in self.filename_mapping.items():
file = get_file_handle(filename, binary=True)
fields = self.field_mapping[item_type]
fields = self.field_mapping.get(item_type)
self.file_mapping[item_type] = file
if str(filename).endswith('.json'):
item_exporter = JsonLinesItemExporter(file, fields_to_export=fields)
@@ -50,12 +50,16 @@ class CompositeItemExporter:
self.counter_mapping[item_type] = AtomicCounter()
def export_items(self, items):
for item in items:
self.export_item(item)
def export_item(self, item):
item_type = item.get('type')
if item_type is None:
raise ValueError('type key is not found in item {}'.format(repr(item)))
raise ValueError('"type" key is not found in item {}'.format(repr(item)))
exporter = self.exporter_mapping[item_type]
exporter = self.exporter_mapping.get(item_type)
if exporter is None:
raise ValueError('Exporter for item type {} not found'.format(item_type))
exporter.export_item(item)

View File

@@ -0,0 +1,38 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
class ConsoleItemExporter:
def open(self):
pass
def export_items(self, items):
for item in items:
self.export_item(item)
def export_item(self, item):
print(json.dumps(item))
def close(self):
pass

View File

@@ -0,0 +1,94 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import logging
from google.cloud import pubsub_v1
from timeout_decorator import timeout_decorator
class GooglePubSubItemExporter:
def __init__(self, item_type_to_topic_mapping, message_attributes=('item_id',)):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.publisher = create_publisher()
self.message_attributes = message_attributes
def open(self):
pass
def export_items(self, items):
try:
self._export_items_with_timeout(items)
except timeout_decorator.TimeoutError as e:
# A bug in PubSub publisher that makes it stalled after running for some time.
# Exception in thread Thread-CommitBatchPublisher:
# details = "channel is in state TRANSIENT_FAILURE"
# https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606
logging.info('Recreating Pub/Sub publisher.')
self.publisher = create_publisher()
raise e
@timeout_decorator.timeout(300)
def _export_items_with_timeout(self, items):
futures = []
for item in items:
message_future = self.export_item(item)
futures.append(message_future)
for future in futures:
# result() blocks until the message is published.
future.result()
def export_item(self, item):
item_type = item.get('type')
if item_type is not None and item_type in self.item_type_to_topic_mapping:
topic_path = self.item_type_to_topic_mapping.get(item_type)
data = json.dumps(item).encode('utf-8')
message_future = self.publisher.publish(topic_path, data=data, **self.get_message_attributes(item))
return message_future
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))
def get_message_attributes(self, item):
attributes = {}
for attr_name in self.message_attributes:
if item.get(attr_name) is not None:
attributes[attr_name] = item.get(attr_name)
return attributes
def close(self):
pass
def create_publisher():
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024 * 5, # 5 kilobytes
max_latency=1, # 1 second
max_messages=1000,
)
return pubsub_v1.PublisherClient(batch_settings)

View File

@@ -0,0 +1,44 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
class InMemoryItemExporter:
def __init__(self, item_types):
self.item_types = item_types
self.items = {}
def open(self):
for item_type in self.item_types:
self.items[item_type] = []
def export_item(self, item):
item_type = item.get('type', None)
if item_type is None:
raise ValueError('type key is not found in item {}'.format(repr(item)))
self.items[item_type].append(item)
def close(self):
pass
def get_items(self, item_type):
return self.items[item_type]

View File

@@ -0,0 +1,9 @@
import logging
def logging_basic_config(filename=None):
format = '%(asctime)s - %(name)s [%(levelname)s] - %(message)s'
if filename is not None:
logging.basicConfig(level=logging.INFO, format=format, filename=filename)
else:
logging.basicConfig(level=logging.INFO, format=format)

View File

@@ -0,0 +1,23 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

View File

@@ -0,0 +1,139 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging
import os
import time
from blockchainetl.streaming.streamer_adapter_stub import StreamerAdapterStub
from blockchainetl.file_utils import smart_open
class Streamer:
def __init__(
self,
blockchain_streamer_adapter=StreamerAdapterStub(),
last_synced_block_file='last_synced_block.txt',
lag=0,
start_block=None,
end_block=None,
period_seconds=10,
block_batch_size=10,
retry_errors=True,
pid_file=None):
self.blockchain_streamer_adapter = blockchain_streamer_adapter
self.last_synced_block_file = last_synced_block_file
self.lag = lag
self.start_block = start_block
self.end_block = end_block
self.period_seconds = period_seconds
self.block_batch_size = block_batch_size
self.retry_errors = retry_errors
self.pid_file = pid_file
if self.start_block is not None or not os.path.isfile(self.last_synced_block_file):
init_last_synced_block_file((self.start_block or 0) - 1, self.last_synced_block_file)
self.last_synced_block = read_last_synced_block(self.last_synced_block_file)
def stream(self):
try:
if self.pid_file is not None:
logging.info('Creating pid file {}'.format(self.pid_file))
write_to_file(self.pid_file, str(os.getpid()))
self.blockchain_streamer_adapter.open()
self._do_stream()
finally:
self.blockchain_streamer_adapter.close()
if self.pid_file is not None:
logging.info('Deleting pid file {}'.format(self.pid_file))
delete_file(self.pid_file)
def _do_stream(self):
while True and (self.end_block is None or self.last_synced_block < self.end_block):
synced_blocks = 0
try:
synced_blocks = self._sync_cycle()
except Exception as e:
# https://stackoverflow.com/a/4992124/1580227
logging.exception('An exception occurred while syncing block data.')
if not self.retry_errors:
raise e
if synced_blocks <= 0:
logging.info('Nothing to sync. Sleeping for {} seconds...'.format(self.period_seconds))
time.sleep(self.period_seconds)
def _sync_cycle(self):
current_block = self.blockchain_streamer_adapter.get_current_block_number()
target_block = self._calculate_target_block(current_block, self.last_synced_block)
blocks_to_sync = max(target_block - self.last_synced_block, 0)
logging.info('Current block {}, target block {}, last synced block {}, blocks to sync {}'.format(
current_block, target_block, self.last_synced_block, blocks_to_sync))
if blocks_to_sync != 0:
self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block)
logging.info('Writing last synced block {}'.format(target_block))
write_last_synced_block(self.last_synced_block_file, target_block)
self.last_synced_block = target_block
return blocks_to_sync
def _calculate_target_block(self, current_block, last_synced_block):
target_block = current_block - self.lag
target_block = min(target_block, last_synced_block + self.block_batch_size)
target_block = min(target_block, self.end_block) if self.end_block is not None else target_block
return target_block
def delete_file(file):
try:
os.remove(file)
except OSError:
pass
def write_last_synced_block(file, last_synced_block):
write_to_file(file, str(last_synced_block) + '\n')
def init_last_synced_block_file(start_block, last_synced_block_file):
if os.path.isfile(last_synced_block_file):
raise ValueError(
'{} should not exist if --start-block option is specified. '
'Either remove the {} file or the --start-block option.'
.format(last_synced_block_file, last_synced_block_file))
write_last_synced_block(last_synced_block_file, start_block)
def read_last_synced_block(file):
with smart_open(file, 'r') as last_synced_block_file:
return int(last_synced_block_file.read())
def write_to_file(file, content):
with smart_open(file, 'w') as file_handle:
file_handle.write(content)

View File

@@ -0,0 +1,13 @@
class StreamerAdapterStub:
def open(self):
pass
def get_current_block_number(self):
return 0
def export_all(self, start_block, end_block):
pass
def close(self):
pass

View File

@@ -0,0 +1,38 @@
import logging
import signal
import sys
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.logging_utils import logging_basic_config
def get_item_exporter(output):
if output is not None:
from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter
item_exporter = GooglePubSubItemExporter(item_type_to_topic_mapping={
'block': output + '.blocks',
'transaction': output + '.transactions',
'log': output + '.logs',
'token_transfer': output + '.token_transfers',
'trace': output + '.traces',
'contract': output + '.contracts',
'token': output + '.tokens',
})
else:
item_exporter = ConsoleItemExporter()
return item_exporter
def configure_signals():
def sigterm_handler(_signo, _stack_frame):
# Raises SystemExit(0):
sys.exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
def configure_logging(filename):
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging_basic_config(filename=filename)

11
dockerhub.md Normal file
View File

@@ -0,0 +1,11 @@
# Uploading to Docker Hub
```bash
> ETHEREUMETL_STREAMING_VERSION=1.3.2-streaming
> docker build -t ethereum-etl:${ETHEREUMETL_STREAMING_VERSION} -f Dockerfile_with_streaming .
> docker tag ethereum-etl:${ETHEREUMETL_STREAMING_VERSION} blockchainetl/ethereum-etl:${ETHEREUMETL_STREAMING_VERSION}
> docker push blockchainetl/ethereum-etl:${ETHEREUMETL_STREAMING_VERSION}
> docker tag ethereum-etl:${ETHEREUMETL_STREAMING_VERSION} blockchainetl/ethereum-etl:latest-streaming
> docker push blockchainetl/ethereum-etl:latest-streaming
```

View File

@@ -29,18 +29,21 @@ from ethereumetl.cli.export_receipts_and_logs import export_receipts_and_logs
from ethereumetl.cli.export_token_transfers import export_token_transfers
from ethereumetl.cli.export_tokens import export_tokens
from ethereumetl.cli.export_traces import export_traces
from ethereumetl.cli.extract_contracts import extract_contracts
from ethereumetl.cli.extract_csv_column import extract_csv_column
from ethereumetl.cli.extract_field import extract_field
from ethereumetl.cli.extract_geth_traces import extract_geth_traces
from ethereumetl.cli.extract_token_transfers import extract_token_transfers
from ethereumetl.cli.extract_tokens import extract_tokens
from ethereumetl.cli.filter_items import filter_items
from ethereumetl.cli.get_block_range_for_date import get_block_range_for_date
from ethereumetl.cli.get_block_range_for_timestamps import get_block_range_for_timestamps
from ethereumetl.cli.get_keccak_hash import get_keccak_hash
from ethereumetl.cli.stream import stream
@click.group()
@click.version_option(version='1.2.4')
@click.version_option(version='1.3.1')
@click.pass_context
def cli(ctx):
pass
@@ -57,6 +60,11 @@ cli.add_command(export_tokens, "export_tokens")
cli.add_command(export_traces, "export_traces")
cli.add_command(export_geth_traces, "export_geth_traces")
cli.add_command(extract_geth_traces, "extract_geth_traces")
cli.add_command(extract_contracts, "extract_contracts")
cli.add_command(extract_tokens, "extract_tokens")
# streaming
cli.add_command(stream, "stream")
# utils
cli.add_command(get_block_range_for_date, "get_block_range_for_date")

View File

@@ -25,6 +25,8 @@ import click
import re
from datetime import datetime, timedelta
from blockchainetl.logging_utils import logging_basic_config
from web3 import Web3
from ethereumetl.jobs.export_all_common import export_all_common
@@ -32,6 +34,8 @@ from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.service.eth_service import EthService
from ethereumetl.utils import check_classic_provider_uri
logging_basic_config()
def is_date_range(start, end):
"""Checks for YYYY-MM-DD date format."""

View File

@@ -25,7 +25,7 @@ import click
from ethereumetl.jobs.export_blocks_job import ExportBlocksJob
from ethereumetl.jobs.exporters.blocks_and_transactions_item_exporter import blocks_and_transactions_item_exporter
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.utils import check_classic_provider_uri

View File

@@ -23,10 +23,10 @@
import click
from ethereumetl.file_utils import smart_open
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.export_contracts_job import ExportContractsJob
from ethereumetl.jobs.exporters.contracts_item_exporter import contracts_item_exporter
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.utils import check_classic_provider_uri

View File

@@ -25,7 +25,7 @@ import click
from ethereumetl.jobs.export_geth_traces_job import ExportGethTracesJob
from ethereumetl.jobs.exporters.geth_traces_item_exporter import geth_traces_item_exporter
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy

View File

@@ -23,10 +23,10 @@
import click
from ethereumetl.file_utils import smart_open
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.export_receipts_job import ExportReceiptsJob
from ethereumetl.jobs.exporters.receipts_and_logs_item_exporter import receipts_and_logs_item_exporter
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.utils import check_classic_provider_uri

View File

@@ -27,7 +27,7 @@ from web3 import Web3
from ethereumetl.jobs.export_token_transfers_job import ExportTokenTransfersJob
from ethereumetl.jobs.exporters.token_transfers_item_exporter import token_transfers_item_exporter
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy

View File

@@ -25,10 +25,10 @@ import click
from web3 import Web3
from ethereumetl.file_utils import smart_open
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.export_tokens_job import ExportTokensJob
from ethereumetl.jobs.exporters.tokens_item_exporter import tokens_item_exporter
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.utils import check_classic_provider_uri

View File

@@ -26,7 +26,7 @@ import click
from web3 import Web3
from ethereumetl.jobs.export_traces_job import ExportTracesJob
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from ethereumetl.jobs.exporters.traces_item_exporter import traces_item_exporter
@@ -37,7 +37,7 @@ logging_basic_config()
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-s', '--start-block', default=0, type=int, help='Start block')
@click.option('-e', '--end-block', required=True, type=int, help='End block')
@click.option('-b', '--batch-size', default=100, type=int, help='The number of blocks to filter at a time.')
@click.option('-b', '--batch-size', default=5, type=int, help='The number of blocks to filter at a time.')
@click.option('-o', '--output', default='-', type=str, help='The output file. If not specified stdout is used.')
@click.option('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
@click.option('-p', '--provider-uri', required=True, type=str,

View File

@@ -0,0 +1,58 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import csv
import json
import click
from blockchainetl.csv_utils import set_max_field_size_limit
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.exporters.contracts_item_exporter import contracts_item_exporter
from ethereumetl.jobs.extract_contracts_job import ExtractContractsJob
from blockchainetl.logging_utils import logging_basic_config
logging_basic_config()
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-t', '--traces', type=str, required=True, help='The CSV file containing traces.')
@click.option('-b', '--batch-size', default=100, type=int, help='The number of blocks to filter at a time.')
@click.option('-o', '--output', default='-', type=str, help='The output file. If not specified stdout is used.')
@click.option('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
def extract_contracts(traces, batch_size, output, max_workers):
"""Extracts contracts from traces file."""
set_max_field_size_limit()
with smart_open(traces, 'r') as traces_file:
if traces.endswith('.json'):
traces_iterable = (json.loads(line) for line in traces_file)
else:
traces_iterable = csv.DictReader(traces_file)
job = ExtractContractsJob(
traces_iterable=traces_iterable,
batch_size=batch_size,
max_workers=max_workers,
item_exporter=contracts_item_exporter(output))
job.run()

View File

@@ -25,7 +25,7 @@ import click
import csv
from ethereumetl.csv_utils import set_max_field_size_limit
from ethereumetl.file_utils import smart_open
from blockchainetl.file_utils import smart_open
@click.command(context_settings=dict(help_option_names=['-h', '--help']))

View File

@@ -24,10 +24,10 @@ import json
import click
from ethereumetl.file_utils import smart_open
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.exporters.traces_item_exporter import traces_item_exporter
from ethereumetl.jobs.extract_geth_traces_job import ExtractGethTracesJob
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
logging_basic_config()

View File

@@ -25,10 +25,10 @@ import click
import csv
import json
from ethereumetl.file_utils import smart_open
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.exporters.token_transfers_item_exporter import token_transfers_item_exporter
from ethereumetl.jobs.extract_token_transfers_job import ExtractTokenTransfersJob
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.logging_utils import logging_basic_config
logging_basic_config()

View File

@@ -0,0 +1,63 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import csv
import json
import click
from blockchainetl.csv_utils import set_max_field_size_limit
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.exporters.tokens_item_exporter import tokens_item_exporter
from ethereumetl.jobs.extract_tokens_job import ExtractTokensJob
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from web3 import Web3
logging_basic_config()
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-c', '--contracts', type=str, required=True, help='The JSON file containing contracts.')
@click.option('-p', '--provider-uri', default='https://mainnet.infura.io', type=str,
help='The URI of the web3 provider e.g. '
'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io')
@click.option('-o', '--output', default='-', type=str, help='The output file. If not specified stdout is used.')
@click.option('-w', '--max-workers', default=5, type=int, help='The maximum number of workers.')
def extract_tokens(contracts, provider_uri, output, max_workers):
"""Extracts tokens from contracts file."""
set_max_field_size_limit()
with smart_open(contracts, 'r') as contracts_file:
if contracts.endswith('.json'):
contracts_iterable = (json.loads(line) for line in contracts_file)
else:
contracts_iterable = csv.DictReader(contracts_file)
job = ExtractTokensJob(
contracts_iterable=contracts_iterable,
web3=ThreadLocalProxy(lambda: Web3(get_provider_from_uri(provider_uri))),
max_workers=max_workers,
item_exporter=tokens_item_exporter(output))
job.run()

View File

@@ -26,8 +26,8 @@ import click
from datetime import datetime
from web3 import Web3
from ethereumetl.file_utils import smart_open
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.file_utils import smart_open
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.service.eth_service import EthService
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.utils import check_classic_provider_uri

View File

@@ -25,8 +25,8 @@ import click
from web3 import Web3
from ethereumetl.file_utils import smart_open
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.file_utils import smart_open
from blockchainetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.service.eth_service import EthService
from ethereumetl.utils import check_classic_provider_uri

View File

@@ -25,8 +25,8 @@ import click
from eth_utils import keccak
from ethereumetl.file_utils import smart_open
from ethereumetl.logging_utils import logging_basic_config
from blockchainetl.file_utils import smart_open
from blockchainetl.logging_utils import logging_basic_config
@click.command(context_settings=dict(help_option_names=['-h', '--help']))

100
ethereumetl/cli/stream.py Normal file
View File

@@ -0,0 +1,100 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging
import random
import click
from blockchainetl.streaming.streaming_utils import configure_signals, configure_logging
from ethereumetl.enumeration.entity_type import EntityType
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.option('-l', '--last-synced-block-file', default='last_synced_block.txt', type=str, help='')
@click.option('--lag', default=0, type=int, help='The number of blocks to lag behind the network.')
@click.option('-p', '--provider-uri', default='https://mainnet.infura.io', type=str,
help='The URI of the web3 provider e.g. '
'file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io')
@click.option('-o', '--output', type=str,
help='Google PubSub topic path e.g. projects/your-project/topics/ethereum_blockchain. '
'If not specified will print to console')
@click.option('-s', '--start-block', default=None, type=int, help='Start block')
@click.option('-e', '--entity-types', default=','.join(EntityType.ALL_FOR_INFURA), type=str,
help='The list of entity types to export.')
@click.option('--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs')
@click.option('-b', '--batch-size', default=10, type=int, help='How many blocks to batch in single request')
@click.option('-B', '--block-batch-size', default=1, type=int, help='How many blocks to batch in single sync round')
@click.option('-w', '--max-workers', default=5, type=int, help='The number of workers')
@click.option('--log-file', default=None, type=str, help='Log file')
@click.option('--pid-file', default=None, type=str, help='pid file')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
configure_signals()
entity_types = parse_entity_types(entity_types)
from blockchainetl.streaming.streaming_utils import get_item_exporter
from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
from blockchainetl.streaming.streamer import Streamer
# TODO: Implement fallback mechanism for provider uris instead of picking randomly
provider_uri = pick_random_provider_uri(provider_uri)
logging.info('Using ' + provider_uri)
streamer_adapter = EthStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=True)),
item_exporter=get_item_exporter(output),
batch_size=batch_size,
max_workers=max_workers,
entity_types=entity_types
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
last_synced_block_file=last_synced_block_file,
lag=lag,
start_block=start_block,
period_seconds=period_seconds,
block_batch_size=block_batch_size,
pid_file=pid_file
)
streamer.stream()
def parse_entity_types(entity_types):
entity_types = [c.strip() for c in entity_types.split(',')]
# validate passed types
for entity_type in entity_types:
if entity_type not in EntityType.ALL_FOR_STREAMING:
raise click.BadOptionUsage(
'--entity-type', '{} is not an available entity type. Supply a comma separated list of types from {}'
.format(entity_type, ','.join(EntityType.ALL_FOR_STREAMING)))
return entity_types
def pick_random_provider_uri(provider_uri):
provider_uris = [uri.strip() for uri in provider_uri.split(',')]
return random.choice(provider_uris)

View File

@@ -28,3 +28,4 @@ class EthContract(object):
self.function_sighashes = []
self.is_erc20 = False
self.is_erc721 = False
self.block_number = None

View File

@@ -28,3 +28,4 @@ class EthToken(object):
self.name = None
self.decimals = None
self.total_supply = None
self.block_number = None

View File

@@ -39,3 +39,5 @@ class EthTrace(object):
self.subtraces = 0
self.trace_address = None
self.error = None
self.status = None
self.trace_id = None

View File

View File

@@ -0,0 +1,12 @@
class EntityType:
BLOCK = 'block'
TRANSACTION = 'transaction'
RECEIPT = 'receipt'
LOG = 'log'
TOKEN_TRANSFER = 'token_transfer'
TRACE = 'trace'
CONTRACT = 'contract'
TOKEN = 'token'
ALL_FOR_STREAMING = [BLOCK, TRANSACTION, LOG, TOKEN_TRANSFER, TRACE, CONTRACT, TOKEN]
ALL_FOR_INFURA = [BLOCK, TRANSACTION, LOG, TOKEN_TRANSFER]

View File

@@ -20,47 +20,93 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging
import time
from requests.exceptions import Timeout as RequestsTimeout, HTTPError, TooManyRedirects
from web3.utils.threads import Timeout as Web3Timeout
from ethereumetl.executors.bounded_executor import BoundedExecutor
from ethereumetl.executors.fail_safe_executor import FailSafeExecutor
from ethereumetl.misc.retriable_value_error import RetriableValueError
from ethereumetl.progress_logger import ProgressLogger
from ethereumetl.utils import dynamic_batch_iterator
RETRY_EXCEPTIONS = (ConnectionError, HTTPError, RequestsTimeout, TooManyRedirects, Web3Timeout, OSError)
RETRY_EXCEPTIONS = (ConnectionError, HTTPError, RequestsTimeout, TooManyRedirects, Web3Timeout, OSError,
RetriableValueError)
BATCH_CHANGE_COOLDOWN_PERIOD_SECONDS = 2 * 60
# Executes the given work in batches, reducing the batch size exponentially in case of errors.
class BatchWorkExecutor:
def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS):
def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS, max_retries=5):
self.batch_size = starting_batch_size
self.max_batch_size = starting_batch_size
self.latest_batch_size_change_time = None
self.max_workers = max_workers
# Using bounded executor prevents unlimited queue growth
# and allows monitoring in-progress futures and failing fast in case of errors.
self.executor = FailSafeExecutor(BoundedExecutor(1, self.max_workers))
self.retry_exceptions = retry_exceptions
self.max_retries = max_retries
self.progress_logger = ProgressLogger()
self.logger = logging.getLogger('BatchWorkExecutor')
def execute(self, work_iterable, work_handler, total_items=None):
self.progress_logger.start(total_items=total_items)
for batch in dynamic_batch_iterator(work_iterable, lambda: self.batch_size):
self.executor.submit(self._fail_safe_execute, work_handler, batch)
# Check race conditions
def _fail_safe_execute(self, work_handler, batch):
try:
work_handler(batch)
self._try_increase_batch_size(len(batch))
except self.retry_exceptions:
batch_size = self.batch_size
# Reduce the batch size. Subsequent batches will be 2 times smaller
if batch_size == len(batch) and batch_size > 1:
self.batch_size = int(batch_size / 2)
# For the failed batch try handling items one by one
self.logger.exception('An exception occurred while executing work_handler.')
self._try_decrease_batch_size(len(batch))
self.logger.info('The batch of size {} will be retried one item at a time.'.format(len(batch)))
for item in batch:
work_handler([item])
execute_with_retries(work_handler, [item],
max_retries=self.max_retries, retry_exceptions=self.retry_exceptions)
self.progress_logger.track(len(batch))
# Some acceptable race conditions are possible
def _try_decrease_batch_size(self, current_batch_size):
batch_size = self.batch_size
if batch_size == current_batch_size and batch_size > 1:
new_batch_size = int(current_batch_size / 2)
self.logger.info('Reducing batch size to {}.'.format(new_batch_size))
self.batch_size = new_batch_size
self.latest_batch_size_change_time = time.time()
def _try_increase_batch_size(self, current_batch_size):
if current_batch_size * 2 <= self.max_batch_size:
current_time = time.time()
latest_batch_size_change_time = self.latest_batch_size_change_time
seconds_since_last_change = current_time - latest_batch_size_change_time \
if latest_batch_size_change_time is not None else 0
if seconds_since_last_change > BATCH_CHANGE_COOLDOWN_PERIOD_SECONDS:
new_batch_size = current_batch_size * 2
self.logger.info('Increasing batch size to {}.'.format(new_batch_size))
self.batch_size = new_batch_size
self.latest_batch_size_change_time = current_time
def shutdown(self):
self.executor.shutdown()
self.progress_logger.finish()
def execute_with_retries(func, *args, max_retries=5, retry_exceptions=RETRY_EXCEPTIONS, sleep_seconds=1):
for i in range(max_retries):
try:
return func(*args)
except retry_exceptions:
logging.exception('An exception occurred while executing execute_with_retries. Retry #{}'.format(i))
if i < max_retries - 1:
logging.info('The request will be retried after {} seconds. Retry #{}'.format(sleep_seconds, i))
time.sleep(sleep_seconds)
continue
else:
raise

View File

@@ -25,13 +25,10 @@ import csv
import logging
import os
import shutil
from time import time
from web3 import Web3
from ethereumetl.csv_utils import set_max_field_size_limit
from ethereumetl.file_utils import smart_open
from blockchainetl.file_utils import smart_open
from ethereumetl.jobs.export_blocks_job import ExportBlocksJob
from ethereumetl.jobs.export_contracts_job import ExportContractsJob
from ethereumetl.jobs.export_receipts_job import ExportReceiptsJob
@@ -42,11 +39,10 @@ from ethereumetl.jobs.exporters.contracts_item_exporter import contracts_item_ex
from ethereumetl.jobs.exporters.receipts_and_logs_item_exporter import receipts_and_logs_item_exporter
from ethereumetl.jobs.exporters.token_transfers_item_exporter import token_transfers_item_exporter
from ethereumetl.jobs.exporters.tokens_item_exporter import tokens_item_exporter
from ethereumetl.logging_utils import logging_basic_config
from ethereumetl.providers.auto import get_provider_from_uri
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from web3 import Web3
logging_basic_config()
logger = logging.getLogger('export_all')

View File

@@ -24,7 +24,7 @@
import json
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.json_rpc_requests import generate_get_block_by_number_json_rpc
from ethereumetl.mappers.block_mapper import EthBlockMapper
from ethereumetl.mappers.transaction_mapper import EthTransactionMapper
@@ -72,7 +72,7 @@ class ExportBlocksJob(BaseJob):
def _export_batch(self, block_number_batch):
blocks_rpc = list(generate_get_block_by_number_json_rpc(block_number_batch, self.export_transactions))
response = self.batch_web3_provider.make_request(json.dumps(blocks_rpc))
response = self.batch_web3_provider.make_batch_request(json.dumps(blocks_rpc))
results = rpc_response_batch_to_results(response)
blocks = [self.block_mapper.json_dict_to_block(result) for result in results]

View File

@@ -24,7 +24,7 @@
import json
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.json_rpc_requests import generate_get_code_json_rpc
from ethereumetl.mappers.contract_mapper import EthContractMapper
@@ -58,7 +58,7 @@ class ExportContractsJob(BaseJob):
def _export_contracts(self, contract_addresses):
contracts_code_rpc = list(generate_get_code_json_rpc(contract_addresses))
response_batch = self.batch_web3_provider.make_request(json.dumps(contracts_code_rpc))
response_batch = self.batch_web3_provider.make_batch_request(json.dumps(contracts_code_rpc))
contracts = []
for response in response_batch:

View File

@@ -24,7 +24,7 @@ import json
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.json_rpc_requests import generate_trace_block_by_number_json_rpc
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.mappers.geth_trace_mapper import EthGethTraceMapper
from ethereumetl.utils import validate_range, rpc_response_to_result
@@ -62,7 +62,7 @@ class ExportGethTracesJob(BaseJob):
def _export_batch(self, block_number_batch):
trace_block_rpc = list(generate_trace_block_by_number_json_rpc(block_number_batch))
response = self.batch_web3_provider.make_request(json.dumps(trace_block_rpc))
response = self.batch_web3_provider.make_batch_request(json.dumps(trace_block_rpc))
for response_item in response:
block_number = response_item.get('id')

View File

@@ -23,7 +23,7 @@
import json
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.json_rpc_requests import generate_get_receipt_json_rpc
from ethereumetl.mappers.receipt_log_mapper import EthReceiptLogMapper
@@ -64,7 +64,7 @@ class ExportReceiptsJob(BaseJob):
def _export_receipts(self, transaction_hashes):
receipts_rpc = list(generate_get_receipt_json_rpc(transaction_hashes))
response = self.batch_web3_provider.make_request(json.dumps(receipts_rpc))
response = self.batch_web3_provider.make_batch_request(json.dumps(receipts_rpc))
results = rpc_response_batch_to_results(response)
receipts = [self.receipt_mapper.json_dict_to_receipt(result) for result in results]
for receipt in receipts:

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.mappers.token_transfer_mapper import EthTokenTransferMapper
from ethereumetl.mappers.receipt_log_mapper import EthReceiptLogMapper
from ethereumetl.service.token_transfer_extractor import EthTokenTransferExtractor, TRANSFER_EVENT_TOPIC

View File

@@ -22,7 +22,7 @@
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.mappers.token_mapper import EthTokenMapper
from ethereumetl.service.eth_token_service import EthTokenService
@@ -46,8 +46,9 @@ class ExportTokensJob(BaseJob):
for token_address in token_addresses:
self._export_token(token_address)
def _export_token(self, token_address):
def _export_token(self, token_address, block_number=None):
token = self.token_service.get_token(token_address)
token.block_number = block_number
token_dict = self.token_mapper.token_to_dict(token)
self.item_exporter.export_item(token_dict)

View File

@@ -21,10 +21,13 @@
# SOFTWARE.
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.mainnet_daofork_state_changes import DAOFORK_BLOCK_NUMBER
from ethereumetl.mappers.trace_mapper import EthTraceMapper
from ethereumetl.service.eth_special_trace_service import EthSpecialTraceService
from ethereumetl.service.trace_id_calculator import calculate_trace_ids
from ethereumetl.service.trace_status_calculator import calculate_trace_statuses
from ethereumetl.utils import validate_range
@@ -71,15 +74,15 @@ class ExportTracesJob(BaseJob):
assert len(block_number_batch) == 1
block_number = block_number_batch[0]
all_traces = []
if self.include_genesis_traces and 0 in block_number_batch:
genesis_traces = self.special_trace_service.get_genesis_traces()
for trace in genesis_traces:
self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace))
all_traces.extend(genesis_traces)
if self.include_daofork_traces and DAOFORK_BLOCK_NUMBER in block_number_batch:
daofork_traces = self.special_trace_service.get_daofork_traces()
for trace in daofork_traces:
self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace))
all_traces.extend(daofork_traces)
# TODO: Change to traceFilter when this issue is fixed
# https://github.com/paritytech/parity-ethereum/issues/9822
@@ -88,8 +91,13 @@ class ExportTracesJob(BaseJob):
if json_traces is None:
raise ValueError('Response from the node is None. Is the node fully synced?')
for json_trace in json_traces:
trace = self.trace_mapper.json_dict_to_trace(json_trace)
traces = [self.trace_mapper.json_dict_to_trace(json_trace) for json_trace in json_traces]
all_traces.extend(traces)
calculate_trace_statuses(all_traces)
calculate_trace_ids(all_traces)
for trace in all_traces:
self.item_exporter.export_item(self.trace_mapper.trace_to_dict(trace))
def _end(self):

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
BLOCK_FIELDS_TO_EXPORT = [
'number',
@@ -55,7 +55,8 @@ TRANSACTION_FIELDS_TO_EXPORT = [
'value',
'gas',
'gas_price',
'input'
'input',
'block_timestamp'
]

View File

@@ -21,14 +21,15 @@
# SOFTWARE.
from ethereumetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
FIELDS_TO_EXPORT = [
'address',
'bytecode',
'function_sighashes',
'is_erc20',
'is_erc721'
'is_erc721',
'block_number',
]

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
FIELDS_TO_EXPORT = [
'block_number',

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
RECEIPT_FIELDS_TO_EXPORT = [
'transaction_hash',

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
FIELDS_TO_EXPORT = [
'token_address',

View File

@@ -21,14 +21,15 @@
# SOFTWARE.
from ethereumetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
FIELDS_TO_EXPORT = [
'address',
'symbol',
'name',
'decimals',
'total_supply'
'total_supply',
'block_number'
]

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
FIELDS_TO_EXPORT = [
'block_number',
@@ -40,6 +40,8 @@ FIELDS_TO_EXPORT = [
'subtraces',
'trace_address',
'error',
'status',
'trace_id',
]

View File

@@ -0,0 +1,85 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from ethereumetl.domain.contract import EthContract
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.mappers.contract_mapper import EthContractMapper
from ethereumetl.service.eth_contract_service import EthContractService
from ethereumetl.utils import to_int_or_none
# Extract contracts
class ExtractContractsJob(BaseJob):
def __init__(
self,
traces_iterable,
batch_size,
max_workers,
item_exporter):
self.traces_iterable = traces_iterable
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.item_exporter = item_exporter
self.contract_service = EthContractService()
self.contract_mapper = EthContractMapper()
def _start(self):
self.item_exporter.open()
def _export(self):
self.batch_work_executor.execute(self.traces_iterable, self._extract_contracts)
def _extract_contracts(self, traces):
for trace in traces:
trace['status'] = to_int_or_none(trace.get('status'))
trace['block_number'] = to_int_or_none(trace.get('block_number'))
contract_creation_traces = [trace for trace in traces
if trace.get('trace_type') == 'create' and trace.get('to_address') is not None
and len(trace.get('to_address')) > 0 and trace.get('status') == 1]
contracts = []
for trace in contract_creation_traces:
contract = EthContract()
contract.address = trace.get('to_address')
bytecode = trace.get('output')
contract.bytecode = bytecode
contract.block_number = trace.get('block_number')
function_sighashes = self.contract_service.get_function_sighashes(bytecode)
contract.function_sighashes = function_sighashes
contract.is_erc20 = self.contract_service.is_erc20_contract(function_sighashes)
contract.is_erc721 = self.contract_service.is_erc721_contract(function_sighashes)
contracts.append(contract)
for contract in contracts:
self.item_exporter.export_item(self.contract_mapper.contract_to_dict(contract))
def _end(self):
self.batch_work_executor.shutdown()
self.item_exporter.close()

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.mappers.trace_mapper import EthTraceMapper
from ethereumetl.mappers.geth_trace_mapper import EthGethTraceMapper

View File

@@ -21,7 +21,7 @@
# SOFTWARE.
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.jobs.base_job import BaseJob
from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.mappers.token_transfer_mapper import EthTokenTransferMapper
from ethereumetl.mappers.receipt_log_mapper import EthReceiptLogMapper
from ethereumetl.service.token_transfer_extractor import EthTokenTransferExtractor

View File

@@ -0,0 +1,42 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from ethereumetl.jobs.export_tokens_job import ExportTokensJob
class ExtractTokensJob(ExportTokensJob):
def __init__(self, web3, item_exporter, contracts_iterable, max_workers):
super().__init__(web3, item_exporter, [], max_workers)
self.contracts_iterable = contracts_iterable
def _export(self):
self.batch_work_executor.execute(self.contracts_iterable, self._export_tokens_from_contracts)
def _export_tokens_from_contracts(self, contracts):
tokens = [contract for contract in contracts if contract.get('is_erc20') or contract.get('is_erc721')]
for token in tokens:
self._export_token(token_address=token['address'], block_number=token['block_number'])

View File

@@ -0,0 +1,77 @@
# MIT License
#
# Copyright (c) 2020 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from web3.utils.events import (
get_event_data,
)
log_entry = {
'topics': ['0x1f9c649fe47e58bb60f4e52f0d90e4c47a526c9f90c5113df842c025970b66ad',
'0xd8321837f963bcc931d1ac71557689dcf6b35ea541a11bad4907ac76f0525d37',
'0xd8321837f963bcc931d1ac71557689dcf6b35ea541a11bad4907ac76f0525d37'],
'data': '0x000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000000000005a7c42f9',
'name': None,
'logIndex': None,
'transactionIndex': None,
'transactionHash': None,
'address': None,
'blockHash': None,
'blockNumber': None,
}
# Convert hex strings to bytes
log_entry['topics'] = [bytes.fromhex(topic.replace('0x', '')) for topic in log_entry['topics']]
log_entry['data'] = bytes.fromhex(log_entry['data'].replace('0x', ''))
event_abi = {
"anonymous": False,
"inputs": [
{
"indexed": True,
"name": "hash",
"type": "bytes32"
},
{
"indexed": True,
"name": "name",
"type": "string"
},
{
"indexed": False,
"name": "value",
"type": "uint256"
},
{
"indexed": False,
"name": "registrationDate",
"type": "uint256"
}
],
"name": "HashInvalidated",
"type": "event"
}
rich_log = get_event_data(event_abi, log_entry)
print(rich_log.args)

View File

@@ -1,5 +0,0 @@
import logging
def logging_basic_config():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s [%(levelname)s] - %(message)s')

View File

@@ -55,7 +55,8 @@ class EthBlockMapper(object):
if 'transactions' in json_dict:
block.transactions = [
self.transaction_mapper.json_dict_to_transaction(tx) for tx in json_dict['transactions']
self.transaction_mapper.json_dict_to_transaction(tx, block_timestamp=block.timestamp)
for tx in json_dict['transactions']
if isinstance(tx, dict)
]

View File

@@ -40,5 +40,6 @@ class EthContractMapper(object):
'bytecode': contract.bytecode,
'function_sighashes': contract.function_sighashes,
'is_erc20': contract.is_erc20,
'is_erc721': contract.is_erc721
'is_erc721': contract.is_erc721,
'block_number': contract.block_number
}

View File

@@ -29,5 +29,6 @@ class EthTokenMapper(object):
'symbol': token.symbol,
'name': token.name,
'decimals': token.decimals,
'total_supply': token.total_supply
'total_supply': token.total_supply,
'block_number': token.block_number
}

View File

@@ -104,6 +104,7 @@ class EthTraceMapper(object):
trace.to_address = address
trace.value = value
trace.trace_type = 'genesis'
trace.status = 1
return trace
@@ -119,6 +120,7 @@ class EthTraceMapper(object):
trace.to_address = to_address
trace.value = value
trace.trace_type = 'daofork'
trace.status = 1
return trace
@@ -186,4 +188,6 @@ class EthTraceMapper(object):
'subtraces': trace.subtraces,
'trace_address': trace.trace_address,
'error': trace.error,
'status': trace.status,
'trace_id': trace.trace_id,
}

View File

@@ -26,12 +26,13 @@ from ethereumetl.utils import hex_to_dec, to_normalized_address
class EthTransactionMapper(object):
def json_dict_to_transaction(self, json_dict):
def json_dict_to_transaction(self, json_dict, **kwargs):
transaction = EthTransaction()
transaction.hash = json_dict.get('hash')
transaction.nonce = hex_to_dec(json_dict.get('nonce'))
transaction.block_hash = json_dict.get('blockHash')
transaction.block_number = hex_to_dec(json_dict.get('blockNumber'))
transaction.block_timestamp = kwargs.get('block_timestamp')
transaction.transaction_index = hex_to_dec(json_dict.get('transactionIndex'))
transaction.from_address = to_normalized_address(json_dict.get('from'))
transaction.to_address = to_normalized_address(json_dict.get('to'))
@@ -48,6 +49,7 @@ class EthTransactionMapper(object):
'nonce': transaction.nonce,
'block_hash': transaction.block_hash,
'block_number': transaction.block_number,
'block_timestamp': transaction.block_timestamp,
'transaction_index': transaction.transaction_index,
'from_address': transaction.from_address,
'to_address': transaction.to_address,

View File

@@ -0,0 +1,21 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

View File

@@ -0,0 +1,2 @@
class RetriableValueError(ValueError):
pass

View File

@@ -27,7 +27,7 @@ import json
import six
from ethereumetl.csv_utils import set_max_field_size_limit
from ethereumetl.file_utils import get_file_handle, smart_open
from blockchainetl.file_utils import get_file_handle, smart_open
@contextlib.contextmanager

View File

@@ -23,9 +23,8 @@
import json
import socket
import threading
from web3.providers.ipc import get_default_ipc_path, PersistantSocket
from web3.providers.ipc import IPCProvider
from web3.utils.threads import (
Timeout,
)
@@ -39,20 +38,10 @@ except ImportError:
# Mostly copied from web3.py/providers/ipc.py. Supports batch requests.
# Will be removed once batch feature is added to web3.py https://github.com/ethereum/web3.py/issues/832
# Also see this optimization https://github.com/ethereum/web3.py/pull/849
class BatchIPCProvider:
class BatchIPCProvider(IPCProvider):
_socket = None
def __init__(self, ipc_path=None, testnet=False, timeout=10):
if ipc_path is None:
self.ipc_path = get_default_ipc_path(testnet)
else:
self.ipc_path = ipc_path
self.timeout = timeout
self._lock = threading.Lock()
self._socket = PersistantSocket(self.ipc_path)
def make_request(self, text):
def make_batch_request(self, text):
request = text.encode('utf-8')
with self._lock, self._socket as sock:
try:

View File

@@ -29,7 +29,7 @@ from web3.utils.request import make_post_request
# Will be removed once batch feature is added to web3.py https://github.com/ethereum/web3.py/issues/832
class BatchHTTPProvider(HTTPProvider):
def make_request(self, text):
def make_batch_request(self, text):
self.logger.debug("Making request HTTP. URI: %s, Request: %s",
self.endpoint_uri, text)
request_data = text.encode('utf-8')

View File

@@ -73,7 +73,8 @@ def call_contract_function(func, ignore_errors, default_value=None):
return result
except Exception as ex:
if type(ex) in ignore_errors:
logger.exception('An exception occurred. This exception can be safely ignored.')
logger.exception('An exception occurred in function {} of contract {}. '.format(func.fn_name, func.address)
+ 'This exception can be safely ignored.')
return default_value
else:
raise ex

View File

@@ -142,3 +142,6 @@ class Point(object):
def __str__(self):
return '({},{})'.format(self.x, self.y)
def __repr__(self):
return 'Point({},{})'.format(self.x, self.y)

View File

@@ -0,0 +1,75 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from collections import defaultdict
def calculate_trace_ids(traces):
# group by block
traces_grouped_by_block = defaultdict(list)
for trace in traces:
traces_grouped_by_block[trace.block_number].append(trace)
# calculate ids for each block number
for block_traces in traces_grouped_by_block.values():
transaction_scoped_traces = [trace for trace in block_traces if trace.transaction_hash]
calculate_transaction_scoped_trace_ids(transaction_scoped_traces)
block_scoped_traces = [trace for trace in block_traces if not trace.transaction_hash]
calculate_block_scoped_trace_ids(block_scoped_traces)
return traces
def calculate_transaction_scoped_trace_ids(traces):
for trace in traces:
trace.trace_id = concat(trace.trace_type, trace.transaction_hash, trace_address_to_str(trace.trace_address))
def calculate_block_scoped_trace_ids(traces):
# group by trace_type
grouped_traces = defaultdict(list)
for trace in traces:
grouped_traces[trace.trace_type].append(trace)
# calculate ids
for type_traces in grouped_traces.values():
calculate_trace_indexes_for_single_type(type_traces)
def calculate_trace_indexes_for_single_type(traces):
sorted_traces = sorted(traces,
key=lambda trace: (trace.reward_type, trace.from_address, trace.to_address, trace.value))
for index, trace in enumerate(sorted_traces):
trace.trace_id = concat(trace.trace_type, trace.block_number, index)
def trace_address_to_str(trace_address):
if trace_address is None or len(trace_address) == 0:
return ''
return '_'.join([str(address_point) for address_point in trace_address])
def concat(*elements):
return '_'.join([str(elem) for elem in elements])

View File

@@ -0,0 +1,68 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from collections import defaultdict
def calculate_trace_statuses(traces):
# set default values
for trace in traces:
if trace.error is not None and len(trace.error) > 0:
trace.status = 0
else:
trace.status = 1
# group by transaction
grouped_transaction_traces = defaultdict(list)
for trace in traces:
if trace.transaction_hash is not None and len(trace.transaction_hash) > 0:
grouped_transaction_traces[trace.transaction_hash].append(trace)
# calculate statuses for each transaction
for transaction_traces in grouped_transaction_traces.values():
calculate_trace_statuses_for_single_transaction(transaction_traces)
return traces
def calculate_trace_statuses_for_single_transaction(all_traces):
"""O(n * log(n))"""
sorted_traces = sorted(all_traces, key=lambda trace: len(trace.trace_address or []))
indexed_traces = {trace_address_to_str(trace.trace_address): trace for trace in sorted_traces}
# if a parent trace failed the child trace set failed also. Because of the sorting order all parent trace statuses
# are calculated before child trace statuses.
for trace in sorted_traces:
if len(trace.trace_address) > 0:
parent_trace = indexed_traces.get(trace_address_to_str(trace.trace_address[:-1]))
if parent_trace is None:
raise ValueError('A parent trace for trace with trace_address {} in transaction {} is not found'
.format(trace.trace_address, trace.transaction_hash))
if parent_trace.status == 0:
trace.status = 0
def trace_address_to_str(trace_address):
if trace_address is None or len(trace_address) == 0:
return ''
return ','.join([str(address_point) for address_point in trace_address])

View File

@@ -0,0 +1,23 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

View File

@@ -0,0 +1,218 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import itertools
from collections import defaultdict
def join(left, right, join_fields, left_fields, right_fields):
left_join_field, right_join_field = join_fields
def field_list_to_dict(field_list):
result_dict = {}
for field in field_list:
if isinstance(field, tuple):
result_dict[field[0]] = field[1]
else:
result_dict[field] = field
return result_dict
left_fields_as_dict = field_list_to_dict(left_fields)
right_fields_as_dict = field_list_to_dict(right_fields)
left_map = defaultdict(list)
for item in left: left_map[item[left_join_field]].append(item)
right_map = defaultdict(list)
for item in right: right_map[item[right_join_field]].append(item)
for key in left_map.keys():
for left_item, right_item in itertools.product(left_map[key], right_map[key]):
result_item = {}
for src_field, dst_field in left_fields_as_dict.items():
result_item[dst_field] = left_item.get(src_field)
for src_field, dst_field in right_fields_as_dict.items():
result_item[dst_field] = right_item.get(src_field)
yield result_item
def enrich_transactions(transactions, receipts):
result = list(join(
transactions, receipts, ('hash', 'transaction_hash'),
left_fields=[
'type',
'hash',
'nonce',
'transaction_index',
'from_address',
'to_address',
'value',
'gas',
'gas_price',
'input',
'block_timestamp',
'block_number',
'block_hash'
],
right_fields=[
('cumulative_gas_used', 'receipt_cumulative_gas_used'),
('gas_used', 'receipt_gas_used'),
('contract_address', 'receipt_contract_address'),
('root', 'receipt_root'),
('status', 'receipt_status')
]))
if len(result) != len(transactions):
raise ValueError('The number of transactions is wrong ' + str(result))
return result
def enrich_logs(blocks, logs):
result = list(join(
logs, blocks, ('block_number', 'number'),
[
'type',
'log_index',
'transaction_hash',
'transaction_index',
'address',
'data',
'topics',
'block_number'
],
[
('timestamp', 'block_timestamp'),
('hash', 'block_hash'),
]))
if len(result) != len(logs):
raise ValueError('The number of logs is wrong ' + str(result))
return result
def enrich_token_transfers(blocks, token_transfers):
result = list(join(
token_transfers, blocks, ('block_number', 'number'),
[
'type',
'token_address',
'from_address',
'to_address',
'value',
'transaction_hash',
'log_index',
'block_number'
],
[
('timestamp', 'block_timestamp'),
('hash', 'block_hash'),
]))
if len(result) != len(token_transfers):
raise ValueError('The number of token transfers is wrong ' + str(result))
return result
def enrich_traces(blocks, traces):
result = list(join(
traces, blocks, ('block_number', 'number'),
[
'type',
'transaction_index',
'from_address',
'to_address',
'value',
'input',
'output',
'trace_type',
'call_type',
'reward_type',
'gas',
'gas_used',
'subtraces',
'trace_address',
'error',
'status',
'transaction_hash',
'block_number',
'trace_id'
],
[
('timestamp', 'block_timestamp'),
('hash', 'block_hash'),
]))
if len(result) != len(traces):
raise ValueError('The number of traces is wrong ' + str(result))
return result
def enrich_contracts(blocks, contracts):
result = list(join(
contracts, blocks, ('block_number', 'number'),
[
'type',
'address',
'bytecode',
'function_sighashes',
'is_erc20',
'is_erc721',
'block_number'
],
[
('timestamp', 'block_timestamp'),
('hash', 'block_hash'),
]))
if len(result) != len(contracts):
raise ValueError('The number of contracts is wrong ' + str(result))
return result
def enrich_tokens(blocks, tokens):
result = list(join(
tokens, blocks, ('block_number', 'number'),
[
'type',
'address',
'symbol',
'name',
'decimals',
'total_supply',
'block_number'
],
[
('timestamp', 'block_timestamp'),
('hash', 'block_hash'),
]))
if len(result) != len(tokens):
raise ValueError('The number of tokens is wrong ' + str(result))
return result

View File

@@ -0,0 +1,57 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import json
import logging
class EthItemIdCalculator:
def calculate(self, item):
if item is None or not isinstance(item, dict):
return None
item_type = item.get('type')
if item_type == 'block' and item.get('hash') is not None:
return concat(item_type, item.get('hash'))
elif item_type == 'transaction' and item.get('hash') is not None:
return concat(item_type, item.get('hash'))
elif item_type == 'log' and item.get('transaction_hash') is not None and item.get('log_index') is not None:
return concat(item_type, item.get('transaction_hash'), item.get('log_index'))
elif item_type == 'token_transfer' and item.get('transaction_hash') is not None \
and item.get('log_index') is not None:
return concat(item_type, item.get('transaction_hash'), item.get('log_index'))
elif item_type == 'trace' and item.get('trace_id') is not None:
return concat(item_type, item.get('trace_id'))
elif item_type == 'contract' and item.get('block_number') is not None and item.get('address') is not None:
return concat(item_type, item.get('block_number'), item.get('address'))
elif item_type == 'token' and item.get('block_number') is not None and item.get('address') is not None:
return concat(item_type, item.get('block_number'), item.get('address'))
logging.warning('item_id for item {} is None'.format(json.dumps(item)))
return None
def concat(*elements):
return '_'.join([str(elem) for elem in elements])

View File

@@ -0,0 +1,214 @@
import logging
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
from blockchainetl.jobs.exporters.in_memory_item_exporter import InMemoryItemExporter
from ethereumetl.enumeration.entity_type import EntityType
from ethereumetl.jobs.export_blocks_job import ExportBlocksJob
from ethereumetl.jobs.export_receipts_job import ExportReceiptsJob
from ethereumetl.jobs.export_traces_job import ExportTracesJob
from ethereumetl.jobs.extract_contracts_job import ExtractContractsJob
from ethereumetl.jobs.extract_token_transfers_job import ExtractTokenTransfersJob
from ethereumetl.jobs.extract_tokens_job import ExtractTokensJob
from ethereumetl.streaming.enrich import enrich_transactions, enrich_logs, enrich_token_transfers, enrich_traces, \
enrich_contracts, enrich_tokens
from ethereumetl.streaming.eth_item_id_calculator import EthItemIdCalculator
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from web3 import Web3
class EthStreamerAdapter:
def __init__(
self,
batch_web3_provider,
item_exporter=ConsoleItemExporter(),
batch_size=100,
max_workers=5,
entity_types=tuple(EntityType.ALL_FOR_STREAMING)):
self.batch_web3_provider = batch_web3_provider
self.item_exporter = item_exporter
self.batch_size = batch_size
self.max_workers = max_workers
self.entity_types = entity_types
self.item_id_calculator = EthItemIdCalculator()
def open(self):
self.item_exporter.open()
def get_current_block_number(self):
return int(Web3(self.batch_web3_provider).eth.getBlock("latest").number)
def export_all(self, start_block, end_block):
# Export blocks and transactions
blocks, transactions = [], []
if self._should_export(EntityType.BLOCK) or self._should_export(EntityType.TRANSACTION):
blocks, transactions = self._export_blocks_and_transactions(start_block, end_block)
# Export receipts and logs
receipts, logs = [], []
if self._should_export(EntityType.RECEIPT) or self._should_export(EntityType.LOG):
receipts, logs = self._export_receipts_and_logs(transactions)
# Extract token transfers
token_transfers = []
if self._should_export(EntityType.TOKEN_TRANSFER):
token_transfers = self._extract_token_transfers(logs)
# Export traces
traces = []
if self._should_export(EntityType.TRACE):
traces = self._export_traces(start_block, end_block)
# Export contracts
contracts = []
if self._should_export(EntityType.CONTRACT):
contracts = self._export_contracts(traces)
# Export tokens
tokens = []
if self._should_export(EntityType.TOKEN):
tokens = self._extract_tokens(contracts)
enriched_blocks = blocks \
if EntityType.BLOCK in self.entity_types else []
enriched_transactions = enrich_transactions(transactions, receipts) \
if EntityType.TRANSACTION in self.entity_types else []
enriched_logs = enrich_logs(blocks, logs) \
if EntityType.LOG in self.entity_types else []
enriched_token_transfers = enrich_token_transfers(blocks, token_transfers) \
if EntityType.TOKEN_TRANSFER in self.entity_types else []
enriched_traces = enrich_traces(blocks, traces) \
if EntityType.TRACE in self.entity_types else []
enriched_contracts = enrich_contracts(blocks, contracts) \
if EntityType.CONTRACT in self.entity_types else []
enriched_tokens = enrich_tokens(blocks, tokens) \
if EntityType.TOKEN in self.entity_types else []
logging.info('Exporting with ' + type(self.item_exporter).__name__)
all_items = enriched_blocks + \
enriched_transactions + \
enriched_logs + \
enriched_token_transfers + \
enriched_traces + \
enriched_contracts + \
enriched_tokens
self.calculate_item_ids(all_items)
self.item_exporter.export_items(all_items)
def _export_blocks_and_transactions(self, start_block, end_block):
blocks_and_transactions_item_exporter = InMemoryItemExporter(item_types=['block', 'transaction'])
blocks_and_transactions_job = ExportBlocksJob(
start_block=start_block,
end_block=end_block,
batch_size=self.batch_size,
batch_web3_provider=self.batch_web3_provider,
max_workers=self.max_workers,
item_exporter=blocks_and_transactions_item_exporter,
export_blocks=self._should_export(EntityType.BLOCK),
export_transactions=self._should_export(EntityType.TRANSACTION)
)
blocks_and_transactions_job.run()
blocks = blocks_and_transactions_item_exporter.get_items('block')
transactions = blocks_and_transactions_item_exporter.get_items('transaction')
return blocks, transactions
def _export_receipts_and_logs(self, transactions):
exporter = InMemoryItemExporter(item_types=['receipt', 'log'])
job = ExportReceiptsJob(
transaction_hashes_iterable=(transaction['hash'] for transaction in transactions),
batch_size=self.batch_size,
batch_web3_provider=self.batch_web3_provider,
max_workers=self.max_workers,
item_exporter=exporter,
export_receipts=self._should_export(EntityType.RECEIPT),
export_logs=self._should_export(EntityType.LOG)
)
job.run()
receipts = exporter.get_items('receipt')
logs = exporter.get_items('log')
return receipts, logs
def _extract_token_transfers(self, logs):
exporter = InMemoryItemExporter(item_types=['token_transfer'])
job = ExtractTokenTransfersJob(
logs_iterable=logs,
batch_size=self.batch_size,
max_workers=self.max_workers,
item_exporter=exporter)
job.run()
token_transfers = exporter.get_items('token_transfer')
return token_transfers
def _export_traces(self, start_block, end_block):
exporter = InMemoryItemExporter(item_types=['trace'])
job = ExportTracesJob(
start_block=start_block,
end_block=end_block,
batch_size=self.batch_size,
web3=ThreadLocalProxy(lambda: Web3(self.batch_web3_provider)),
max_workers=self.max_workers,
item_exporter=exporter
)
job.run()
traces = exporter.get_items('trace')
return traces
def _export_contracts(self, traces):
exporter = InMemoryItemExporter(item_types=['contract'])
job = ExtractContractsJob(
traces_iterable=traces,
batch_size=self.batch_size,
max_workers=self.max_workers,
item_exporter=exporter
)
job.run()
contracts = exporter.get_items('contract')
return contracts
def _extract_tokens(self, contracts):
exporter = InMemoryItemExporter(item_types=['token'])
job = ExtractTokensJob(
contracts_iterable=contracts,
web3=ThreadLocalProxy(lambda: Web3(self.batch_web3_provider)),
max_workers=self.max_workers,
item_exporter=exporter
)
job.run()
tokens = exporter.get_items('token')
return tokens
def _should_export(self, entity_type):
if entity_type == EntityType.BLOCK:
return True
if entity_type == EntityType.TRANSACTION:
return EntityType.TRANSACTION in self.entity_types or self._should_export(EntityType.LOG)
if entity_type == EntityType.RECEIPT:
return EntityType.TRANSACTION in self.entity_types or self._should_export(EntityType.TOKEN_TRANSFER)
if entity_type == EntityType.LOG:
return EntityType.LOG in self.entity_types or self._should_export(EntityType.TOKEN_TRANSFER)
if entity_type == EntityType.TOKEN_TRANSFER:
return EntityType.TOKEN_TRANSFER in self.entity_types
if entity_type == EntityType.TRACE:
return EntityType.TRACE in self.entity_types or self._should_export(EntityType.CONTRACT)
if entity_type == EntityType.CONTRACT:
return EntityType.CONTRACT in self.entity_types or self._should_export(EntityType.TOKEN)
if entity_type == EntityType.TOKEN:
return EntityType.TOKEN in self.entity_types
raise ValueError('Unexpected entity type ' + entity_type)
def calculate_item_ids(self, items):
for item in items:
item['item_id'] = self.item_id_calculator.calculate(item)
def close(self):
self.item_exporter.close()

View File

@@ -24,6 +24,8 @@
import itertools
import warnings
from ethereumetl.misc.retriable_value_error import RetriableValueError
def hex_to_dec(hex_string):
if hex_string is None:
@@ -35,6 +37,17 @@ def hex_to_dec(hex_string):
return hex_string
def to_int_or_none(val):
if isinstance(val, int):
return val
if val is None or val == '':
return None
try:
return int(val)
except ValueError:
return None
def chunk_string(string, length):
return (string[0 + i:length + i] for i in range(0, len(string), length))
@@ -64,10 +77,29 @@ def rpc_response_to_result(response):
error_message = 'result is None in response {}.'.format(response)
if response.get('error') is None:
error_message = error_message + ' Make sure Ethereum node is synced.'
# When nodes are behind a load balancer it makes sense to retry the request in hopes it will go to other,
# synced node
raise RetriableValueError(error_message)
elif response.get('error') is not None and is_retriable_error(response.get('error').get('code')):
raise RetriableValueError(error_message)
raise ValueError(error_message)
return result
def is_retriable_error(error_code):
if error_code is None:
return False
if not isinstance(error_code, int):
return False
# https://www.jsonrpc.org/specification#error_object
if error_code == -32603 or (-32000 >= error_code >= -32099):
return True
return False
def split_to_batches(start_incl, end_incl, batch_size):
"""start_incl and end_incl are inclusive, the returned batch ranges are also inclusive"""
for batch_start in range(start_incl, end_incl + 1, batch_size):
@@ -94,6 +126,7 @@ def pairwise(iterable):
next(b, None)
return zip(a, b)
def check_classic_provider_uri(chain, provider_uri):
if chain == 'classic' and provider_uri == 'https://mainnet.infura.io':
warnings.warn("ETC Chain not supported on Infura.io. Using https://ethereumclassic.network instead")

View File

@@ -91,7 +91,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
transactions_file=${transactions_output_dir}/transactions_${file_name_suffix}.csv
log "Exporting blocks ${block_range} to ${blocks_file}"
log "Exporting transactions from blocks ${block_range} to ${transactions_file}"
python3 export_blocks_and_transactions.py --start-block=${batch_start_block} --end-block=${batch_end_block} --provider-uri="${provider_uri}" --blocks-output=${blocks_file} --transactions-output=${transactions_file}
python3 ethereumetl export_blocks_and_transactions --start-block=${batch_start_block} --end-block=${batch_end_block} --provider-uri="${provider_uri}" --blocks-output=${blocks_file} --transactions-output=${transactions_file}
quit_if_returned_error
### token_transfers
@@ -101,7 +101,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
token_transfers_file=${token_transfers_output_dir}/token_transfers_${file_name_suffix}.csv
log "Exporting ERC20 transfers from blocks ${block_range} to ${token_transfers_file}"
python3 export_token_transfers.py --start-block=${batch_start_block} --end-block=${batch_end_block} --provider-uri="${provider_uri}" --output=${token_transfers_file}
python3 ethereumetl export_token_transfers --start-block=${batch_start_block} --end-block=${batch_end_block} --provider-uri="${provider_uri}" --output=${token_transfers_file}
quit_if_returned_error
### receipts_and_logs
@@ -111,7 +111,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
transaction_hashes_file=${transaction_hashes_output_dir}/transaction_hashes_${file_name_suffix}.csv
log "Extracting hash column from transaction file ${transactions_file}"
python3 extract_csv_column.py --input ${transactions_file} --output ${transaction_hashes_file} --column "hash"
python3 ethereumetl extract_csv_column --input ${transactions_file} --output ${transaction_hashes_file} --column "hash"
quit_if_returned_error
receipts_output_dir=${output_dir}/receipts${partition_dir}
@@ -123,7 +123,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
receipts_file=${receipts_output_dir}/receipts_${file_name_suffix}.csv
logs_file=${logs_output_dir}/logs_${file_name_suffix}.csv
log "Exporting receipts and logs from blocks ${block_range} to ${receipts_file} and ${logs_file}"
python3 export_receipts_and_logs.py --transaction-hashes ${transaction_hashes_file} --provider-uri="${provider_uri}" --receipts-output=${receipts_file} --logs-output=${logs_file}
python3 ethereumetl export_receipts_and_logs --transaction-hashes ${transaction_hashes_file} --provider-uri="${provider_uri}" --receipts-output=${receipts_file} --logs-output=${logs_file}
quit_if_returned_error
### contracts
@@ -133,7 +133,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
contract_addresses_file=${contract_addresses_output_dir}/contract_addresses_${file_name_suffix}.csv
log "Extracting contract_address from receipt file ${receipts_file}"
python3 extract_csv_column.py --input ${receipts_file} --column contract_address --output ${contract_addresses_file}
python3 ethereumetl extract_csv_column --input ${receipts_file} --column contract_address --output ${contract_addresses_file}
quit_if_returned_error
contracts_output_dir=${output_dir}/contracts${partition_dir}
@@ -141,7 +141,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
contracts_file=${contracts_output_dir}/contracts_${file_name_suffix}.csv
log "Exporting contracts from blocks ${block_range} to ${contracts_file}"
python3 export_contracts.py --contract-addresses ${contract_addresses_file} --provider-uri="${provider_uri}" --output=${contracts_file}
python3 ethereumetl export_contracts --contract-addresses ${contract_addresses_file} --provider-uri="${provider_uri}" --output=${contracts_file}
quit_if_returned_error
### tokens
@@ -151,7 +151,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
token_addresses_file=${token_addresses_output_dir}/token_addresses_${file_name_suffix}
log "Extracting token_address from token_transfers file ${token_transfers_file}"
python3 extract_csv_column.py -i ${token_transfers_file} -c token_address -o - | sort | uniq > ${token_addresses_file}
python3 ethereumetl extract_csv_column -i ${token_transfers_file} -c token_address -o - | sort | uniq > ${token_addresses_file}
quit_if_returned_error
tokens_output_dir=${output_dir}/tokens${partition_dir}
@@ -159,7 +159,7 @@ for (( batch_start_block=$start_block; batch_start_block <= $end_block; batch_st
tokens_file=${tokens_output_dir}/tokens_${file_name_suffix}.csv
log "Exporting tokens from blocks ${block_range} to ${tokens_file}"
python3 export_tokens.py --token-addresses ${token_addresses_file} --provider-uri="${provider_uri}" --output ${tokens_file}
python3 ethereumetl export_tokens --token-addresses ${token_addresses_file} --provider-uri="${provider_uri}" --output ${tokens_file}
quit_if_returned_error
end_time=$(date +%s)

View File

@@ -11,7 +11,7 @@ long_description = read('README.md') if os.path.isfile("README.md") else ""
setup(
name='ethereum-etl',
version='1.2.4',
version='1.3.1',
author='Evgeny Medvedev',
author_email='evge.medvedev@gmail.com',
description='Tools for exporting Ethereum blockchain data to CSV or JSON',
@@ -37,13 +37,17 @@ setup(
'eth-abi==1.2.0',
# TODO: This has to be removed when "ModuleNotFoundError: No module named 'eth_utils.toolz'" is fixed at eth-abi
'python-dateutil==2.7.0',
'click==6.7',
'click==7.0',
'ethereum-dasm==0.1.4'
],
extras_require={
'dev': [
'pytest~=3.2.0',
'streaming': [
'timeout-decorator==0.4.1',
'google-cloud-pubsub==0.39.1'
],
'dev': [
'pytest~=4.3.0'
]
},
entry_points={
'console_scripts': [

View File

@@ -23,31 +23,22 @@
import json
from ethereumetl.utils import hex_to_dec
from tests.ethereumetl.job.mock_web3_provider import MockWeb3Provider, build_file_name
class MockBatchWeb3Provider(object):
class MockBatchWeb3Provider(MockWeb3Provider):
def __init__(self, read_resource):
super().__init__(read_resource)
self.read_resource = read_resource
def make_request(self, text):
def make_batch_request(self, text):
batch = json.loads(text)
web3_response = []
for req in batch:
if req['method'] == 'eth_getBlockByNumber':
block_number = hex_to_dec(req['params'][0])
file_name = 'web3_response.block.' + str(block_number) + '.json'
elif req['method'] == 'eth_getCode':
contract_address = req['params'][0]
file_name = 'web3_response.code.' + str(contract_address) + '.json'
elif req['method'] == 'eth_getTransactionReceipt':
transaction_hash = req['params'][0]
file_name = 'web3_response.receipt.' + str(transaction_hash) + '.json'
elif req['method'] == 'debug_traceBlockByNumber':
block_number = req['params'][0]
file_name = 'web3_response.block_trace.' + str(block_number) + '.json'
else:
raise ValueError('Request method {} is unexpected'.format(req['method']))
method = req['method']
params = req['params']
file_name = build_file_name(method, params)
file_content = self.read_resource(file_name)
web3_response.append(json.loads(file_content))
return web3_response
return web3_response

View File

@@ -31,15 +31,19 @@ class MockWeb3Provider(IPCProvider):
self.read_resource = read_resource
def make_request(self, method, params):
if method == 'eth_call':
to = params[0]['to'].lower()
data = params[0]['data']
file_name = '{}_{}_{}.json'.format(method, to, data)
# TODO: Remove this when this issue is fixed
# https://github.com/paritytech/parity-ethereum/issues/9822
elif method == 'trace_block':
file_name = 'trace_filter.json'
else:
file_name = method + '.json'
file_name = build_file_name(method, params)
file_content = self.read_resource(file_name)
return json.loads(file_content)
def build_file_name(method, params):
return 'web3_response.' + method + '_' + '_'.join([param_to_str(param) for param in params]) + '.json'
def param_to_str(param):
if isinstance(param, dict):
return '_'.join([str(key) + '_' + param_to_str(param[key]) for key in sorted(param)])
elif isinstance(param, list):
return '_'.join([param_to_str(param_item) for param_item in param])
else:
return str(param).lower()

View File

@@ -59,6 +59,8 @@ def test_export_contracts_job(tmpdir, batch_size, contract_addresses, output_for
)
job.run()
print('=====================')
print(read_file(contracts_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_contracts.' + output_format), read_file(contracts_output_file)
)

View File

@@ -58,6 +58,8 @@ def test_export_traces_job(tmpdir, start_block, end_block, resource_group, web3_
)
job.run()
print('=====================')
print(read_file(traces_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_traces.csv'), read_file(traces_output_file)
)

View File

@@ -56,6 +56,8 @@ def test_extract_traces_job(tmpdir, resource_group):
)
job.run()
print('=====================')
print(read_file(output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_traces.csv'), read_file(output_file)
)

View File

@@ -30,13 +30,12 @@ from ethereumetl.service.graph_operations import OutOfBoundsError
from tests.helpers import skip_if_slow_tests_disabled
@skip_if_slow_tests_disabled
@pytest.mark.parametrize("date,expected_start_block,expected_end_block", [
('2015-07-30', 0, 6911),
('2015-07-31', 6912, 13774),
('2017-01-01', 2912407, 2918517),
('2017-01-02', 2918518, 2924575),
('2018-06-10', 5761663, 5767303)
skip_if_slow_tests_disabled(['2015-07-30', 0, 6911]),
skip_if_slow_tests_disabled(['2015-07-31', 6912, 13774]),
skip_if_slow_tests_disabled(['2017-01-01', 2912407, 2918517]),
skip_if_slow_tests_disabled(['2017-01-02', 2918518, 2924575]),
skip_if_slow_tests_disabled(['2018-06-10', 5761663, 5767303])
])
def test_get_block_range_for_date(date, expected_start_block, expected_end_block):
eth_service = get_new_eth_service()
@@ -45,10 +44,9 @@ def test_get_block_range_for_date(date, expected_start_block, expected_end_block
assert blocks == (expected_start_block, expected_end_block)
@skip_if_slow_tests_disabled
@pytest.mark.parametrize("date", [
'2015-07-29',
'2030-01-01'
skip_if_slow_tests_disabled(['2015-07-29']),
skip_if_slow_tests_disabled(['2030-01-01'])
])
def test_get_block_range_for_date_fail(date):
eth_service = get_new_eth_service()
@@ -57,10 +55,9 @@ def test_get_block_range_for_date_fail(date):
eth_service.get_block_range_for_date(parsed_date)
@skip_if_slow_tests_disabled
@pytest.mark.parametrize("start_timestamp,end_timestamp,expected_start_block,expected_end_block", [
(1438270128, 1438270128, 10, 10),
(1438270128, 1438270129, 10, 10)
skip_if_slow_tests_disabled([1438270128, 1438270128, 10, 10]),
skip_if_slow_tests_disabled([1438270128, 1438270129, 10, 10])
])
def test_get_block_range_for_timestamps(start_timestamp, end_timestamp, expected_start_block, expected_end_block):
eth_service = get_new_eth_service()
@@ -68,9 +65,8 @@ def test_get_block_range_for_timestamps(start_timestamp, end_timestamp, expected
assert blocks == (expected_start_block, expected_end_block)
@skip_if_slow_tests_disabled
@pytest.mark.parametrize("start_timestamp,end_timestamp", [
(1438270129, 1438270131)
skip_if_slow_tests_disabled([1438270129, 1438270131])
])
def test_get_block_range_for_timestamps_fail(start_timestamp, end_timestamp):
eth_service = get_new_eth_service()

View File

@@ -0,0 +1,138 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, evge.medvedev@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import pytest
from ethereumetl.streaming.eth_streamer_adapter import EthStreamerAdapter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
import tests.resources
from ethereumetl.enumeration.entity_type import EntityType
from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter
from blockchainetl.streaming.streamer import Streamer
from tests.ethereumetl.job.helpers import get_web3_provider
from tests.helpers import compare_lines_ignore_order, read_file, skip_if_slow_tests_disabled
RESOURCE_GROUP = 'test_stream'
def read_resource(resource_group, file_name):
return tests.resources.read_resource([RESOURCE_GROUP, resource_group], file_name)
@pytest.mark.parametrize("start_block, end_block, batch_size, resource_group, entity_types, provider_type", [
(1755634, 1755635, 1, 'blocks_1755634_1755635', EntityType.ALL_FOR_INFURA, 'mock'),
skip_if_slow_tests_disabled([1755634, 1755635, 1, 'blocks_1755634_1755635', EntityType.ALL_FOR_INFURA, 'infura']),
(508110, 508110, 1, 'blocks_508110_508110', ['trace', 'contract', 'token'], 'mock'),
(2112234, 2112234, 1, 'blocks_2112234_2112234', ['trace', 'contract', 'token'], 'mock'),
])
def test_stream(tmpdir, start_block, end_block, batch_size, resource_group, entity_types, provider_type):
try:
os.remove('last_synced_block.txt')
except OSError:
pass
blocks_output_file = str(tmpdir.join('actual_blocks.json'))
transactions_output_file = str(tmpdir.join('actual_transactions.json'))
logs_output_file = str(tmpdir.join('actual_logs.json'))
token_transfers_output_file = str(tmpdir.join('actual_token_transfers.json'))
traces_output_file = str(tmpdir.join('actual_traces.json'))
contracts_output_file = str(tmpdir.join('actual_contracts.json'))
tokens_output_file = str(tmpdir.join('actual_tokens.json'))
streamer_adapter = EthStreamerAdapter(
batch_web3_provider=ThreadLocalProxy(
lambda: get_web3_provider(provider_type,
read_resource_lambda=lambda file: read_resource(resource_group, file),
batch=True)
),
batch_size=batch_size,
item_exporter=CompositeItemExporter(
filename_mapping={
'block': blocks_output_file,
'transaction': transactions_output_file,
'log': logs_output_file,
'token_transfer': token_transfers_output_file,
'trace': traces_output_file,
'contract': contracts_output_file,
'token': tokens_output_file,
}
),
entity_types=entity_types,
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
start_block=start_block,
end_block=end_block,
retry_errors=False
)
streamer.stream()
if 'block' in entity_types:
print('=====================')
print(read_file(blocks_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_blocks.json'), read_file(blocks_output_file)
)
if 'transaction' in entity_types:
print('=====================')
print(read_file(transactions_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_transactions.json'), read_file(transactions_output_file)
)
if 'log' in entity_types:
print('=====================')
print(read_file(logs_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_logs.json'), read_file(logs_output_file)
)
if 'token_transfer' in entity_types:
print('=====================')
print(read_file(token_transfers_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_token_transfers.json'), read_file(token_transfers_output_file)
)
if 'trace' in entity_types:
print('=====================')
print(read_file(traces_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_traces.json'), read_file(traces_output_file)
)
if 'contract' in entity_types:
print('=====================')
print(read_file(contracts_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_contracts.json'), read_file(contracts_output_file)
)
if 'token' in entity_types:
print('=====================')
print(read_file(tokens_output_file))
compare_lines_ignore_order(
read_resource(resource_group, 'expected_tokens.json'), read_file(tokens_output_file)
)

View File

@@ -55,4 +55,7 @@ def read_file(path):
run_slow_tests_variable = os.environ.get('ETHEREUM_ETL_RUN_SLOW_TESTS', 'False')
run_slow_tests = run_slow_tests_variable.lower() in ['1', 'true', 'yes']
skip_if_slow_tests_disabled = pytest.mark.skipif(not run_slow_tests, reason='Skipping slow running tests')
def skip_if_slow_tests_disabled(data):
return pytest.param(*data, marks=pytest.mark.skipif(not run_slow_tests, reason='Skipping slow running tests'))

View File

@@ -1,5 +1,5 @@
hash,nonce,block_hash,block_number,transaction_index,from_address,to_address,value,gas,gas_price,input
0x04cbcb236043d8fb7839e07bbc7f5eed692fb2ca55d897f1101eac3e3ad4fab8,12,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,0,0x1b63142628311395ceafeea5667e7c9026c862ca,0xf4eced2f682ce333f96f2d8966c613ded8fc95dd,0,150853,50000000000,0xa9059cbb000000000000000000000000ac4df82fe37ea2187bc8c011a23d743b4f39019a00000000000000000000000000000000000000000000000000000000000186a0
0xcea6f89720cc1d2f46cc7a935463ae0b99dd5fad9c91bb7357de5421511cee49,84,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,1,0x9b22a80d5c7b3374a05b446081f97d0a34079e7f,0xf4eced2f682ce333f96f2d8966c613ded8fc95dd,0,150853,50000000000,0xa9059cbb00000000000000000000000066f183060253cfbe45beff1e6e7ebbe318c81e560000000000000000000000000000000000000000000000000000000000030d40
0x463d53f0ad57677a3b430a007c1c31d15d62c37fab5eee598551697c297c235c,88,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,2,0x9df428a91ff0f3635c8f0ce752933b9788926804,0x9e669f970ec0f49bb735f20799a7e7c4a1c274e2,11000440000000000,90000,50000000000,0x
0x05287a561f218418892ab053adfb3d919860988b19458c570c5c30f51c146f02,20085,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,3,0x2a65aca4d5fc5b5c859090a6c34d164135398226,0x743b8aeedc163c0e3a0fe9f3910d146c48e70da8,1530219620000000000,90000,50000000000,0x
hash,nonce,block_hash,block_number,transaction_index,from_address,to_address,value,gas,gas_price,input,block_timestamp
0x04cbcb236043d8fb7839e07bbc7f5eed692fb2ca55d897f1101eac3e3ad4fab8,12,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,0,0x1b63142628311395ceafeea5667e7c9026c862ca,0xf4eced2f682ce333f96f2d8966c613ded8fc95dd,0,150853,50000000000,0xa9059cbb000000000000000000000000ac4df82fe37ea2187bc8c011a23d743b4f39019a00000000000000000000000000000000000000000000000000000000000186a0,1446561880
0xcea6f89720cc1d2f46cc7a935463ae0b99dd5fad9c91bb7357de5421511cee49,84,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,1,0x9b22a80d5c7b3374a05b446081f97d0a34079e7f,0xf4eced2f682ce333f96f2d8966c613ded8fc95dd,0,150853,50000000000,0xa9059cbb00000000000000000000000066f183060253cfbe45beff1e6e7ebbe318c81e560000000000000000000000000000000000000000000000000000000000030d40,1446561880
0x463d53f0ad57677a3b430a007c1c31d15d62c37fab5eee598551697c297c235c,88,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,2,0x9df428a91ff0f3635c8f0ce752933b9788926804,0x9e669f970ec0f49bb735f20799a7e7c4a1c274e2,11000440000000000,90000,50000000000,0x,1446561880
0x05287a561f218418892ab053adfb3d919860988b19458c570c5c30f51c146f02,20085,0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae,483920,3,0x2a65aca4d5fc5b5c859090a6c34d164135398226,0x743b8aeedc163c0e3a0fe9f3910d146c48e70da8,1530219620000000000,90000,50000000000,0x,1446561880
1 hash nonce block_hash block_number transaction_index from_address to_address value gas gas_price input block_timestamp
2 0x04cbcb236043d8fb7839e07bbc7f5eed692fb2ca55d897f1101eac3e3ad4fab8 12 0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae 483920 0 0x1b63142628311395ceafeea5667e7c9026c862ca 0xf4eced2f682ce333f96f2d8966c613ded8fc95dd 0 150853 50000000000 0xa9059cbb000000000000000000000000ac4df82fe37ea2187bc8c011a23d743b4f39019a00000000000000000000000000000000000000000000000000000000000186a0 1446561880
3 0xcea6f89720cc1d2f46cc7a935463ae0b99dd5fad9c91bb7357de5421511cee49 84 0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae 483920 1 0x9b22a80d5c7b3374a05b446081f97d0a34079e7f 0xf4eced2f682ce333f96f2d8966c613ded8fc95dd 0 150853 50000000000 0xa9059cbb00000000000000000000000066f183060253cfbe45beff1e6e7ebbe318c81e560000000000000000000000000000000000000000000000000000000000030d40 1446561880
4 0x463d53f0ad57677a3b430a007c1c31d15d62c37fab5eee598551697c297c235c 88 0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae 483920 2 0x9df428a91ff0f3635c8f0ce752933b9788926804 0x9e669f970ec0f49bb735f20799a7e7c4a1c274e2 11000440000000000 90000 50000000000 0x 1446561880
5 0x05287a561f218418892ab053adfb3d919860988b19458c570c5c30f51c146f02 20085 0x246edb4b351d93c27926f4649bcf6c24366e2a7c7c718dc9158eea20c03bc6ae 483920 3 0x2a65aca4d5fc5b5c859090a6c34d164135398226 0x743b8aeedc163c0e3a0fe9f3910d146c48e70da8 1530219620000000000 90000 50000000000 0x 1446561880

View File

@@ -1,5 +1,5 @@
hash,nonce,block_hash,block_number,transaction_index,from_address,to_address,value,gas,gas_price,input
0xbd5ab8937e52a6244209d804471be4878df6c364bca0111dd6d05e0d3edf63cf,79,0x944f09177142833c644c979a83900d8cae1ee67369774b88b3b330bb72825082,47219,0,0xe6a7a1d47ff21b6321162aea7c6cb457d5476bca,0xe25e3a1947405a1f82dd8e3048a9ca471dc782e1,8306052477120672000,21000,61580653163,0x
0x4bcc1dd0c56c0b767b1ee3cb8bce7df44518f1696205299e34eb53a5e00a863e,1,0x944f09177142833c644c979a83900d8cae1ee67369774b88b3b330bb72825082,47219,1,0xf9a19aea1193d9b9e4ef2f5b8c9ec8df93a22356,0x32be343b94f860124dc4fee278fdcbd38c102d88,1998716170000000000,21000,61134768794,0x
0x99f1097abd8f33a68f0ed63d60de5f3e7e2a3e0579b90d5f46a4f201c658b46d,9,0x889c421abc62a48641eee140519e6da8c9dc01d85d8f5c4fbc3c13e3c6e4cb3e,47218,0,0x1406854d149e081ac09cb4ca560da463f3123059,0xa0e74ae010d51894734c308d612131056bb721ad,110000000000000000000,40000,62227241854,0x
0x95844e6c54b4aafc8e1f75784127529280e75c3a980d91f6dfca1c1b0eb078fb,78,0x889c421abc62a48641eee140519e6da8c9dc01d85d8f5c4fbc3c13e3c6e4cb3e,47218,1,0xe6a7a1d47ff21b6321162aea7c6cb457d5476bca,0xee80ef3c49d9465c7fc2b3d7373fdbbbc3fe282f,8140416390630760000,21000,62222792381,0x
hash,nonce,block_hash,block_number,transaction_index,from_address,to_address,value,gas,gas_price,input,block_timestamp
0xbd5ab8937e52a6244209d804471be4878df6c364bca0111dd6d05e0d3edf63cf,79,0x944f09177142833c644c979a83900d8cae1ee67369774b88b3b330bb72825082,47219,0,0xe6a7a1d47ff21b6321162aea7c6cb457d5476bca,0xe25e3a1947405a1f82dd8e3048a9ca471dc782e1,8306052477120672000,21000,61580653163,0x,1438936326
0x4bcc1dd0c56c0b767b1ee3cb8bce7df44518f1696205299e34eb53a5e00a863e,1,0x944f09177142833c644c979a83900d8cae1ee67369774b88b3b330bb72825082,47219,1,0xf9a19aea1193d9b9e4ef2f5b8c9ec8df93a22356,0x32be343b94f860124dc4fee278fdcbd38c102d88,1998716170000000000,21000,61134768794,0x,1438936326
0x99f1097abd8f33a68f0ed63d60de5f3e7e2a3e0579b90d5f46a4f201c658b46d,9,0x889c421abc62a48641eee140519e6da8c9dc01d85d8f5c4fbc3c13e3c6e4cb3e,47218,0,0x1406854d149e081ac09cb4ca560da463f3123059,0xa0e74ae010d51894734c308d612131056bb721ad,110000000000000000000,40000,62227241854,0x,1438936285
0x95844e6c54b4aafc8e1f75784127529280e75c3a980d91f6dfca1c1b0eb078fb,78,0x889c421abc62a48641eee140519e6da8c9dc01d85d8f5c4fbc3c13e3c6e4cb3e,47218,1,0xe6a7a1d47ff21b6321162aea7c6cb457d5476bca,0xee80ef3c49d9465c7fc2b3d7373fdbbbc3fe282f,8140416390630760000,21000,62222792381,0x,1438936285
1 hash nonce block_hash block_number transaction_index from_address to_address value gas gas_price input block_timestamp
2 0xbd5ab8937e52a6244209d804471be4878df6c364bca0111dd6d05e0d3edf63cf 79 0x944f09177142833c644c979a83900d8cae1ee67369774b88b3b330bb72825082 47219 0 0xe6a7a1d47ff21b6321162aea7c6cb457d5476bca 0xe25e3a1947405a1f82dd8e3048a9ca471dc782e1 8306052477120672000 21000 61580653163 0x 1438936326
3 0x4bcc1dd0c56c0b767b1ee3cb8bce7df44518f1696205299e34eb53a5e00a863e 1 0x944f09177142833c644c979a83900d8cae1ee67369774b88b3b330bb72825082 47219 1 0xf9a19aea1193d9b9e4ef2f5b8c9ec8df93a22356 0x32be343b94f860124dc4fee278fdcbd38c102d88 1998716170000000000 21000 61134768794 0x 1438936326
4 0x99f1097abd8f33a68f0ed63d60de5f3e7e2a3e0579b90d5f46a4f201c658b46d 9 0x889c421abc62a48641eee140519e6da8c9dc01d85d8f5c4fbc3c13e3c6e4cb3e 47218 0 0x1406854d149e081ac09cb4ca560da463f3123059 0xa0e74ae010d51894734c308d612131056bb721ad 110000000000000000000 40000 62227241854 0x 1438936285
5 0x95844e6c54b4aafc8e1f75784127529280e75c3a980d91f6dfca1c1b0eb078fb 78 0x889c421abc62a48641eee140519e6da8c9dc01d85d8f5c4fbc3c13e3c6e4cb3e 47218 1 0xe6a7a1d47ff21b6321162aea7c6cb457d5476bca 0xee80ef3c49d9465c7fc2b3d7373fdbbbc3fe282f 8140416390630760000 21000 62222792381 0x 1438936285

Some files were not shown because too many files have changed in this diff Show More