1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import * as _ from 'lodash';
import * as Web3 from 'web3';
import {BlockAndLogStreamer, Block} from 'ethereumjs-blockstream';
import {Web3Wrapper} from '../web3_wrapper';
import {AbiDecoder} from '../utils/abi_decoder';
import {
ZeroExError,
InternalZeroExError,
Artifact,
LogWithDecodedArgs,
RawLog,
ContractEvents,
SubscriptionOpts,
IndexedFilterValues,
EventCallback,
} from '../types';
import {utils} from '../utils/utils';
import {constants} from '../utils/constants';
import {intervalUtils} from '../utils/interval_utils';
import {filterUtils} from '../utils/filter_utils';
export class ContractWrapper {
protected _web3Wrapper: Web3Wrapper;
private _abiDecoder?: AbiDecoder;
private _blockAndLogStreamer: BlockAndLogStreamer;
private _blockAndLogStreamInterval: NodeJS.Timer;
private _activeFilters: number;
private _filters: {[filterToken: string]: Web3.FilterObject};
private _filterCallbacks: {[filterToken: string]: EventCallback};
private _onLogAddedSubscriptionToken: string|undefined;
private _onLogRemovedSubscriptionToken: string|undefined;
constructor(web3Wrapper: Web3Wrapper, abiDecoder?: AbiDecoder) {
this._web3Wrapper = web3Wrapper;
this._abiDecoder = abiDecoder;
this._activeFilters = 0;
this._filters = {};
this._filterCallbacks = {};
this._onLogAddedSubscriptionToken = undefined;
this._onLogRemovedSubscriptionToken = undefined;
}
protected _subscribe(address: string, eventName: ContractEvents,
indexFilterValues: IndexedFilterValues, abi: Web3.ContractAbi,
callback: EventCallback): string {
const filter = filterUtils.getFilter(
this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
);
if (_.isEmpty(this._filters)) {
this._startBlockAndLogStream();
let removed = false;
this._onLogAddedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogAdded(
this._onLogStateChanged.bind(this, removed),
);
removed = true;
this._onLogRemovedSubscriptionToken = this._blockAndLogStreamer.subscribeToOnLogRemoved(
this._onLogStateChanged.bind(this, removed),
);
}
const filterToken = filterUtils.generateUUID();
this._filters[filterToken] = filter;
this._filterCallbacks[filterToken] = callback;
return filterToken;
}
protected _unsubscribe(filterToken: string): void {
if (_.isUndefined(this._filters[filterToken])) {
throw new Error(ZeroExError.SubscriptionNotFound);
}
delete this._filters[filterToken];
delete this._filterCallbacks[filterToken];
if (_.isEmpty(this._filters)) {
this._blockAndLogStreamer.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
this._blockAndLogStreamer.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
this._stopBlockAndLogStream();
}
}
protected async _getLogsAsync(address: string, eventName: ContractEvents, subscriptionOpts: SubscriptionOpts,
indexFilterValues: IndexedFilterValues,
abi: Web3.ContractAbi): Promise<LogWithDecodedArgs[]> {
const filter = filterUtils.getFilter(
this._web3Wrapper.keccak256.bind(this._web3Wrapper), address, eventName, indexFilterValues, abi,
subscriptionOpts,
);
const logs = await this._web3Wrapper.getLogsAsync(filter);
const logsWithDecodedArguments = _.map(logs, this._tryToDecodeLogOrNoop.bind(this));
return logsWithDecodedArguments;
}
protected _tryToDecodeLogOrNoop(log: Web3.LogEntry): LogWithDecodedArgs|RawLog {
if (_.isUndefined(this._abiDecoder)) {
throw new Error(InternalZeroExError.NoAbiDecoder);
}
const logWithDecodedArgs = this._abiDecoder.tryToDecodeLogOrNoop(log);
return logWithDecodedArgs;
}
protected async _instantiateContractIfExistsAsync<A extends Web3.ContractInstance>(artifact: Artifact,
addressIfExists?: string,
): Promise<A> {
const contractInstance =
await this._web3Wrapper.getContractInstanceFromArtifactAsync<A>(artifact, addressIfExists);
return contractInstance;
}
private _onLogStateChanged(removed: boolean, log: Web3.LogEntry): void {
_.forEach(this._filters, (filter: Web3.FilterObject, filterToken: string) => {
if (filterUtils.matchesFilter(log, filter)) {
const decodedLog = this._tryToDecodeLogOrNoop(log) as LogWithDecodedArgs;
const logEvent = {
...decodedLog,
removed,
};
this._filterCallbacks[filterToken](logEvent);
}
});
}
private _startBlockAndLogStream(): void {
this._blockAndLogStreamer = new BlockAndLogStreamer(
this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
);
this._blockAndLogStreamer.addLogFilter({});
this._blockAndLogStreamInterval = intervalUtils.setAsyncExcludingInterval(
this._reconcileBlockAsync.bind(this), constants.DEFAULT_BLOCK_POLLING_INTERVAL,
);
}
private _stopBlockAndLogStream(): void {
intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamInterval);
delete this._blockAndLogStreamer;
}
private async _reconcileBlockAsync(): Promise<void> {
const latestBlock = await this._web3Wrapper.getBlockAsync('latest');
// We need to coerce to Block type cause Web3.Block includes types for mempool bloks
this._blockAndLogStreamer.reconcileNewBlock(latestBlock as any as Block);
}
}
|