import { AbiDecoder, intervalUtils, logUtils } from '@0xproject/utils'; import { marshaller, Web3Wrapper } from '@0xproject/web3-wrapper'; import { BlockParamLiteral, ContractAbi, ContractArtifact, FilterObject, LogEntry, LogWithDecodedArgs, RawLog, RawLogEntry, } from 'ethereum-types'; import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream'; import * as _ from 'lodash'; import { BlockRange, ContractEventArgs, ContractEvents, ContractWrappersError, EventCallback, IndexedFilterValues, } from '../types'; import { constants } from '../utils/constants'; import { filterUtils } from '../utils/filter_utils'; const CONTRACT_NAME_TO_NOT_FOUND_ERROR: { [contractName: string]: ContractWrappersError; } = { ZRX: ContractWrappersError.ZRXContractDoesNotExist, EtherToken: ContractWrappersError.EtherTokenContractDoesNotExist, ERC20Token: ContractWrappersError.ERC20TokenContractDoesNotExist, ERC20Proxy: ContractWrappersError.ERC20ProxyContractDoesNotExist, ERC721Token: ContractWrappersError.ERC721TokenContractDoesNotExist, ERC721Proxy: ContractWrappersError.ERC721ProxyContractDoesNotExist, Exchange: ContractWrappersError.ExchangeContractDoesNotExist, }; export abstract class ContractWrapper { public abstract abi: ContractAbi; protected _web3Wrapper: Web3Wrapper; protected _networkId: number; private _blockAndLogStreamerIfExists: BlockAndLogStreamer | undefined; private _blockPollingIntervalMs: number; private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer; private _filters: { [filterToken: string]: FilterObject }; private _filterCallbacks: { [filterToken: string]: EventCallback; }; private _onLogAddedSubscriptionToken: string | undefined; private _onLogRemovedSubscriptionToken: string | undefined; private static _onBlockAndLogStreamerError(isVerbose: boolean, err: Error): void { // Since Blockstream errors are all recoverable, we simply log them if the verbose // config is passed in. if (isVerbose) { logUtils.warn(err); } } constructor(web3Wrapper: Web3Wrapper, networkId: number, blockPollingIntervalMs?: number) { this._web3Wrapper = web3Wrapper; this._networkId = networkId; this._blockPollingIntervalMs = _.isUndefined(blockPollingIntervalMs) ? constants.DEFAULT_BLOCK_POLLING_INTERVAL : blockPollingIntervalMs; this._filters = {}; this._filterCallbacks = {}; this._blockAndLogStreamerIfExists = undefined; this._onLogAddedSubscriptionToken = undefined; this._onLogRemovedSubscriptionToken = undefined; } protected _unsubscribeAll(): void { const filterTokens = _.keys(this._filterCallbacks); _.each(filterTokens, filterToken => { this._unsubscribe(filterToken); }); } protected _unsubscribe(filterToken: string, err?: Error): void { if (_.isUndefined(this._filters[filterToken])) { throw new Error(ContractWrappersError.SubscriptionNotFound); } if (!_.isUndefined(err)) { const callback = this._filterCallbacks[filterToken]; callback(err, undefined); } delete this._filters[filterToken]; delete this._filterCallbacks[filterToken]; if (_.isEmpty(this._filters)) { this._stopBlockAndLogStream(); } } protected _subscribe( address: string, eventName: ContractEvents, indexFilterValues: IndexedFilterValues, abi: ContractAbi, callback: EventCallback, isVerbose: boolean = false, ): string { const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi); if (_.isUndefined(this._blockAndLogStreamerIfExists)) { this._startBlockAndLogStream(isVerbose); } const filterToken = filterUtils.generateUUID(); this._filters[filterToken] = filter; this._filterCallbacks[filterToken] = callback as EventCallback; return filterToken; } protected async _getLogsAsync( address: string, eventName: ContractEvents, blockRange: BlockRange, indexFilterValues: IndexedFilterValues, abi: ContractAbi, ): Promise>> { const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi, blockRange); const logs = await this._web3Wrapper.getLogsAsync(filter); const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this)); return logsWithDecodedArguments; } protected _tryToDecodeLogOrNoop( log: LogEntry, ): LogWithDecodedArgs | RawLog { const abiDecoder = new AbiDecoder([this.abi]); const logWithDecodedArgs = abiDecoder.tryToDecodeLogOrNoop(log); return logWithDecodedArgs; } protected async _getContractAbiAndAddressFromArtifactsAsync( artifact: ContractArtifact, addressIfExists?: string, ): Promise<[ContractAbi, string]> { let contractAddress: string; if (_.isUndefined(addressIfExists)) { if (_.isUndefined(artifact.networks[this._networkId])) { throw new Error(ContractWrappersError.ContractNotDeployedOnNetwork); } contractAddress = artifact.networks[this._networkId].address.toLowerCase(); } else { contractAddress = addressIfExists; } const doesContractExist = await this._web3Wrapper.doesContractExistAtAddressAsync(contractAddress); if (!doesContractExist) { throw new Error(CONTRACT_NAME_TO_NOT_FOUND_ERROR[artifact.contractName]); } const abiAndAddress: [ContractAbi, string] = [artifact.compilerOutput.abi, contractAddress]; return abiAndAddress; } protected _getContractAddress(artifact: ContractArtifact, addressIfExists?: string): string { if (_.isUndefined(addressIfExists)) { if (_.isUndefined(artifact.networks[this._networkId])) { throw new Error(ContractWrappersError.ContractNotDeployedOnNetwork); } const contractAddress = artifact.networks[this._networkId].address; if (_.isUndefined(contractAddress)) { throw new Error(CONTRACT_NAME_TO_NOT_FOUND_ERROR[artifact.contractName]); } return contractAddress; } else { return addressIfExists; } } private _onLogStateChanged(isRemoved: boolean, rawLog: RawLogEntry): void { const log: LogEntry = marshaller.unmarshalLog(rawLog); _.forEach(this._filters, (filter: FilterObject, filterToken: string) => { if (filterUtils.matchesFilter(log, filter)) { const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs; const logEvent = { log: decodedLog, isRemoved, }; this._filterCallbacks[filterToken](null, logEvent); } }); } private _startBlockAndLogStream(isVerbose: boolean): void { if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { throw new Error(ContractWrappersError.SubscriptionAlreadyPresent); } this._blockAndLogStreamerIfExists = new BlockAndLogStreamer( this._blockstreamGetBlockOrNullAsync.bind(this), this._blockstreamGetLogsAsync.bind(this), ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose), ); const catchAllLogFilter = {}; this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter); this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval( this._reconcileBlockAsync.bind(this), this._blockPollingIntervalMs, ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose), ); let isRemoved = false; this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded( this._onLogStateChanged.bind(this, isRemoved), ); isRemoved = true; this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved( this._onLogStateChanged.bind(this, isRemoved), ); } // This method only exists in order to comply with the expected interface of Blockstream's constructor private async _blockstreamGetBlockOrNullAsync(hash: string): Promise { const shouldIncludeTransactionData = false; const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync({ method: 'eth_getBlockByHash', params: [hash, shouldIncludeTransactionData], }); return blockOrNull; } // This method only exists in order to comply with the expected interface of Blockstream's constructor private async _blockstreamGetLatestBlockOrNullAsync(): Promise { const shouldIncludeTransactionData = false; const blockOrNull = await this._web3Wrapper.sendRawPayloadAsync({ method: 'eth_getBlockByNumber', params: [BlockParamLiteral.Latest, shouldIncludeTransactionData], }); return blockOrNull; } // This method only exists in order to comply with the expected interface of Blockstream's constructor private async _blockstreamGetLogsAsync(filterOptions: FilterObject): Promise { const logs = await this._web3Wrapper.sendRawPayloadAsync({ method: 'eth_getLogs', params: [filterOptions], }); return logs as RawLogEntry[]; } // HACK: This should be a package-scoped method (which doesn't exist in TS) // We don't want this method available in the public interface for all classes // who inherit from ContractWrapper, and it is only used by the internal implementation // of those higher classes. // tslint:disable-next-line:no-unused-variable private _setNetworkId(networkId: number): void { this._networkId = networkId; } private _stopBlockAndLogStream(): void { if (_.isUndefined(this._blockAndLogStreamerIfExists)) { throw new Error(ContractWrappersError.SubscriptionNotFound); } this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string); this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string); intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer); delete this._blockAndLogStreamerIfExists; } private async _reconcileBlockAsync(): Promise { const latestBlockOrNull = await this._blockstreamGetLatestBlockOrNullAsync(); if (_.isNull(latestBlockOrNull)) { return; // noop } // We need to coerce to Block type cause Web3.Block includes types for mempool blocks if (!_.isUndefined(this._blockAndLogStreamerIfExists)) { // If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined await this._blockAndLogStreamerIfExists.reconcileNewBlock(latestBlockOrNull); } } }