aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFabio Berger <me@fabioberger.com>2018-07-06 05:18:55 +0800
committerFabio Berger <me@fabioberger.com>2018-07-06 05:18:55 +0800
commit32ad34d2241c8e23002a4a7fc267a8a2e92fb304 (patch)
tree33630dd1bc634075e8f1abe0da3c4204b657ad8b
parentcfbb1c440ee6659fc4790b71929770bc7e54ea69 (diff)
downloaddexon-0x-contracts-32ad34d2241c8e23002a4a7fc267a8a2e92fb304.tar.gz
dexon-0x-contracts-32ad34d2241c8e23002a4a7fc267a8a2e92fb304.tar.zst
dexon-0x-contracts-32ad34d2241c8e23002a4a7fc267a8a2e92fb304.zip
properly stop blockstream and pass stateLayer into blockstream
-rw-r--r--packages/order-watcher/src/order_watcher/event_watcher.ts25
1 files changed, 19 insertions, 6 deletions
diff --git a/packages/order-watcher/src/order_watcher/event_watcher.ts b/packages/order-watcher/src/order_watcher/event_watcher.ts
index 0e27cd64b..d439d9e5b 100644
--- a/packages/order-watcher/src/order_watcher/event_watcher.ts
+++ b/packages/order-watcher/src/order_watcher/event_watcher.ts
@@ -54,19 +54,22 @@ export class EventWatcher {
this._startBlockAndLogStream(callback);
}
public unsubscribe(): void {
- if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
- intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists);
- delete this._blockAndLogStreamIntervalIfExists;
- delete this._blockAndLogStreamerIfExists;
+ if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
}
+ this._stopBlockAndLogStream();
}
private _startBlockAndLogStream(callback: EventWatcherCallback): void {
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
+ const eventFilter = {
+ fromBlock: this._stateLayer,
+ toBlock: this._stateLayer,
+ };
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
- this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
- this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
+ this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer),
+ this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter),
EventWatcher._onBlockAndLogStreamerError.bind(this, callback),
);
const catchAllLogFilter = {};
@@ -85,6 +88,16 @@ export class EventWatcher {
this._onLogStateChangedAsync.bind(this, callback, isRemoved),
);
}
+ private _stopBlockAndLogStream(): void {
+ if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
+ throw new Error(OrderWatcherError.SubscriptionNotFound);
+ }
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
+ this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
+ intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer);
+ delete this._blockAndLogStreamerIfExists;
+ delete this._blockAndLogStreamIntervalIfExists;
+ }
private async _onLogStateChangedAsync(
callback: EventWatcherCallback,
isRemoved: boolean,