aboutsummaryrefslogtreecommitdiffstats
path: root/packages/connect/src/ws_orderbook_channel.ts
blob: 399f973191da23467eb6475da1da4b992528fd31 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import { assert } from '@0xproject/assert';
import { schemas } from '@0xproject/json-schemas';
import * as _ from 'lodash';
import * as WebSocket from 'websocket';

import {
    OrderbookChannel,
    OrderbookChannelHandler,
    OrderbookChannelMessageTypes,
    OrderbookChannelSubscriptionOpts,
    WebsocketClientEventType,
    WebsocketConnectionEventType,
} from './types';
import { orderbookChannelMessageParsers } from './utils/orderbook_channel_message_parsers';

/**
 * This class includes all the functionality related to interacting with a websocket endpoint
 * that implements the standard relayer API v0
 */
export class WebSocketOrderbookChannel implements OrderbookChannel {
    private _apiEndpointUrl: string;
    private _client: WebSocket.client;
    private _connectionIfExists?: WebSocket.connection;
    private _subscriptionCounter = 0;
    /**
     * Instantiates a new WebSocketOrderbookChannel instance
     * @param   url                 The relayer API base WS url you would like to interact with
     * @return  An instance of WebSocketOrderbookChannel
     */
    constructor(url: string) {
        assert.isUri('url', url);
        this._apiEndpointUrl = url;
        this._client = new WebSocket.client();
    }
    /**
     * Subscribe to orderbook snapshots and updates from the websocket
     * @param   subscriptionOpts     An OrderbookChannelSubscriptionOpts instance describing which
     *                               token pair to subscribe to
     * @param   handler              An OrderbookChannelHandler instance that responds to various
     *                               channel updates
     */
    public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
        assert.doesConformToSchema(
            'subscriptionOpts',
            subscriptionOpts,
            schemas.relayerApiOrderbookChannelSubscribePayload,
        );
        assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
        assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
        assert.isFunction('handler.onError', _.get(handler, 'onError'));
        assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
        this._subscriptionCounter += 1;
        const subscribeMessage = {
            type: 'subscribe',
            channel: 'orderbook',
            requestId: this._subscriptionCounter,
            payload: subscriptionOpts,
        };
        this._getConnection((error, connection) => {
            if (!_.isUndefined(error)) {
                handler.onError(this, subscriptionOpts, error);
            } else if (!_.isUndefined(connection) && connection.connected) {
                connection.on(WebsocketConnectionEventType.Error, wsError => {
                    handler.onError(this, subscriptionOpts, wsError);
                });
                connection.on(WebsocketConnectionEventType.Close, () => {
                    handler.onClose(this, subscriptionOpts);
                });
                connection.on(WebsocketConnectionEventType.Message, message => {
                    this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler);
                });
                connection.sendUTF(JSON.stringify(subscribeMessage));
            }
        });
    }
    /**
     * Close the websocket and stop receiving updates
     */
    public close() {
        if (!_.isUndefined(this._connectionIfExists)) {
            this._connectionIfExists.close();
        }
    }
    private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void) {
        if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) {
            callback(undefined, this._connectionIfExists);
        } else {
            this._client.on(WebsocketClientEventType.Connect, connection => {
                this._connectionIfExists = connection;
                callback(undefined, this._connectionIfExists);
            });
            this._client.on(WebsocketClientEventType.ConnectFailed, error => {
                callback(error, undefined);
            });
            this._client.connect(this._apiEndpointUrl);
        }
    }
    private _handleWebSocketMessage(
        requestId: number,
        subscriptionOpts: OrderbookChannelSubscriptionOpts,
        message: WebSocket.IMessage,
        handler: OrderbookChannelHandler,
    ): void {
        if (!_.isUndefined(message.utf8Data)) {
            try {
                const utf8Data = message.utf8Data;
                const parserResult = orderbookChannelMessageParsers.parser(utf8Data);
                if (parserResult.requestId === requestId) {
                    switch (parserResult.type) {
                        case OrderbookChannelMessageTypes.Snapshot: {
                            handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
                            break;
                        }
                        case OrderbookChannelMessageTypes.Update: {
                            handler.onUpdate(this, subscriptionOpts, parserResult.payload);
                            break;
                        }
                        default: {
                            handler.onError(
                                this,
                                subscriptionOpts,
                                new Error(`Message has missing a type parameter: ${utf8Data}`),
                            );
                        }
                    }
                }
            } catch (error) {
                handler.onError(this, subscriptionOpts, error);
            }
        } else {
            handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`));
        }
    }
}