mirror of
https://github.com/blockchain-etl/ethereum-etl.git
synced 2026-01-09 13:57:54 -05:00
Merge pull request #302 from blockjoe/develop
Fallback to `web3.eth.getLogs` when calling to nodes without `eth_newFilter`
This commit is contained in:
@@ -43,6 +43,7 @@ class ExportOriginJob(BaseJob):
|
||||
self.receipt_log_mapper = EthReceiptLogMapper()
|
||||
self.marketplace_listing_mapper = OriginMarketplaceListingMapper()
|
||||
self.shop_listing_mapper = OriginShopProductMapper()
|
||||
self._supports_eth_newFilter = True
|
||||
|
||||
|
||||
def _start(self):
|
||||
@@ -100,8 +101,18 @@ class ExportOriginJob(BaseJob):
|
||||
'fromBlock': batch['from_block'],
|
||||
'toBlock': batch['to_block']
|
||||
}
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
if self._supports_eth_newFilter:
|
||||
try:
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
except ValueError as e:
|
||||
if str(e) == "{'code': -32000, 'message': 'the method is currently not implemented: eth_newFilter'}":
|
||||
self._supports_eth_newFilter = False
|
||||
events = self.web3.eth.getLogs(filter_params)
|
||||
else:
|
||||
raise(e)
|
||||
else:
|
||||
events = self.web3.eth.getLogs(filter_params)
|
||||
for event in events:
|
||||
log = self.receipt_log_mapper.web3_dict_to_receipt_log(event)
|
||||
listing, shop_products = self.event_extractor.extract_event_from_log(log, batch['contract_version'])
|
||||
@@ -112,9 +123,10 @@ class ExportOriginJob(BaseJob):
|
||||
item = self.shop_listing_mapper.product_to_dict(product)
|
||||
self.shop_product_exporter.export_item(item)
|
||||
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
if self._supports_eth_newFilter:
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
|
||||
def _end(self):
|
||||
self.batch_work_executor.shutdown()
|
||||
self.marketplace_listing_exporter.close()
|
||||
self.shop_product_exporter.close()
|
||||
self.shop_product_exporter.close()
|
||||
|
||||
@@ -51,6 +51,7 @@ class ExportTokenTransfersJob(BaseJob):
|
||||
self.receipt_log_mapper = EthReceiptLogMapper()
|
||||
self.token_transfer_mapper = EthTokenTransferMapper()
|
||||
self.token_transfer_extractor = EthTokenTransferExtractor()
|
||||
self._supports_eth_newFilter = True
|
||||
|
||||
def _start(self):
|
||||
self.item_exporter.open()
|
||||
@@ -74,15 +75,23 @@ class ExportTokenTransfersJob(BaseJob):
|
||||
if self.tokens is not None and len(self.tokens) > 0:
|
||||
filter_params['address'] = self.tokens
|
||||
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
try:
|
||||
event_filter = self.web3.eth.filter(filter_params)
|
||||
events = event_filter.get_all_entries()
|
||||
except ValueError as e:
|
||||
if str(e) == "{'code': -32000, 'message': 'the method is currently not implemented: eth_newFilter'}":
|
||||
self._supports_eth_newFilter = False
|
||||
events = self.web3.eth.getLogs(filter_params)
|
||||
else:
|
||||
raise(e)
|
||||
for event in events:
|
||||
log = self.receipt_log_mapper.web3_dict_to_receipt_log(event)
|
||||
token_transfer = self.token_transfer_extractor.extract_transfer_from_log(log)
|
||||
if token_transfer is not None:
|
||||
self.item_exporter.export_item(self.token_transfer_mapper.token_transfer_to_dict(token_transfer))
|
||||
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
if self._supports_eth_newFilter:
|
||||
self.web3.eth.uninstallFilter(event_filter.filter_id)
|
||||
|
||||
def _end(self):
|
||||
self.batch_work_executor.shutdown()
|
||||
|
||||
Reference in New Issue
Block a user