diff options
author | Leonid <logvinov.leon@gmail.com> | 2017-10-06 20:24:42 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-06 20:24:42 +0800 |
commit | f38d2f80a6e3af4ff7e2454d42abdf2a389e4303 (patch) | |
tree | 9e2f94e7d8236f7612628288d7eab3333d240b6b /src | |
parent | cd5327bc31618b4ea268de1902a74cf862987ee6 (diff) | |
parent | 0c112a2a1c1811e1fc7c4b03881f960b952c788c (diff) | |
download | dexon-0x-contracts-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.gz dexon-0x-contracts-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.tar.zst dexon-0x-contracts-f38d2f80a6e3af4ff7e2454d42abdf2a389e4303.zip |
Merge pull request #182 from 0xProject/feature/ethereumjs-blockstream
Rewrite subscriptions
Diffstat (limited to 'src')
-rw-r--r-- | src/0x.ts | 9 | ||||
-rw-r--r-- | src/contract_wrappers/contract_wrapper.ts | 120 | ||||
-rw-r--r-- | src/contract_wrappers/exchange_wrapper.ts | 86 | ||||
-rw-r--r-- | src/contract_wrappers/token_wrapper.ts | 67 | ||||
-rw-r--r-- | src/index.ts | 2 | ||||
-rw-r--r-- | src/subproviders/empty_wallet_subprovider.ts | 1 | ||||
-rw-r--r-- | src/types.ts | 32 | ||||
-rw-r--r-- | src/utils/abi_decoder.ts | 2 | ||||
-rw-r--r-- | src/utils/assert.ts | 3 | ||||
-rw-r--r-- | src/utils/constants.ts | 1 | ||||
-rw-r--r-- | src/utils/event_utils.ts | 41 | ||||
-rw-r--r-- | src/utils/filter_utils.ts | 82 | ||||
-rw-r--r-- | src/web3_wrapper.ts | 12 |
13 files changed, 255 insertions, 203 deletions
@@ -1,12 +1,8 @@ import * as _ from 'lodash'; import * as BigNumber from 'bignumber.js'; -import * as Web3 from 'web3'; -import * as abiDecoder from 'abi-decoder'; import {SchemaValidator, schemas} from '0x-json-schemas'; import {bigNumberConfigs} from './bignumber_config'; import * as ethUtil from 'ethereumjs-util'; -import findVersions = require('find-versions'); -import compareVersions = require('compare-versions'); import {Web3Wrapper} from './web3_wrapper'; import {constants} from './utils/constants'; import {utils} from './utils/utils'; @@ -27,12 +23,7 @@ import { SignedOrder, Web3Provider, ZeroExConfig, - TransactionReceipt, - DecodedLogArgs, TransactionReceiptWithDecodedLogs, - LogWithDecodedArgs, - FilterObject, - RawLog, } from './types'; import {zeroExConfigSchema} from './schemas/zero_ex_config_schema'; diff --git a/src/contract_wrappers/contract_wrapper.ts b/src/contract_wrappers/contract_wrapper.ts index 743dfc9b2..f6ccfdee4 100644 --- a/src/contract_wrappers/contract_wrapper.ts +++ b/src/contract_wrappers/contract_wrapper.ts @@ -1,9 +1,10 @@ import * as _ from 'lodash'; import * as Web3 from 'web3'; -import * as ethUtil from 'ethereumjs-util'; +import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream'; import {Web3Wrapper} from '../web3_wrapper'; import {AbiDecoder} from '../utils/abi_decoder'; import { + ZeroExError, InternalZeroExError, Artifact, LogWithDecodedArgs, @@ -11,32 +12,57 @@ import { ContractEvents, SubscriptionOpts, IndexedFilterValues, + EventCallback, + BlockParamLiteral, } from '../types'; -import {utils} from '../utils/utils'; - -const TOPIC_LENGTH = 32; +import {constants} from '../utils/constants'; +import {intervalUtils} from '../utils/interval_utils'; +import {filterUtils} from '../utils/filter_utils'; export class ContractWrapper { protected _web3Wrapper: Web3Wrapper; private _abiDecoder?: AbiDecoder; + private _blockAndLogStreamer: BlockAndLogStreamer|undefined; + private _blockAndLogStreamInterval: NodeJS.Timer; + private _filters: {[filterToken: string]: Web3.FilterObject}; + private _filterCallbacks: {[filterToken: string]: EventCallback}; + private _onLogAddedSubscriptionToken: string|undefined; + private _onLogRemovedSubscriptionToken: string|undefined; constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) { this._web3Wrapper = web3Wrapper; this._abiDecoder = abiDecoder; + this._filters = {}; + this._filterCallbacks = {}; + this._blockAndLogStreamer = undefined; + this._onLogAddedSubscriptionToken = undefined; + this._onLogRemovedSubscriptionToken = undefined; + } + protected _subscribe(address: string, eventName: ContractEvents, + indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi, + callback: EventCallback): string { + const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi); + if (_.isUndefined(this._blockAndLogStreamer)) { + this._startBlockAndLogStream(); + } + const filterToken = filterUtils.generateUUID(); + this._filters[filterToken] = filter; + this._filterCallbacks[filterToken] = callback; + return filterToken; + } + protected _unsubscribe(filterToken: string): void { + if (_.isUndefined(this._filters[filterToken])) { + throw new Error(ZeroExError.SubscriptionNotFound); + } + delete this._filters[filterToken]; + delete this._filterCallbacks[filterToken]; + if (_.isEmpty(this._filters)) { + this._stopBlockAndLogStream(); + } } protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts, indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> { - const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi; - const eventSignature = this._getEventSignatureFromAbiByName(eventAbi, eventName); - const topicForEventSignature = this._web3Wrapper.keccak256(eventSignature); - const topicsForIndexedArgs = this._getTopicsForIndexedArgs(eventAbi, indexFilterValues); - const topics = [topicForEventSignature, ...topicsForIndexedArgs]; - const filter = { - fromBlock: subscriptionOpts.fromBlock, - toBlock: subscriptionOpts.toBlock, - address, - topics, - }; + const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi, subscriptionOpts); const logs = await this._web3Wrapper.getLogsAsync(filter); const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this)); return logsWithDecodedArguments; @@ -55,27 +81,51 @@ export class ContractWrapper { await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists); return contractInstance; } - protected _getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string { - const types = _.map(eventAbi.inputs, 'type'); - const signature = `${eventAbi.name}(${types.join(',')})`; - return signature; - } - private _getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> { - const topics: Array<string|null> = []; - for (const eventInput of abi.inputs) { - if (!eventInput.indexed) { - continue; - } - if (_.isUndefined(indexFilterValues[eventInput.name])) { - topics.push(null); - } else { - const value = indexFilterValues[eventInput.name] as string; - const buffer = ethUtil.toBuffer(value); - const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH); - const topic = ethUtil.bufferToHex(paddedBuffer); - topics.push(topic); + private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void { + _.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => { + if (filterUtils.matchesFilter(log, filter)) { + const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs; + const logEvent = { + ...decodedLog, + removed, + }; + this._filterCallbacks[filterToken](logEvent); } + }); + } + private _startBlockAndLogStream(): void { + this._blockAndLogStreamer = new BlockAndLogStreamer( + this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper), + this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper), + ); + const catchAllLogFilter = {}; + this._blockAndLogStreamer.addLogFilter(catchAllLogFilter); + this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval( + this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL, + ); + let removed = false; + this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded( + this._onLogStateChanged.bind(this, removed), + ); + removed = true; + this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved( + this._onLogStateChanged.bind(this, removed), + ); + } + private _stopBlockAndLogStream(): void { + (this._blockAndLogStreamer as BlockAndLogStreamer).unsubscribeFromOnLogAdded( + this._onLogAddedSubscriptionToken as string); + (this._blockAndLogStreamer as BlockAndLogStreamer).unsubscribeFromOnLogRemoved( + this._onLogRemovedSubscriptionToken as string); + intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval); + delete this._blockAndLogStreamer; + } + private async _reconcileBlockAsync(): Promise<void> { + const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest); + // We need to coerce to Block type cause Web3.Block includes types for mempool blocks + if (!_.isUndefined(this._blockAndLogStreamer)) { + // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined + this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block); } - return topics; } } diff --git a/src/contract_wrappers/exchange_wrapper.ts b/src/contract_wrappers/exchange_wrapper.ts index 32eaa590c..5f02903ce 100644 --- a/src/contract_wrappers/exchange_wrapper.ts +++ b/src/contract_wrappers/exchange_wrapper.ts @@ -1,7 +1,6 @@ import * as _ from 'lodash'; import * as BigNumber from 'bignumber.js'; -import {SchemaValidator, schemas} from '0x-json-schemas'; -import promisify = require('es6-promisify'); +import {schemas} from '0x-json-schemas'; import {Web3Wrapper} from '../web3_wrapper'; import { ECSignature, @@ -16,11 +15,8 @@ import { SignedOrder, ContractEvent, ExchangeEvents, - ContractEventEmitter, SubscriptionOpts, IndexedFilterValues, - CreateContractEvent, - ContractEventObj, OrderCancellationRequest, OrderFillRequest, LogErrorContractEventArgs, @@ -31,13 +27,12 @@ import { ValidateOrderFillableOpts, OrderTransactionOpts, RawLog, + EventCallback, } from '../types'; import {assert} from '../utils/assert'; import {utils} from '../utils/utils'; -import {eventUtils} from '../utils/event_utils'; import {OrderValidationUtils} from '../utils/order_validation_utils'; import {ContractWrapper} from './contract_wrapper'; -import {constants} from '../utils/constants'; import {TokenWrapper} from './token_wrapper'; import {decorators} from '../utils/decorators'; import {AbiDecoder} from '../utils/abi_decoder'; @@ -51,7 +46,7 @@ const SHOULD_VALIDATE_BY_DEFAULT = true; */ export class ExchangeWrapper extends ContractWrapper { private _exchangeContractIfExists?: ExchangeContract; - private _exchangeLogEventEmitters: ContractEventEmitter[]; + private _activeSubscriptions: string[]; private _orderValidationUtils: OrderValidationUtils; private _tokenWrapper: TokenWrapper; private _exchangeContractErrCodesToMsg = { @@ -86,7 +81,7 @@ export class ExchangeWrapper extends ContractWrapper { super(web3Wrapper, abiDecoder); this._tokenWrapper = tokenWrapper; this._orderValidationUtils = new OrderValidationUtils(tokenWrapper, this); - this._exchangeLogEventEmitters = []; + this._activeSubscriptions = []; this._contractAddressIfExists = contractAddressIfExists; } /** @@ -622,41 +617,32 @@ export class ExchangeWrapper extends ContractWrapper { return txHash; } /** - * Subscribe to an event type emitted by the Exchange smart contract - * @param eventName The exchange contract event you would like to subscribe to. - * @param subscriptionOpts Subscriptions options that let you configure the subscription. - * @param indexFilterValues An object where the keys are indexed args returned by the event and - * the value is the value you are interested in. E.g `{maker: aUserAddressHex}` - * @param exchangeContractAddress The hex encoded address of the Exchange contract to call. - * @return ContractEventEmitter object + * Subscribe to an event type emitted by the Exchange contract. + * @param eventName The exchange contract event you would like to subscribe to. + * @param indexFilterValues An object where the keys are indexed args returned by the event and + * the value is the value you are interested in. E.g `{maker: aUserAddressHex}` + * @param callback Callback that gets called when a log is added/removed + * @return Subscription token used later to unsubscribe */ - public async subscribeAsync(eventName: ExchangeEvents, subscriptionOpts: SubscriptionOpts, - indexFilterValues: IndexedFilterValues, exchangeContractAddress: string): - Promise<ContractEventEmitter> { - assert.isETHAddressHex('exchangeContractAddress', exchangeContractAddress); + public async subscribeAsync(eventName: ExchangeEvents, indexFilterValues: IndexedFilterValues, + callback: EventCallback): Promise<string> { assert.doesBelongToStringEnum('eventName', eventName, ExchangeEvents); - assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema); assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema); - const exchangeContract = await this._getExchangeContractAsync(); - let createLogEvent: CreateContractEvent; - switch (eventName) { - case ExchangeEvents.LogFill: - createLogEvent = exchangeContract.LogFill; - break; - case ExchangeEvents.LogError: - createLogEvent = exchangeContract.LogError; - break; - case ExchangeEvents.LogCancel: - createLogEvent = exchangeContract.LogCancel; - break; - default: - throw utils.spawnSwitchErr('ExchangeEvents', eventName); - } - - const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts); - const eventEmitter = eventUtils.wrapEventEmitter(logEventObj); - this._exchangeLogEventEmitters.push(eventEmitter); - return eventEmitter; + assert.isFunction('callback', callback); + const exchangeContractAddress = await this.getContractAddressAsync(); + const subscriptionToken = this._subscribe( + exchangeContractAddress, eventName, indexFilterValues, artifacts.ExchangeArtifact.abi, callback, + ); + this._activeSubscriptions.push(subscriptionToken); + return subscriptionToken; + } + /** + * Cancel a subscription + * @param subscriptionToken Subscription token returned by `subscribe()` + */ + public unsubscribe(subscriptionToken: string): void { + _.pull(this._activeSubscriptions, subscriptionToken); + this._unsubscribe(subscriptionToken); } /** * Gets historical logs without creating a subscription @@ -678,15 +664,6 @@ export class ExchangeWrapper extends ContractWrapper { return logs; } /** - * Stops watching for all exchange events - */ - public async stopWatchingAllEventsAsync(): Promise<void> { - const stopWatchingPromises = _.map(this._exchangeLogEventEmitters, - logEventObj => logEventObj.stopWatchingAsync()); - await Promise.all(stopWatchingPromises); - this._exchangeLogEventEmitters = []; - } - /** * Retrieves the Ethereum address of the Exchange contract deployed on the network * that the user-passed web3 provider is connected to. * @returns The Ethereum address of the Exchange contract being used. @@ -809,8 +786,15 @@ export class ExchangeWrapper extends ContractWrapper { const ZRXtokenAddress = await exchangeInstance.ZRX_TOKEN_CONTRACT.callAsync(); return ZRXtokenAddress; } + /** + * Cancels all existing subscriptions + */ + public unsubscribeAll(): void { + _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this)); + this._activeSubscriptions = []; + } private async _invalidateContractInstancesAsync(): Promise<void> { - await this.stopWatchingAllEventsAsync(); + this.unsubscribeAll(); delete this._exchangeContractIfExists; } private async _isValidSignatureUsingContractCallAsync(dataHex: string, ecSignature: ECSignature, diff --git a/src/contract_wrappers/token_wrapper.ts b/src/contract_wrappers/token_wrapper.ts index f988e6ece..abd090f7e 100644 --- a/src/contract_wrappers/token_wrapper.ts +++ b/src/contract_wrappers/token_wrapper.ts @@ -1,10 +1,8 @@ import * as _ from 'lodash'; import * as BigNumber from 'bignumber.js'; -import {SchemaValidator, schemas} from '0x-json-schemas'; +import {schemas} from '0x-json-schemas'; import {Web3Wrapper} from '../web3_wrapper'; import {assert} from '../utils/assert'; -import {utils} from '../utils/utils'; -import {eventUtils} from '../utils/event_utils'; import {constants} from '../utils/constants'; import {ContractWrapper} from './contract_wrapper'; import {AbiDecoder} from '../utils/abi_decoder'; @@ -15,12 +13,9 @@ import { TokenEvents, IndexedFilterValues, SubscriptionOpts, - CreateContractEvent, - ContractEventEmitter, - ContractEventObj, MethodOpts, LogWithDecodedArgs, - RawLog, + EventCallback, } from '../types'; const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155; @@ -33,13 +28,13 @@ const ALLOWANCE_TO_ZERO_GAS_AMOUNT = 47155; export class TokenWrapper extends ContractWrapper { public UNLIMITED_ALLOWANCE_IN_BASE_UNITS = constants.UNLIMITED_ALLOWANCE_IN_BASE_UNITS; private _tokenContractsByAddress: {[address: string]: TokenContract}; - private _tokenLogEventEmitters: ContractEventEmitter[]; + private _activeSubscriptions: string[]; private _tokenTransferProxyContractAddressFetcher: () => Promise<string>; constructor(web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, tokenTransferProxyContractAddressFetcher: () => Promise<string>) { super(web3Wrapper, abiDecoder); this._tokenContractsByAddress = {}; - this._tokenLogEventEmitters = []; + this._activeSubscriptions = []; this._tokenTransferProxyContractAddressFetcher = tokenTransferProxyContractAddressFetcher; } /** @@ -251,34 +246,30 @@ export class TokenWrapper extends ContractWrapper { * Subscribe to an event type emitted by the Token contract. * @param tokenAddress The hex encoded address where the ERC20 token is deployed. * @param eventName The token contract event you would like to subscribe to. - * @param subscriptionOpts Subscriptions options that let you configure the subscription. * @param indexFilterValues An object where the keys are indexed args returned by the event and * the value is the value you are interested in. E.g `{maker: aUserAddressHex}` - * @return ContractEventEmitter object + * @param callback Callback that gets called when a log is added/removed + * @return Subscription token used later to unsubscribe */ - public async subscribeAsync(tokenAddress: string, eventName: TokenEvents, subscriptionOpts: SubscriptionOpts, - indexFilterValues: IndexedFilterValues): Promise<ContractEventEmitter> { + public subscribe(tokenAddress: string, eventName: TokenEvents, indexFilterValues: IndexedFilterValues, + callback: EventCallback): string { assert.isETHAddressHex('tokenAddress', tokenAddress); assert.doesBelongToStringEnum('eventName', eventName, TokenEvents); - assert.doesConformToSchema('subscriptionOpts', subscriptionOpts, schemas.subscriptionOptsSchema); assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema); - const tokenContract = await this._getTokenContractAsync(tokenAddress); - let createLogEvent: CreateContractEvent; - switch (eventName) { - case TokenEvents.Approval: - createLogEvent = tokenContract.Approval; - break; - case TokenEvents.Transfer: - createLogEvent = tokenContract.Transfer; - break; - default: - throw utils.spawnSwitchErr('TokenEvents', eventName); - } - - const logEventObj: ContractEventObj = createLogEvent(indexFilterValues, subscriptionOpts); - const eventEmitter = eventUtils.wrapEventEmitter(logEventObj); - this._tokenLogEventEmitters.push(eventEmitter); - return eventEmitter; + assert.isFunction('callback', callback); + const subscriptionToken = this._subscribe( + tokenAddress, eventName, indexFilterValues, artifacts.TokenArtifact.abi, callback, + ); + this._activeSubscriptions.push(subscriptionToken); + return subscriptionToken; + } + /** + * Cancel a subscription + * @param subscriptionToken Subscription token returned by `subscribe()` + */ + public unsubscribe(subscriptionToken: string): void { + _.pull(this._activeSubscriptions, subscriptionToken); + this._unsubscribe(subscriptionToken); } /** * Gets historical logs without creating a subscription @@ -301,16 +292,14 @@ export class TokenWrapper extends ContractWrapper { return logs; } /** - * Stops watching for all token events + * Cancels all existing subscriptions */ - public async stopWatchingAllEventsAsync(): Promise<void> { - const stopWatchingPromises = _.map(this._tokenLogEventEmitters, - logEventObj => logEventObj.stopWatchingAsync()); - await Promise.all(stopWatchingPromises); - this._tokenLogEventEmitters = []; + public unsubscribeAll(): void { + _.forEach(this._activeSubscriptions, this._unsubscribe.bind(this)); + this._activeSubscriptions = []; } - private async _invalidateContractInstancesAsync(): Promise<void> { - await this.stopWatchingAllEventsAsync(); + private _invalidateContractInstancesAsync(): void { + this.unsubscribeAll(); this._tokenContractsByAddress = {}; } private async _getTokenContractAsync(tokenAddress: string): Promise<TokenContract> { diff --git a/src/index.ts b/src/index.ts index 3359743e9..97ab084b7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,7 +19,6 @@ export { OrderFillOrKillRequest, OrderCancellationRequest, OrderFillRequest, - ContractEventEmitter, LogErrorContractEventArgs, LogCancelContractEventArgs, LogFillContractEventArgs, @@ -36,4 +35,5 @@ export { MethodOpts, OrderTransactionOpts, FilterObject, + LogEvent, } from './types'; diff --git a/src/subproviders/empty_wallet_subprovider.ts b/src/subproviders/empty_wallet_subprovider.ts index 0d037042d..2f260217c 100644 --- a/src/subproviders/empty_wallet_subprovider.ts +++ b/src/subproviders/empty_wallet_subprovider.ts @@ -1,4 +1,3 @@ -import * as Web3 from 'web3'; import {JSONRPCPayload} from '../types'; /* diff --git a/src/types.ts b/src/types.ts index 35bb6af78..44094f442 100644 --- a/src/types.ts +++ b/src/types.ts @@ -14,6 +14,7 @@ export enum ZeroExError { InvalidJump = 'INVALID_JUMP', OutOfGas = 'OUT_OF_GAS', NoNetworkId = 'NO_NETWORK_ID', + SubscriptionNotFound = 'SUBSCRIPTION_NOT_FOUND', } export enum InternalZeroExError { @@ -35,23 +36,17 @@ export type OrderAddresses = [string, string, string, string, string]; export type OrderValues = [BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber, BigNumber.BigNumber]; -export type EventCallbackAsync = (err: Error, event: ContractEvent) => Promise<void>; -export type EventCallbackSync = (err: Error, event: ContractEvent) => void; -export type EventCallback = EventCallbackSync|EventCallbackAsync; -export interface ContractEventObj { - watch: (eventWatch: EventCallback) => void; - stopWatching: () => void; +export interface LogEvent extends LogWithDecodedArgs { + removed: boolean; } -export type CreateContractEvent = (indexFilterValues: IndexedFilterValues, - subscriptionOpts: SubscriptionOpts) => ContractEventObj; +export type EventCallbackAsync = (log: LogEvent) => Promise<void>; +export type EventCallbackSync = (log: LogEvent) => void; +export type EventCallback = EventCallbackSync|EventCallbackAsync; export interface ExchangeContract extends Web3.ContractInstance { isValidSignature: { callAsync: (signerAddressHex: string, dataHex: string, v: number, r: string, s: string, txOpts?: TxOpts) => Promise<boolean>; }; - LogFill: CreateContractEvent; - LogCancel: CreateContractEvent; - LogError: CreateContractEvent; ZRX_TOKEN_CONTRACT: { callAsync: () => Promise<string>; }; @@ -137,8 +132,6 @@ export interface ExchangeContract extends Web3.ContractInstance { } export interface TokenContract extends Web3.ContractInstance { - Transfer: CreateContractEvent; - Approval: CreateContractEvent; balanceOf: { callAsync: (address: string, defaultBlock?: Web3.BlockParam) => Promise<BigNumber.BigNumber>; }; @@ -352,7 +345,13 @@ export interface IndexedFilterValues { [index: string]: ContractEventArg; } -export type BlockParam = 'latest'|'earliest'|'pending'|number; +export enum BlockParamLiteral { + Latest = 'latest', + Earliest = 'earliest', + Pending = 'pending', +} + +export type BlockParam = BlockParamLiteral|number; export interface SubscriptionOpts { fromBlock: BlockParam; @@ -378,11 +377,6 @@ export interface OrderFillRequest { export type AsyncMethod = (...args: any[]) => Promise<any>; -export interface ContractEventEmitter { - watch: (eventCallback: EventCallback) => void; - stopWatchingAsync: () => Promise<void>; -} - /** * We re-export the `Web3.Provider` type specified in the Web3 Typescript typings * since it is the type of the `provider` argument to the `ZeroEx` constructor. diff --git a/src/utils/abi_decoder.ts b/src/utils/abi_decoder.ts index 542591251..52b114c12 100644 --- a/src/utils/abi_decoder.ts +++ b/src/utils/abi_decoder.ts @@ -1,7 +1,7 @@ import * as Web3 from 'web3'; import * as _ from 'lodash'; import * as BigNumber from 'bignumber.js'; -import {AbiType, DecodedLogArgs, DecodedArgs, LogWithDecodedArgs, RawLog, SolidityTypes} from '../types'; +import {AbiType, DecodedLogArgs, LogWithDecodedArgs, RawLog, SolidityTypes} from '../types'; import * as SolidityCoder from 'web3/lib/solidity/coder'; export class AbiDecoder { diff --git a/src/utils/assert.ts b/src/utils/assert.ts index eb084129c..0b7a11939 100644 --- a/src/utils/assert.ts +++ b/src/utils/assert.ts @@ -17,6 +17,9 @@ export const assert = { isString(variableName: string, value: string): void { this.assert(_.isString(value), this.typeAssertionMessage(variableName, 'string', value)); }, + isFunction(variableName: string, value: any): void { + this.assert(_.isFunction(value), this.typeAssertionMessage(variableName, 'function', value)); + }, isHexString(variableName: string, value: string): void { this.assert(_.isString(value) && HEX_REGEX.test(value), this.typeAssertionMessage(variableName, 'HexString', value)); diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 1b11d7055..a066fe869 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -7,4 +7,5 @@ export const constants = { INVALID_JUMP_PATTERN: 'invalid JUMP at', OUT_OF_GAS_PATTERN: 'out of gas', UNLIMITED_ALLOWANCE_IN_BASE_UNITS: new BigNumber(2).pow(256).minus(1), + DEFAULT_BLOCK_POLLING_INTERVAL: 1000, }; diff --git a/src/utils/event_utils.ts b/src/utils/event_utils.ts deleted file mode 100644 index e8f30e1a8..000000000 --- a/src/utils/event_utils.ts +++ /dev/null @@ -1,41 +0,0 @@ -import * as _ from 'lodash'; -import {EventCallback, ContractEventArg, ContractEvent, ContractEventObj, ContractEventEmitter} from '../types'; -import * as BigNumber from 'bignumber.js'; -import promisify = require('es6-promisify'); - -export const eventUtils = { - wrapEventEmitter(event: ContractEventObj): ContractEventEmitter { - const watch = (eventCallback: EventCallback) => { - const bignumberWrappingEventCallback = eventUtils._getBigNumberWrappingEventCallback(eventCallback); - event.watch(bignumberWrappingEventCallback); - }; - const zeroExEvent = { - watch, - stopWatchingAsync: async () => { - await promisify(event.stopWatching, event)(); - }, - }; - return zeroExEvent; - }, - /** - * Wraps eventCallback function so that all the BigNumber arguments are wrapped in a newer version of BigNumber. - * @param eventCallback Event callback function to be wrapped - * @return Wrapped event callback function - */ - _getBigNumberWrappingEventCallback(eventCallback: EventCallback): EventCallback { - const bignumberWrappingEventCallback = (err: Error, event: ContractEvent) => { - if (_.isNull(err)) { - const wrapIfBigNumber = (value: ContractEventArg): ContractEventArg => { - // HACK: The old version of BigNumber used by Web3@0.19.0 does not support the `isBigNumber` - // and checking for a BigNumber instance using `instanceof` does not work either. We therefore - // check if the value constructor is a bignumber constructor. - const isWeb3BigNumber = _.startsWith(value.constructor.toString(), 'function BigNumber('); - return isWeb3BigNumber ? new BigNumber(value) : value; - }; - event.args = _.mapValues(event.args, wrapIfBigNumber); - } - eventCallback(err, event); - }; - return bignumberWrappingEventCallback; - }, -}; diff --git a/src/utils/filter_utils.ts b/src/utils/filter_utils.ts new file mode 100644 index 000000000..e09a95a6e --- /dev/null +++ b/src/utils/filter_utils.ts @@ -0,0 +1,82 @@ +import * as _ from 'lodash'; +import * as Web3 from 'web3'; +import * as uuid from 'uuid/v4'; +import * as ethUtil from 'ethereumjs-util'; +import * as jsSHA3 from 'js-sha3'; +import {ContractEvents, IndexedFilterValues, SubscriptionOpts} from '../types'; + +const TOPIC_LENGTH = 32; + +export const filterUtils = { + generateUUID(): string { + return uuid(); + }, + getFilter(address: string, eventName: ContractEvents, + indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi, + subscriptionOpts?: SubscriptionOpts): Web3.FilterObject { + const eventAbi = _.find(abi, {name: eventName}) as Web3.EventAbi; + const eventSignature = filterUtils.getEventSignatureFromAbiByName(eventAbi, eventName); + const topicForEventSignature = ethUtil.addHexPrefix(jsSHA3.keccak256(eventSignature)); + const topicsForIndexedArgs = filterUtils.getTopicsForIndexedArgs(eventAbi, indexFilterValues); + const topics = [topicForEventSignature, ...topicsForIndexedArgs]; + let filter: Web3.FilterObject = { + address, + topics, + }; + if (!_.isUndefined(subscriptionOpts)) { + filter = { + ...subscriptionOpts, + ...filter, + }; + } + return filter; + }, + getEventSignatureFromAbiByName(eventAbi: Web3.EventAbi, eventName: ContractEvents): string { + const types = _.map(eventAbi.inputs, 'type'); + const signature = `${eventAbi.name}(${types.join(',')})`; + return signature; + }, + getTopicsForIndexedArgs(abi: Web3.EventAbi, indexFilterValues: IndexedFilterValues): Array<string|null> { + const topics: Array<string|null> = []; + for (const eventInput of abi.inputs) { + if (!eventInput.indexed) { + continue; + } + if (_.isUndefined(indexFilterValues[eventInput.name])) { + // Null is a wildcard topic in a JSON-RPC call + topics.push(null); + } else { + const value = indexFilterValues[eventInput.name] as string; + const buffer = ethUtil.toBuffer(value); + const paddedBuffer = ethUtil.setLengthLeft(buffer, TOPIC_LENGTH); + const topic = ethUtil.bufferToHex(paddedBuffer); + topics.push(topic); + } + } + return topics; + }, + matchesFilter(log: Web3.LogEntry, filter: Web3.FilterObject): boolean { + if (!_.isUndefined(filter.address) && log.address !== filter.address) { + return false; + } + if (!_.isUndefined(filter.topics)) { + return filterUtils.matchesTopics(log.topics, filter.topics); + } + return true; + }, + matchesTopics(logTopics: string[], filterTopics: Array<string[]|string|null>): boolean { + const matchesTopic = _.zipWith(logTopics, filterTopics, filterUtils.matchesTopic.bind(filterUtils)); + const matchesTopics = _.every(matchesTopic); + return matchesTopics; + }, + matchesTopic(logTopic: string, filterTopic: string[]|string|null): boolean { + if (_.isArray(filterTopic)) { + return _.includes(filterTopic, logTopic); + } + if (_.isString(filterTopic)) { + return filterTopic === logTopic; + } + // null topic is a wildcard + return true; + }, +}; diff --git a/src/web3_wrapper.ts b/src/web3_wrapper.ts index 7576f3d40..9de75c809 100644 --- a/src/web3_wrapper.ts +++ b/src/web3_wrapper.ts @@ -94,8 +94,12 @@ export class Web3Wrapper { const signData = await promisify(this.web3.eth.sign)(address, message); return signData; } - public async getBlockTimestampAsync(blockHash: string): Promise<number> { - const {timestamp} = await promisify(this.web3.eth.getBlock)(blockHash); + public async getBlockAsync(blockParam: string|Web3.BlockParam): Promise<Web3.BlockWithoutTransactionData> { + const block = await promisify(this.web3.eth.getBlock)(blockParam); + return block; + } + public async getBlockTimestampAsync(blockParam: string|Web3.BlockParam): Promise<number> { + const {timestamp} = await this.getBlockAsync(blockParam); return timestamp; } public async getAvailableAddressesAsync(): Promise<string[]> { @@ -112,10 +116,6 @@ export class Web3Wrapper { const logs = await this.sendRawPayloadAsync(payload); return logs; } - public keccak256(data: string): string { - const hash = this.web3.sha3(data); - return hash; - } private getContractInstance<A extends Web3.ContractInstance>(abi: Web3.ContractAbi, address: string): A { const web3ContractInstance = this.web3.eth.contract(abi).at(address); const contractInstance = new Contract(web3ContractInstance, this.defaults) as any as A; |