From 441c1f9ab77bbbaef7186bf52964cbfdf7c12208 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 16:41:57 -0500 Subject: rename folder to order_watcher --- src/order_watcher/event_watcher.ts | 78 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 src/order_watcher/event_watcher.ts (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts new file mode 100644 index 000000000..205885f96 --- /dev/null +++ b/src/order_watcher/event_watcher.ts @@ -0,0 +1,78 @@ +import * as Web3 from 'web3'; +import * as _ from 'lodash'; +import {Web3Wrapper} from '../web3_wrapper'; +import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; +import {AbiDecoder} from '../utils/abi_decoder'; +import {intervalUtils} from '../utils/interval_utils'; + +const DEFAULT_EVENT_POLLING_INTERVAL = 200; + +export class EventWatcher { + private _web3Wrapper: Web3Wrapper; + private _pollingIntervalMs: number; + private _intervalId: NodeJS.Timer; + private _lastEvents: Web3.LogEntry[] = []; + private _callbackAsync?: MempoolEventCallback; + constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { + this._web3Wrapper = web3Wrapper; + this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? + DEFAULT_EVENT_POLLING_INTERVAL : + pollingIntervalMs; + } + public subscribe(callback: MempoolEventCallback, numConfirmations: number): void { + this._callbackAsync = callback; + this._intervalId = intervalUtils.setAsyncExcludingInterval( + this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs, + ); + } + public unsubscribe(): void { + delete this._callbackAsync; + this._lastEvents = []; + intervalUtils.clearAsyncExcludingInterval(this._intervalId); + } + private async _pollForMempoolEventsAsync(numConfirmations: number): Promise { + const pendingEvents = await this._getMempoolEventsAsync(numConfirmations); + if (pendingEvents.length === 0) { + // HACK: Sometimes when node rebuilds the pending block we get back the empty result. + // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, + // that's why we just ignore those cases. + return; + } + const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); + const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); + let isRemoved = true; + await this._emitDifferencesAsync(removedEvents, isRemoved); + isRemoved = false; + await this._emitDifferencesAsync(newEvents, isRemoved); + this._lastEvents = pendingEvents; + } + private async _getMempoolEventsAsync(numConfirmations: number): Promise { + let fromBlock: BlockParamLiteral|number; + let toBlock: BlockParamLiteral|number; + if (numConfirmations === 0) { + fromBlock = BlockParamLiteral.Pending; + toBlock = BlockParamLiteral.Pending; + } else { + toBlock = await this._web3Wrapper.getBlockNumberAsync(); + fromBlock = toBlock - numConfirmations; + } + const mempoolFilter = { + fromBlock, + toBlock, + }; + const pendingEvents = await this._web3Wrapper.getLogsAsync(mempoolFilter); + return pendingEvents; + } + // TODO: Let's emit out own LogEntry type that has property isRemoved rather then removed + private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise { + for (const log of logs) { + const logEvent = { + removed: isRemoved, + ...log, + }; + if (!_.isUndefined(this._callbackAsync)) { + await this._callbackAsync(logEvent); + } + } + } +} -- cgit From 6f5a55b5fe4c6f0f56303b6ac5dcbd99129ee7bd Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 16:43:19 -0500 Subject: Rename MempoolEventCallback to EventWatcherCallback --- src/order_watcher/event_watcher.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 7d3719282..9ace32b29 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -1,7 +1,7 @@ import * as Web3 from 'web3'; import * as _ from 'lodash'; import {Web3Wrapper} from '../web3_wrapper'; -import {BlockParamLiteral, EventCallback, MempoolEventCallback} from '../types'; +import {BlockParamLiteral, EventCallback, EventWatcherCallback} from '../types'; import {AbiDecoder} from '../utils/abi_decoder'; import {intervalUtils} from '../utils/interval_utils'; @@ -12,14 +12,14 @@ export class EventWatcher { private _pollingIntervalMs: number; private _intervalId: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; - private _callbackAsync?: MempoolEventCallback; + private _callbackAsync?: EventWatcherCallback; constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { this._web3Wrapper = web3Wrapper; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? DEFAULT_EVENT_POLLING_INTERVAL : pollingIntervalMs; } - public subscribe(callback: MempoolEventCallback, numConfirmations: number): void { + public subscribe(callback: EventWatcherCallback, numConfirmations: number): void { this._callbackAsync = callback; this._intervalId = intervalUtils.setAsyncExcludingInterval( this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs, -- cgit From 50d3a1482567d0e4f572877b50547ae4fbad9942 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 17:39:03 -0500 Subject: Remove finished TODOs --- src/order_watcher/event_watcher.ts | 1 - 1 file changed, 1 deletion(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 9ace32b29..471dcd21a 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -63,7 +63,6 @@ export class EventWatcher { const events = await this._web3Wrapper.getLogsAsync(eventFilter); return events; } - // TODO: Let's emit out own LogEntry type that has property isRemoved rather then removed private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise { for (const log of logs) { const logEvent = { -- cgit From c60d7e2db857331f66140c9d9a49f4161a8bf8a4 Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Thu, 9 Nov 2017 17:44:36 -0500 Subject: Fix getting events from non-mempool --- src/order_watcher/event_watcher.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 471dcd21a..f1a2b5729 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -53,8 +53,9 @@ export class EventWatcher { fromBlock = BlockParamLiteral.Pending; toBlock = BlockParamLiteral.Pending; } else { - toBlock = await this._web3Wrapper.getBlockNumberAsync(); - fromBlock = toBlock - numConfirmations; + const currentBlock = await this._web3Wrapper.getBlockNumberAsync(); + toBlock = currentBlock - numConfirmations; + fromBlock = currentBlock - numConfirmations; } const eventFilter = { fromBlock, -- cgit From b0491b0ee28e6d9fd7e3a5647284c99bd79d4930 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 17:48:05 -0500 Subject: Rename _callbackAsync to _callbackIfExistsAsync for clarity --- src/order_watcher/event_watcher.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index f1a2b5729..ce471b58d 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -12,7 +12,7 @@ export class EventWatcher { private _pollingIntervalMs: number; private _intervalId: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; - private _callbackAsync?: EventWatcherCallback; + private _callbackIfExistsAsync?: EventWatcherCallback; constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { this._web3Wrapper = web3Wrapper; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? @@ -20,13 +20,13 @@ export class EventWatcher { pollingIntervalMs; } public subscribe(callback: EventWatcherCallback, numConfirmations: number): void { - this._callbackAsync = callback; + this._callbackIfExistsAsync = callback; this._intervalId = intervalUtils.setAsyncExcludingInterval( this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs, ); } public unsubscribe(): void { - delete this._callbackAsync; + delete this._callbackIfExistsAsync; this._lastEvents = []; intervalUtils.clearAsyncExcludingInterval(this._intervalId); } @@ -70,8 +70,8 @@ export class EventWatcher { removed: isRemoved, ...log, }; - if (!_.isUndefined(this._callbackAsync)) { - await this._callbackAsync(logEvent); + if (!_.isUndefined(this._callbackIfExistsAsync)) { + await this._callbackIfExistsAsync(logEvent); } } } -- cgit From c0db88168b250ef4db960e9eddced8f5a10ee63f Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 18:29:13 -0500 Subject: Fix bug where we hard-coded using pendingBlock for fetching the orderState. Moved numConfirmations to become a global orderStateWatcher config --- src/order_watcher/event_watcher.ts | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index ce471b58d..f71b14afb 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -13,16 +13,18 @@ export class EventWatcher { private _intervalId: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; private _callbackIfExistsAsync?: EventWatcherCallback; - constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { + private _numConfirmations: number; + constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number, numConfirmations: number) { this._web3Wrapper = web3Wrapper; + this._numConfirmations = numConfirmations; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? DEFAULT_EVENT_POLLING_INTERVAL : pollingIntervalMs; } - public subscribe(callback: EventWatcherCallback, numConfirmations: number): void { + public subscribe(callback: EventWatcherCallback): void { this._callbackIfExistsAsync = callback; this._intervalId = intervalUtils.setAsyncExcludingInterval( - this._pollForMempoolEventsAsync.bind(this, numConfirmations), this._pollingIntervalMs, + this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs, ); } public unsubscribe(): void { @@ -30,8 +32,8 @@ export class EventWatcher { this._lastEvents = []; intervalUtils.clearAsyncExcludingInterval(this._intervalId); } - private async _pollForMempoolEventsAsync(numConfirmations: number): Promise { - const pendingEvents = await this._getEventsAsync(numConfirmations); + private async _pollForMempoolEventsAsync(): Promise { + const pendingEvents = await this._getEventsAsync(); if (pendingEvents.length === 0) { // HACK: Sometimes when node rebuilds the pending block we get back the empty result. // We don't want to emit a lot of removal events and bring them back after a couple of miliseconds, @@ -46,16 +48,16 @@ export class EventWatcher { await this._emitDifferencesAsync(newEvents, isRemoved); this._lastEvents = pendingEvents; } - private async _getEventsAsync(numConfirmations: number): Promise { + private async _getEventsAsync(): Promise { let fromBlock: BlockParamLiteral|number; let toBlock: BlockParamLiteral|number; - if (numConfirmations === 0) { + if (this._numConfirmations === 0) { fromBlock = BlockParamLiteral.Pending; - toBlock = BlockParamLiteral.Pending; + toBlock = fromBlock; } else { const currentBlock = await this._web3Wrapper.getBlockNumberAsync(); - toBlock = currentBlock - numConfirmations; - fromBlock = currentBlock - numConfirmations; + toBlock = currentBlock - this._numConfirmations; + fromBlock = toBlock; } const eventFilter = { fromBlock, -- cgit From 27519e1dfa882d6b8b8a3febf3bea88be845e48f Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 22:57:38 -0500 Subject: rename intervalId to intervalIdIfExists --- src/order_watcher/event_watcher.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index f71b14afb..4cb741617 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -10,7 +10,7 @@ const DEFAULT_EVENT_POLLING_INTERVAL = 200; export class EventWatcher { private _web3Wrapper: Web3Wrapper; private _pollingIntervalMs: number; - private _intervalId: NodeJS.Timer; + private _intervalIdIfExists?: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; private _callbackIfExistsAsync?: EventWatcherCallback; private _numConfirmations: number; @@ -23,14 +23,16 @@ export class EventWatcher { } public subscribe(callback: EventWatcherCallback): void { this._callbackIfExistsAsync = callback; - this._intervalId = intervalUtils.setAsyncExcludingInterval( + this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs, ); } public unsubscribe(): void { delete this._callbackIfExistsAsync; this._lastEvents = []; - intervalUtils.clearAsyncExcludingInterval(this._intervalId); + if (!_.isUndefined(this._intervalIdIfExists)) { + intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); + } } private async _pollForMempoolEventsAsync(): Promise { const pendingEvents = await this._getEventsAsync(); -- cgit From 960c83315b60398df7ca7ac0a3f4ae5dde7771f1 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 23:05:46 -0500 Subject: Add assertion --- src/order_watcher/event_watcher.ts | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 4cb741617..786470f1d 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -4,6 +4,7 @@ import {Web3Wrapper} from '../web3_wrapper'; import {BlockParamLiteral, EventCallback, EventWatcherCallback} from '../types'; import {AbiDecoder} from '../utils/abi_decoder'; import {intervalUtils} from '../utils/interval_utils'; +import {assert} from '../utils/assert'; const DEFAULT_EVENT_POLLING_INTERVAL = 200; @@ -22,6 +23,7 @@ export class EventWatcher { pollingIntervalMs; } public subscribe(callback: EventWatcherCallback): void { + assert.isFunction('callback', callback); this._callbackIfExistsAsync = callback; this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs, -- cgit From 0205f9ede337e5574245b45e1ee940b8ace89456 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Thu, 9 Nov 2017 23:16:26 -0500 Subject: Simplify to/from block code --- src/order_watcher/event_watcher.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 786470f1d..2a1b6dacf 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -53,19 +53,16 @@ export class EventWatcher { this._lastEvents = pendingEvents; } private async _getEventsAsync(): Promise { - let fromBlock: BlockParamLiteral|number; - let toBlock: BlockParamLiteral|number; + let latestBlock: BlockParamLiteral|number; if (this._numConfirmations === 0) { - fromBlock = BlockParamLiteral.Pending; - toBlock = fromBlock; + latestBlock = BlockParamLiteral.Pending; } else { const currentBlock = await this._web3Wrapper.getBlockNumberAsync(); - toBlock = currentBlock - this._numConfirmations; - fromBlock = toBlock; + latestBlock = currentBlock - this._numConfirmations; } const eventFilter = { - fromBlock, - toBlock, + fromBlock: latestBlock, + toBlock: latestBlock, }; const events = await this._web3Wrapper.getLogsAsync(eventFilter); return events; -- cgit From 697926641ff2996e561811f077f0f51afb543484 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Fri, 10 Nov 2017 17:47:30 -0500 Subject: Rename method since it's not more then just mempool --- src/order_watcher/event_watcher.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 2a1b6dacf..0174288cc 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -26,7 +26,7 @@ export class EventWatcher { assert.isFunction('callback', callback); this._callbackIfExistsAsync = callback; this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForMempoolEventsAsync.bind(this), this._pollingIntervalMs, + this._pollForBlockchainEventsAsync.bind(this), this._pollingIntervalMs, ); } public unsubscribe(): void { @@ -36,7 +36,7 @@ export class EventWatcher { intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); } } - private async _pollForMempoolEventsAsync(): Promise { + private async _pollForBlockchainEventsAsync(): Promise { const pendingEvents = await this._getEventsAsync(); if (pendingEvents.length === 0) { // HACK: Sometimes when node rebuilds the pending block we get back the empty result. -- cgit From 0d957ea71d9323445add682d7ee98d9b3cb1a973 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Fri, 10 Nov 2017 17:47:41 -0500 Subject: Add comment above the eventWatcher class --- src/order_watcher/event_watcher.ts | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 0174288cc..304b3e994 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -8,6 +8,10 @@ import {assert} from '../utils/assert'; const DEFAULT_EVENT_POLLING_INTERVAL = 200; +/* + * The EventWatcher watches for blockchain events at the specified block confirmation + * depth. + */ export class EventWatcher { private _web3Wrapper: Web3Wrapper; private _pollingIntervalMs: number; -- cgit From ca9c1bca4a694026981a804c58138f08c8b23321 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Fri, 10 Nov 2017 18:06:57 -0500 Subject: Fix alignment --- src/order_watcher/event_watcher.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 304b3e994..286872a6f 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -23,8 +23,8 @@ export class EventWatcher { this._web3Wrapper = web3Wrapper; this._numConfirmations = numConfirmations; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? - DEFAULT_EVENT_POLLING_INTERVAL : - pollingIntervalMs; + DEFAULT_EVENT_POLLING_INTERVAL : + pollingIntervalMs; } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); -- cgit From 6012926e8278bed55b173a8e8380231c75a1a48c Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Fri, 10 Nov 2017 18:07:11 -0500 Subject: Throw if trying to subscribe multiple times --- src/order_watcher/event_watcher.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 286872a6f..c11b78f2e 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -1,7 +1,12 @@ import * as Web3 from 'web3'; import * as _ from 'lodash'; import {Web3Wrapper} from '../web3_wrapper'; -import {BlockParamLiteral, EventCallback, EventWatcherCallback} from '../types'; +import { + BlockParamLiteral, + EventCallback, + EventWatcherCallback, + ZeroExError, +} from '../types'; import {AbiDecoder} from '../utils/abi_decoder'; import {intervalUtils} from '../utils/interval_utils'; import {assert} from '../utils/assert'; @@ -28,6 +33,9 @@ export class EventWatcher { } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); + if (!_.isUndefined(this._callbackIfExistsAsync)) { + throw new Error(ZeroExError.SubscriptionAlreadyPresent); + } this._callbackIfExistsAsync = callback; this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( this._pollForBlockchainEventsAsync.bind(this), this._pollingIntervalMs, -- cgit From 0ec51b124bb6030cd59a57ab53444af452e0e674 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sat, 11 Nov 2017 08:57:49 -0500 Subject: Feather the callback down to _emitDifferencesAsync and don't store it as a class instance. This will make supporting multiple subscriptions easier later on and reduces the amount of unsubscription cleanup --- src/order_watcher/event_watcher.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index c11b78f2e..f86d1f59f 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -22,7 +22,6 @@ export class EventWatcher { private _pollingIntervalMs: number; private _intervalIdIfExists?: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; - private _callbackIfExistsAsync?: EventWatcherCallback; private _numConfirmations: number; constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number, numConfirmations: number) { this._web3Wrapper = web3Wrapper; @@ -33,22 +32,21 @@ export class EventWatcher { } public subscribe(callback: EventWatcherCallback): void { assert.isFunction('callback', callback); - if (!_.isUndefined(this._callbackIfExistsAsync)) { + if (!_.isUndefined(this._intervalIdIfExists)) { throw new Error(ZeroExError.SubscriptionAlreadyPresent); } - this._callbackIfExistsAsync = callback; this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval( - this._pollForBlockchainEventsAsync.bind(this), this._pollingIntervalMs, + this._pollForBlockchainEventsAsync.bind(this, callback), this._pollingIntervalMs, ); } public unsubscribe(): void { - delete this._callbackIfExistsAsync; this._lastEvents = []; if (!_.isUndefined(this._intervalIdIfExists)) { intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists); + delete this._intervalIdIfExists; } } - private async _pollForBlockchainEventsAsync(): Promise { + private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise { const pendingEvents = await this._getEventsAsync(); if (pendingEvents.length === 0) { // HACK: Sometimes when node rebuilds the pending block we get back the empty result. @@ -59,9 +57,9 @@ export class EventWatcher { const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); let isRemoved = true; - await this._emitDifferencesAsync(removedEvents, isRemoved); + await this._emitDifferencesAsync(removedEvents, isRemoved, callback); isRemoved = false; - await this._emitDifferencesAsync(newEvents, isRemoved); + await this._emitDifferencesAsync(newEvents, isRemoved, callback); this._lastEvents = pendingEvents; } private async _getEventsAsync(): Promise { @@ -79,14 +77,16 @@ export class EventWatcher { const events = await this._web3Wrapper.getLogsAsync(eventFilter); return events; } - private async _emitDifferencesAsync(logs: Web3.LogEntry[], isRemoved: boolean): Promise { + private async _emitDifferencesAsync( + logs: Web3.LogEntry[], isRemoved: boolean, callback: EventWatcherCallback, + ): Promise { for (const log of logs) { const logEvent = { removed: isRemoved, ...log, }; - if (!_.isUndefined(this._callbackIfExistsAsync)) { - await this._callbackIfExistsAsync(logEvent); + if (!_.isUndefined(this._intervalIdIfExists)) { + await callback(logEvent); } } } -- cgit From 12023073f43ad431a9acb5f28bb0a9abea4ab089 Mon Sep 17 00:00:00 2001 From: Fabio Berger Date: Sat, 11 Nov 2017 09:54:27 -0500 Subject: Use enum instead of boolean to avoid potential bugs from isRemoved incorrectly being set to true --- src/order_watcher/event_watcher.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index f86d1f59f..c9e72281c 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -10,9 +10,15 @@ import { import {AbiDecoder} from '../utils/abi_decoder'; import {intervalUtils} from '../utils/interval_utils'; import {assert} from '../utils/assert'; +import {utils} from '../utils/utils'; const DEFAULT_EVENT_POLLING_INTERVAL = 200; +enum LogEventState { + Removed, + Added, +} + /* * The EventWatcher watches for blockchain events at the specified block confirmation * depth. @@ -56,10 +62,8 @@ export class EventWatcher { } const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify); const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify); - let isRemoved = true; - await this._emitDifferencesAsync(removedEvents, isRemoved, callback); - isRemoved = false; - await this._emitDifferencesAsync(newEvents, isRemoved, callback); + await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback); + await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback); this._lastEvents = pendingEvents; } private async _getEventsAsync(): Promise { @@ -78,11 +82,11 @@ export class EventWatcher { return events; } private async _emitDifferencesAsync( - logs: Web3.LogEntry[], isRemoved: boolean, callback: EventWatcherCallback, + logs: Web3.LogEntry[], logEventState: LogEventState, callback: EventWatcherCallback, ): Promise { for (const log of logs) { const logEvent = { - removed: isRemoved, + removed: logEventState === LogEventState.Removed, ...log, }; if (!_.isUndefined(this._intervalIdIfExists)) { -- cgit From f5608d2c94bcee05a76ef102f235f5e860820567 Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Sun, 12 Nov 2017 12:53:03 -0500 Subject: Pass blockStore to eventWatcher --- src/order_watcher/event_watcher.ts | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index c9e72281c..5303bb651 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -11,6 +11,7 @@ import {AbiDecoder} from '../utils/abi_decoder'; import {intervalUtils} from '../utils/interval_utils'; import {assert} from '../utils/assert'; import {utils} from '../utils/utils'; +import {BlockStore} from '../stores/block_store'; const DEFAULT_EVENT_POLLING_INTERVAL = 200; @@ -29,8 +30,11 @@ export class EventWatcher { private _intervalIdIfExists?: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; private _numConfirmations: number; - constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number, numConfirmations: number) { + private _blockStore: BlockStore; + constructor(web3Wrapper: Web3Wrapper, blockStore: BlockStore, pollingIntervalMs: undefined|number, + numConfirmations: number) { this._web3Wrapper = web3Wrapper; + this._blockStore = blockStore; this._numConfirmations = numConfirmations; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? DEFAULT_EVENT_POLLING_INTERVAL : @@ -67,16 +71,10 @@ export class EventWatcher { this._lastEvents = pendingEvents; } private async _getEventsAsync(): Promise { - let latestBlock: BlockParamLiteral|number; - if (this._numConfirmations === 0) { - latestBlock = BlockParamLiteral.Pending; - } else { - const currentBlock = await this._web3Wrapper.getBlockNumberAsync(); - latestBlock = currentBlock - this._numConfirmations; - } + const blockNumber = this._blockStore.getBlockNumberWithNConfirmations(this._numConfirmations); const eventFilter = { - fromBlock: latestBlock, - toBlock: latestBlock, + fromBlock: blockNumber, + toBlock: blockNumber, }; const events = await this._web3Wrapper.getLogsAsync(eventFilter); return events; -- cgit From a9ae555b88cc36ff2cbd92fdd37a339702860c01 Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Sun, 12 Nov 2017 13:06:25 -0500 Subject: Store number of confirmations in a blockStore --- src/order_watcher/event_watcher.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 5303bb651..7e27d5e79 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -29,13 +29,10 @@ export class EventWatcher { private _pollingIntervalMs: number; private _intervalIdIfExists?: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; - private _numConfirmations: number; private _blockStore: BlockStore; - constructor(web3Wrapper: Web3Wrapper, blockStore: BlockStore, pollingIntervalMs: undefined|number, - numConfirmations: number) { + constructor(web3Wrapper: Web3Wrapper, blockStore: BlockStore, pollingIntervalMs: undefined|number) { this._web3Wrapper = web3Wrapper; this._blockStore = blockStore; - this._numConfirmations = numConfirmations; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? DEFAULT_EVENT_POLLING_INTERVAL : pollingIntervalMs; @@ -71,7 +68,7 @@ export class EventWatcher { this._lastEvents = pendingEvents; } private async _getEventsAsync(): Promise { - const blockNumber = this._blockStore.getBlockNumberWithNConfirmations(this._numConfirmations); + const blockNumber = this._blockStore.getBlockNumber(); const eventFilter = { fromBlock: blockNumber, toBlock: blockNumber, -- cgit From 84c965d459b948eb03cb8a70a66663bd7b35f463 Mon Sep 17 00:00:00 2001 From: Leonid Logvinov Date: Sun, 12 Nov 2017 18:10:47 -0500 Subject: Remove blockStore and default to numConfirmations === 0 --- src/order_watcher/event_watcher.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'src/order_watcher/event_watcher.ts') diff --git a/src/order_watcher/event_watcher.ts b/src/order_watcher/event_watcher.ts index 7e27d5e79..81529a98c 100644 --- a/src/order_watcher/event_watcher.ts +++ b/src/order_watcher/event_watcher.ts @@ -11,7 +11,6 @@ import {AbiDecoder} from '../utils/abi_decoder'; import {intervalUtils} from '../utils/interval_utils'; import {assert} from '../utils/assert'; import {utils} from '../utils/utils'; -import {BlockStore} from '../stores/block_store'; const DEFAULT_EVENT_POLLING_INTERVAL = 200; @@ -29,10 +28,8 @@ export class EventWatcher { private _pollingIntervalMs: number; private _intervalIdIfExists?: NodeJS.Timer; private _lastEvents: Web3.LogEntry[] = []; - private _blockStore: BlockStore; - constructor(web3Wrapper: Web3Wrapper, blockStore: BlockStore, pollingIntervalMs: undefined|number) { + constructor(web3Wrapper: Web3Wrapper, pollingIntervalMs: undefined|number) { this._web3Wrapper = web3Wrapper; - this._blockStore = blockStore; this._pollingIntervalMs = _.isUndefined(pollingIntervalMs) ? DEFAULT_EVENT_POLLING_INTERVAL : pollingIntervalMs; @@ -68,10 +65,9 @@ export class EventWatcher { this._lastEvents = pendingEvents; } private async _getEventsAsync(): Promise { - const blockNumber = this._blockStore.getBlockNumber(); const eventFilter = { - fromBlock: blockNumber, - toBlock: blockNumber, + fromBlock: BlockParamLiteral.Pending, + toBlock: BlockParamLiteral.Pending, }; const events = await this._web3Wrapper.getLogsAsync(eventFilter); return events; -- cgit