1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import * as _ from 'lodash';
import * as WebSocket from 'websocket';
import {assert} from '@0xproject/assert';
import {schemas} from '@0xproject/json-schemas';
import {SignedOrder} from '0x.js';
import {
OrderbookChannel,
OrderbookChannelHandler,
OrderbookChannelMessageTypes,
OrderbookChannelSubscriptionOpts,
} from './types';
import {orderbookChannelMessageParsers} from './utils/orderbook_channel_message_parsers';
enum ConnectionEventType {
Close = 'close',
Error = 'error',
Message = 'message',
}
enum ClientEventType {
Connect = 'connect',
ConnectFailed = 'connectFailed',
}
/**
* This class includes all the functionality related to interacting with a websocket endpoint
* that implements the standard relayer API v0
*/
export class WebSocketOrderbookChannel implements OrderbookChannel {
private apiEndpointUrl: string;
private client: WebSocket.client;
private connectionIfExists?: WebSocket.connection;
/**
* Instantiates a new WebSocketOrderbookChannel instance
* @param url The base url for making API calls
* @return An instance of WebSocketOrderbookChannel
*/
constructor(url: string) {
assert.isUri('url', url);
this.apiEndpointUrl = url;
this.client = new WebSocket.client();
}
/**
* Subscribe to orderbook snapshots and updates from the websocket
* @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
* token pair to subscribe to
* @param handler An OrderbookChannelHandler instance that responds to various
* channel updates
*/
public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
assert.doesConformToSchema(
'subscriptionOpts', subscriptionOpts, schemas.relayerApiOrderbookChannelSubscribePayload);
assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
assert.isFunction('handler.onError', _.get(handler, 'onError'));
assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
const subscribeMessage = {
type: 'subscribe',
channel: 'orderbook',
payload: subscriptionOpts,
};
this._getConnection((error, connection) => {
if (!_.isUndefined(error)) {
handler.onError(this, error);
} else if (!_.isUndefined(connection) && connection.connected) {
connection.on(ConnectionEventType.Error, wsError => {
handler.onError(this, wsError);
});
connection.on(ConnectionEventType.Close, () => {
handler.onClose(this);
});
connection.on(ConnectionEventType.Message, message => {
this._handleWebSocketMessage(message, handler);
});
connection.sendUTF(JSON.stringify(subscribeMessage));
}
});
}
/**
* Close the websocket and stop receiving updates
*/
public close() {
if (!_.isUndefined(this.connectionIfExists)) {
this.connectionIfExists.close();
}
}
private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void) {
if (!_.isUndefined(this.connectionIfExists) && this.connectionIfExists.connected) {
callback(undefined, this.connectionIfExists);
} else {
this.client.on(ClientEventType.Connect, connection => {
this.connectionIfExists = connection;
callback(undefined, this.connectionIfExists);
});
this.client.on(ClientEventType.ConnectFailed, error => {
callback(error, undefined);
});
this.client.connect(this.apiEndpointUrl);
}
}
private _handleWebSocketMessage(message: WebSocket.IMessage, handler: OrderbookChannelHandler): void {
if (!_.isUndefined(message.utf8Data)) {
try {
const utf8Data = message.utf8Data;
const parserResult = orderbookChannelMessageParsers.parser(utf8Data);
const type = parserResult.type;
switch (parserResult.type) {
case (OrderbookChannelMessageTypes.Snapshot): {
handler.onSnapshot(this, parserResult.payload);
break;
}
case (OrderbookChannelMessageTypes.Update): {
handler.onUpdate(this, parserResult.payload);
break;
}
default: {
handler.onError(this, new Error(`Message has missing a type parameter: ${utf8Data}`));
}
}
} catch (error) {
handler.onError(this, error);
}
} else {
handler.onError(this, new Error(`Message does not contain utf8Data`));
}
}
}
|