aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline
diff options
context:
space:
mode:
authorxianny <xianny@gmail.com>2018-12-07 04:07:55 +0800
committerxianny <xianny@gmail.com>2018-12-08 01:43:55 +0800
commitac971685b32328fb0e99715c0dde2a10fdf3158a (patch)
treea81a5e89975dda8e7d04f81599d6b836dc614fa3 /packages/pipeline
parentaa4fcebdc71fa58b12dc12ab03d7697444416c09 (diff)
downloaddexon-sol-tools-ac971685b32328fb0e99715c0dde2a10fdf3158a.tar.gz
dexon-sol-tools-ac971685b32328fb0e99715c0dde2a10fdf3158a.tar.zst
dexon-sol-tools-ac971685b32328fb0e99715c0dde2a10fdf3158a.zip
upgrade throttling code
Diffstat (limited to 'packages/pipeline')
-rw-r--r--packages/pipeline/package.json2
-rw-r--r--packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts24
2 files changed, 12 insertions, 14 deletions
diff --git a/packages/pipeline/package.json b/packages/pipeline/package.json
index 0539618d4..4fde906b8 100644
--- a/packages/pipeline/package.json
+++ b/packages/pipeline/package.json
@@ -52,9 +52,9 @@
"@types/p-limit": "^2.0.0",
"async-parallel": "^1.2.3",
"axios": "^0.18.0",
+ "bottleneck": "^2.13.2",
"dockerode": "^2.5.7",
"ethereum-types": "^1.0.6",
- "p-limit": "^2.0.0",
"pg": "^7.5.0",
"prettier": "^1.15.3",
"ramda": "^0.25.0",
diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
index 8804c34d0..e83e3b67d 100644
--- a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
+++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts
@@ -1,6 +1,6 @@
// tslint:disable:no-duplicate-imports
import { fetchAsync } from '@0x/utils';
-import promiseLimit = require('p-limit');
+import Bottleneck from 'bottleneck';
import { stringify } from 'querystring';
import * as R from 'ramda';
@@ -35,6 +35,7 @@ export interface CryptoCompareOHLCVParams {
const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
+const ONE_MINUTE = 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
const ONE_SECOND = 1000;
const ONE_HOUR_AGO = new Date().getTime() - ONE_HOUR;
const HTTP_OK_STATUS = 200;
@@ -47,9 +48,14 @@ export class CryptoCompareOHLCVSource {
private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?';
// rate-limit for all API calls through this class instance
- private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>;
- constructor(maxConcurrentRequests: number = 50) {
- this._promiseLimit = promiseLimit(maxConcurrentRequests);
+ private readonly _limiter: Bottleneck;
+ constructor(maxReqsPerSecond: number = 40) {
+ this._limiter = new Bottleneck({
+ minTime: Math.ceil(ONE_SECOND / maxReqsPerSecond),
+ reservoir: 2000,
+ reservoirRefreshAmount: 2000,
+ reservoirRefreshInterval: ONE_MINUTE,
+ });
}
// gets OHLCV records starting from pair.latest
@@ -61,15 +67,7 @@ export class CryptoCompareOHLCVSource {
toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms
};
const url = this._url + stringify(params);
-
- // go through the instance-wide rate-limit
- const fetchPromise: Promise<Response> = this._promiseLimit(() => {
- // tslint:disable-next-line:no-console
- console.log(`Scraping Crypto Compare at ${url}`);
- return fetchAsync(url);
- });
-
- const response = await Promise.resolve(fetchPromise);
+ const response = await this._limiter.schedule(() => fetchAsync(url));
if (response.status !== HTTP_OK_STATUS) {
throw new Error(`HTTP error while scraping Crypto Compare: [${response}]`);
}