diff options
Diffstat (limited to 'packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts')
-rw-r--r-- | packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts | 76 |
1 files changed, 25 insertions, 51 deletions
diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts index 1717eb8b3..58691e2ab 100644 --- a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -8,78 +8,52 @@ import { ExchangeWrapper, } from '@0x/contract-wrappers'; import { Web3ProviderEngine } from '@0x/subproviders'; -import { Web3Wrapper } from '@0x/web3-wrapper'; import { LogWithDecodedArgs } from 'ethereum-types'; -import { EXCHANGE_START_BLOCK } from '../../utils'; - -const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock. -const NUM_BLOCKS_PER_QUERY = 20000; // Number of blocks to query for events at a time. +import { GetEventsFunc, getEventsWithPaginationAsync } from './utils'; export class ExchangeEventsSource { private readonly _exchangeWrapper: ExchangeWrapper; - private readonly _web3Wrapper: Web3Wrapper; constructor(provider: Web3ProviderEngine, networkId: number) { - this._web3Wrapper = new Web3Wrapper(provider); const contractWrappers = new ContractWrappers(provider, { networkId }); this._exchangeWrapper = contractWrappers.exchange; } public async getFillEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { - return this._getEventsAsync<ExchangeFillEventArgs>(ExchangeEvents.Fill, fromBlock, toBlock); + const getFillEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeFillEventArgs>(ExchangeEvents.Fill); + return getEventsWithPaginationAsync(getFillEventsForRangeAsync, startBlock, endBlock); } public async getCancelEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> { - return this._getEventsAsync<ExchangeCancelEventArgs>(ExchangeEvents.Cancel, fromBlock, toBlock); + const getCancelEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelEventArgs>( + ExchangeEvents.Cancel, + ); + return getEventsWithPaginationAsync(getCancelEventsForRangeAsync, startBlock, endBlock); } public async getCancelUpToEventsAsync( - fromBlock?: number, - toBlock?: number, + startBlock: number, + endBlock: number, ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> { - return this._getEventsAsync<ExchangeCancelUpToEventArgs>(ExchangeEvents.CancelUpTo, fromBlock, toBlock); - } - - private async _getEventsAsync<ArgsType extends ExchangeEventArgs>( - eventName: ExchangeEvents, - fromBlock: number = EXCHANGE_START_BLOCK, - toBlock?: number, - ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - const calculatedToBlock = - toBlock === undefined - ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD - : toBlock; - let events: Array<LogWithDecodedArgs<ArgsType>> = []; - for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) { - events = events.concat( - await this._getEventsForRangeAsync<ArgsType>( - eventName, - currFromBlock, - Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock), - ), - ); - } - return events; + const getCancelUpToEventsForRangeAsync = this._makeGetterFuncForEventType<ExchangeCancelUpToEventArgs>( + ExchangeEvents.CancelUpTo, + ); + return getEventsWithPaginationAsync(getCancelUpToEventsForRangeAsync, startBlock, endBlock); } - private async _getEventsForRangeAsync<ArgsType extends ExchangeEventArgs>( - eventName: ExchangeEvents, - fromBlock: number, - toBlock: number, - ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { - return this._exchangeWrapper.getLogsAsync<ArgsType>( - eventName, - { - fromBlock, - toBlock, - }, - {}, - ); + // Returns a getter function which gets all events of a specific type for a + // specific sub-range. This getter function will be called during each step + // of pagination. + private _makeGetterFuncForEventType<ArgsType extends ExchangeEventArgs>( + eventType: ExchangeEvents, + ): GetEventsFunc<ArgsType> { + return async (fromBlock: number, toBlock: number) => + this._exchangeWrapper.getLogsAsync<ArgsType>(eventType, { fromBlock, toBlock }, {}); } } |