From 16ddd1edfccdd7768447bfff9afec1f4a1ce014e Mon Sep 17 00:00:00 2001 From: Brandon Millman Date: Wed, 16 May 2018 11:15:02 -0700 Subject: Implement web browser socket --- .../connect/src/browser_ws_orderbook_channel.ts | 140 +++++++++++++++++ packages/connect/src/index.ts | 5 +- packages/connect/src/node_ws_orderbook_channel.ts | 158 ++++++++++++++++++++ ...de_websocket_orderbook_channel_config_schema.ts | 10 ++ packages/connect/src/schemas/schemas.ts | 4 +- .../websocket_orderbook_channel_config_schema.ts | 10 -- packages/connect/src/types.ts | 2 +- packages/connect/src/utils/assert.ts | 25 ++++ .../src/utils/orderbook_channel_message_parser.ts | 8 +- packages/connect/src/ws_orderbook_channel.ts | 166 --------------------- 10 files changed, 346 insertions(+), 182 deletions(-) create mode 100644 packages/connect/src/browser_ws_orderbook_channel.ts create mode 100644 packages/connect/src/node_ws_orderbook_channel.ts create mode 100644 packages/connect/src/schemas/node_websocket_orderbook_channel_config_schema.ts delete mode 100644 packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts create mode 100644 packages/connect/src/utils/assert.ts delete mode 100644 packages/connect/src/ws_orderbook_channel.ts (limited to 'packages/connect/src') diff --git a/packages/connect/src/browser_ws_orderbook_channel.ts b/packages/connect/src/browser_ws_orderbook_channel.ts new file mode 100644 index 000000000..b97a82ec9 --- /dev/null +++ b/packages/connect/src/browser_ws_orderbook_channel.ts @@ -0,0 +1,140 @@ +import * as _ from 'lodash'; +import * as WebSocket from 'websocket'; + +import { + OrderbookChannel, + OrderbookChannelHandler, + OrderbookChannelMessageTypes, + OrderbookChannelSubscriptionOpts, + WebsocketClientEventType, + WebsocketConnectionEventType, +} from './types'; +import { assert } from './utils/assert'; +import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser'; + +interface Subscription { + subscriptionOpts: OrderbookChannelSubscriptionOpts; + handler: OrderbookChannelHandler; +} + +/** + * This class includes all the functionality related to interacting with a websocket endpoint + * that implements the standard relayer API v0 in a browser environment + */ +export class BrowserWebSocketOrderbookChannel implements OrderbookChannel { + private _apiEndpointUrl: string; + private _clientIfExists?: WebSocket.w3cwebsocket; + private _subscriptions: Subscription[] = []; + /** + * Instantiates a new WebSocketOrderbookChannel instance + * @param url The relayer API base WS url you would like to interact with + * @return An instance of WebSocketOrderbookChannel + */ + constructor(url: string) { + assert.isUri('url', url); + this._apiEndpointUrl = url; + } + /** + * Subscribe to orderbook snapshots and updates from the websocket + * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which + * token pair to subscribe to + * @param handler An OrderbookChannelHandler instance that responds to various + * channel updates + */ + public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void { + assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts); + assert.isOrderbookChannelHandler('handler', handler); + const newSubscription: Subscription = { + subscriptionOpts, + handler, + }; + this._subscriptions.push(newSubscription); + const subscribeMessage = { + type: 'subscribe', + channel: 'orderbook', + requestId: this._subscriptions.length - 1, + payload: subscriptionOpts, + }; + if (_.isUndefined(this._clientIfExists)) { + this._clientIfExists = new WebSocket.w3cwebsocket(this._apiEndpointUrl); + this._clientIfExists.onopen = () => { + this._sendMessage(subscribeMessage); + }; + this._clientIfExists.onerror = error => { + this._alertAllHandlersToError(error); + }; + this._clientIfExists.onclose = () => { + _.forEach(this._subscriptions, subscription => { + subscription.handler.onClose(this, subscription.subscriptionOpts); + }); + }; + this._clientIfExists.onmessage = message => { + this._handleWebSocketMessage(message); + }; + } else { + this._sendMessage(subscribeMessage); + } + } + /** + * Close the websocket and stop receiving updates + */ + public close(): void { + if (!_.isUndefined(this._clientIfExists)) { + this._clientIfExists.close(); + } + } + /** + * Send a message to the client if it has been instantiated and it is open + */ + private _sendMessage(message: any): void { + if (!_.isUndefined(this._clientIfExists) && this._clientIfExists.readyState === WebSocket.w3cwebsocket.OPEN) { + this._clientIfExists.send(JSON.stringify(message)); + } + } + /** + * For use in cases where we need to alert all handlers of an error + */ + private _alertAllHandlersToError(error: Error): void { + _.forEach(this._subscriptions, subscription => { + subscription.handler.onError(this, subscription.subscriptionOpts, error); + }); + } + private _handleWebSocketMessage(message: any): void { + // if we get a message with no data, alert all handlers and return + if (_.isUndefined(message.data)) { + this._alertAllHandlersToError(new Error(`Message does not contain utf8Data`)); + return; + } + // try to parse the message data and route it to the correct handler + try { + const utf8Data = message.data; + const parserResult = orderbookChannelMessageParser.parse(utf8Data); + const subscription = this._subscriptions[parserResult.requestId]; + if (_.isUndefined(subscription)) { + this._alertAllHandlersToError(new Error(`Message has unknown requestId: ${utf8Data}`)); + return; + } + const handler = subscription.handler; + const subscriptionOpts = subscription.subscriptionOpts; + switch (parserResult.type) { + case OrderbookChannelMessageTypes.Snapshot: { + handler.onSnapshot(this, subscriptionOpts, parserResult.payload); + break; + } + case OrderbookChannelMessageTypes.Update: { + handler.onUpdate(this, subscriptionOpts, parserResult.payload); + break; + } + default: { + handler.onError( + this, + subscriptionOpts, + new Error(`Message has unknown type parameter: ${utf8Data}`), + ); + } + } + } catch (error) { + this._alertAllHandlersToError(error); + } + } +} diff --git a/packages/connect/src/index.ts b/packages/connect/src/index.ts index ef5d8683e..88b09506c 100644 --- a/packages/connect/src/index.ts +++ b/packages/connect/src/index.ts @@ -1,9 +1,11 @@ export { HttpClient } from './http_client'; -export { WebSocketOrderbookChannel } from './ws_orderbook_channel'; +export { BrowserWebSocketOrderbookChannel } from './browser_ws_orderbook_channel'; +export { NodeWebSocketOrderbookChannel } from './node_ws_orderbook_channel'; export { Client, FeesRequest, FeesResponse, + NodeWebSocketOrderbookChannelConfig, OrderbookChannel, OrderbookChannelHandler, OrderbookChannelSubscriptionOpts, @@ -14,7 +16,6 @@ export { TokenPairsItem, TokenPairsRequestOpts, TokenTradeInfo, - WebSocketOrderbookChannelConfig, } from './types'; export { Order, SignedOrder } from '@0xproject/types'; diff --git a/packages/connect/src/node_ws_orderbook_channel.ts b/packages/connect/src/node_ws_orderbook_channel.ts new file mode 100644 index 000000000..5f61ac4c8 --- /dev/null +++ b/packages/connect/src/node_ws_orderbook_channel.ts @@ -0,0 +1,158 @@ +import * as _ from 'lodash'; +import * as WebSocket from 'websocket'; + +import { schemas as clientSchemas } from './schemas/schemas'; +import { + NodeWebSocketOrderbookChannelConfig, + OrderbookChannel, + OrderbookChannelHandler, + OrderbookChannelMessageTypes, + OrderbookChannelSubscriptionOpts, + WebsocketClientEventType, + WebsocketConnectionEventType, +} from './types'; +import { assert } from './utils/assert'; +import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser'; + +const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000; +const MINIMUM_HEARTBEAT_INTERVAL_MS = 10; + +/** + * This class includes all the functionality related to interacting with a websocket endpoint + * that implements the standard relayer API v0 in a node environment + */ +export class NodeWebSocketOrderbookChannel implements OrderbookChannel { + private _apiEndpointUrl: string; + private _client: WebSocket.client; + private _connectionIfExists?: WebSocket.connection; + private _heartbeatTimerIfExists?: NodeJS.Timer; + private _subscriptionCounter = 0; + private _heartbeatIntervalMs: number; + /** + * Instantiates a new NodeWebSocketOrderbookChannelConfig instance + * @param url The relayer API base WS url you would like to interact with + * @param config The configuration object. Look up the type for the description. + * @return An instance of NodeWebSocketOrderbookChannelConfig + */ + constructor(url: string, config?: NodeWebSocketOrderbookChannelConfig) { + assert.isUri('url', url); + if (!_.isUndefined(config)) { + assert.doesConformToSchema('config', config, clientSchemas.nodeWebSocketOrderbookChannelConfigSchema); + } + this._apiEndpointUrl = url; + this._heartbeatIntervalMs = + _.isUndefined(config) || _.isUndefined(config.heartbeatIntervalMs) + ? DEFAULT_HEARTBEAT_INTERVAL_MS + : config.heartbeatIntervalMs; + this._client = new WebSocket.client(); + } + /** + * Subscribe to orderbook snapshots and updates from the websocket + * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which + * token pair to subscribe to + * @param handler An OrderbookChannelHandler instance that responds to various + * channel updates + */ + public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void { + assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts); + assert.isOrderbookChannelHandler('handler', handler); + this._subscriptionCounter += 1; + const subscribeMessage = { + type: 'subscribe', + channel: 'orderbook', + requestId: this._subscriptionCounter, + payload: subscriptionOpts, + }; + this._getConnection((error, connection) => { + if (!_.isUndefined(error)) { + handler.onError(this, subscriptionOpts, error); + } else if (!_.isUndefined(connection) && connection.connected) { + connection.on(WebsocketConnectionEventType.Error, wsError => { + handler.onError(this, subscriptionOpts, wsError); + }); + connection.on(WebsocketConnectionEventType.Close, (_code: number, _desc: string) => { + handler.onClose(this, subscriptionOpts); + }); + connection.on(WebsocketConnectionEventType.Message, message => { + this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler); + }); + connection.sendUTF(JSON.stringify(subscribeMessage)); + } + }); + } + /** + * Close the websocket and stop receiving updates + */ + public close(): void { + if (!_.isUndefined(this._connectionIfExists)) { + this._connectionIfExists.close(); + } + if (!_.isUndefined(this._heartbeatTimerIfExists)) { + clearInterval(this._heartbeatTimerIfExists); + } + } + private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void): void { + if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) { + callback(undefined, this._connectionIfExists); + } else { + this._client.on(WebsocketClientEventType.Connect, connection => { + this._connectionIfExists = connection; + if (this._heartbeatIntervalMs >= MINIMUM_HEARTBEAT_INTERVAL_MS) { + this._heartbeatTimerIfExists = setInterval(() => { + connection.ping(''); + }, this._heartbeatIntervalMs); + } else { + callback( + new Error( + `Heartbeat interval is ${ + this._heartbeatIntervalMs + }ms which is less than the required minimum of ${MINIMUM_HEARTBEAT_INTERVAL_MS}ms`, + ), + undefined, + ); + } + callback(undefined, this._connectionIfExists); + }); + this._client.on(WebsocketClientEventType.ConnectFailed, error => { + callback(error, undefined); + }); + this._client.connect(this._apiEndpointUrl); + } + } + private _handleWebSocketMessage( + requestId: number, + subscriptionOpts: OrderbookChannelSubscriptionOpts, + message: WebSocket.IMessage, + handler: OrderbookChannelHandler, + ): void { + if (!_.isUndefined(message.utf8Data)) { + try { + const utf8Data = message.utf8Data; + const parserResult = orderbookChannelMessageParser.parse(utf8Data); + if (parserResult.requestId === requestId) { + switch (parserResult.type) { + case OrderbookChannelMessageTypes.Snapshot: { + handler.onSnapshot(this, subscriptionOpts, parserResult.payload); + break; + } + case OrderbookChannelMessageTypes.Update: { + handler.onUpdate(this, subscriptionOpts, parserResult.payload); + break; + } + default: { + handler.onError( + this, + subscriptionOpts, + new Error(`Message has missing a type parameter: ${utf8Data}`), + ); + } + } + } + } catch (error) { + handler.onError(this, subscriptionOpts, error); + } + } else { + handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`)); + } + } +} diff --git a/packages/connect/src/schemas/node_websocket_orderbook_channel_config_schema.ts b/packages/connect/src/schemas/node_websocket_orderbook_channel_config_schema.ts new file mode 100644 index 000000000..c745d0b82 --- /dev/null +++ b/packages/connect/src/schemas/node_websocket_orderbook_channel_config_schema.ts @@ -0,0 +1,10 @@ +export const nodeWebSocketOrderbookChannelConfigSchema = { + id: '/NodeWebSocketOrderbookChannelConfig', + type: 'object', + properties: { + heartbeatIntervalMs: { + type: 'number', + minimum: 10, + }, + }, +}; diff --git a/packages/connect/src/schemas/schemas.ts b/packages/connect/src/schemas/schemas.ts index b9a8472fb..835fc7b4f 100644 --- a/packages/connect/src/schemas/schemas.ts +++ b/packages/connect/src/schemas/schemas.ts @@ -1,15 +1,15 @@ import { feesRequestSchema } from './fees_request_schema'; +import { nodeWebSocketOrderbookChannelConfigSchema } from './node_websocket_orderbook_channel_config_schema'; import { orderBookRequestSchema } from './orderbook_request_schema'; import { ordersRequestOptsSchema } from './orders_request_opts_schema'; import { pagedRequestOptsSchema } from './paged_request_opts_schema'; import { tokenPairsRequestOptsSchema } from './token_pairs_request_opts_schema'; -import { webSocketOrderbookChannelConfigSchema } from './websocket_orderbook_channel_config_schema'; export const schemas = { feesRequestSchema, + nodeWebSocketOrderbookChannelConfigSchema, orderBookRequestSchema, ordersRequestOptsSchema, pagedRequestOptsSchema, tokenPairsRequestOptsSchema, - webSocketOrderbookChannelConfigSchema, }; diff --git a/packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts b/packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts deleted file mode 100644 index 81c0cac9c..000000000 --- a/packages/connect/src/schemas/websocket_orderbook_channel_config_schema.ts +++ /dev/null @@ -1,10 +0,0 @@ -export const webSocketOrderbookChannelConfigSchema = { - id: '/WebSocketOrderbookChannelConfig', - type: 'object', - properties: { - heartbeatIntervalMs: { - type: 'number', - minimum: 10, - }, - }, -}; diff --git a/packages/connect/src/types.ts b/packages/connect/src/types.ts index f5e52f50d..5657942ee 100644 --- a/packages/connect/src/types.ts +++ b/packages/connect/src/types.ts @@ -18,7 +18,7 @@ export interface OrderbookChannel { /** * heartbeatInterval: Interval in milliseconds that the orderbook channel should ping the underlying websocket. Default: 15000 */ -export interface WebSocketOrderbookChannelConfig { +export interface NodeWebSocketOrderbookChannelConfig { heartbeatIntervalMs?: number; } diff --git a/packages/connect/src/utils/assert.ts b/packages/connect/src/utils/assert.ts new file mode 100644 index 000000000..f8241aacb --- /dev/null +++ b/packages/connect/src/utils/assert.ts @@ -0,0 +1,25 @@ +import { assert as sharedAssert } from '@0xproject/assert'; +// We need those two unused imports because they're actually used by sharedAssert which gets injected here +// tslint:disable-next-line:no-unused-variable +import { Schema, schemas } from '@0xproject/json-schemas'; +// tslint:disable-next-line:no-unused-variable +import { ECSignature } from '@0xproject/types'; +import { BigNumber } from '@0xproject/utils'; +import * as _ from 'lodash'; + +export const assert = { + ...sharedAssert, + isOrderbookChannelSubscriptionOpts(variableName: string, subscriptionOpts: any): void { + sharedAssert.doesConformToSchema( + 'subscriptionOpts', + subscriptionOpts, + schemas.relayerApiOrderbookChannelSubscribePayload, + ); + }, + isOrderbookChannelHandler(variableName: string, handler: any): void { + sharedAssert.isFunction(`${variableName}.onSnapshot`, _.get(handler, 'onSnapshot')); + sharedAssert.isFunction(`${variableName}.onUpdate`, _.get(handler, 'onUpdate')); + sharedAssert.isFunction(`${variableName}.onError`, _.get(handler, 'onError')); + sharedAssert.isFunction(`${variableName}.onClose`, _.get(handler, 'onClose')); + }, +}; diff --git a/packages/connect/src/utils/orderbook_channel_message_parser.ts b/packages/connect/src/utils/orderbook_channel_message_parser.ts index 9a9ca8901..593288078 100644 --- a/packages/connect/src/utils/orderbook_channel_message_parser.ts +++ b/packages/connect/src/utils/orderbook_channel_message_parser.ts @@ -8,10 +8,16 @@ import { relayerResponseJsonParsers } from './relayer_response_json_parsers'; export const orderbookChannelMessageParser = { parse(utf8Data: string): OrderbookChannelMessage { + // parse the message const messageObj = JSON.parse(utf8Data); + // ensure we have a type parameter to switch on const type: string = _.get(messageObj, 'type'); assert.assert(!_.isUndefined(type), `Message is missing a type parameter: ${utf8Data}`); assert.isString('type', type); + // ensure we have a request id for the resulting message + const requestId: number = _.get(messageObj, 'requestId'); + assert.assert(!_.isUndefined(requestId), `Message is missing a requestId parameter: ${utf8Data}`); + assert.isNumber('requestId', requestId); switch (type) { case OrderbookChannelMessageTypes.Snapshot: { assert.doesConformToSchema('message', messageObj, schemas.relayerApiOrderbookChannelSnapshotSchema); @@ -28,7 +34,7 @@ export const orderbookChannelMessageParser = { default: { return { type: OrderbookChannelMessageTypes.Unknown, - requestId: 0, + requestId, payload: undefined, }; } diff --git a/packages/connect/src/ws_orderbook_channel.ts b/packages/connect/src/ws_orderbook_channel.ts deleted file mode 100644 index bdcc8a75d..000000000 --- a/packages/connect/src/ws_orderbook_channel.ts +++ /dev/null @@ -1,166 +0,0 @@ -import { assert } from '@0xproject/assert'; -import { schemas } from '@0xproject/json-schemas'; -import * as _ from 'lodash'; -import * as WebSocket from 'websocket'; - -import { schemas as clientSchemas } from './schemas/schemas'; -import { - OrderbookChannel, - OrderbookChannelHandler, - OrderbookChannelMessageTypes, - OrderbookChannelSubscriptionOpts, - WebsocketClientEventType, - WebsocketConnectionEventType, - WebSocketOrderbookChannelConfig, -} from './types'; -import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser'; - -const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000; -const MINIMUM_HEARTBEAT_INTERVAL_MS = 10; - -/** - * This class includes all the functionality related to interacting with a websocket endpoint - * that implements the standard relayer API v0 - */ -export class WebSocketOrderbookChannel implements OrderbookChannel { - private _apiEndpointUrl: string; - private _client: WebSocket.client; - private _connectionIfExists?: WebSocket.connection; - private _heartbeatTimerIfExists?: NodeJS.Timer; - private _subscriptionCounter = 0; - private _heartbeatIntervalMs: number; - /** - * Instantiates a new WebSocketOrderbookChannel instance - * @param url The relayer API base WS url you would like to interact with - * @param config The configuration object. Look up the type for the description. - * @return An instance of WebSocketOrderbookChannel - */ - constructor(url: string, config?: WebSocketOrderbookChannelConfig) { - assert.isUri('url', url); - if (!_.isUndefined(config)) { - assert.doesConformToSchema('config', config, clientSchemas.webSocketOrderbookChannelConfigSchema); - } - this._apiEndpointUrl = url; - this._heartbeatIntervalMs = - _.isUndefined(config) || _.isUndefined(config.heartbeatIntervalMs) - ? DEFAULT_HEARTBEAT_INTERVAL_MS - : config.heartbeatIntervalMs; - this._client = new WebSocket.client(); - } - /** - * Subscribe to orderbook snapshots and updates from the websocket - * @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which - * token pair to subscribe to - * @param handler An OrderbookChannelHandler instance that responds to various - * channel updates - */ - public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void { - assert.doesConformToSchema( - 'subscriptionOpts', - subscriptionOpts, - schemas.relayerApiOrderbookChannelSubscribePayload, - ); - assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot')); - assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate')); - assert.isFunction('handler.onError', _.get(handler, 'onError')); - assert.isFunction('handler.onClose', _.get(handler, 'onClose')); - this._subscriptionCounter += 1; - const subscribeMessage = { - type: 'subscribe', - channel: 'orderbook', - requestId: this._subscriptionCounter, - payload: subscriptionOpts, - }; - this._getConnection((error, connection) => { - if (!_.isUndefined(error)) { - handler.onError(this, subscriptionOpts, error); - } else if (!_.isUndefined(connection) && connection.connected) { - connection.on(WebsocketConnectionEventType.Error, wsError => { - handler.onError(this, subscriptionOpts, wsError); - }); - connection.on(WebsocketConnectionEventType.Close, (_code: number, _desc: string) => { - handler.onClose(this, subscriptionOpts); - }); - connection.on(WebsocketConnectionEventType.Message, message => { - this._handleWebSocketMessage(subscribeMessage.requestId, subscriptionOpts, message, handler); - }); - connection.sendUTF(JSON.stringify(subscribeMessage)); - } - }); - } - /** - * Close the websocket and stop receiving updates - */ - public close(): void { - if (!_.isUndefined(this._connectionIfExists)) { - this._connectionIfExists.close(); - } - if (!_.isUndefined(this._heartbeatTimerIfExists)) { - clearInterval(this._heartbeatTimerIfExists); - } - } - private _getConnection(callback: (error?: Error, connection?: WebSocket.connection) => void): void { - if (!_.isUndefined(this._connectionIfExists) && this._connectionIfExists.connected) { - callback(undefined, this._connectionIfExists); - } else { - this._client.on(WebsocketClientEventType.Connect, connection => { - this._connectionIfExists = connection; - if (this._heartbeatIntervalMs >= MINIMUM_HEARTBEAT_INTERVAL_MS) { - this._heartbeatTimerIfExists = setInterval(() => { - connection.ping(''); - }, this._heartbeatIntervalMs); - } else { - callback( - new Error( - `Heartbeat interval is ${ - this._heartbeatIntervalMs - }ms which is less than the required minimum of ${MINIMUM_HEARTBEAT_INTERVAL_MS}ms`, - ), - undefined, - ); - } - callback(undefined, this._connectionIfExists); - }); - this._client.on(WebsocketClientEventType.ConnectFailed, error => { - callback(error, undefined); - }); - this._client.connect(this._apiEndpointUrl); - } - } - private _handleWebSocketMessage( - requestId: number, - subscriptionOpts: OrderbookChannelSubscriptionOpts, - message: WebSocket.IMessage, - handler: OrderbookChannelHandler, - ): void { - if (!_.isUndefined(message.utf8Data)) { - try { - const utf8Data = message.utf8Data; - const parserResult = orderbookChannelMessageParser.parse(utf8Data); - if (parserResult.requestId === requestId) { - switch (parserResult.type) { - case OrderbookChannelMessageTypes.Snapshot: { - handler.onSnapshot(this, subscriptionOpts, parserResult.payload); - break; - } - case OrderbookChannelMessageTypes.Update: { - handler.onUpdate(this, subscriptionOpts, parserResult.payload); - break; - } - default: { - handler.onError( - this, - subscriptionOpts, - new Error(`Message has missing a type parameter: ${utf8Data}`), - ); - } - } - } - } catch (error) { - handler.onError(this, subscriptionOpts, error); - } - } else { - handler.onError(this, subscriptionOpts, new Error(`Message does not contain utf8Data`)); - } - } -} -- cgit