aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLeonid Logvinov <logvinov.leon@gmail.com>2017-11-11 05:32:23 +0800
committerLeonid Logvinov <logvinov.leon@gmail.com>2017-11-13 09:06:13 +0800
commit6bcd9adb9e41ccc45ef5e117df243526bbd281ec (patch)
tree2dbaad897aa6d8997fed17350d008c2df67cb882 /src
parent61e7b735dcae37195e4b306cab3e4c50cd9c3ba5 (diff)
downloaddexon-0x-contracts-6bcd9adb9e41ccc45ef5e117df243526bbd281ec.tar.gz
dexon-0x-contracts-6bcd9adb9e41ccc45ef5e117df243526bbd281ec.tar.zst
dexon-0x-contracts-6bcd9adb9e41ccc45ef5e117df243526bbd281ec.zip
Make subscribe function async and make blockStore operational
Diffstat (limited to 'src')
-rw-r--r--src/order_watcher/order_state_watcher.ts51
-rw-r--r--src/stores/block_store.ts29
2 files changed, 59 insertions, 21 deletions
diff --git a/src/order_watcher/order_state_watcher.ts b/src/order_watcher/order_state_watcher.ts
index 8e0b46eb0..affb350a3 100644
--- a/src/order_watcher/order_state_watcher.ts
+++ b/src/order_watcher/order_state_watcher.ts
@@ -50,11 +50,15 @@ export class OrderStateWatcher {
private _dependentOrderHashes: DependentOrderHashes = {};
private _callbackIfExistsAsync?: OnOrderStateChangeCallback;
private _eventWatcher: EventWatcher;
+ private _web3Wrapper: Web3Wrapper;
+ private _token: TokenWrapper;
+ private _exchange: ExchangeWrapper;
private _abiDecoder: AbiDecoder;
- private _orderStateUtils: OrderStateUtils;
+ private _orderStateUtils?: OrderStateUtils;
+ private _blockStore?: BlockStore;
private _numConfirmations: number;
- private _orderFilledCancelledLazyStore: OrderFilledCancelledLazyStore;
- private _balanceAndProxyAllowanceLazyStore: BalanceAndProxyAllowanceLazyStore;
+ private _orderFilledCancelledLazyStore?: OrderFilledCancelledLazyStore;
+ private _balanceAndProxyAllowanceLazyStore?: BalanceAndProxyAllowanceLazyStore;
constructor(
web3Wrapper: Web3Wrapper, abiDecoder: AbiDecoder, token: TokenWrapper, exchange: ExchangeWrapper,
config?: OrderStateWatcherConfig,
@@ -67,16 +71,8 @@ export class OrderStateWatcher {
web3Wrapper, eventPollingIntervalMs, this._numConfirmations,
);
this._abiDecoder = abiDecoder;
- const blockStore = new BlockStore(web3Wrapper);
- this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(
- token, blockStore, this._numConfirmations,
- );
- this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore(
- exchange, blockStore, this._numConfirmations,
- );
- this._orderStateUtils = new OrderStateUtils(
- this._balanceAndProxyAllowanceLazyStore, this._orderFilledCancelledLazyStore,
- );
+ this._token = token;
+ this._exchange = exchange;
}
/**
* Add an order to the orderStateWatcher. Before the order is added, it's
@@ -109,11 +105,22 @@ export class OrderStateWatcher {
* @param callback Receives the orderHash of the order that should be re-validated, together
* with all the order-relevant blockchain state needed to re-validate the order.
*/
- public subscribe(callback: OnOrderStateChangeCallback): void {
+ public async subscribeAsync(callback: OnOrderStateChangeCallback): Promise<void> {
assert.isFunction('callback', callback);
if (!_.isUndefined(this._callbackIfExistsAsync)) {
throw new Error(ZeroExError.SubscriptionAlreadyPresent);
}
+ this._blockStore = new BlockStore(this._web3Wrapper);
+ await this._blockStore.startAsync();
+ this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(
+ this._token, this._blockStore, this._numConfirmations,
+ );
+ this._orderFilledCancelledLazyStore = new OrderFilledCancelledLazyStore(
+ this._exchange, this._blockStore, this._numConfirmations,
+ );
+ this._orderStateUtils = new OrderStateUtils(
+ this._balanceAndProxyAllowanceLazyStore, this._orderFilledCancelledLazyStore,
+ );
this._callbackIfExistsAsync = callback;
this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this));
}
@@ -121,7 +128,15 @@ export class OrderStateWatcher {
* Ends an orderStateWatcher subscription.
*/
public unsubscribe(): void {
+ if (_.isUndefined(this._blockStore)) {
+ throw new Error(ZeroExError.SubscriptionNotFound);
+ }
+ this._blockStore.stop();
+ delete this._blockStore;
delete this._callbackIfExistsAsync;
+ delete this._balanceAndProxyAllowanceLazyStore;
+ delete this._orderFilledCancelledLazyStore;
+ delete this._orderStateUtils;
this._eventWatcher.unsubscribe();
}
private async _onEventWatcherCallbackAsync(log: LogEvent): Promise<void> {
@@ -174,12 +189,14 @@ export class OrderStateWatcher {
private async _emitRevalidateOrdersAsync(orderHashes: string[]): Promise<void> {
for (const orderHash of orderHashes) {
const signedOrder = this._orderByOrderHash[orderHash] as SignedOrder;
+ if (_.isUndefined(this._orderStateUtils)) {
+ break; // Unsubscribe was called
+ }
const orderState = await this._orderStateUtils.getOrderStateAsync(signedOrder);
- if (!_.isUndefined(this._callbackIfExistsAsync)) {
- await this._callbackIfExistsAsync(orderState);
- } else {
+ if (_.isUndefined(this._callbackIfExistsAsync)) {
break; // Unsubscribe was called
}
+ await this._callbackIfExistsAsync(orderState);
}
}
private addToDependentOrderHashes(signedOrder: SignedOrder, orderHash: string) {
diff --git a/src/stores/block_store.ts b/src/stores/block_store.ts
index d1b4d3c54..70798a999 100644
--- a/src/stores/block_store.ts
+++ b/src/stores/block_store.ts
@@ -1,8 +1,11 @@
import * as _ from 'lodash';
import * as Web3 from 'web3';
import {BigNumber} from 'bignumber.js';
-import {BlockParamLiteral, InternalZeroExError} from '../types';
+import {BlockParamLiteral, InternalZeroExError, ZeroExError} from '../types';
import {Web3Wrapper} from '../web3_wrapper';
+import {intervalUtils} from '../utils/interval_utils';
+
+const POLLING_INTERVAL_MS = 500;
/**
* Store for a current latest block number
@@ -10,9 +13,9 @@ import {Web3Wrapper} from '../web3_wrapper';
export class BlockStore {
private web3Wrapper?: Web3Wrapper;
private latestBlockNumber?: number;
+ private intervalId?: NodeJS.Timer;
constructor(web3Wrapper?: Web3Wrapper) {
this.web3Wrapper = web3Wrapper;
- // TODO start a subscription
}
public getBlockNumberWithNConfirmations(numConfirmations: number): Web3.BlockParam {
let blockNumber;
@@ -32,7 +35,25 @@ export class BlockStore {
}
return blockNumber;
}
- public setLatestBlock(latestBlockNumber: number): void {
- this.latestBlockNumber = latestBlockNumber;
+ public async startAsync(): Promise<void> {
+ await this.updateLatestBlockAsync();
+ this.intervalId = intervalUtils.setAsyncExcludingInterval(
+ this.updateLatestBlockAsync.bind(this), POLLING_INTERVAL_MS,
+ );
+ }
+ public stop(): void {
+ if (!_.isUndefined(this.intervalId)) {
+ intervalUtils.clearAsyncExcludingInterval(this.intervalId);
+ }
+ }
+ private async updateLatestBlockAsync(): Promise<void> {
+ if (_.isUndefined(this.web3Wrapper)) {
+ throw new Error(InternalZeroExError.Web3WrapperRequiredToStartBlockStore);
+ }
+ const block = await this.web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
+ if (_.isNull(block.number)) {
+ throw new Error(ZeroExError.FailedToFetchLatestBlock);
+ }
+ this.latestBlockNumber = block.number;
}
}