From ac971685b32328fb0e99715c0dde2a10fdf3158a Mon Sep 17 00:00:00 2001 From: xianny Date: Thu, 6 Dec 2018 12:07:55 -0800 Subject: upgrade throttling code --- packages/pipeline/package.json | 2 +- .../data_sources/ohlcv_external/crypto_compare.ts | 24 ++++++++++------------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json index 0539618d4..4fde906b8 100644 --- a/packages/pipeline/package.json +++ b/packages/pipeline/package.json @@ -52,9 +52,9 @@ "@types/p-limit": "^2.0.0", "async-parallel": "^1.2.3", "axios": "^0.18.0", + "bottleneck": "^2.13.2", "dockerode": "^2.5.7", "ethereum-types": "^1.0.6", - "p-limit": "^2.0.0", "pg": "^7.5.0", "prettier": "^1.15.3", "ramda": "^0.25.0", diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts index 8804c34d0..e83e3b67d 100644 --- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -1,6 +1,6 @@ // tslint:disable:no-duplicate-imports import { fetchAsync } from '@0x/utils'; -import promiseLimit = require('p-limit'); +import Bottleneck from 'bottleneck'; import { stringify } from 'querystring'; import * as R from 'ramda'; @@ -35,6 +35,7 @@ export interface CryptoCompareOHLCVParams { const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_MINUTE = 60 * 1000; // tslint:disable-line:custom-no-magic-numbers const ONE_SECOND = 1000; const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR; const HTTP_OK_STATUS = 200; @@ -47,9 +48,14 @@ export class CryptoCompareOHLCVSource { private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; // rate-limit for all API calls through this class instance - private readonly _promiseLimit: (fetchFn: () => Promise) => Promise; - constructor(maxConcurrentRequests: number = 50) { - this._promiseLimit = promiseLimit(maxConcurrentRequests); + private readonly _limiter: Bottleneck; + constructor(maxReqsPerSecond: number = 40) { + this._limiter = new Bottleneck({ + minTime: Math.ceil(ONE_SECOND / maxReqsPerSecond), + reservoir: 2000, + reservoirRefreshAmount: 2000, + reservoirRefreshInterval: ONE_MINUTE, + }); } // gets OHLCV records starting from pair.latest @@ -61,15 +67,7 @@ export class CryptoCompareOHLCVSource { toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms }; const url = this._url + stringify(params); - - // go through the instance-wide rate-limit - const fetchPromise: Promise = this._promiseLimit(() => { - // tslint:disable-next-line:no-console - console.log(`Scraping Crypto Compare at ${url}`); - return fetchAsync(url); - }); - - const response = await Promise.resolve(fetchPromise); + const response = await this._limiter.schedule(() => fetchAsync(url)); if (response.status !== HTTP_OK_STATUS) { throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`); } -- cgit