1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
// tslint:disable:no-console
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare';
import { OHLCVExternal } from '../entities';
import * as ormConfig from '../ormconfig';
import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare';
import { handleError } from '../utils';
import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs';
const SOURCE_NAME = 'CryptoCompare';
const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers
const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01';
const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime();
let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
const repository = connection.getRepository(OHLCVExternal);
const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS);
const jobTime = new Date().getTime();
const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME);
console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`);
const fetchAndSavePromises = tradingPairs.map(async pair => {
const pairs = source.generateBackfillIntervals(pair);
return fetchAndSaveAsync(source, repository, jobTime, pairs);
});
await Promise.all(fetchAndSavePromises);
console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`);
process.exit(0);
})().catch(handleError);
async function fetchAndSaveAsync(
source: CryptoCompareOHLCVSource,
repository: Repository<OHLCVExternal>,
jobTime: number,
pairs: TradingPair[],
): Promise<void> {
const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => {
if (a.latestSavedTime < b.latestSavedTime) {
return -1;
} else if (a.latestSavedTime > b.latestSavedTime) {
return 1;
} else {
return 0;
}
};
pairs.sort(sortAscTimestamp);
let i = 0;
while (i < pairs.length) {
const pair = pairs[i];
if (pair.latestSavedTime > TWO_HOURS_AGO) {
break;
}
try {
const records = await source.getHourlyOHLCVAsync(pair);
console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`);
if (records.length > 0) {
const metadata: OHLCVMetadata = {
exchange: source.default_exchange,
fromSymbol: pair.fromSymbol,
toSymbol: pair.toSymbol,
source: SOURCE_NAME,
observedTimestamp: jobTime,
interval: source.intervalBetweenRecords,
};
const parsedRecords = parseRecords(records, metadata);
await saveRecordsAsync(repository, parsedRecords);
}
i++;
} catch (err) {
console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
break;
}
}
return Promise.resolve();
}
async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> {
const metadata = [
records[0].fromSymbol,
records[0].toSymbol,
new Date(records[0].startTime),
new Date(records[records.length - 1].endTime),
];
console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`);
await repository.save(records);
}
|