This commit is contained in:
kumavis
2018-10-04 10:27:56 -04:00
parent 4ce985702a
commit 5cb9d633a5
11 changed files with 743 additions and 900 deletions

View File

@@ -11,11 +11,13 @@ class BlockFilter extends BaseFilter {
}
async update ({ oldBlock, newBlock }) {
console.log('filter - adding results start')
const toBlock = newBlock
const fromBlock = incrementHexInt(oldBlock)
const blockBodies = await getBlocksForRange({ ethQuery: this.ethQuery, fromBlock, toBlock })
const blockHashes = blockBodies.map((block) => block.hash)
this.addResults(blockHashes)
console.log('filter - adding results done', blockHashes)
}
}

View File

@@ -1,17 +1,18 @@
module.exports = getBlocksForRange
async function getBlocksForRange({ ethQuery, fromBlock, toBlock }) {
async function getBlocksForRange({ provider, fromBlock, toBlock }) {
if (!fromBlock) fromBlock = toBlock
const fromBlockNumber = hexToInt(fromBlock)
const toBlockNumber = hexToInt(toBlock)
const blockCountToQuery = toBlockNumber - fromBlockNumber + 1
// load all blocks from old to new (inclusive)
console.log({ fromBlock, toBlock })
const missingBlockNumbers = Array(blockCountToQuery).fill()
.map((_,index) => fromBlockNumber + index)
.map(intToHex)
const blockBodies = await Promise.all(
missingBlockNumbers.map(blockNum => ethQuery.getBlockByNumber(blockNum, false))
missingBlockNumbers.map(blockNum => query(provider, 'eth_getBlockByNumber', [blockNum, false]))
)
return blockBodies
}
@@ -34,3 +35,12 @@ function intToHex(int) {
if (needsLeftPad) hexString = '0' + hexString
return '0x' + hexString
}
function query(provider, method, params) {
return new Promise((resolve, reject) => {
provider.sendAsync({ id: 1, jsonrpc: '2.0', method, params }, (err, res) => {
if (err) return reject(err)
resolve(res.result)
})
})
}

View File

@@ -8,6 +8,7 @@ module.exports = {
hexToInt,
incrementHexInt,
intToHex,
unsafeRandomBytes,
}
function minBlockRef(...refs) {
@@ -54,3 +55,16 @@ function intToHex(int) {
if (needsLeftPad) hexString = '0' + hexString
return '0x' + hexString
}
function unsafeRandomBytes(byteCount) {
let result = '0x'
for (let i = 0; i < byteCount; i++) {
result += unsafeRandomNibble()
result += unsafeRandomNibble()
}
return result
}
function unsafeRandomNibble() {
return Math.floor(Math.random() * 16).toString(16)
}

View File

@@ -22,14 +22,14 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
const middleware = createJsonRpcMiddleware({
// install filters
eth_newFilter: waitForFree(createAsyncMiddleware(newLogFilter)),
eth_newBlockFilter: waitForFree(createAsyncMiddleware(newBlockFilter)),
eth_newPendingTransactionFilter: waitForFree(createAsyncMiddleware(newPendingTransactionFilter)),
eth_newFilter: waitForFree(toAsyncRpcMiddleware(newLogFilter)),
eth_newBlockFilter: waitForFree(toAsyncRpcMiddleware(newBlockFilter)),
eth_newPendingTransactionFilter: waitForFree(toAsyncRpcMiddleware(newPendingTransactionFilter)),
// uninstall filters
eth_uninstallFilter: waitForFree(createAsyncMiddleware(uninstallFilterHandler)),
eth_uninstallFilter: waitForFree(toAsyncRpcMiddleware(uninstallFilterHandler)),
// checking filter changes
eth_getFilterChanges: waitForFree(createAsyncMiddleware(getFilterChanges)),
eth_getFilterLogs: waitForFree(createAsyncMiddleware(getFilterLogs)),
eth_getFilterChanges: waitForFree(toAsyncRpcMiddleware(getFilterChanges)),
eth_getFilterLogs: waitForFree(toAsyncRpcMiddleware(getFilterLogs)),
})
// setup filter updating and destroy handler
@@ -55,6 +55,15 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
releaseLock()
}
// expose filter methods directly
middleware.newLogFilter = newLogFilter
middleware.newBlockFilter = newBlockFilter
middleware.newPendingTransactionFilter = newPendingTransactionFilter
middleware.uninstallFilter = uninstallFilter
middleware.getFilterChanges = getFilterChanges
middleware.getFilterLogs = getFilterLogs
// expose destroy method for cleanup
middleware.destroy = () => {
uninstallAllFilters()
}
@@ -65,33 +74,33 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
// new filters
//
async function newLogFilter(req, res, next) {
async function newLogFilter(req) {
const params = req.params[0]
const filter = new LogFilter({ ethQuery, params })
const filterIndex = await installFilter(filter)
const result = intToHex(filterIndex)
res.result = result
return result
}
async function newBlockFilter(req, res, next) {
async function newBlockFilter(req) {
const filter = new BlockFilter({ ethQuery })
const filterIndex = await installFilter(filter)
const result = intToHex(filterIndex)
res.result = result
return result
}
async function newPendingTransactionFilter(req, res, next) {
async function newPendingTransactionFilter(req) {
const filter = new TxFilter({ ethQuery })
const filterIndex = await installFilter(filter)
const result = intToHex(filterIndex)
res.result = result
return result
}
//
// get filter changes
//
async function getFilterChanges(req, res, next) {
async function getFilterChanges(req) {
const filterIndexHex = req.params[0]
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
@@ -99,10 +108,10 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
throw new Error('No filter for index "${filterIndex}"')
}
const results = filter.getChangesAndClear()
res.result = results
return results
}
async function getFilterLogs(req, res, next, end) {
async function getFilterLogs(req) {
const filterIndexHex = req.params[0]
const filterIndex = hexToInt(filterIndexHex)
const filter = filters[filterIndex]
@@ -110,7 +119,7 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
throw new Error('No filter for index "${filterIndex}"')
}
const results = filter.getAllResults()
res.result = results
return results
}
@@ -119,7 +128,7 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
//
async function uninstallFilterHandler(req, res, next) {
async function uninstallFilterHandler(req) {
const filterIndexHex = req.params[0]
// check filter exists
const filterIndex = hexToInt(filterIndexHex)
@@ -129,7 +138,7 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
if (result) {
await uninstallFilter(filterIndex)
}
res.result = result
return result
}
//
@@ -139,8 +148,10 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
async function installFilter(filter) {
const prevFilterCount = objValues(filters).length
// install filter
console.log('install start')
const currentBlock = await blockTracker.getLatestBlock()
await filter.initialize({ currentBlock })
console.log('install done')
filterIndex++
filters[filterIndex] = filter
// update block tracker subs
@@ -179,6 +190,14 @@ function createEthFilterMiddleware({ blockTracker, provider }) {
}
function toAsyncRpcMiddleware(asyncFn) {
// set result on response + convert to middleware
return createAsyncMiddleware(async (req, res) => {
const result = await asyncFn(req)
res.result = result
})
}
function mutexMiddlewareWrapper({ mutex }) {
return (middleware) => {
return async (req, res, next, end) => {

1247
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,7 @@
"license": "ISC",
"dependencies": {
"await-semaphore": "^0.1.3",
"eth-json-rpc-middleware": "^1.6.0",
"eth-json-rpc-middleware": "^2.6.0",
"ethjs-query": "^0.3.8",
"json-rpc-engine": "^3.8.0",
"lodash.flatmap": "^4.5.0"

161
subscriptionManager.js Normal file
View File

@@ -0,0 +1,161 @@
const SafeEventEmitter = require('safe-event-emitter')
const createScaffoldMiddleware = require('eth-json-rpc-middleware/scaffold')
const createAsyncMiddleware = require('json-rpc-engine/src/createAsyncMiddleware')
const createFilterMiddleware = require('./index.js')
const { unsafeRandomBytes, incrementHexInt } = require('./hexUtils.js')
const getBlocksForRange = require('./getBlocksForRange.js')
module.exports = createSubscriptionMiddleware
function createSubscriptionMiddleware({ blockTracker, provider }) {
const filterManager = createFilterMiddleware({ blockTracker, provider })
const events = new SafeEventEmitter()
const middleware = createScaffoldMiddleware({
eth_subscribe: createAsyncMiddleware(subscribe),
eth_unsubscribe: createAsyncMiddleware(unsubscribe),
})
const subscriptions = {}
return { events, middleware }
async function subscribe(req, res) {
const subscriptionType = req.params[0]
// const filterIdHex = await _createFilter(req)
// const filterId = Number.parseInt(filterIdHex, 16)
// subId is 16 byte hex string
const subId = unsafeRandomBytes(16)
// create sub
let sub
switch (subscriptionType) {
case 'newHeads':
sub = createSubNewHeads({ subId })
break
case 'logs':
const filterIdHex = await _createFilter(req)
sub = createSubFromFilter({ subId, filterIdHex })
break
}
subscriptions[subId] = sub
// check for subscription updates on new block
blockTracker.on('sync', sub.update)
blockTracker.on('sync', ({ oldBlock, newBlock }) => console.log('sub sync', {oldBlock, newBlock}))
res.result = subId
return
function createSubNewHeads({ subId }) {
const sub = {
type: subscriptionType,
// destroy: () => {
// blockTracker.removeListener('latest', sub.update)
// delete subscriptions[filterId]
// },
update: async ({ oldBlock, newBlock }) => {
// for newHeads
const toBlock = newBlock
const fromBlock = incrementHexInt(oldBlock)
console.log('newHeads update', {fromBlock, toBlock})
const rawBlocks = await getBlocksForRange({ provider, fromBlock, toBlock })
const results = rawBlocks.map(normalizeBlock)
results.forEach((value) => {
_emitSubscriptionResult(subId, value)
})
}
}
return sub
}
function createSubFromFilter({ subId, filterIdHex }){
const sub = {
type: subscriptionType,
// destroy: () => {
// blockTracker.removeListener('latest', sub.update)
// delete subscriptions[filterId]
// },
update: async () => {
// check filter for updates
console.log('filter check start', filterIdHex)
const results = await filterManager.getFilterChanges({ params: [filterIdHex] })
console.log('filter check done', filterIdHex, results)
// emit updates
results.forEach(async (result) => {
_emitSubscriptionResult(filterIdHex, subscriptionResult)
})
}
}
return sub
}
// ???
// if (subscriptionType === 'newPendingTransactions') {
// self.checkForPendingBlocks()
// }
}
async function unsubscribe(req) {
}
function _emitSubscriptionResult(filterIdHex, value) {
events.emit('notification', {
jsonrpc: '2.0',
method: 'eth_subscription',
params: {
subscription: filterIdHex,
result: value,
},
})
}
async function _createFilter(req) {
const subscriptionType = req.params[0]
let filterIdHex
// identify filter constructor
switch (subscriptionType) {
case 'logs':
filterIdHex = await filterManager.newLogFilter(req)
break
case 'newPendingTransactions':
filterIdHex = await filterManager.newPendingTransactionFilter(req)
break
case 'newHeads':
filterIdHex = await filterManager.newBlockFilter(req)
break
default:
throw new Error(`SubscriptionManager - unsupported subscription type "${subscriptionType}"`)
return
}
return filterIdHex
}
}
function normalizeBlock(block) {
return {
hash: block.hash,
parentHash: block.parentHash,
sha3Uncles: block.sha3Uncles,
miner: block.miner,
stateRoot: block.stateRoot,
transactionsRoot: block.transactionsRoot,
receiptsRoot: block.receiptsRoot,
logsBloom: block.logsBloom,
difficulty: block.difficulty,
number: block.number,
gasLimit: block.gasLimit,
gasUsed: block.gasUsed,
nonce: block.nonce,
mixHash: block.mixHash,
timestamp: block.timestamp,
extraData: block.extraData,
}
}

View File

@@ -8,6 +8,7 @@ const ethUtil = require('ethereumjs-util')
const {
createTestSetup,
createPayload,
asyncTest,
} = require('./util')
test('LogFilter - basic', asyncTest(async (t) => {
@@ -136,7 +137,6 @@ test('BlockFilter - basic', asyncTest(async (t) => {
await eth.uninstallFilter(filterId)
}))
async function deployLogEchoContract({ tools, from }){
// https://github.com/kumavis/eth-needlepoint/blob/master/examples/emit-log.js
const eth = tools.query
@@ -150,14 +150,3 @@ async function deployLogEchoContract({ tools, from }){
contractAddress,
}
}
function asyncTest(asyncTestFn){
return async function(t) {
try {
await asyncTestFn(t)
t.end()
} catch (err) {
t.end(err)
}
}
}

View File

@@ -4,3 +4,4 @@ process.on('unhandledRejection', function(err){
require('./logs')
require('./ganache')
require('./subscriptions')

66
test/subscriptions.js Normal file
View File

@@ -0,0 +1,66 @@
const test = require('tape')
const {
createTestSetup,
createPayload,
asyncTest,
timeout,
} = require('./util')
test('BlockFilter - basic', asyncTest(async (t) => {
const tools = createTestSetup()
const eth = tools.query
const subs = tools.subs
const { blockTracker } = tools
// if you remove this the test breaks
blockTracker.on('sync', ({ oldBlock, newBlock }) => console.log('test sync', {oldBlock, newBlock}))
// await first block
await tools.trackNextBlock()
await tools.forceNextBlock()
console.log('tracking next block start')
const xyz = await tools.trackNextBlock()
console.log('tracking next block done', xyz)
await timeout(1000)
// create sub
const subResults = []
const sub = await subs.newHeads()
sub.events.on('notification', (value) => {
subResults.push(value)
})
const subId = sub.id
t.ok(subId, `got sub id: ${subId} (${typeof subId})`)
// check sub
t.equal(subResults.length, 0, 'no sub results yet')
// await one block
await tools.forceNextBlock()
console.log('tracking next block start')
await tools.trackNextBlock()
console.log('tracking next block done')
await timeout(1000)
// console.log(subResults)
// check sub
t.equal(subResults.length, 1, 'only one sub result')
// await two blocks
await tools.forceNextBlock()
console.log('tracking next block start')
await tools.trackNextBlock()
console.log('tracking next block done')
await tools.forceNextBlock()
console.log('tracking next block start')
await tools.trackNextBlock()
console.log('tracking next block done')
await timeout(1000)
// console.log(subResults)
// check filter
t.equal(subResults.length, 3, 'three sub results')
// await eth.uninstallFilter(filterId)
}))

View File

@@ -1,3 +1,4 @@
const EventEmitter = require('events')
const EthBlockTracker = require('eth-block-tracker')
const EthQuery = require('ethjs-query')
const JsonRpcEngine = require('json-rpc-engine')
@@ -6,12 +7,15 @@ const providerFromEngine = require('eth-json-rpc-middleware/providerFromEngine')
const GanacheCore = require('ganache-core')
const pify = require('pify')
const createFilterMiddleware = require('../index.js')
const createSubscriptionMiddleware = require('../subscriptionManager.js')
module.exports = {
createPayload,
createEngineFromGanacheCore,
createEngineFromTestBlockMiddleware,
createTestSetup,
asyncTest,
timeout,
}
function createTestSetup () {
@@ -26,19 +30,64 @@ function createTestSetup () {
const engine = new JsonRpcEngine()
const provider = providerFromEngine(engine)
const query = new EthQuery(provider)
// add block ref middleware
// add filter middleware
engine.push(createFilterMiddleware({ blockTracker, provider }))
// add subscription middleware
const subscriptionManager = createSubscriptionMiddleware({ blockTracker, provider })
engine.push(subscriptionManager.middleware)
subscriptionManager.events.on('notification', (message) => engine.emit('notification', message))
// add data source
engine.push(providerAsMiddleware(ganacheProvider))
return { ganacheProvider, forceNextBlock, engine, provider, query, blockTracker, trackNextBlock }
// subs helper
const subs = createSubsHelper({ provider })
return { ganacheProvider, forceNextBlock, engine, provider, query, subs, blockTracker, trackNextBlock }
async function trackNextBlock() {
return new Promise((resolve) => blockTracker.once('latest', resolve))
}
}
function createSubsHelper({ provider }) {
return {
logs: createSubGenerator({ subType: 'logs', provider }),
newPendingTransactions: createSubGenerator({ subType: 'newPendingTransactions', provider }),
newHeads: createSubGenerator({ subType: 'newHeads', provider }),
}
}
function createSubGenerator({ subType, provider }) {
return pify(function() {
const args = [].slice.call(arguments)
const cb = args.pop()
args.unshift(subType)
provider.sendAsync({ method: 'eth_subscribe', params: args }, (err, res) => {
if (err) return cb(err)
const hexId = res.result
const id = Number.parseInt(hexId, 16)
const result = createNewSub({ id, provider })
cb(null, result)
})
})
}
function createNewSub({ id, provider }) {
const events = new EventEmitter()
provider.on('data', (_, message) => {
if (message.method !== 'eth_subscription') return
const subHexId = message.params.subscription
const subId = Number.parseInt(subHexId, 16)
if (subId !== id) return
const value = message.params.result
events.emit('notification', value)
})
return {
id,
events,
}
}
function createEngineFromGanacheCore () {
const ganacheProvider = GanacheCore.provider()
return { ganacheProvider, forceNextBlock }
@@ -59,3 +108,18 @@ function createEngineFromTestBlockMiddleware () {
function createPayload(payload) {
return Object.assign({ id: 1, jsonrpc: '2.0', params: [] }, payload)
}
function asyncTest(asyncTestFn){
return async function(t) {
try {
await asyncTestFn(t)
t.end()
} catch (err) {
t.end(err)
}
}
}
function timeout(duration) {
return new Promise(resolve => setTimeout(resolve, duration))
}