refactor filters for better abstraction + events + tests to enable logs subs

This commit is contained in:
kumavis
2018-10-08 11:09:17 -04:00
parent 882dfef42e
commit c583ba9d94
10 changed files with 118 additions and 80 deletions

View File

@@ -1,6 +1,9 @@
class BaseFilter {
const SafeEventEmitter = require('safe-event-emitter')
class BaseFilter extends SafeEventEmitter {
constructor () {
super()
this.updates = []
this.allResults = []
}
@@ -14,6 +17,7 @@ class BaseFilter {
addResults (newResults) {
this.updates = this.updates.concat(newResults)
this.allResults = this.allResults.concat(newResults)
newResults.forEach(result => this.emit('update', result))
}
addInitialResults (newResults) {
@@ -32,4 +36,4 @@ class BaseFilter {
}
module.exports = BaseFilter
module.exports = BaseFilter

View File

@@ -11,13 +11,11 @@ class BlockFilter extends BaseFilter {
}
async update ({ oldBlock, newBlock }) {
console.log('filter - adding results start')
const toBlock = newBlock
const fromBlock = incrementHexInt(oldBlock)
const blockBodies = await getBlocksForRange({ provider: this.provider, fromBlock, toBlock })
const blockHashes = blockBodies.map((block) => block.hash)
this.addResults(blockHashes)
console.log('filter - adding results done', blockHashes)
}
}

View File

@@ -22,9 +22,9 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
const middleware = createJsonRpcMiddleware({
// install filters
eth_newFilter: waitForFree(toAsyncRpcMiddleware(newLogFilter)),
eth_newBlockFilter: waitForFree(toAsyncRpcMiddleware(newBlockFilter)),
eth_newPendingTransactionFilter: waitForFree(toAsyncRpcMiddleware(newPendingTransactionFilter)),
eth_newFilter: waitForFree(toFilterCreationMiddleware(newLogFilter)),
eth_newBlockFilter: waitForFree(toFilterCreationMiddleware(newBlockFilter)),
eth_newPendingTransactionFilter: waitForFree(toFilterCreationMiddleware(newPendingTransactionFilter)),
// uninstall filters
eth_uninstallFilter: waitForFree(toAsyncRpcMiddleware(uninstallFilterHandler)),
// checking filter changes
@@ -59,7 +59,7 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
middleware.newLogFilter = newLogFilter
middleware.newBlockFilter = newBlockFilter
middleware.newPendingTransactionFilter = newPendingTransactionFilter
middleware.uninstallFilter = uninstallFilter
middleware.uninstallFilter = uninstallFilterHandler
middleware.getFilterChanges = getFilterChanges
middleware.getFilterLogs = getFilterLogs
@@ -74,34 +74,29 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
// new filters
//
async function newLogFilter(req) {
const params = req.params[0]
async function newLogFilter(params) {
const filter = new LogFilter({ provider, ethQuery, params })
const filterIndex = await installFilter(filter)
const result = intToHex(filterIndex)
return result
return filter
}
async function newBlockFilter(req) {
async function newBlockFilter() {
const filter = new BlockFilter({ provider, ethQuery })
const filterIndex = await installFilter(filter)
const result = intToHex(filterIndex)
return result
return filter
}
async function newPendingTransactionFilter(req) {
async function newPendingTransactionFilter() {
const filter = new TxFilter({ provider, ethQuery })
const filterIndex = await installFilter(filter)
const result = intToHex(filterIndex)
return result
return filter
}
//
// get filter changes
//
async function getFilterChanges(req) {
const filterIndexHex = req.params[0]
async function getFilterChanges(filterIndexHex) {
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
if (!filter) {
@@ -111,8 +106,7 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
return results
}
async function getFilterLogs(req) {
const filterIndexHex = req.params[0]
async function getFilterLogs(filterIndexHex) {
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
if (!filter) {
@@ -128,8 +122,7 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
//
async function uninstallFilterHandler(req) {
const filterIndexHex = req.params[0]
async function uninstallFilterHandler(filterIndexHex) {
// check filter exists
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
@@ -148,12 +141,12 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
async function installFilter(filter) {
const prevFilterCount = objValues(filters).length
// install filter
console.log('install start')
const currentBlock = await blockTracker.getLatestBlock()
await filter.initialize({ currentBlock })
console.log('install done')
filterIndex++
filters[filterIndex] = filter
filter.id = filterIndex
filter.idHex = intToHex(filterIndex)
// update block tracker subs
const newFilterCount = objValues(filters).length
updateBlockTrackerSubs({ prevFilterCount, newFilterCount })
@@ -190,10 +183,19 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
}
// helper for turning filter constructors into rpc middleware
function toFilterCreationMiddleware(createFilterFn) {
return toAsyncRpcMiddleware(async (...args) => {
const filter = await createFilterFn(...args)
const result = intToHex(filter.id)
return result
})
}
// helper for pulling out req.params and setting res.result
function toAsyncRpcMiddleware(asyncFn) {
// set result on response + convert to middleware
return createAsyncMiddleware(async (req, res) => {
const result = await asyncFn(req)
const result = await asyncFn.apply(null, req.params)
res.result = result
})
}

View File

@@ -15,7 +15,6 @@ class LogFilter extends BaseFilter {
}, params)
// normalize address
if (this.params.address) this.params.address = this.params.address.toLowerCase()
// console.log('LogFilter - constructor - params', this.params)
}
async initialize({ currentBlock }) {
@@ -64,23 +63,17 @@ class LogFilter extends BaseFilter {
}
matchLog(log) {
// console.log('LogFilter - validateLog:', log)
// check if block number in bounds:
// console.log('LogFilter - validateLog - blockNumber', this.fromBlock, this.toBlock)
if (hexToInt(this.params.fromBlock) >= hexToInt(log.blockNumber)) return false
if (blockRefIsNumber(this.params.toBlock) && hexToInt(this.params.toBlock) <= hexToInt(log.blockNumber)) return false
// address is correct:
// console.log('LogFilter - validateLog - address', this.params.address)
if (this.params.address && this.params.address !== log.address) return false
// topics match:
// topics are position-dependant
// topics can be nested to represent `or` [[a || b], c]
// topics can be null, representing a wild card for that position
// console.log('LogFilter - validateLog - topics', log.topics)
// console.log('LogFilter - validateLog - against topics', this.params.topics)
const topicsMatch = this.params.topics.every((topicPattern, index) => {
// pattern is longer than actual topics
const logTopic = log.topics[index]
@@ -94,7 +87,6 @@ class LogFilter extends BaseFilter {
return topicDoesMatch
})
// console.log('LogFilter - validateLog - '+(topicsMatch ? 'approved!' : 'denied!')+' ==============')
return topicsMatch
}

18
package-lock.json generated
View File

@@ -7522,7 +7522,7 @@
},
"is-builtin-module": {
"version": "1.0.0",
"resolved": "http://registry.npmjs.org/is-builtin-module/-/is-builtin-module-1.0.0.tgz",
"resolved": "https://registry.npmjs.org/is-builtin-module/-/is-builtin-module-1.0.0.tgz",
"integrity": "sha1-VAVy0096wxGfj3bDDLwbHgN6/74=",
"requires": {
"builtin-modules": "^1.0.0"
@@ -7740,7 +7740,7 @@
},
"jsonfile": {
"version": "2.4.0",
"resolved": "http://registry.npmjs.org/jsonfile/-/jsonfile-2.4.0.tgz",
"resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-2.4.0.tgz",
"integrity": "sha1-NzaitCi4e72gzIO1P6PWM6NcKug=",
"requires": {
"graceful-fs": "^4.1.6"
@@ -7820,7 +7820,7 @@
},
"readable-stream": {
"version": "1.1.14",
"resolved": "http://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz",
"integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=",
"requires": {
"core-util-is": "~1.0.0",
@@ -7857,7 +7857,7 @@
},
"readable-stream": {
"version": "1.0.34",
"resolved": "http://registry.npmjs.org/readable-stream/-/readable-stream-1.0.34.tgz",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.34.tgz",
"integrity": "sha1-Elgg40vIQtLyqq+v5MKRbuMsFXw=",
"requires": {
"core-util-is": "~1.0.0",
@@ -7897,7 +7897,7 @@
},
"load-json-file": {
"version": "1.1.0",
"resolved": "http://registry.npmjs.org/load-json-file/-/load-json-file-1.1.0.tgz",
"resolved": "https://registry.npmjs.org/load-json-file/-/load-json-file-1.1.0.tgz",
"integrity": "sha1-lWkFcI1YtLq0wiYbBPWfMcmTdMA=",
"requires": {
"graceful-fs": "^4.1.2",
@@ -7997,7 +7997,7 @@
"dependencies": {
"async": {
"version": "1.5.2",
"resolved": "http://registry.npmjs.org/async/-/async-1.5.2.tgz",
"resolved": "https://registry.npmjs.org/async/-/async-1.5.2.tgz",
"integrity": "sha1-7GphrlZIDAw8skHJVhjiCJL5Zyo="
}
}
@@ -8133,7 +8133,7 @@
},
"os-locale": {
"version": "1.4.0",
"resolved": "http://registry.npmjs.org/os-locale/-/os-locale-1.4.0.tgz",
"resolved": "https://registry.npmjs.org/os-locale/-/os-locale-1.4.0.tgz",
"integrity": "sha1-IPnxeuKe00XoveWDsT0gCYA8FNk=",
"requires": {
"lcid": "^1.0.0"
@@ -8763,7 +8763,7 @@
},
"web3-provider-engine": {
"version": "13.8.0",
"resolved": "http://registry.npmjs.org/web3-provider-engine/-/web3-provider-engine-13.8.0.tgz",
"resolved": "https://registry.npmjs.org/web3-provider-engine/-/web3-provider-engine-13.8.0.tgz",
"integrity": "sha512-fZXhX5VWwWpoFfrfocslyg6P7cN3YWPG/ASaevNfeO80R+nzgoPUBXcWQekSGSsNDkeRTis4aMmpmofYf1TNtQ==",
"requires": {
"async": "^2.5.0",
@@ -8834,7 +8834,7 @@
},
"wrap-ansi": {
"version": "2.1.0",
"resolved": "http://registry.npmjs.org/wrap-ansi/-/wrap-ansi-2.1.0.tgz",
"resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-2.1.0.tgz",
"integrity": "sha1-2Pw9KE3QV5T+hJc8rs3Rz4JP3YU=",
"requires": {
"string-width": "^1.0.1",

View File

@@ -13,7 +13,8 @@
"eth-json-rpc-middleware": "^2.6.0",
"ethjs-query": "^0.3.8",
"json-rpc-engine": "^3.8.0",
"lodash.flatmap": "^4.5.0"
"lodash.flatmap": "^4.5.0",
"safe-event-emitter": "^1.0.1"
},
"devDependencies": {
"deep-clone": "^3.0.3",

View File

@@ -33,8 +33,9 @@ function createSubscriptionMiddleware({ blockTracker, provider }) {
sub = createSubNewHeads({ subId })
break
case 'logs':
const filterIdHex = await filterManager.newLogFilter(req)
sub = createSubFromFilter({ subId, filterIdHex })
const filterParams = req.params[1]
const filter = await filterManager.newLogFilter(filterParams)
sub = createSubFromFilter({ subId, filter })
break
default:
throw new Error(`SubscriptionManager - unsupported subscription type "${subscriptionType}"`)
@@ -42,16 +43,13 @@ function createSubscriptionMiddleware({ blockTracker, provider }) {
}
subscriptions[subId] = sub
// check for subscription updates on new block
blockTracker.on('sync', sub.update)
res.result = subId
return
function createSubNewHeads({ subId }) {
const sub = {
type: subscriptionType,
destroy: () => {
destroy: async () => {
blockTracker.removeListener('sync', sub.update)
},
update: async ({ oldBlock, newBlock }) => {
@@ -65,23 +63,18 @@ function createSubscriptionMiddleware({ blockTracker, provider }) {
})
}
}
// check for subscription updates on new block
blockTracker.on('sync', sub.update)
return sub
}
function createSubFromFilter({ subId, filterIdHex }){
function createSubFromFilter({ subId, filter }){
filter.on('update', result => _emitSubscriptionResult(subId, result))
const sub = {
type: subscriptionType,
destroy: () => {
blockTracker.removeListener('sync', sub.update)
destroy: async () => {
return await filterManager.uninstallFilter(filter.idHex)
},
update: async () => {
// check filter for updates
const results = await filterManager.getFilterChanges({ params: [filterIdHex] })
// emit updates
results.forEach(async (result) => {
_emitSubscriptionResult(subId, result)
})
}
}
return sub
}
@@ -97,7 +90,7 @@ function createSubscriptionMiddleware({ blockTracker, provider }) {
}
// cleanup subscription
delete subscriptions[id]
subscription.destroy()
await subscription.destroy()
res.result = true
}

View File

@@ -9,6 +9,7 @@ const {
createTestSetup,
createPayload,
asyncTest,
deployLogEchoContract,
} = require('./util')
test('LogFilter - basic', asyncTest(async (t) => {
@@ -136,17 +137,3 @@ test('BlockFilter - basic', asyncTest(async (t) => {
await eth.uninstallFilter(filterId)
}))
async function deployLogEchoContract({ tools, from }){
// https://github.com/kumavis/eth-needlepoint/blob/master/examples/emit-log.js
const eth = tools.query
const deployTxHash = await eth.sendTransaction({ from, data: '0x600e600c600039600e6000f336600060003760005160206000a1' })
await tools.trackNextBlock()
const deployTxRx = await eth.getTransactionReceipt(deployTxHash)
const contractAddress = deployTxRx.contractAddress
return {
deployTxHash,
deployTxRx,
contractAddress,
}
}

View File

@@ -4,9 +4,10 @@ const {
createPayload,
asyncTest,
timeout,
deployLogEchoContract,
} = require('./util')
test('BlockFilter - basic', asyncTest(async (t) => {
test('subscriptions - newHeads', asyncTest(async (t) => {
const tools = createTestSetup()
const eth = tools.query
@@ -26,7 +27,7 @@ test('BlockFilter - basic', asyncTest(async (t) => {
})
const subId = sub.id
t.ok(subId, `got sub id: ${subId} (${typeof subId})`)
t.equal(typeof subId, 'string', `got sub id as number (${typeof subId})`)
t.equal(typeof subId, 'string', `got sub id as hex string (${typeof subId})`)
// check sub
t.equal(subResults.length, 0, 'no sub results yet')
@@ -52,3 +53,47 @@ test('BlockFilter - basic', asyncTest(async (t) => {
// uninstall subscription
await sub.uninstall()
}))
test('subscriptions - log', asyncTest(async (t) => {
const tools = createTestSetup()
const eth = tools.query
const { query, subs, blockTracker } = tools
// deploy log-echo contract
const coinbase = await query.coinbase()
const { contractAddress } = await deployLogEchoContract({ tools, from: coinbase })
t.ok(contractAddress, 'got deployed contract address')
// create subscription
const subResults = []
const blockNumber = await blockTracker.getLatestBlock()
const targetTopic = '0xaabbcce106361d4f6cd9098051596d565c1dbf7bc20b4c3acb3aaa4204aabbcc'
const filterParams = { address: contractAddress, topics: [targetTopic], fromBlock: blockNumber, toBlock: 'latest' }
const sub = await subs.logs(filterParams)
sub.events.on('notification', (value) => {
subResults.push(value)
})
// verify subId
const subId = sub.id
t.ok(subId, `got filter id: ${subId} (${typeof subId})`)
t.equal(typeof subId, 'string', `got sub id as hex string (${typeof subId})`)
// trigger matching log
const triggeringTxHash = await query.sendTransaction({ from: coinbase, to: contractAddress, data: targetTopic })
await tools.trackNextBlock()
// wait for subscription results to update
await timeout(200)
// check subscription results
t.equal(subResults.length, 1, 'only one matched filter')
const matchingResults = subResults[0]
t.equal(matchingResults.transactionHash, triggeringTxHash, 'tx hash should match')
t.equal(matchingResults.topics.length, 1, 'emitted a single log topic')
const matchedTopic = matchingResults.topics[0]
t.equal(matchedTopic, targetTopic, 'topic matches expected')
await sub.uninstall()
}))

View File

@@ -16,6 +16,7 @@ module.exports = {
createTestSetup,
asyncTest,
timeout,
deployLogEchoContract,
}
function createTestSetup () {
@@ -132,3 +133,18 @@ function asyncTest(asyncTestFn){
function timeout(duration) {
return new Promise(resolve => setTimeout(resolve, duration))
}
async function deployLogEchoContract({ tools, from }){
// https://github.com/kumavis/eth-needlepoint/blob/master/examples/emit-log.js
const eth = tools.query
const deployTxHash = await eth.sendTransaction({ from, data: '0x600e600c600039600e6000f336600060003760005160206000a1' })
await tools.trackNextBlock()
const deployTxRx = await eth.getTransactionReceipt(deployTxHash)
const contractAddress = deployTxRx.contractAddress
return {
deployTxHash,
deployTxRx,
contractAddress,
}
}