diff options
author | Leonid Logvinov <logvinov.leon@gmail.com> | 2017-10-31 00:38:10 +0800 |
---|---|---|
committer | Fabio Berger <me@fabioberger.com> | 2017-10-31 00:49:16 +0800 |
commit | a896904ae7c453f51b1f46de2be3a28416db72d1 (patch) | |
tree | c50c3638b0b9bee982816bb48a4c935de798476a /src/mempool | |
parent | 6bfcd253f8e9689ce787899a42f80914b067a4f1 (diff) | |
download | dexon-0x-contracts-a896904ae7c453f51b1f46de2be3a28416db72d1.tar.gz dexon-0x-contracts-a896904ae7c453f51b1f46de2be3a28416db72d1.tar.zst dexon-0x-contracts-a896904ae7c453f51b1f46de2be3a28416db72d1.zip |
Add naive order state watcher implementation
Revalidate all orders upon event received and emit order states even if
not changed
Diffstat (limited to 'src/mempool')
-rw-r--r-- | src/mempool/event_watcher.ts | 20 | ||||
-rw-r--r-- | src/mempool/order_state_watcher.ts | 55 |
2 files changed, 41 insertions, 34 deletions
diff --git a/src/mempool/event_watcher.ts b/src/mempool/event_watcher.ts index 1ad30b790..27f0c8207 100644 --- a/src/mempool/event_watcher.ts +++ b/src/mempool/event_watcher.ts @@ -12,7 +12,7 @@ export class EventWatcher { private _pollingIntervalMs: number; private _intervalId: NodeJS.Timer; private _lastMempoolEvents: Web3.LogEntry[] = []; - private _callback?: MempoolEventCallback; + private _callbackAsync?: MempoolEventCallback; constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { this._web3Wrapper = web3Wrapper; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? @@ -20,12 +20,12 @@ export class EventWatcher { pollingIntervalMs; } public subscribe(callback: MempoolEventCallback): void { - this._callback = callback; + this._callbackAsync = callback; this._intervalId = intervalUtils.setAsyncExcludingInterval( this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs); } public unsubscribe(): void { - delete this._callback; + delete this._callbackAsync; this._lastMempoolEvents = []; intervalUtils.clearAsyncExcludingInterval(this._intervalId); } @@ -40,9 +40,9 @@ export class EventWatcher { const removedEvents = _.differenceBy(this._lastMempoolEvents, pendingEvents, JSON.stringify); const newEvents = _.differenceBy(pendingEvents, this._lastMempoolEvents, JSON.stringify); let isRemoved = true; - this._emitDifferences(removedEvents, isRemoved); + await this._emitDifferencesAsync(removedEvents, isRemoved); isRemoved = false; - this._emitDifferences(newEvents, isRemoved); + await this._emitDifferencesAsync(newEvents, isRemoved); this._lastMempoolEvents = pendingEvents; } private async _getMempoolEventsAsync(): Promise<Web3.LogEntry[]> { @@ -53,15 +53,15 @@ export class EventWatcher { const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); return pendingEvents; } - private _emitDifferences(logs: Web3.LogEntry[], isRemoved: boolean): void { - _.forEach(logs, log => { + private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise<void> { + for (const log of logs) { const logEvent = { removed: isRemoved, ...log, }; - if (!_.isUndefined(this._callback)) { - this._callback(logEvent); + if (!_.isUndefined(this._callbackAsync)) { + await this._callbackAsync(logEvent); } - }); + } } } diff --git a/src/mempool/order_state_watcher.ts b/src/mempool/order_state_watcher.ts index 89f84647d..3da48005d 100644 --- a/src/mempool/order_state_watcher.ts +++ b/src/mempool/order_state_watcher.ts @@ -5,13 +5,14 @@ import {EventWatcher} from './event_watcher'; import {assert} from '../utils/assert'; import {artifacts} from '../artifacts'; import {AbiDecoder} from '../utils/abi_decoder'; -import {orderWatcherConfigSchema} from '../schemas/order_watcher_config_schema'; +import {OrderStateUtils} from '../utils/order_state_utils'; import { LogEvent, + OrderState, SignedOrder, Web3Provider, + BlockParamLiteral, LogWithDecodedArgs, - OrderWatcherConfig, OnOrderStateChangeCallback, } from '../types'; import {Web3Wrapper} from '../web3_wrapper'; @@ -19,20 +20,19 @@ import {Web3Wrapper} from '../web3_wrapper'; export class OrderStateWatcher { private _orders = new Map<string, SignedOrder>(); private _web3Wrapper: Web3Wrapper; - private _config: OrderWatcherConfig; - private _callback?: OnOrderStateChangeCallback; - private _eventWatcher?: EventWatcher; + private _callbackAsync?: OnOrderStateChangeCallback; + private _eventWatcher: EventWatcher; private _abiDecoder: AbiDecoder; - constructor(provider: Web3Provider, config?: OrderWatcherConfig) { - assert.isWeb3Provider('provider', provider); - if (!_.isUndefined(config)) { - assert.doesConformToSchema('config', config, orderWatcherConfigSchema); - } - this._web3Wrapper = new Web3Wrapper(provider); - this._config = config || {}; - const artifactJSONs = _.values(artifacts); - const abiArrays = _.map(artifactJSONs, artifact => artifact.abi); - this._abiDecoder = new AbiDecoder(abiArrays); + private _orderStateUtils: OrderStateUtils; + constructor( + web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, orderStateUtils: OrderStateUtils, + mempoolPollingIntervalMs?: number) { + this._web3Wrapper = web3Wrapper; + this._eventWatcher = new EventWatcher( + this._web3Wrapper, mempoolPollingIntervalMs, + ); + this._abiDecoder = abiDecoder; + this._orderStateUtils = orderStateUtils; } public addOrder(signedOrder: SignedOrder): void { assert.doesConformToSchema('signedOrder', signedOrder, schemas.signedOrderSchema); @@ -46,17 +46,12 @@ export class OrderStateWatcher { } public subscribe(callback: OnOrderStateChangeCallback): void { assert.isFunction('callback', callback); - this._callback = callback; - this._eventWatcher = new EventWatcher( - this._web3Wrapper, this._config.mempoolPollingIntervalMs, - ); + this._callbackAsync = callback; this._eventWatcher.subscribe(this._onMempoolEventCallbackAsync.bind(this)); } public unsubscribe(): void { - delete this._callback; - if (!_.isUndefined(this._eventWatcher)) { - this._eventWatcher.unsubscribe(); - } + delete this._callbackAsync; + this._eventWatcher.unsubscribe(); } private async _onMempoolEventCallbackAsync(log: LogEvent): Promise<void> { const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log); @@ -65,6 +60,18 @@ export class OrderStateWatcher { } } private async _revalidateOrdersAsync(): Promise<void> { - _.noop(); + const methodOpts = { + defaultBlock: BlockParamLiteral.Pending, + }; + const orderHashes = Array.from(this._orders.keys()); + for (const orderHash of orderHashes) { + const signedOrder = this._orders.get(orderHash) as SignedOrder; + const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder, methodOpts); + if (!_.isUndefined(this._callbackAsync)) { + await this._callbackAsync(orderState); + } else { + break; // Unsubscribe was called + } + } } } |