From 441c1f9ab77bbbaef7186bf52964cbfdf7c12208 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 16:41:57 -0500 Subject: rename folder to order_watcher --- src/0x.ts | 2 +- src/mempool/event_watcher.ts | 78 -------------- src/mempool/order_state_watcher.ts | 175 ------------------------------- src/order_watcher/event_watcher.ts | 78 ++++++++++++++ src/order_watcher/order_state_watcher.ts | 175 +++++++++++++++++++++++++++++++ 5 files changed, 254 insertions(+), 254 deletions(-) delete mode 100644 src/mempool/event_watcher.ts delete mode 100644 src/mempool/order_state_watcher.ts create mode 100644 src/order_watcher/event_watcher.ts create mode 100644 src/order_watcher/order_state_watcher.ts diff --git a/src/0x.ts b/src/0x.ts index 51744d2d6..75a154930 100644 --- a/src/0x.ts +++ b/src/0x.ts @@ -17,7 +17,7 @@ import {TokenRegistryWrapper} from './contract_wrappers/token_registry_wrapper'; import {EtherTokenWrapper} from './contract_wrappers/ether_token_wrapper'; import {TokenWrapper} from './contract_wrappers/token_wrapper'; import {TokenTransferProxyWrapper} from './contract_wrappers/token_transfer_proxy_wrapper'; -import {OrderStateWatcher} from './mempool/order_state_watcher'; +import {OrderStateWatcher} from './order_watcher/order_state_watcher'; import {OrderStateUtils} from './utils/order_state_utils'; import { ECSignature, diff --git a/src/mempool/event_watcher.ts b/src/mempool/event_watcher.ts deleted file mode 100644 index 3f40606e7..000000000 --- a/src/mempool/event_watcher.ts +++ /dev/null @@ -1,78 +0,0 @@ -import * as Web3 from 'web3'; -import * as _ from 'lodash'; -import {Web3Wrapper} from '../web3_wrapper'; -import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; -import {AbiDecoder} from '../utils/abi_decoder'; -import {intervalUtils} from '../utils/interval_utils'; - -const DEFAULT_MEMPOOL_POLLING_INTERVAL = 200; - -export class EventWatcher { - private _web3Wrapper: Web3Wrapper; - private _pollingIntervalMs: number; - private _intervalId: NodeJS.Timer; - private _lastMempoolEvents: Web3.LogEntry[] = []; - private _callbackAsync?: MempoolEventCallback; - constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { - this._web3Wrapper = web3Wrapper; - this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? - DEFAULT_MEMPOOL_POLLING_INTERVAL : - pollingIntervalMs; - } - public subscribe(callback: MempoolEventCallback, numConfirmations: number): void { - this._callbackAsync = callback; - this._intervalId = intervalUtils.setAsyncExcludingInterval( - this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs, - ); - } - public unsubscribe(): void { - delete this._callbackAsync; - this._lastMempoolEvents = []; - intervalUtils.clearAsyncExcludingInterval(this._intervalId); - } - private async _pollForMempoolEventsAsync(numConfirmations: number): Promise { - const pendingEvents = await this._getMempoolEventsAsync(numConfirmations); - if (pendingEvents.length === 0) { - // HACK: Sometimes when node rebuilds the pending block we get back the empty result. - // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, - // that's why we just ignore those cases. - return; - } - const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, JSON.stringify); - const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, JSON.stringify); - let isRemoved = true; - await this._emitDifferencesAsync(removedEvents, isRemoved); - isRemoved = false; - await this._emitDifferencesAsync(newEvents, isRemoved); - this._lastMempoolEvents = pendingEvents; - } - private async _getMempoolEventsAsync(numConfirmations: number): Promise { - let fromBlock: BlockParamLiteral|number; - let toBlock: BlockParamLiteral|number; - if (numConfirmations === 0) { - fromBlock = BlockParamLiteral.Pending; - toBlock = BlockParamLiteral.Pending; - } else { - toBlock = await this._web3Wrapper.getBlockNumberAsync(); - fromBlock = toBlock - numConfirmations; - } - const mempoolFilter = { - fromBlock, - toBlock, - }; - const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); - return pendingEvents; - } - // TODO: Let's emit out own LogEntry type that has property isRemoved rather then removed - private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise { - for (const log of logs) { - const logEvent = { - removed: isRemoved, - ...log, - }; - if (!_.isUndefined(this._callbackAsync)) { - await this._callbackAsync(logEvent); - } - } - } -} diff --git a/src/mempool/order_state_watcher.ts b/src/mempool/order_state_watcher.ts deleted file mode 100644 index d0bf5d89c..000000000 --- a/src/mempool/order_state_watcher.ts +++ /dev/null @@ -1,175 +0,0 @@ -import * as _ from 'lodash'; -import {schemas} from '0x-json-schemas'; -import {ZeroEx} from '../'; -import {EventWatcher} from './event_watcher'; -import {assert} from '../utils/assert'; -import {utils} from '../utils/utils'; -import {artifacts} from '../artifacts'; -import {AbiDecoder} from '../utils/abi_decoder'; -import {OrderStateUtils} from '../utils/order_state_utils'; -import { - LogEvent, - OrderState, - SignedOrder, - Web3Provider, - BlockParamLiteral, - LogWithDecodedArgs, - OnOrderStateChangeCallback, - ExchangeEvents, - TokenEvents, - ZeroExError, -} from '../types'; -import {Web3Wrapper} from '../web3_wrapper'; - -interface DependentOrderHashes { - [makerAddress: string]: { - [makerToken: string]: Set, - }; -} - -interface OrderByOrderHash { - [orderHash: string]: SignedOrder; -} - -export class OrderStateWatcher { - private _orders: OrderByOrderHash; - private _dependentOrderHashes: DependentOrderHashes; - private _web3Wrapper: Web3Wrapper; - private _callbackAsync?: OnOrderStateChangeCallback; - private _eventWatcher: EventWatcher; - private _abiDecoder: AbiDecoder; - private _orderStateUtils: OrderStateUtils; - constructor( - web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, orderStateUtils: OrderStateUtils, - mempoolPollingIntervalMs?: number) { - this._web3Wrapper = web3Wrapper; - this._orders = {}; - this._dependentOrderHashes = {}; - this._eventWatcher = new EventWatcher( - this._web3Wrapper, mempoolPollingIntervalMs, - ); - this._abiDecoder = abiDecoder; - this._orderStateUtils = orderStateUtils; - } - /** - * Add an order to the orderStateWatcher - * @param signedOrder The order you wish to start watching. - */ - public addOrder(signedOrder: SignedOrder): void { - assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema); - const orderHash = ZeroEx.getOrderHashHex(signedOrder); - this._orders[orderHash] = signedOrder; - this.addToDependentOrderHashes(signedOrder, orderHash); - } - /** - * Removes an order from the orderStateWatcher - * @param orderHash The orderHash of the order you wish to stop watching. - */ - public removeOrder(orderHash: string): void { - assert.doesConformToSchema('orderHash', orderHash, schemas.orderHashSchema); - const signedOrder = this._orders[orderHash]; - if (_.isUndefined(signedOrder)) { - return; - } - delete this._orders[orderHash]; - this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress].delete(orderHash); - // We currently do not remove the maker/makerToken keys from the mapping when all orderHashes removed - } - /** - * Starts an orderStateWatcher subscription. The callback will be notified every time a watched order's - * backing blockchain state has changed. This is a call-to-action for the caller to re-validate the order - * @param callback Receives the orderHash of the order that should be re-validated, together. - * with all the order-relevant blockchain state needed to re-validate the order - * @param numConfirmations Number of confirmed blocks deeps you want to run the orderWatcher from. Passing - * is 0 will watch the backing node's mempool, 3 will emit events when blockchain - * state relevant to a watched order changed 3 blocks ago. - */ - public subscribe(callback: OnOrderStateChangeCallback, numConfirmations: number): void { - assert.isFunction('callback', callback); - if (!_.isUndefined(this._callbackAsync)) { - throw new Error(ZeroExError.SubscriptionAlreadyPresent); - } - this._callbackAsync = callback; - this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this), numConfirmations); - } - /** - * Ends an orderStateWatcher subscription. - * @param signedOrder The order you wish to stop watching. - */ - public unsubscribe(): void { - delete this._callbackAsync; - this._eventWatcher.unsubscribe(); - } - private async _onMempoolEventCallbackAsync(log: LogEvent): Promise { - const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log); - const isDecodedLog = !_.isUndefined((maybeDecodedLog as LogWithDecodedArgs).event); - if (!isDecodedLog) { - return; // noop - } - const decodedLog = maybeDecodedLog as LogWithDecodedArgs; - let makerToken: string; - let makerAddress: string; - let orderHashesSet: Set; - switch (decodedLog.event) { - case TokenEvents.Approval: - makerToken = decodedLog.address; - makerAddress = decodedLog.args._owner; - orderHashesSet = _.get(this._dependentOrderHashes, [makerAddress, makerToken]); - if (!_.isUndefined(orderHashesSet)) { - const orderHashes = Array.from(orderHashesSet); - await this._emitRevalidateOrdersAsync(orderHashes); - } - break; - - case TokenEvents.Transfer: - makerToken = decodedLog.address; - makerAddress = decodedLog.args._from; - orderHashesSet = _.get(this._dependentOrderHashes, [makerAddress, makerToken]); - if (!_.isUndefined(orderHashesSet)) { - const orderHashes = Array.from(orderHashesSet); - await this._emitRevalidateOrdersAsync(orderHashes); - } - break; - - case ExchangeEvents.LogFill: - case ExchangeEvents.LogCancel: - const orderHash = decodedLog.args.orderHash; - const isOrderWatched = !_.isUndefined(this._orders[orderHash]); - if (isOrderWatched) { - await this._emitRevalidateOrdersAsync([orderHash]); - } - break; - - case ExchangeEvents.LogError: - return; // noop - - default: - throw utils.spawnSwitchErr('decodedLog.event', decodedLog.event); - } - } - private async _emitRevalidateOrdersAsync(orderHashes: string[]): Promise { - // TODO: Make defaultBlock a passed in option - const methodOpts = { - defaultBlock: BlockParamLiteral.Pending, - }; - - for (const orderHash of orderHashes) { - const signedOrder = this._orders[orderHash] as SignedOrder; - const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder, methodOpts); - if (!_.isUndefined(this._callbackAsync)) { - await this._callbackAsync(orderState); - } else { - break; // Unsubscribe was called - } - } - } - private addToDependentOrderHashes(signedOrder: SignedOrder, orderHash: string) { - if (_.isUndefined(this._dependentOrderHashes[signedOrder.maker])) { - this._dependentOrderHashes[signedOrder.maker] = {}; - } - if (_.isUndefined(this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress])) { - this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress] = new Set(); - } - this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress].add(orderHash); - } -} diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts new file mode 100644 index 000000000..205885f96 --- /dev/null +++ b/src/order_watcher/event_watcher.ts @@ -0,0 +1,78 @@ +import * as Web3 from 'web3'; +import * as _ from 'lodash'; +import {Web3Wrapper} from '../web3_wrapper'; +import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; +import {AbiDecoder} from '../utils/abi_decoder'; +import {intervalUtils} from '../utils/interval_utils'; + +const DEFAULT_EVENT_POLLING_INTERVAL = 200; + +export class EventWatcher { + private _web3Wrapper: Web3Wrapper; + private _pollingIntervalMs: number; + private _intervalId: NodeJS.Timer; + private _lastEvents: Web3.LogEntry[] = []; + private _callbackAsync?: MempoolEventCallback; + constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { + this._web3Wrapper = web3Wrapper; + this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? + DEFAULT_EVENT_POLLING_INTERVAL : + pollingIntervalMs; + } + public subscribe(callback: MempoolEventCallback, numConfirmations: number): void { + this._callbackAsync = callback; + this._intervalId = intervalUtils.setAsyncExcludingInterval( + this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs, + ); + } + public unsubscribe(): void { + delete this._callbackAsync; + this._lastEvents = []; + intervalUtils.clearAsyncExcludingInterval(this._intervalId); + } + private async _pollForMempoolEventsAsync(numConfirmations: number): Promise { + const pendingEvents = await this._getMempoolEventsAsync(numConfirmations); + if (pendingEvents.length === 0) { + // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // that's why we just ignore those cases. + return; + } + const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); + const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); + let isRemoved = true; + await this._emitDifferencesAsync(removedEvents, isRemoved); + isRemoved = false; + await this._emitDifferencesAsync(newEvents, isRemoved); + this._lastEvents = pendingEvents; + } + private async _getMempoolEventsAsync(numConfirmations: number): Promise { + let fromBlock: BlockParamLiteral|number; + let toBlock: BlockParamLiteral|number; + if (numConfirmations === 0) { + fromBlock = BlockParamLiteral.Pending; + toBlock = BlockParamLiteral.Pending; + } else { + toBlock = await this._web3Wrapper.getBlockNumberAsync(); + fromBlock = toBlock - numConfirmations; + } + const mempoolFilter = { + fromBlock, + toBlock, + }; + const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); + return pendingEvents; + } + // TODO: Let's emit out own LogEntry type that has property isRemoved rather then removed + private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise { + for (const log of logs) { + const logEvent = { + removed: isRemoved, + ...log, + }; + if (!_.isUndefined(this._callbackAsync)) { + await this._callbackAsync(logEvent); + } + } + } +} diff --git a/src/order_watcher/order_state_watcher.ts b/src/order_watcher/order_state_watcher.ts new file mode 100644 index 000000000..8710c5a84 --- /dev/null +++ b/src/order_watcher/order_state_watcher.ts @@ -0,0 +1,175 @@ +import * as _ from 'lodash'; +import {schemas} from '0x-json-schemas'; +import {ZeroEx} from '../'; +import {EventWatcher} from './event_watcher'; +import {assert} from '../utils/assert'; +import {utils} from '../utils/utils'; +import {artifacts} from '../artifacts'; +import {AbiDecoder} from '../utils/abi_decoder'; +import {OrderStateUtils} from '../utils/order_state_utils'; +import { + LogEvent, + OrderState, + SignedOrder, + Web3Provider, + BlockParamLiteral, + LogWithDecodedArgs, + OnOrderStateChangeCallback, + ExchangeEvents, + TokenEvents, + ZeroExError, +} from '../types'; +import {Web3Wrapper} from '../web3_wrapper'; + +interface DependentOrderHashes { + [makerAddress: string]: { + [makerToken: string]: Set, + }; +} + +interface OrderByOrderHash { + [orderHash: string]: SignedOrder; +} + +export class OrderStateWatcher { + private _orders: OrderByOrderHash; + private _dependentOrderHashes: DependentOrderHashes; + private _web3Wrapper: Web3Wrapper; + private _callbackAsync?: OnOrderStateChangeCallback; + private _eventWatcher: EventWatcher; + private _abiDecoder: AbiDecoder; + private _orderStateUtils: OrderStateUtils; + constructor( + web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, orderStateUtils: OrderStateUtils, + eventPollingIntervalMs?: number) { + this._web3Wrapper = web3Wrapper; + this._orders = {}; + this._dependentOrderHashes = {}; + this._eventWatcher = new EventWatcher( + this._web3Wrapper, eventPollingIntervalMs, + ); + this._abiDecoder = abiDecoder; + this._orderStateUtils = orderStateUtils; + } + /** + * Add an order to the orderStateWatcher + * @param signedOrder The order you wish to start watching. + */ + public addOrder(signedOrder: SignedOrder): void { + assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema); + const orderHash = ZeroEx.getOrderHashHex(signedOrder); + this._orders[orderHash] = signedOrder; + this.addToDependentOrderHashes(signedOrder, orderHash); + } + /** + * Removes an order from the orderStateWatcher + * @param orderHash The orderHash of the order you wish to stop watching. + */ + public removeOrder(orderHash: string): void { + assert.doesConformToSchema('orderHash', orderHash, schemas.orderHashSchema); + const signedOrder = this._orders[orderHash]; + if (_.isUndefined(signedOrder)) { + return; + } + delete this._orders[orderHash]; + this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress].delete(orderHash); + // We currently do not remove the maker/makerToken keys from the mapping when all orderHashes removed + } + /** + * Starts an orderStateWatcher subscription. The callback will be notified every time a watched order's + * backing blockchain state has changed. This is a call-to-action for the caller to re-validate the order + * @param callback Receives the orderHash of the order that should be re-validated, together. + * with all the order-relevant blockchain state needed to re-validate the order + * @param numConfirmations Number of confirmed blocks deeps you want to run the orderWatcher from. Passing + * is 0 will watch the backing node's mempool, 3 will emit events when blockchain + * state relevant to a watched order changed 3 blocks ago. + */ + public subscribe(callback: OnOrderStateChangeCallback, numConfirmations: number): void { + assert.isFunction('callback', callback); + if (!_.isUndefined(this._callbackAsync)) { + throw new Error(ZeroExError.SubscriptionAlreadyPresent); + } + this._callbackAsync = callback; + this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this), numConfirmations); + } + /** + * Ends an orderStateWatcher subscription. + * @param signedOrder The order you wish to stop watching. + */ + public unsubscribe(): void { + delete this._callbackAsync; + this._eventWatcher.unsubscribe(); + } + private async _onMempoolEventCallbackAsync(log: LogEvent): Promise { + const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log); + const isDecodedLog = !_.isUndefined((maybeDecodedLog as LogWithDecodedArgs).event); + if (!isDecodedLog) { + return; // noop + } + const decodedLog = maybeDecodedLog as LogWithDecodedArgs; + let makerToken: string; + let makerAddress: string; + let orderHashesSet: Set; + switch (decodedLog.event) { + case TokenEvents.Approval: + makerToken = decodedLog.address; + makerAddress = decodedLog.args._owner; + orderHashesSet = _.get(this._dependentOrderHashes, [makerAddress, makerToken]); + if (!_.isUndefined(orderHashesSet)) { + const orderHashes = Array.from(orderHashesSet); + await this._emitRevalidateOrdersAsync(orderHashes); + } + break; + + case TokenEvents.Transfer: + makerToken = decodedLog.address; + makerAddress = decodedLog.args._from; + orderHashesSet = _.get(this._dependentOrderHashes, [makerAddress, makerToken]); + if (!_.isUndefined(orderHashesSet)) { + const orderHashes = Array.from(orderHashesSet); + await this._emitRevalidateOrdersAsync(orderHashes); + } + break; + + case ExchangeEvents.LogFill: + case ExchangeEvents.LogCancel: + const orderHash = decodedLog.args.orderHash; + const isOrderWatched = !_.isUndefined(this._orders[orderHash]); + if (isOrderWatched) { + await this._emitRevalidateOrdersAsync([orderHash]); + } + break; + + case ExchangeEvents.LogError: + return; // noop + + default: + throw utils.spawnSwitchErr('decodedLog.event', decodedLog.event); + } + } + private async _emitRevalidateOrdersAsync(orderHashes: string[]): Promise { + // TODO: Make defaultBlock a passed in option + const methodOpts = { + defaultBlock: BlockParamLiteral.Pending, + }; + + for (const orderHash of orderHashes) { + const signedOrder = this._orders[orderHash] as SignedOrder; + const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder, methodOpts); + if (!_.isUndefined(this._callbackAsync)) { + await this._callbackAsync(orderState); + } else { + break; // Unsubscribe was called + } + } + } + private addToDependentOrderHashes(signedOrder: SignedOrder, orderHash: string) { + if (_.isUndefined(this._dependentOrderHashes[signedOrder.maker])) { + this._dependentOrderHashes[signedOrder.maker] = {}; + } + if (_.isUndefined(this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress])) { + this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress] = new Set(); + } + this._dependentOrderHashes[signedOrder.maker][signedOrder.makerTokenAddress].add(orderHash); + } +} -- cgit