diff options
Diffstat (limited to 'packages/pipeline/src')
47 files changed, 2436 insertions, 0 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts new file mode 100644 index 000000000..31cd5bfd6 --- /dev/null +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -0,0 +1,133 @@ +import axios from 'axios'; +import * as R from 'ramda'; + +// URL to use for getting dex trades from Bloxy. +export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; +// Number of trades to get at once. Must be less than or equal to MAX_OFFSET. +const TRADES_PER_QUERY = 10000; +// Maximum offset supported by the Bloxy API. +const MAX_OFFSET = 100000; +// Buffer to subtract from offset. This means we will request some trades twice +// but we have less chance on missing out on any data. +const OFFSET_BUFFER = 1000; +// Maximum number of days supported by the Bloxy API. +const MAX_DAYS = 30; +// Buffer used for comparing the last seen timestamp to the last returned +// timestamp. Increasing this reduces chances of data loss but also creates more +// redundancy and can impact performance. +// tslint:disable-next-line:custom-no-magic-numbers +const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes + +// tslint:disable-next-line:custom-no-magic-numbers +const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d + +export interface BloxyTrade { + tx_hash: string; + tx_time: string; + tx_date: string; + tx_sender: string; + smart_contract_id: number; + smart_contract_address: string; + contract_type: string; + maker: string; + taker: string; + amountBuy: number; + makerFee: number; + buyCurrencyId: number; + buySymbol: string; + amountSell: number; + takerFee: number; + sellCurrencyId: number; + sellSymbol: string; + maker_annotation: string; + taker_annotation: string; + protocol: string; + buyAddress: string | null; + sellAddress: string | null; +} + +interface BloxyError { + error: string; +} + +type BloxyResponse<T> = T | BloxyError; +type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>; + +function isError<T>(response: BloxyResponse<T>): response is BloxyError { + return (response as BloxyError).error !== undefined; +} + +export class BloxySource { + private readonly _apiKey: string; + + constructor(apiKey: string) { + this._apiKey = apiKey; + } + + /** + * Gets all latest trades between the lastSeenTimestamp (minus some buffer) + * and the current time. Note that because the Bloxy API has some hard + * limits it might not always be possible to get *all* the trades in the + * desired time range. + * @param lastSeenTimestamp The latest timestamp for trades that have + * already been seen. + */ + public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> { + let allTrades: BloxyTrade[] = []; + + // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive) + const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp)); + + // Keep getting trades until we hit one of the following conditions: + // + // 1. Offset hits MAX_OFFSET (we can't go back any further). + // 2. There are no more trades in the response. + // 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus + // some buffer). + // + for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) { + const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset); + if (trades.length === 0) { + // There are no more trades left for the days we are querying. + // This means we are done. + return filterDuplicateTrades(allTrades); + } + const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades)); + allTrades = allTrades.concat(sortedTrades); + + // Check if lastReturnedTimestamp < lastSeenTimestamp + const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime(); + if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) { + // We are at the point where we have already seen trades for the + // timestamp range that is being returned. We're done. + return filterDuplicateTrades(allTrades); + } + } + return filterDuplicateTrades(allTrades); + } + + private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> { + const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, { + params: { + key: this._apiKey, + days: numberOfDays, + limit: TRADES_PER_QUERY, + offset, + }, + }); + if (isError(resp.data)) { + throw new Error('Error in Bloxy API response: ' + resp.data.error); + } + return resp.data; + } +} + +// Computes the number of days between the given timestamp and the current +// timestamp (rounded up). +function getDaysSinceTimestamp(timestamp: number): number { + const msSinceTimestamp = Date.now() - timestamp; + const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay; + return Math.ceil(daysSinceTimestamp); +} + +const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash); diff --git a/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts new file mode 100644 index 000000000..1717eb8b3 --- /dev/null +++ b/packages/pipeline/src/data_sources/contract-wrappers/exchange_events.ts @@ -0,0 +1,85 @@ +import { + ContractWrappers, + ExchangeCancelEventArgs, + ExchangeCancelUpToEventArgs, + ExchangeEventArgs, + ExchangeEvents, + ExchangeFillEventArgs, + ExchangeWrapper, +} from '@0x/contract-wrappers'; +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import { LogWithDecodedArgs } from 'ethereum-types'; + +import { EXCHANGE_START_BLOCK } from '../../utils'; + +const BLOCK_FINALITY_THRESHOLD = 10; // When to consider blocks as final. Used to compute default toBlock. +const NUM_BLOCKS_PER_QUERY = 20000; // Number of blocks to query for events at a time. + +export class ExchangeEventsSource { + private readonly _exchangeWrapper: ExchangeWrapper; + private readonly _web3Wrapper: Web3Wrapper; + constructor(provider: Web3ProviderEngine, networkId: number) { + this._web3Wrapper = new Web3Wrapper(provider); + const contractWrappers = new ContractWrappers(provider, { networkId }); + this._exchangeWrapper = contractWrappers.exchange; + } + + public async getFillEventsAsync( + fromBlock?: number, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeFillEventArgs>>> { + return this._getEventsAsync<ExchangeFillEventArgs>(ExchangeEvents.Fill, fromBlock, toBlock); + } + + public async getCancelEventsAsync( + fromBlock?: number, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>> { + return this._getEventsAsync<ExchangeCancelEventArgs>(ExchangeEvents.Cancel, fromBlock, toBlock); + } + + public async getCancelUpToEventsAsync( + fromBlock?: number, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>> { + return this._getEventsAsync<ExchangeCancelUpToEventArgs>(ExchangeEvents.CancelUpTo, fromBlock, toBlock); + } + + private async _getEventsAsync<ArgsType extends ExchangeEventArgs>( + eventName: ExchangeEvents, + fromBlock: number = EXCHANGE_START_BLOCK, + toBlock?: number, + ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + const calculatedToBlock = + toBlock === undefined + ? (await this._web3Wrapper.getBlockNumberAsync()) - BLOCK_FINALITY_THRESHOLD + : toBlock; + let events: Array<LogWithDecodedArgs<ArgsType>> = []; + for (let currFromBlock = fromBlock; currFromBlock <= calculatedToBlock; currFromBlock += NUM_BLOCKS_PER_QUERY) { + events = events.concat( + await this._getEventsForRangeAsync<ArgsType>( + eventName, + currFromBlock, + Math.min(currFromBlock + NUM_BLOCKS_PER_QUERY - 1, calculatedToBlock), + ), + ); + } + return events; + } + + private async _getEventsForRangeAsync<ArgsType extends ExchangeEventArgs>( + eventName: ExchangeEvents, + fromBlock: number, + toBlock: number, + ): Promise<Array<LogWithDecodedArgs<ArgsType>>> { + return this._exchangeWrapper.getLogsAsync<ArgsType>( + eventName, + { + fromBlock, + toBlock, + }, + {}, + ); + } +} diff --git a/packages/pipeline/src/data_sources/ddex/index.ts b/packages/pipeline/src/data_sources/ddex/index.ts new file mode 100644 index 000000000..2bbd8c29b --- /dev/null +++ b/packages/pipeline/src/data_sources/ddex/index.ts @@ -0,0 +1,78 @@ +import { fetchAsync, logUtils } from '@0x/utils'; + +const DDEX_BASE_URL = 'https://api.ddex.io/v2'; +const ACTIVE_MARKETS_URL = `${DDEX_BASE_URL}/markets`; +const NO_AGGREGATION_LEVEL = 3; // See https://docs.ddex.io/#get-orderbook +const ORDERBOOK_ENDPOINT = `/orderbook?level=${NO_AGGREGATION_LEVEL}`; +export const DDEX_SOURCE = 'ddex'; + +export interface DdexActiveMarketsResponse { + status: number; + desc: string; + data: { + markets: DdexMarket[]; + }; +} + +export interface DdexMarket { + id: string; + quoteToken: string; + quoteTokenDecimals: number; + quoteTokenAddress: string; + baseToken: string; + baseTokenDecimals: number; + baseTokenAddress: string; + minOrderSize: string; + maxOrderSize: string; + pricePrecision: number; + priceDecimals: number; + amountDecimals: number; +} + +export interface DdexOrderbookResponse { + status: number; + desc: string; + data: { + orderBook: DdexOrderbook; + }; +} + +export interface DdexOrderbook { + marketId: string; + bids: DdexOrder[]; + asks: DdexOrder[]; +} + +export interface DdexOrder { + price: string; + amount: string; + orderId: string; +} + +// tslint:disable:prefer-function-over-method +// ^ Keep consistency with other sources and help logical organization +export class DdexSource { + /** + * Call Ddex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise<DdexMarket[]> { + logUtils.log('Getting all active DDEX markets'); + const resp = await fetchAsync(ACTIVE_MARKETS_URL); + const respJson: DdexActiveMarketsResponse = await resp.json(); + const markets = respJson.data.markets; + logUtils.log(`Got ${markets.length} markets.`); + return markets; + } + + /** + * Retrieve orderbook from Ddex API for a given market. + * @param marketId String identifying the market we want data for. Eg. 'REP/AUG' + */ + public async getMarketOrderbookAsync(marketId: string): Promise<DdexOrderbook> { + logUtils.log(`${marketId}: Retrieving orderbook.`); + const marketOrderbookUrl = `${ACTIVE_MARKETS_URL}/${marketId}${ORDERBOOK_ENDPOINT}`; + const resp = await fetchAsync(marketOrderbookUrl); + const respJson: DdexOrderbookResponse = await resp.json(); + return respJson.data.orderBook; + } +} diff --git a/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts new file mode 100644 index 000000000..6b10c29c5 --- /dev/null +++ b/packages/pipeline/src/data_sources/ohlcv_external/crypto_compare.ts @@ -0,0 +1,94 @@ +// tslint:disable:no-duplicate-imports +import { fetchAsync } from '@0x/utils'; +import promiseLimit = require('p-limit'); +import { stringify } from 'querystring'; +import * as R from 'ramda'; + +import { TradingPair } from '../../utils/get_ohlcv_trading_pairs'; + +export interface CryptoCompareOHLCVResponse { + Data: Map<string, CryptoCompareOHLCVRecord[]>; + Response: string; + Message: string; +} + +export interface CryptoCompareOHLCVRecord { + time: number; // in seconds, not milliseconds + close: number; + high: number; + low: number; + open: number; + volumefrom: number; + volumeto: number; +} + +export interface CryptoCompareOHLCVParams { + fsym: string; + tsym: string; + e?: string; + aggregate?: string; + aggregatePredictableTimePeriods?: boolean; + limit?: number; + toTs?: number; +} + +const ONE_WEEK = 7 * 24 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR = 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; +const HTTP_OK_STATUS = 200; + +export class CryptoCompareOHLCVSource { + public readonly interval = ONE_WEEK; // the hourly API returns data for one week at a time + public readonly default_exchange = 'CCCAGG'; + public readonly intervalBetweenRecords = ONE_HOUR; + private readonly _url: string = 'https://min-api.cryptocompare.com/data/histohour?'; + + // rate-limit for all API calls through this class instance + private readonly _promiseLimit: (fetchFn: () => Promise<Response>) => Promise<Response>; + constructor(maxConcurrentRequests: number = 50) { + this._promiseLimit = promiseLimit(maxConcurrentRequests); + } + + // gets OHLCV records starting from pair.latest + public async getHourlyOHLCVAsync(pair: TradingPair): Promise<CryptoCompareOHLCVRecord[]> { + const params = { + e: this.default_exchange, + fsym: pair.fromSymbol, + tsym: pair.toSymbol, + toTs: Math.floor((pair.latestSavedTime + this.interval) / ONE_SECOND), // CryptoCompare uses timestamp in seconds. not ms + }; + const url = this._url + stringify(params); + + // go through the instance-wide rate-limit + const fetchPromise: Promise<Response> = this._promiseLimit(() => { + // tslint:disable-next-line:no-console + console.log(`Scraping Crypto Compare at ${url}`); + return fetchAsync(url); + }); + + const response = await Promise.resolve(fetchPromise); + if (response.status !== HTTP_OK_STATUS) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}`); + return []; + } + const json: CryptoCompareOHLCVResponse = await response.json(); + if (json.Response === 'Error' || Object.keys(json.Data).length === 0) { + // tslint:disable-next-line:no-console + console.log(`Error scraping ${url}: ${json.Message}`); + return []; + } + return Object.values(json.Data).filter(rec => rec.time * ONE_SECOND >= pair.latestSavedTime); + } + public generateBackfillIntervals(pair: TradingPair): TradingPair[] { + const now = new Date().getTime(); + const f = (p: TradingPair): false | [TradingPair, TradingPair] => { + if (p.latestSavedTime > now) { + return false; + } else { + return [p, R.merge(p, { latestSavedTime: p.latestSavedTime + this.interval })]; + } + }; + return R.unfold(f, pair); + } +} diff --git a/packages/pipeline/src/data_sources/paradex/index.ts b/packages/pipeline/src/data_sources/paradex/index.ts new file mode 100644 index 000000000..69a03d553 --- /dev/null +++ b/packages/pipeline/src/data_sources/paradex/index.ts @@ -0,0 +1,92 @@ +import { fetchAsync, logUtils } from '@0x/utils'; + +const PARADEX_BASE_URL = 'https://api.paradex.io/consumer/v0'; +const ACTIVE_MARKETS_URL = PARADEX_BASE_URL + '/markets'; +const ORDERBOOK_ENDPOINT = PARADEX_BASE_URL + '/orderbook'; +const TOKEN_INFO_ENDPOINT = PARADEX_BASE_URL + '/tokens'; +export const PARADEX_SOURCE = 'paradex'; + +export type ParadexActiveMarketsResponse = ParadexMarket[]; + +export interface ParadexMarket { + id: string; + symbol: string; + baseToken: string; + quoteToken: string; + minOrderSize: string; + maxOrderSize: string; + priceMaxDecimals: number; + amountMaxDecimals: number; + // These are not native to the Paradex API response. We tag them on later + // by calling the token endpoint and joining on symbol. + baseTokenAddress?: string; + quoteTokenAddress?: string; +} + +export interface ParadexOrderbookResponse { + marketId: number; + marketSymbol: string; + bids: ParadexOrder[]; + asks: ParadexOrder[]; +} + +export interface ParadexOrder { + amount: string; + price: string; +} + +export type ParadexTokenInfoResponse = ParadexTokenInfo[]; + +export interface ParadexTokenInfo { + name: string; + symbol: string; + address: string; +} + +export class ParadexSource { + private readonly _apiKey: string; + + constructor(apiKey: string) { + this._apiKey = apiKey; + } + + /** + * Call Paradex API to find out which markets they are maintaining orderbooks for. + */ + public async getActiveMarketsAsync(): Promise<ParadexActiveMarketsResponse> { + logUtils.log('Getting all active Paradex markets.'); + const resp = await fetchAsync(ACTIVE_MARKETS_URL, { + headers: { 'API-KEY': this._apiKey }, + }); + const markets: ParadexActiveMarketsResponse = await resp.json(); + logUtils.log(`Got ${markets.length} markets.`); + return markets; + } + + /** + * Call Paradex API to find out their token information. + */ + public async getTokenInfoAsync(): Promise<ParadexTokenInfoResponse> { + logUtils.log('Getting token information from Paradex.'); + const resp = await fetchAsync(TOKEN_INFO_ENDPOINT, { + headers: { 'API-KEY': this._apiKey }, + }); + const tokens: ParadexTokenInfoResponse = await resp.json(); + logUtils.log(`Got information for ${tokens.length} tokens.`); + return tokens; + } + + /** + * Retrieve orderbook from Paradex API for a given market. + * @param marketSymbol String representing the market we want data for. + */ + public async getMarketOrderbookAsync(marketSymbol: string): Promise<ParadexOrderbookResponse> { + logUtils.log(`${marketSymbol}: Retrieving orderbook.`); + const marketOrderbookUrl = `${ORDERBOOK_ENDPOINT}?market=${marketSymbol}`; + const resp = await fetchAsync(marketOrderbookUrl, { + headers: { 'API-KEY': this._apiKey }, + }); + const orderbookResponse: ParadexOrderbookResponse = await resp.json(); + return orderbookResponse; + } +} diff --git a/packages/pipeline/src/data_sources/relayer-registry/index.ts b/packages/pipeline/src/data_sources/relayer-registry/index.ts new file mode 100644 index 000000000..8133f5eae --- /dev/null +++ b/packages/pipeline/src/data_sources/relayer-registry/index.ts @@ -0,0 +1,33 @@ +import axios from 'axios'; + +export interface RelayerResponse { + name: string; + homepage_url: string; + app_url: string; + header_img: string; + logo_img: string; + networks: RelayerResponseNetwork[]; +} + +export interface RelayerResponseNetwork { + networkId: number; + sra_http_endpoint?: string; + sra_ws_endpoint?: string; + static_order_fields?: { + fee_recipient_addresses?: string[]; + taker_addresses?: string[]; + }; +} + +export class RelayerRegistrySource { + private readonly _url: string; + + constructor(url: string) { + this._url = url; + } + + public async getRelayerInfoAsync(): Promise<Map<string, RelayerResponse>> { + const resp = await axios.get<Map<string, RelayerResponse>>(this._url); + return resp.data; + } +} diff --git a/packages/pipeline/src/data_sources/trusted_tokens/index.ts b/packages/pipeline/src/data_sources/trusted_tokens/index.ts new file mode 100644 index 000000000..552739fb9 --- /dev/null +++ b/packages/pipeline/src/data_sources/trusted_tokens/index.ts @@ -0,0 +1,29 @@ +import axios from 'axios'; + +export interface ZeroExTrustedTokenMeta { + address: string; + name: string; + symbol: string; + decimals: number; +} + +export interface MetamaskTrustedTokenMeta { + address: string; + name: string; + erc20: boolean; + symbol: string; + decimals: number; +} + +export class TrustedTokenSource<T> { + private readonly _url: string; + + constructor(url: string) { + this._url = url; + } + + public async getTrustedTokenMetaAsync(): Promise<T> { + const resp = await axios.get<T>(this._url); + return resp.data; + } +} diff --git a/packages/pipeline/src/data_sources/web3/index.ts b/packages/pipeline/src/data_sources/web3/index.ts new file mode 100644 index 000000000..45a9ea161 --- /dev/null +++ b/packages/pipeline/src/data_sources/web3/index.ts @@ -0,0 +1,22 @@ +import { Web3ProviderEngine } from '@0x/subproviders'; +import { Web3Wrapper } from '@0x/web3-wrapper'; +import { BlockWithoutTransactionData, Transaction } from 'ethereum-types'; + +export class Web3Source { + private readonly _web3Wrapper: Web3Wrapper; + constructor(provider: Web3ProviderEngine) { + this._web3Wrapper = new Web3Wrapper(provider); + } + + public async getBlockInfoAsync(blockNumber: number): Promise<BlockWithoutTransactionData> { + const block = await this._web3Wrapper.getBlockIfExistsAsync(blockNumber); + if (block == null) { + return Promise.reject(new Error(`Could not find block for given block number: ${blockNumber}`)); + } + return block; + } + + public async getTransactionInfoAsync(txHash: string): Promise<Transaction> { + return this._web3Wrapper.getTransactionByHashAsync(txHash); + } +} diff --git a/packages/pipeline/src/entities/block.ts b/packages/pipeline/src/entities/block.ts new file mode 100644 index 000000000..398946622 --- /dev/null +++ b/packages/pipeline/src/entities/block.ts @@ -0,0 +1,13 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'blocks', schema: 'raw' }) +export class Block { + @PrimaryColumn() public hash!: string; + @PrimaryColumn({ transformer: numberToBigIntTransformer }) + public number!: number; + + @Column({ name: 'timestamp', transformer: numberToBigIntTransformer }) + public timestamp!: number; +} diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts new file mode 100644 index 000000000..9d288cb51 --- /dev/null +++ b/packages/pipeline/src/entities/dex_trade.ts @@ -0,0 +1,54 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'dex_trades', schema: 'raw' }) +export class DexTrade { + @PrimaryColumn({ name: 'source_url' }) + public sourceUrl!: string; + @PrimaryColumn({ name: 'tx_hash' }) + public txHash!: string; + + @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) + public txTimestamp!: number; + @Column({ name: 'tx_date' }) + public txDate!: string; + @Column({ name: 'tx_sender' }) + public txSender!: string; + @Column({ name: 'smart_contract_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public smartContractId!: number; + @Column({ name: 'smart_contract_address' }) + public smartContractAddress!: string; + @Column({ name: 'contract_type' }) + public contractType!: string; + @Column({ type: 'varchar' }) + public maker!: string; + @Column({ type: 'varchar' }) + public taker!: string; + @Column({ name: 'amount_buy', type: 'numeric', transformer: bigNumberTransformer }) + public amountBuy!: BigNumber; + @Column({ name: 'maker_fee_amount', type: 'numeric', transformer: bigNumberTransformer }) + public makerFeeAmount!: BigNumber; + @Column({ name: 'buy_currency_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public buyCurrencyId!: number; + @Column({ name: 'buy_symbol' }) + public buySymbol!: string; + @Column({ name: 'amount_sell', type: 'numeric', transformer: bigNumberTransformer }) + public amountSell!: BigNumber; + @Column({ name: 'taker_fee_amount', type: 'numeric', transformer: bigNumberTransformer }) + public takerFeeAmount!: BigNumber; + @Column({ name: 'sell_currency_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public sellCurrencyId!: number; + @Column({ name: 'sell_symbol' }) + public sellSymbol!: string; + @Column({ name: 'maker_annotation' }) + public makerAnnotation!: string; + @Column({ name: 'taker_annotation' }) + public takerAnnotation!: string; + @Column() public protocol!: string; + @Column({ name: 'buy_address', type: 'varchar', nullable: true }) + public buyAddress!: string | null; + @Column({ name: 'sell_address', type: 'varchar', nullable: true }) + public sellAddress!: string | null; +} diff --git a/packages/pipeline/src/entities/exchange_cancel_event.ts b/packages/pipeline/src/entities/exchange_cancel_event.ts new file mode 100644 index 000000000..38f99c903 --- /dev/null +++ b/packages/pipeline/src/entities/exchange_cancel_event.ts @@ -0,0 +1,51 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { AssetType } from '../types'; +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'exchange_cancel_events', schema: 'raw' }) +export class ExchangeCancelEvent { + @PrimaryColumn({ name: 'contract_address' }) + public contractAddress!: string; + @PrimaryColumn({ name: 'log_index' }) + public logIndex!: number; + @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) + public blockNumber!: number; + + @Column({ name: 'raw_data' }) + public rawData!: string; + + @Column({ name: 'transaction_hash' }) + public transactionHash!: string; + @Column({ name: 'maker_address' }) + public makerAddress!: string; + @Column({ nullable: true, type: String, name: 'taker_address' }) + public takerAddress!: string; + @Column({ name: 'fee_recipient_address' }) + public feeRecipientAddress!: string; + @Column({ name: 'sender_address' }) + public senderAddress!: string; + @Column({ name: 'order_hash' }) + public orderHash!: string; + + @Column({ name: 'raw_maker_asset_data' }) + public rawMakerAssetData!: string; + @Column({ name: 'maker_asset_type' }) + public makerAssetType!: AssetType; + @Column({ name: 'maker_asset_proxy_id' }) + public makerAssetProxyId!: string; + @Column({ name: 'maker_token_address' }) + public makerTokenAddress!: string; + @Column({ nullable: true, type: String, name: 'maker_token_id' }) + public makerTokenId!: string | null; + @Column({ name: 'raw_taker_asset_data' }) + public rawTakerAssetData!: string; + @Column({ name: 'taker_asset_type' }) + public takerAssetType!: AssetType; + @Column({ name: 'taker_asset_proxy_id' }) + public takerAssetProxyId!: string; + @Column({ name: 'taker_token_address' }) + public takerTokenAddress!: string; + @Column({ nullable: true, type: String, name: 'taker_token_id' }) + public takerTokenId!: string | null; +} diff --git a/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts b/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts new file mode 100644 index 000000000..27580305e --- /dev/null +++ b/packages/pipeline/src/entities/exchange_cancel_up_to_event.ts @@ -0,0 +1,26 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'exchange_cancel_up_to_events', schema: 'raw' }) +export class ExchangeCancelUpToEvent { + @PrimaryColumn({ name: 'contract_address' }) + public contractAddress!: string; + @PrimaryColumn({ name: 'log_index' }) + public logIndex!: number; + @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) + public blockNumber!: number; + + @Column({ name: 'raw_data' }) + public rawData!: string; + + @Column({ name: 'transaction_hash' }) + public transactionHash!: string; + @Column({ name: 'maker_address' }) + public makerAddress!: string; + @Column({ name: 'sender_address' }) + public senderAddress!: string; + @Column({ name: 'order_epoch', type: 'numeric', transformer: bigNumberTransformer }) + public orderEpoch!: BigNumber; +} diff --git a/packages/pipeline/src/entities/exchange_fill_event.ts b/packages/pipeline/src/entities/exchange_fill_event.ts new file mode 100644 index 000000000..9b7727615 --- /dev/null +++ b/packages/pipeline/src/entities/exchange_fill_event.ts @@ -0,0 +1,60 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { AssetType } from '../types'; +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'exchange_fill_events', schema: 'raw' }) +export class ExchangeFillEvent { + @PrimaryColumn({ name: 'contract_address' }) + public contractAddress!: string; + @PrimaryColumn({ name: 'log_index' }) + public logIndex!: number; + @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) + public blockNumber!: number; + + @Column({ name: 'raw_data' }) + public rawData!: string; + + @Column({ name: 'transaction_hash' }) + public transactionHash!: string; + @Column({ name: 'maker_address' }) + public makerAddress!: string; + @Column({ name: 'taker_address' }) + public takerAddress!: string; + @Column({ name: 'fee_recipient_address' }) + public feeRecipientAddress!: string; + @Column({ name: 'sender_address' }) + public senderAddress!: string; + @Column({ name: 'maker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer }) + public makerAssetFilledAmount!: BigNumber; + @Column({ name: 'taker_asset_filled_amount', type: 'numeric', transformer: bigNumberTransformer }) + public takerAssetFilledAmount!: BigNumber; + @Column({ name: 'maker_fee_paid', type: 'numeric', transformer: bigNumberTransformer }) + public makerFeePaid!: BigNumber; + @Column({ name: 'taker_fee_paid', type: 'numeric', transformer: bigNumberTransformer }) + public takerFeePaid!: BigNumber; + @Column({ name: 'order_hash' }) + public orderHash!: string; + + @Column({ name: 'raw_maker_asset_data' }) + public rawMakerAssetData!: string; + @Column({ name: 'maker_asset_type' }) + public makerAssetType!: AssetType; + @Column({ name: 'maker_asset_proxy_id' }) + public makerAssetProxyId!: string; + @Column({ name: 'maker_token_address' }) + public makerTokenAddress!: string; + @Column({ nullable: true, type: String, name: 'maker_token_id' }) + public makerTokenId!: string | null; + @Column({ name: 'raw_taker_asset_data' }) + public rawTakerAssetData!: string; + @Column({ name: 'taker_asset_type' }) + public takerAssetType!: AssetType; + @Column({ name: 'taker_asset_proxy_id' }) + public takerAssetProxyId!: string; + @Column({ name: 'taker_token_address' }) + public takerTokenAddress!: string; + @Column({ nullable: true, type: String, name: 'taker_token_id' }) + public takerTokenId!: string | null; +} diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts new file mode 100644 index 000000000..db0814e38 --- /dev/null +++ b/packages/pipeline/src/entities/index.ts @@ -0,0 +1,18 @@ +import { ExchangeCancelEvent } from './exchange_cancel_event'; +import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; +import { ExchangeFillEvent } from './exchange_fill_event'; + +export { Block } from './block'; +export { DexTrade } from './dex_trade'; +export { ExchangeCancelEvent } from './exchange_cancel_event'; +export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event'; +export { ExchangeFillEvent } from './exchange_fill_event'; +export { OHLCVExternal } from './ohlcv_external'; +export { Relayer } from './relayer'; +export { SraOrder } from './sra_order'; +export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp'; +export { TokenMetadata } from './token_metadata'; +export { TokenOrderbookSnapshot } from './token_order'; +export { Transaction } from './transaction'; + +export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/entities/ohlcv_external.ts b/packages/pipeline/src/entities/ohlcv_external.ts new file mode 100644 index 000000000..4f55dd930 --- /dev/null +++ b/packages/pipeline/src/entities/ohlcv_external.ts @@ -0,0 +1,30 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'ohlcv_external', schema: 'raw' }) +export class OHLCVExternal { + @PrimaryColumn() public exchange!: string; + + @PrimaryColumn({ name: 'from_symbol', type: 'varchar' }) + public fromSymbol!: string; + @PrimaryColumn({ name: 'to_symbol', type: 'varchar' }) + public toSymbol!: string; + @PrimaryColumn({ name: 'start_time', transformer: numberToBigIntTransformer }) + public startTime!: number; + @PrimaryColumn({ name: 'end_time', transformer: numberToBigIntTransformer }) + public endTime!: number; + + @Column() public open!: number; + @Column() public close!: number; + @Column() public low!: number; + @Column() public high!: number; + @Column({ name: 'volume_from' }) + public volumeFrom!: number; + @Column({ name: 'volume_to' }) + public volumeTo!: number; + + @PrimaryColumn() public source!: string; + @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer }) + public observedTimestamp!: number; +} diff --git a/packages/pipeline/src/entities/relayer.ts b/packages/pipeline/src/entities/relayer.ts new file mode 100644 index 000000000..5af8578b4 --- /dev/null +++ b/packages/pipeline/src/entities/relayer.ts @@ -0,0 +1,21 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +@Entity({ name: 'relayers', schema: 'raw' }) +export class Relayer { + @PrimaryColumn() public uuid!: string; + + @Column() public name!: string; + @Column({ name: 'homepage_url', type: 'varchar' }) + public homepageUrl!: string; + @Column({ name: 'sra_http_endpoint', type: 'varchar', nullable: true }) + public sraHttpEndpoint!: string | null; + @Column({ name: 'sra_ws_endpoint', type: 'varchar', nullable: true }) + public sraWsEndpoint!: string | null; + @Column({ name: 'app_url', type: 'varchar', nullable: true }) + public appUrl!: string | null; + + @Column({ name: 'fee_recipient_addresses', type: 'varchar', array: true }) + public feeRecipientAddresses!: string[]; + @Column({ name: 'taker_addresses', type: 'varchar', array: true }) + public takerAddresses!: string[]; +} diff --git a/packages/pipeline/src/entities/sra_order.ts b/packages/pipeline/src/entities/sra_order.ts new file mode 100644 index 000000000..9c730a0bb --- /dev/null +++ b/packages/pipeline/src/entities/sra_order.ts @@ -0,0 +1,63 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { AssetType } from '../types'; +import { bigNumberTransformer } from '../utils'; + +@Entity({ name: 'sra_orders', schema: 'raw' }) +export class SraOrder { + @PrimaryColumn({ name: 'exchange_address' }) + public exchangeAddress!: string; + @PrimaryColumn({ name: 'order_hash_hex' }) + public orderHashHex!: string; + @PrimaryColumn({ name: 'source_url' }) + public sourceUrl!: string; + + @Column({ name: 'maker_address' }) + public makerAddress!: string; + @Column({ name: 'taker_address' }) + public takerAddress!: string; + @Column({ name: 'fee_recipient_address' }) + public feeRecipientAddress!: string; + @Column({ name: 'sender_address' }) + public senderAddress!: string; + @Column({ name: 'maker_asset_amount', type: 'numeric', transformer: bigNumberTransformer }) + public makerAssetAmount!: BigNumber; + @Column({ name: 'taker_asset_amount', type: 'numeric', transformer: bigNumberTransformer }) + public takerAssetAmount!: BigNumber; + @Column({ name: 'maker_fee', type: 'numeric', transformer: bigNumberTransformer }) + public makerFee!: BigNumber; + @Column({ name: 'taker_fee', type: 'numeric', transformer: bigNumberTransformer }) + public takerFee!: BigNumber; + @Column({ name: 'expiration_time_seconds', type: 'numeric', transformer: bigNumberTransformer }) + public expirationTimeSeconds!: BigNumber; + @Column({ name: 'salt', type: 'numeric', transformer: bigNumberTransformer }) + public salt!: BigNumber; + @Column({ name: 'signature' }) + public signature!: string; + + @Column({ name: 'raw_maker_asset_data' }) + public rawMakerAssetData!: string; + @Column({ name: 'maker_asset_type' }) + public makerAssetType!: AssetType; + @Column({ name: 'maker_asset_proxy_id' }) + public makerAssetProxyId!: string; + @Column({ name: 'maker_token_address' }) + public makerTokenAddress!: string; + @Column({ nullable: true, type: String, name: 'maker_token_id' }) + public makerTokenId!: string | null; + @Column({ name: 'raw_taker_asset_data' }) + public rawTakerAssetData!: string; + @Column({ name: 'taker_asset_type' }) + public takerAssetType!: AssetType; + @Column({ name: 'taker_asset_proxy_id' }) + public takerAssetProxyId!: string; + @Column({ name: 'taker_token_address' }) + public takerTokenAddress!: string; + @Column({ nullable: true, type: String, name: 'taker_token_id' }) + public takerTokenId!: string | null; + + // TODO(albrow): Make this optional? + @Column({ name: 'metadata_json' }) + public metadataJson!: string; +} diff --git a/packages/pipeline/src/entities/sra_order_observed_timestamp.ts b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts new file mode 100644 index 000000000..cd2d41397 --- /dev/null +++ b/packages/pipeline/src/entities/sra_order_observed_timestamp.ts @@ -0,0 +1,32 @@ +import { Entity, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +import { SraOrder } from './sra_order'; + +@Entity({ name: 'sra_orders_observed_timestamps', schema: 'raw' }) +export class SraOrdersObservedTimeStamp { + @PrimaryColumn({ name: 'exchange_address' }) + public exchangeAddress!: string; + @PrimaryColumn({ name: 'order_hash_hex' }) + public orderHashHex!: string; + @PrimaryColumn({ name: 'source_url' }) + public sourceUrl!: string; + + @PrimaryColumn({ name: 'observed_timestamp', transformer: numberToBigIntTransformer }) + public observedTimestamp!: number; +} + +/** + * Returns a new SraOrdersObservedTimeStamp for the given order based on the + * current time. + * @param order The order to generate a timestamp for. + */ +export function createObservedTimestampForOrder(order: SraOrder): SraOrdersObservedTimeStamp { + const observed = new SraOrdersObservedTimeStamp(); + observed.exchangeAddress = order.exchangeAddress; + observed.orderHashHex = order.orderHashHex; + observed.sourceUrl = order.sourceUrl; + observed.observedTimestamp = Date.now(); + return observed; +} diff --git a/packages/pipeline/src/entities/token_metadata.ts b/packages/pipeline/src/entities/token_metadata.ts new file mode 100644 index 000000000..911b53972 --- /dev/null +++ b/packages/pipeline/src/entities/token_metadata.ts @@ -0,0 +1,22 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { bigNumberTransformer } from '../utils/transformers'; + +@Entity({ name: 'token_metadata', schema: 'raw' }) +export class TokenMetadata { + @PrimaryColumn({ type: 'varchar', nullable: false }) + public address!: string; + + @PrimaryColumn({ type: 'varchar', nullable: false }) + public authority!: string; + + @Column({ type: 'numeric', transformer: bigNumberTransformer, nullable: true }) + public decimals!: BigNumber | null; + + @Column({ type: 'varchar', nullable: true }) + public symbol!: string | null; + + @Column({ type: 'varchar', nullable: true }) + public name!: string | null; +} diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts new file mode 100644 index 000000000..557705767 --- /dev/null +++ b/packages/pipeline/src/entities/token_order.ts @@ -0,0 +1,29 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { OrderType } from '../types'; +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'token_orderbook_snapshots', schema: 'raw' }) +export class TokenOrderbookSnapshot { + @PrimaryColumn({ name: 'observed_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) + public observedTimestamp!: number; + @PrimaryColumn({ name: 'source' }) + public source!: string; + @Column({ name: 'order_type' }) + public orderType!: OrderType; + @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer }) + public price!: BigNumber; + @PrimaryColumn({ name: 'base_asset_symbol' }) + public baseAssetSymbol!: string; + @Column({ name: 'base_asset_address' }) + public baseAssetAddress!: string; + @Column({ name: 'base_volume', type: 'numeric', transformer: bigNumberTransformer }) + public baseVolume!: BigNumber; + @PrimaryColumn({ name: 'quote_asset_symbol' }) + public quoteAssetSymbol!: string; + @Column({ name: 'quote_asset_address' }) + public quoteAssetAddress!: string; + @Column({ name: 'quote_volume', type: 'numeric', transformer: bigNumberTransformer }) + public quoteVolume!: BigNumber; +} diff --git a/packages/pipeline/src/entities/transaction.ts b/packages/pipeline/src/entities/transaction.ts new file mode 100644 index 000000000..742050177 --- /dev/null +++ b/packages/pipeline/src/entities/transaction.ts @@ -0,0 +1,19 @@ +import { BigNumber } from '@0x/utils'; +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'transactions', schema: 'raw' }) +export class Transaction { + @PrimaryColumn({ name: 'transaction_hash' }) + public transactionHash!: string; + @PrimaryColumn({ name: 'block_hash' }) + public blockHash!: string; + @PrimaryColumn({ name: 'block_number', transformer: numberToBigIntTransformer }) + public blockNumber!: number; + + @Column({ type: 'numeric', name: 'gas_used', transformer: bigNumberTransformer }) + public gasUsed!: BigNumber; + @Column({ type: 'numeric', name: 'gas_price', transformer: bigNumberTransformer }) + public gasPrice!: BigNumber; +} diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts new file mode 100644 index 000000000..9f7815b4e --- /dev/null +++ b/packages/pipeline/src/ormconfig.ts @@ -0,0 +1,42 @@ +import { ConnectionOptions } from 'typeorm'; + +import { + Block, + DexTrade, + ExchangeCancelEvent, + ExchangeCancelUpToEvent, + ExchangeFillEvent, + OHLCVExternal, + Relayer, + SraOrder, + SraOrdersObservedTimeStamp, + TokenMetadata, + TokenOrderbookSnapshot, + Transaction, +} from './entities'; + +const entities = [ + Block, + DexTrade, + ExchangeCancelEvent, + ExchangeCancelUpToEvent, + ExchangeFillEvent, + OHLCVExternal, + Relayer, + SraOrder, + SraOrdersObservedTimeStamp, + TokenMetadata, + TokenOrderbookSnapshot, + Transaction, +]; + +const config: ConnectionOptions = { + type: 'postgres', + url: process.env.ZEROEX_DATA_PIPELINE_DB_URL, + synchronize: false, + logging: ['error'], + entities, + migrations: ['./lib/migrations/**/*.js'], +}; + +module.exports = config; diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts new file mode 100644 index 000000000..caa55d289 --- /dev/null +++ b/packages/pipeline/src/parsers/bloxy/index.ts @@ -0,0 +1,53 @@ +import { BigNumber } from '@0x/utils'; +import * as R from 'ramda'; + +import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../data_sources/bloxy'; +import { DexTrade } from '../../entities'; + +/** + * Parses a raw trades response from the Bloxy Dex API and returns an array of + * DexTrade entities. + * @param rawTrades A raw order response from an SRA endpoint. + */ +export function parseBloxyTrades(rawTrades: BloxyTrade[]): DexTrade[] { + return R.map(_parseBloxyTrade, rawTrades); +} + +/** + * Converts a single Bloxy trade into a DexTrade entity. + * @param rawTrade A single trade from the response from the Bloxy API. + */ +export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade { + const dexTrade = new DexTrade(); + dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL; + dexTrade.txHash = rawTrade.tx_hash; + dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime(); + dexTrade.txDate = rawTrade.tx_date; + dexTrade.txSender = rawTrade.tx_sender; + dexTrade.smartContractId = rawTrade.smart_contract_id; + dexTrade.smartContractAddress = rawTrade.smart_contract_address; + dexTrade.contractType = rawTrade.contract_type; + dexTrade.maker = rawTrade.maker; + dexTrade.taker = rawTrade.taker; + // TODO(albrow): The Bloxy API returns amounts and fees as a `number` type + // but some of their values have too many significant digits to be + // represented that way. Ideally they will switch to using strings and then + // we can update this code. + dexTrade.amountBuy = new BigNumber(rawTrade.amountBuy.toString()); + dexTrade.makerFeeAmount = new BigNumber(rawTrade.makerFee.toString()); + dexTrade.buyCurrencyId = rawTrade.buyCurrencyId; + dexTrade.buySymbol = filterNullCharacters(rawTrade.buySymbol); + dexTrade.amountSell = new BigNumber(rawTrade.amountSell.toString()); + dexTrade.takerFeeAmount = new BigNumber(rawTrade.takerFee.toString()); + dexTrade.sellCurrencyId = rawTrade.sellCurrencyId; + dexTrade.sellSymbol = filterNullCharacters(rawTrade.sellSymbol); + dexTrade.makerAnnotation = rawTrade.maker_annotation; + dexTrade.takerAnnotation = rawTrade.taker_annotation; + dexTrade.protocol = rawTrade.protocol; + dexTrade.buyAddress = rawTrade.buyAddress; + dexTrade.sellAddress = rawTrade.sellAddress; + return dexTrade; +} + +// Works with any form of escaped null character (e.g., '\0' and '\u0000'). +const filterNullCharacters = R.replace(/\0/g, ''); diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts new file mode 100644 index 000000000..81132e8f0 --- /dev/null +++ b/packages/pipeline/src/parsers/ddex_orders/index.ts @@ -0,0 +1,77 @@ +import { BigNumber } from '@0x/utils'; +import * as R from 'ramda'; + +import { DdexMarket, DdexOrder, DdexOrderbook } from '../../data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook, + * other information attached. + * @param ddexOrderbook A raw orderbook that we pull from the Ddex API. + * @param ddexMarket An object containing market data also directly from the API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'ddex'. + */ +export function parseDdexOrders( + ddexOrderbook: DdexOrderbook, + ddexMarket: DdexMarket, + observedTimestamp: number, + source: string, +): TokenOrder[] { + const aggregatedBids = aggregateOrders(ddexOrderbook.bids); + const aggregatedAsks = aggregateOrders(ddexOrderbook.asks); + const parsedBids = aggregatedBids.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'bid', source, order)); + const parsedAsks = aggregatedAsks.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'ask', source, order)); + return parsedBids.concat(parsedAsks); +} + +/** + * Aggregates orders by price point for consistency with other exchanges. + * Querying the Ddex API at level 3 setting returns a breakdown of + * individual orders at each price point. Other exchanges only give total amount + * at each price point. Returns an array of <price, amount> tuples. + * @param ddexOrders A list of Ddex orders awaiting aggregation. + */ +export function aggregateOrders(ddexOrders: DdexOrder[]): Array<[string, BigNumber]> { + const sumAmount = (acc: BigNumber, order: DdexOrder): BigNumber => acc.plus(order.amount); + const aggregatedPricePoints = R.reduceBy(sumAmount, new BigNumber(0), R.prop('price'), ddexOrders); + return Object.entries(aggregatedPricePoints); +} + +/** + * Parse a single aggregated Ddex order in order to form a tokenOrder entity + * which can be saved into the database. + * @param ddexMarket An object containing information about the market where these + * trades have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param ddexOrder A <price, amount> tuple which we will convert to volume-basis. + */ +export function parseDdexOrder( + ddexMarket: DdexMarket, + observedTimestamp: number, + orderType: OrderType, + source: string, + ddexOrder: [string, BigNumber], +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(ddexOrder[0]); + const amount = ddexOrder[1]; + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + + tokenOrder.baseAssetSymbol = ddexMarket.baseToken; + tokenOrder.baseAssetAddress = ddexMarket.baseTokenAddress; + tokenOrder.baseVolume = price.times(amount); + + tokenOrder.quoteAssetSymbol = ddexMarket.quoteToken; + tokenOrder.quoteAssetAddress = ddexMarket.quoteTokenAddress; + tokenOrder.quoteVolume = amount; + return tokenOrder; +} diff --git a/packages/pipeline/src/parsers/events/index.ts b/packages/pipeline/src/parsers/events/index.ts new file mode 100644 index 000000000..e18106c75 --- /dev/null +++ b/packages/pipeline/src/parsers/events/index.ts @@ -0,0 +1,133 @@ +import { ExchangeCancelEventArgs, ExchangeCancelUpToEventArgs, ExchangeFillEventArgs } from '@0x/contract-wrappers'; +import { assetDataUtils } from '@0x/order-utils'; +import { AssetProxyId, ERC721AssetData } from '@0x/types'; +import { LogWithDecodedArgs } from 'ethereum-types'; +import * as R from 'ramda'; + +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; +import { bigNumbertoStringOrNull } from '../../utils'; + +/** + * Parses raw event logs for a fill event and returns an array of + * ExchangeFillEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeFillEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeFillEventArgs>>, +) => ExchangeFillEvent[] = R.map(_convertToExchangeFillEvent); + +/** + * Parses raw event logs for a cancel event and returns an array of + * ExchangeCancelEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelEventArgs>>, +) => ExchangeCancelEvent[] = R.map(_convertToExchangeCancelEvent); + +/** + * Parses raw event logs for a CancelUpTo event and returns an array of + * ExchangeCancelUpToEvent entities. + * @param eventLogs Raw event logs (e.g. returned from contract-wrappers). + */ +export const parseExchangeCancelUpToEvents: ( + eventLogs: Array<LogWithDecodedArgs<ExchangeCancelUpToEventArgs>>, +) => ExchangeCancelUpToEvent[] = R.map(_convertToExchangeCancelUpToEvent); + +/** + * Converts a raw event log for a fill event into an ExchangeFillEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent { + const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); + const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); + const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const exchangeFillEvent = new ExchangeFillEvent(); + exchangeFillEvent.contractAddress = eventLog.address as string; + exchangeFillEvent.blockNumber = eventLog.blockNumber as number; + exchangeFillEvent.logIndex = eventLog.logIndex as number; + exchangeFillEvent.rawData = eventLog.data as string; + exchangeFillEvent.transactionHash = eventLog.transactionHash; + exchangeFillEvent.makerAddress = eventLog.args.makerAddress; + exchangeFillEvent.takerAddress = eventLog.args.takerAddress; + exchangeFillEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; + exchangeFillEvent.senderAddress = eventLog.args.senderAddress; + exchangeFillEvent.makerAssetFilledAmount = eventLog.args.makerAssetFilledAmount; + exchangeFillEvent.takerAssetFilledAmount = eventLog.args.takerAssetFilledAmount; + exchangeFillEvent.makerFeePaid = eventLog.args.makerFeePaid; + exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid; + exchangeFillEvent.orderHash = eventLog.args.orderHash; + exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData; + exchangeFillEvent.makerAssetType = makerAssetType; + exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId; + exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress; + // tslint has a false positive here. Type assertion is required. + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); + exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData; + exchangeFillEvent.takerAssetType = takerAssetType; + exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId; + exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); + return exchangeFillEvent; +} + +/** + * Converts a raw event log for a cancel event into an ExchangeCancelEvent + * entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeCancelEvent( + eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>, +): ExchangeCancelEvent { + const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); + const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); + const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const exchangeCancelEvent = new ExchangeCancelEvent(); + exchangeCancelEvent.contractAddress = eventLog.address as string; + exchangeCancelEvent.blockNumber = eventLog.blockNumber as number; + exchangeCancelEvent.logIndex = eventLog.logIndex as number; + exchangeCancelEvent.rawData = eventLog.data as string; + exchangeCancelEvent.transactionHash = eventLog.transactionHash; + exchangeCancelEvent.makerAddress = eventLog.args.makerAddress; + exchangeCancelEvent.takerAddress = eventLog.args.takerAddress; + exchangeCancelEvent.feeRecipientAddress = eventLog.args.feeRecipientAddress; + exchangeCancelEvent.senderAddress = eventLog.args.senderAddress; + exchangeCancelEvent.orderHash = eventLog.args.orderHash; + exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData; + exchangeCancelEvent.makerAssetType = makerAssetType; + exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId; + exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); + exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData; + exchangeCancelEvent.takerAssetType = takerAssetType; + exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId; + exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); + return exchangeCancelEvent; +} + +/** + * Converts a raw event log for a cancelUpTo event into an + * ExchangeCancelUpToEvent entity. + * @param eventLog Raw event log (e.g. returned from contract-wrappers). + */ +export function _convertToExchangeCancelUpToEvent( + eventLog: LogWithDecodedArgs<ExchangeCancelUpToEventArgs>, +): ExchangeCancelUpToEvent { + const exchangeCancelUpToEvent = new ExchangeCancelUpToEvent(); + exchangeCancelUpToEvent.contractAddress = eventLog.address as string; + exchangeCancelUpToEvent.blockNumber = eventLog.blockNumber as number; + exchangeCancelUpToEvent.logIndex = eventLog.logIndex as number; + exchangeCancelUpToEvent.rawData = eventLog.data as string; + exchangeCancelUpToEvent.transactionHash = eventLog.transactionHash; + exchangeCancelUpToEvent.makerAddress = eventLog.args.makerAddress; + exchangeCancelUpToEvent.senderAddress = eventLog.args.senderAddress; + exchangeCancelUpToEvent.orderEpoch = eventLog.args.orderEpoch; + return exchangeCancelUpToEvent; +} diff --git a/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts new file mode 100644 index 000000000..3efb90384 --- /dev/null +++ b/packages/pipeline/src/parsers/ohlcv_external/crypto_compare.ts @@ -0,0 +1,38 @@ +import { CryptoCompareOHLCVRecord } from '../../data_sources/ohlcv_external/crypto_compare'; +import { OHLCVExternal } from '../../entities'; + +const ONE_SECOND = 1000; // Crypto Compare uses timestamps in seconds instead of milliseconds + +export interface OHLCVMetadata { + exchange: string; + fromSymbol: string; + toSymbol: string; + source: string; + observedTimestamp: number; + interval: number; +} +/** + * Parses OHLCV records from Crypto Compare into an array of OHLCVExternal entities + * @param rawRecords an array of OHLCV records from Crypto Compare (not the full response) + */ +export function parseRecords(rawRecords: CryptoCompareOHLCVRecord[], metadata: OHLCVMetadata): OHLCVExternal[] { + return rawRecords.map(rec => { + const ohlcvRecord = new OHLCVExternal(); + ohlcvRecord.exchange = metadata.exchange; + ohlcvRecord.fromSymbol = metadata.fromSymbol; + ohlcvRecord.toSymbol = metadata.toSymbol; + ohlcvRecord.startTime = rec.time * ONE_SECOND - metadata.interval; + ohlcvRecord.endTime = rec.time * ONE_SECOND; + + ohlcvRecord.open = rec.open; + ohlcvRecord.close = rec.close; + ohlcvRecord.low = rec.low; + ohlcvRecord.high = rec.high; + ohlcvRecord.volumeFrom = rec.volumefrom; + ohlcvRecord.volumeTo = rec.volumeto; + + ohlcvRecord.source = metadata.source; + ohlcvRecord.observedTimestamp = metadata.observedTimestamp; + return ohlcvRecord; + }); +} diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts new file mode 100644 index 000000000..7966658a7 --- /dev/null +++ b/packages/pipeline/src/parsers/paradex_orders/index.ts @@ -0,0 +1,66 @@ +import { BigNumber } from '@0x/utils'; + +import { ParadexMarket, ParadexOrder, ParadexOrderbookResponse } from '../../data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; +import { OrderType } from '../../types'; + +/** + * Marque function of this file. + * 1) Takes in orders from an orderbook (orders are already aggregated by price point), + * 2) For each aggregated order, forms a TokenOrder entity with market data and + * other information attached. + * @param paradexOrderbookResponse An orderbook response from the Paradex API. + * @param paradexMarket An object containing market data also directly from the API. + * @param observedTimestamp Time at which the orders for the market were pulled. + * @param source The exchange where these orders are placed. In this case 'paradex'. + */ +export function parseParadexOrders( + paradexOrderbookResponse: ParadexOrderbookResponse, + paradexMarket: ParadexMarket, + observedTimestamp: number, + source: string, +): TokenOrder[] { + const parsedBids = paradexOrderbookResponse.bids.map(order => + parseParadexOrder(paradexMarket, observedTimestamp, 'bid', source, order), + ); + const parsedAsks = paradexOrderbookResponse.asks.map(order => + parseParadexOrder(paradexMarket, observedTimestamp, 'ask', source, order), + ); + return parsedBids.concat(parsedAsks); +} + +/** + * Parse a single aggregated Ddex order in order to form a tokenOrder entity + * which can be saved into the database. + * @param paradexMarket An object containing information about the market where these + * orders have been placed. + * @param observedTimestamp The time when the API response returned back to us. + * @param orderType 'bid' or 'ask' enum. + * @param source Exchange where these orders were placed. + * @param paradexOrder A ParadexOrder object; basically price, amount tuple. + */ +export function parseParadexOrder( + paradexMarket: ParadexMarket, + observedTimestamp: number, + orderType: OrderType, + source: string, + paradexOrder: ParadexOrder, +): TokenOrder { + const tokenOrder = new TokenOrder(); + const price = new BigNumber(paradexOrder.price); + const amount = new BigNumber(paradexOrder.amount); + + tokenOrder.source = source; + tokenOrder.observedTimestamp = observedTimestamp; + tokenOrder.orderType = orderType; + tokenOrder.price = price; + + tokenOrder.baseAssetSymbol = paradexMarket.baseToken; + tokenOrder.baseAssetAddress = paradexMarket.baseTokenAddress as string; + tokenOrder.baseVolume = price.times(amount); + + tokenOrder.quoteAssetSymbol = paradexMarket.quoteToken; + tokenOrder.quoteAssetAddress = paradexMarket.quoteTokenAddress as string; + tokenOrder.quoteVolume = amount; + return tokenOrder; +} diff --git a/packages/pipeline/src/parsers/relayer_registry/index.ts b/packages/pipeline/src/parsers/relayer_registry/index.ts new file mode 100644 index 000000000..9723880a4 --- /dev/null +++ b/packages/pipeline/src/parsers/relayer_registry/index.ts @@ -0,0 +1,37 @@ +import * as R from 'ramda'; + +import { RelayerResponse, RelayerResponseNetwork } from '../../data_sources/relayer-registry'; +import { Relayer } from '../../entities'; + +/** + * Parses a raw relayer registry response into an array of Relayer entities. + * @param rawResp raw response from the relayer-registry json file. + */ +export function parseRelayers(rawResp: Map<string, RelayerResponse>): Relayer[] { + const parsedAsObject = R.mapObjIndexed(parseRelayer, rawResp); + return R.values(parsedAsObject); +} + +function parseRelayer(relayerResp: RelayerResponse, uuid: string): Relayer { + const relayer = new Relayer(); + relayer.uuid = uuid; + relayer.name = relayerResp.name; + relayer.homepageUrl = relayerResp.homepage_url; + relayer.appUrl = relayerResp.app_url; + const mainNetworkRelayerInfo = getMainNetwork(relayerResp); + if (mainNetworkRelayerInfo !== undefined) { + relayer.sraHttpEndpoint = mainNetworkRelayerInfo.sra_http_endpoint || null; + relayer.sraWsEndpoint = mainNetworkRelayerInfo.sra_ws_endpoint || null; + relayer.feeRecipientAddresses = + R.path(['static_order_fields', 'fee_recipient_addresses'], mainNetworkRelayerInfo) || []; + relayer.takerAddresses = R.path(['static_order_fields', 'taker_addresses'], mainNetworkRelayerInfo) || []; + } else { + relayer.feeRecipientAddresses = []; + relayer.takerAddresses = []; + } + return relayer; +} + +function getMainNetwork(relayerResp: RelayerResponse): RelayerResponseNetwork | undefined { + return R.find(network => network.networkId === 1, relayerResp.networks); +} diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts new file mode 100644 index 000000000..ef8901e40 --- /dev/null +++ b/packages/pipeline/src/parsers/sra_orders/index.ts @@ -0,0 +1,62 @@ +import { APIOrder, OrdersResponse } from '@0x/connect'; +import { assetDataUtils, orderHashUtils } from '@0x/order-utils'; +import { AssetProxyId, ERC721AssetData } from '@0x/types'; +import * as R from 'ramda'; + +import { SraOrder } from '../../entities'; +import { bigNumbertoStringOrNull } from '../../utils'; + +/** + * Parses a raw order response from an SRA endpoint and returns an array of + * SraOrder entities. + * @param rawOrdersResponse A raw order response from an SRA endpoint. + */ +export function parseSraOrders(rawOrdersResponse: OrdersResponse): SraOrder[] { + return R.map(_convertToEntity, rawOrdersResponse.records); +} + +/** + * Converts a single APIOrder into an SraOrder entity. + * @param apiOrder A single order from the response from an SRA endpoint. + */ +export function _convertToEntity(apiOrder: APIOrder): SraOrder { + // TODO(albrow): refactor out common asset data decoding code. + const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.makerAssetData); + const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.takerAssetData); + const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; + + const sraOrder = new SraOrder(); + sraOrder.exchangeAddress = apiOrder.order.exchangeAddress; + sraOrder.orderHashHex = orderHashUtils.getOrderHashHex(apiOrder.order); + + sraOrder.makerAddress = apiOrder.order.makerAddress; + sraOrder.takerAddress = apiOrder.order.takerAddress; + sraOrder.feeRecipientAddress = apiOrder.order.feeRecipientAddress; + sraOrder.senderAddress = apiOrder.order.senderAddress; + sraOrder.makerAssetAmount = apiOrder.order.makerAssetAmount; + sraOrder.takerAssetAmount = apiOrder.order.takerAssetAmount; + sraOrder.makerFee = apiOrder.order.makerFee; + sraOrder.takerFee = apiOrder.order.takerFee; + sraOrder.expirationTimeSeconds = apiOrder.order.expirationTimeSeconds; + sraOrder.salt = apiOrder.order.salt; + sraOrder.signature = apiOrder.order.signature; + + sraOrder.rawMakerAssetData = apiOrder.order.makerAssetData; + sraOrder.makerAssetType = makerAssetType; + sraOrder.makerAssetProxyId = makerAssetData.assetProxyId; + sraOrder.makerTokenAddress = makerAssetData.tokenAddress; + // tslint has a false positive here. Type assertion is required. + // tslint:disable-next-line:no-unnecessary-type-assertion + sraOrder.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); + sraOrder.rawTakerAssetData = apiOrder.order.takerAssetData; + sraOrder.takerAssetType = takerAssetType; + sraOrder.takerAssetProxyId = takerAssetData.assetProxyId; + sraOrder.takerTokenAddress = takerAssetData.tokenAddress; + // tslint:disable-next-line:no-unnecessary-type-assertion + sraOrder.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); + + sraOrder.metadataJson = JSON.stringify(apiOrder.metaData); + + return sraOrder; +} diff --git a/packages/pipeline/src/parsers/token_metadata/index.ts b/packages/pipeline/src/parsers/token_metadata/index.ts new file mode 100644 index 000000000..3b3e05d76 --- /dev/null +++ b/packages/pipeline/src/parsers/token_metadata/index.ts @@ -0,0 +1,47 @@ +import { BigNumber } from '@0x/utils'; +import * as R from 'ramda'; + +import { MetamaskTrustedTokenMeta, ZeroExTrustedTokenMeta } from '../../data_sources/trusted_tokens'; +import { TokenMetadata } from '../../entities'; +import {} from '../../utils'; + +/** + * Parses Metamask's trusted tokens list. + * @param rawResp raw response from the metamask json file. + */ +export function parseMetamaskTrustedTokens(rawResp: Map<string, MetamaskTrustedTokenMeta>): TokenMetadata[] { + const parsedAsObject = R.mapObjIndexed(parseMetamaskTrustedToken, rawResp); + return R.values(parsedAsObject); +} + +/** + * Parses 0x's trusted tokens list. + * @param rawResp raw response from the 0x trusted tokens file. + */ +export function parseZeroExTrustedTokens(rawResp: ZeroExTrustedTokenMeta[]): TokenMetadata[] { + return R.map(parseZeroExTrustedToken, rawResp); +} + +function parseMetamaskTrustedToken(resp: MetamaskTrustedTokenMeta, address: string): TokenMetadata { + const trustedToken = new TokenMetadata(); + + trustedToken.address = address; + trustedToken.decimals = new BigNumber(resp.decimals); + trustedToken.symbol = resp.symbol; + trustedToken.name = resp.name; + trustedToken.authority = 'metamask'; + + return trustedToken; +} + +function parseZeroExTrustedToken(resp: ZeroExTrustedTokenMeta): TokenMetadata { + const trustedToken = new TokenMetadata(); + + trustedToken.address = resp.address; + trustedToken.decimals = new BigNumber(resp.decimals); + trustedToken.symbol = resp.symbol; + trustedToken.name = resp.name; + trustedToken.authority = '0x'; + + return trustedToken; +} diff --git a/packages/pipeline/src/parsers/web3/index.ts b/packages/pipeline/src/parsers/web3/index.ts new file mode 100644 index 000000000..f986efc59 --- /dev/null +++ b/packages/pipeline/src/parsers/web3/index.ts @@ -0,0 +1,49 @@ +import { BigNumber } from '@0x/utils'; +import { BlockWithoutTransactionData, Transaction as EthTransaction } from 'ethereum-types'; + +import { Block, Transaction } from '../../entities'; + +const MILLISECONDS_PER_SECOND = 1000; + +/** + * Parses a raw block and returns a Block entity. + * @param rawBlock a raw block (e.g. returned from web3-wrapper). + */ +export function parseBlock(rawBlock: BlockWithoutTransactionData): Block { + if (rawBlock.hash == null) { + throw new Error('Tried to parse raw block but hash was null'); + } + if (rawBlock.number == null) { + throw new Error('Tried to parse raw block but number was null'); + } + + const block = new Block(); + block.hash = rawBlock.hash; + block.number = rawBlock.number; + // Block timestamps are in seconds, but we use milliseconds everywhere else. + block.timestamp = rawBlock.timestamp * MILLISECONDS_PER_SECOND; + return block; +} + +/** + * Parses a raw transaction and returns a Transaction entity. + * @param rawBlock a raw transaction (e.g. returned from web3-wrapper). + */ +export function parseTransaction(rawTransaction: EthTransaction): Transaction { + if (rawTransaction.blockHash == null) { + throw new Error('Tried to parse raw transaction but blockHash was null'); + } + if (rawTransaction.blockNumber == null) { + throw new Error('Tried to parse raw transaction but blockNumber was null'); + } + + const tx = new Transaction(); + tx.transactionHash = rawTransaction.hash; + tx.blockHash = rawTransaction.blockHash; + tx.blockNumber = rawTransaction.blockNumber; + + tx.gasUsed = new BigNumber(rawTransaction.gas); + tx.gasPrice = rawTransaction.gasPrice; + + return tx; +} diff --git a/packages/pipeline/src/scripts/pull_competing_dex_trades.ts b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts new file mode 100644 index 000000000..4e4c12dd0 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_competing_dex_trades.ts @@ -0,0 +1,51 @@ +// tslint:disable:no-console +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { BloxySource } from '../data_sources/bloxy'; +import { DexTrade } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBloxyTrades } from '../parsers/bloxy'; +import { handleError } from '../utils'; + +// Number of trades to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getAndSaveTrades(); + process.exit(0); +})().catch(handleError); + +async function getAndSaveTrades(): Promise<void> { + const apiKey = process.env.BLOXY_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: BLOXY_API_KEY'); + } + const bloxySource = new BloxySource(apiKey); + const tradesRepository = connection.getRepository(DexTrade); + const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository); + console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`); + console.log('Getting latest dex trades...'); + const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp); + console.log(`Parsing ${rawTrades.length} trades...`); + const trades = parseBloxyTrades(rawTrades); + console.log(`Saving ${trades.length} trades...`); + await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) }); + console.log('Done saving trades.'); +} + +async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> { + if ((await tradesRepository.count()) === 0) { + return 0; + } + const response = (await connection.query( + 'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1', + )) as Array<{ tx_timestamp: number }>; + if (response.length === 0) { + return 0; + } + return response[0].tx_timestamp; +} diff --git a/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts new file mode 100644 index 000000000..7868e9c5a --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ddex_orderbook_snapshots.ts @@ -0,0 +1,55 @@ +import { logUtils } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseDdexOrders } from '../parsers/ddex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +// Number of markets to retrieve orderbooks for at once. +const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50; + +// Delay between market orderbook requests. +const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const ddexSource = new DdexSource(); + const markets = await ddexSource.getActiveMarketsAsync(); + for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) { + await Promise.all( + marketsChunk.map(async (market: DdexMarket) => getAndSaveMarketOrderbook(ddexSource, market)), + ); + await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY)); + } + process.exit(0); +})().catch(handleError); + +/** + * Retrieve orderbook from Ddex API for a given market. Parse orders and insert + * them into our database. + * @param ddexSource Data source which can query Ddex API. + * @param market Object from Ddex API containing market data. + */ +async function getAndSaveMarketOrderbook(ddexSource: DdexSource, market: DdexMarket): Promise<void> { + const orderBook = await ddexSource.getMarketOrderbookAsync(market.id); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.id}: Parsing orders.`); + const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.id}: Saving ${orders.length} orders.`); + const TokenOrderRepository = connection.getRepository(TokenOrder); + await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.id}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts new file mode 100644 index 000000000..b7bd51f08 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -0,0 +1,80 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import * as Parallel from 'async-parallel'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { Web3Source } from '../data_sources/web3'; +import { Block } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseBlock } from '../parsers/web3'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; + +// Number of blocks to save at once. +const BATCH_SAVE_SIZE = 1000; +// Maximum number of requests to send at once. +const MAX_CONCURRENCY = 10; +// Maximum number of blocks to query for at once. This is also the maximum +// number of blocks we will hold in memory prior to being saved to the database. +const MAX_BLOCKS_PER_QUERY = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: `${INFURA_ROOT_URL}/${process.env.INFURA_API_KEY}`, + }); + const web3Source = new Web3Source(provider); + await getAllMissingBlocks(web3Source); + process.exit(0); +})().catch(handleError); + +interface MissingBlocksResponse { + block_number: string; +} + +async function getAllMissingBlocks(web3Source: Web3Source): Promise<void> { + const blocksRepository = connection.getRepository(Block); + let fromBlock = EXCHANGE_START_BLOCK; + while (true) { + const blockNumbers = await getMissingBlockNumbers(fromBlock); + if (blockNumbers.length === 0) { + // There are no more missing blocks. We're done. + break; + } + await getAndSaveBlocks(web3Source, blocksRepository, blockNumbers); + fromBlock = Math.max(...blockNumbers) + 1; + } + const totalBlocks = await blocksRepository.count(); + console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); +} + +async function getMissingBlockNumbers(fromBlock: number): Promise<number[]> { + console.log(`Checking for missing blocks starting at ${fromBlock}...`); + const response = (await connection.query( + 'SELECT DISTINCT(block_number) FROM raw.exchange_fill_events WHERE block_number NOT IN (SELECT number FROM raw.blocks) AND block_number >= $1 ORDER BY block_number ASC LIMIT $2', + [fromBlock, MAX_BLOCKS_PER_QUERY], + )) as MissingBlocksResponse[]; + const blockNumberStrings = R.pluck('block_number', response); + const blockNumbers = R.map(parseInt, blockNumberStrings); + console.log(`Found ${blockNumbers.length} missing blocks in the given range.`); + return blockNumbers; +} + +async function getAndSaveBlocks( + web3Source: Web3Source, + blocksRepository: Repository<Block>, + blockNumbers: number[], +): Promise<void> { + console.log(`Getting block data for ${blockNumbers.length} blocks...`); + Parallel.setConcurrency(MAX_CONCURRENCY); + const rawBlocks = await Parallel.map(blockNumbers, async (blockNumber: number) => + web3Source.getBlockInfoAsync(blockNumber), + ); + console.log(`Parsing ${rawBlocks.length} blocks...`); + const blocks = R.map(parseBlock, rawBlocks); + console.log(`Saving ${blocks.length} blocks...`); + await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); +} diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts new file mode 100644 index 000000000..80abbb8b0 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -0,0 +1,136 @@ +// tslint:disable:no-console +import { web3Factory } from '@0x/dev-utils'; +import R = require('ramda'); +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; +import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeEvent, ExchangeFillEvent } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseExchangeCancelEvents, parseExchangeCancelUpToEvents, parseExchangeFillEvents } from '../parsers/events'; +import { EXCHANGE_START_BLOCK, handleError, INFURA_ROOT_URL } from '../utils'; + +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const provider = web3Factory.getRpcProvider({ + rpcUrl: INFURA_ROOT_URL, + }); + const eventsSource = new ExchangeEventsSource(provider, 1); + await getFillEventsAsync(eventsSource); + await getCancelEventsAsync(eventsSource); + await getCancelUpToEventsAsync(eventsSource); + process.exit(0); +})().catch(handleError); + +async function getFillEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { + console.log('Checking existing fill events...'); + const repository = connection.getRepository(ExchangeFillEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting fill events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getFillEventsAsync(startBlock); + console.log('Parsing fill events...'); + const events = parseExchangeFillEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total fill events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { + console.log('Checking existing cancel events...'); + const repository = connection.getRepository(ExchangeCancelEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting cancel events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelEventsAsync(startBlock); + console.log('Parsing cancel events...'); + const events = parseExchangeCancelEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total cancel events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +async function getCancelUpToEventsAsync(eventsSource: ExchangeEventsSource): Promise<void> { + console.log('Checking existing CancelUpTo events...'); + const repository = connection.getRepository(ExchangeCancelUpToEvent); + const startBlock = await getStartBlockAsync(repository); + console.log(`Getting CancelUpTo events starting at ${startBlock}...`); + const eventLogs = await eventsSource.getCancelUpToEventsAsync(startBlock); + console.log('Parsing CancelUpTo events...'); + const events = parseExchangeCancelUpToEvents(eventLogs); + console.log(`Retrieved and parsed ${events.length} total CancelUpTo events.`); + await saveEventsAsync(startBlock === EXCHANGE_START_BLOCK, repository, events); +} + +const tableNameRegex = /^[a-zA-Z_]*$/; + +async function getStartBlockAsync<T extends ExchangeEvent>(repository: Repository<T>): Promise<number> { + const fillEventCount = await repository.count(); + if (fillEventCount === 0) { + console.log(`No existing ${repository.metadata.name}s found.`); + return EXCHANGE_START_BLOCK; + } + const tableName = repository.metadata.tableName; + if (!tableNameRegex.test(tableName)) { + throw new Error(`Unexpected special character in table name: ${tableName}`); + } + const queryResult = await connection.query( + `SELECT block_number FROM raw.${tableName} ORDER BY block_number DESC LIMIT 1`, + ); + const lastKnownBlock = queryResult[0].block_number; + return lastKnownBlock - START_BLOCK_OFFSET; +} + +async function saveEventsAsync<T extends ExchangeEvent>( + isInitialPull: boolean, + repository: Repository<T>, + events: T[], +): Promise<void> { + console.log(`Saving ${repository.metadata.name}s...`); + if (isInitialPull) { + // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE + // each. + for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { + await repository.insert(eventsBatch); + } + } else { + // If we possibly have some overlap where we need to update some + // existing events, we need to use our workaround/fallback. + await saveIndividuallyWithFallbackAsync(repository, events); + } + const totalEvents = await repository.count(); + console.log(`Done saving events. There are now ${totalEvents} total ${repository.metadata.name}s.`); +} + +async function saveIndividuallyWithFallbackAsync<T extends ExchangeEvent>( + repository: Repository<T>, + events: T[], +): Promise<void> { + // Note(albrow): This is a temporary hack because `save` is not working as + // documented and is causing a foreign key constraint violation. Hopefully + // can remove later because this "poor man's upsert" implementation operates + // on one event at a time and is therefore much slower. + for (const event of events) { + try { + // First try an insert. + await repository.insert(event); + } catch { + // If it fails, assume it was a foreign key constraint error and try + // doing an update instead. + // Note(albrow): Unfortunately the `as any` hack here seems + // required. I can't figure out how to convince the type-checker + // that the criteria and the entity itself are the correct type for + // the given repository. If we can remove the `save` hack then this + // will probably no longer be necessary. + await repository.update( + { + contractAddress: event.contractAddress, + blockNumber: event.blockNumber, + logIndex: event.logIndex, + } as any, + event as any, + ); + } + } +} diff --git a/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts new file mode 100644 index 000000000..6979cd10e --- /dev/null +++ b/packages/pipeline/src/scripts/pull_ohlcv_cryptocompare.ts @@ -0,0 +1,101 @@ +// tslint:disable:no-console +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CryptoCompareOHLCVSource } from '../data_sources/ohlcv_external/crypto_compare'; +import { OHLCVExternal } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { OHLCVMetadata, parseRecords } from '../parsers/ohlcv_external/crypto_compare'; +import { handleError } from '../utils'; +import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_trading_pairs'; + +const SOURCE_NAME = 'CryptoCompare'; +const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers +const ONE_SECOND = 1000; + +const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers +const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare +const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime(); + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const repository = connection.getRepository(OHLCVExternal); + const source = new CryptoCompareOHLCVSource(MAX_CONCURRENT_REQUESTS); + + const jobTime = new Date().getTime(); + const tradingPairs = await fetchOHLCVTradingPairsAsync(connection, SOURCE_NAME, EARLIEST_BACKFILL_TIME); + console.log(`Starting ${tradingPairs.length} job(s) to scrape Crypto Compare for OHLCV records...`); + + const fetchAndSavePromises = tradingPairs.map(async pair => { + const pairs = source.generateBackfillIntervals(pair); + return fetchAndSaveAsync(source, repository, jobTime, pairs); + }); + await Promise.all(fetchAndSavePromises); + console.log(`Finished scraping OHLCV records from Crypto Compare, exiting...`); + process.exit(0); +})().catch(handleError); + +async function fetchAndSaveAsync( + source: CryptoCompareOHLCVSource, + repository: Repository<OHLCVExternal>, + jobTime: number, + pairs: TradingPair[], +): Promise<void> { + const sortAscTimestamp = (a: TradingPair, b: TradingPair): number => { + if (a.latestSavedTime < b.latestSavedTime) { + return -1; + } else if (a.latestSavedTime > b.latestSavedTime) { + return 1; + } else { + return 0; + } + }; + pairs.sort(sortAscTimestamp); + + let i = 0; + while (i < pairs.length) { + const pair = pairs[i]; + if (pair.latestSavedTime > TWO_HOURS_AGO) { + break; + } + const rawRecords = await source.getHourlyOHLCVAsync(pair); + const records = rawRecords.filter(rec => { + return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime; + }); // Crypto Compare can take ~30mins to finalise records + if (records.length === 0) { + console.log(`No more records, stopping task for ${JSON.stringify(pair)}`); + break; + } + const metadata: OHLCVMetadata = { + exchange: source.default_exchange, + fromSymbol: pair.fromSymbol, + toSymbol: pair.toSymbol, + source: SOURCE_NAME, + observedTimestamp: jobTime, + interval: source.intervalBetweenRecords, + }; + const parsedRecords = parseRecords(records, metadata); + try { + await saveRecordsAsync(repository, parsedRecords); + i++; + } catch (err) { + console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`); + break; + } + } + return Promise.resolve(); +} + +async function saveRecordsAsync(repository: Repository<OHLCVExternal>, records: OHLCVExternal[]): Promise<void> { + const metadata = [ + records[0].fromSymbol, + records[0].toSymbol, + new Date(records[0].startTime), + new Date(records[records.length - 1].endTime), + ]; + + console.log(`Saving ${records.length} records to ${repository.metadata.name}... ${JSON.stringify(metadata)}`); + await repository.save(records); +} diff --git a/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts new file mode 100644 index 000000000..bae1fbede --- /dev/null +++ b/packages/pipeline/src/scripts/pull_paradex_orderbook_snapshots.ts @@ -0,0 +1,87 @@ +import { logUtils } from '@0x/utils'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { + PARADEX_SOURCE, + ParadexActiveMarketsResponse, + ParadexMarket, + ParadexSource, + ParadexTokenInfoResponse, +} from '../data_sources/paradex'; +import { TokenOrderbookSnapshot as TokenOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseParadexOrders } from '../parsers/paradex_orders'; +import { handleError } from '../utils'; + +// Number of orders to save at once. +const BATCH_SAVE_SIZE = 1000; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + const apiKey = process.env.PARADEX_DATA_PIPELINE_API_KEY; + if (apiKey === undefined) { + throw new Error('Missing required env var: PARADEX_DATA_PIPELINE_API_KEY'); + } + const paradexSource = new ParadexSource(apiKey); + const markets = await paradexSource.getActiveMarketsAsync(); + const tokenInfoResponse = await paradexSource.getTokenInfoAsync(); + const extendedMarkets = addTokenAddresses(markets, tokenInfoResponse); + await Promise.all( + extendedMarkets.map(async (market: ParadexMarket) => getAndSaveMarketOrderbook(paradexSource, market)), + ); + process.exit(0); +})().catch(handleError); + +/** + * Extend the default ParadexMarket objects with token addresses. + * @param markets An array of ParadexMarket objects. + * @param tokenInfoResponse An array of ParadexTokenInfo containing the addresses. + */ +function addTokenAddresses( + markets: ParadexActiveMarketsResponse, + tokenInfoResponse: ParadexTokenInfoResponse, +): ParadexMarket[] { + const symbolAddressMapping = new Map<string, string>(); + tokenInfoResponse.forEach(tokenInfo => symbolAddressMapping.set(tokenInfo.symbol, tokenInfo.address)); + + markets.forEach((market: ParadexMarket) => { + if (symbolAddressMapping.has(market.baseToken)) { + market.baseTokenAddress = symbolAddressMapping.get(market.baseToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.baseToken}: No address found.`); + } + + if (symbolAddressMapping.has(market.quoteToken)) { + market.quoteTokenAddress = symbolAddressMapping.get(market.quoteToken); + } else { + market.quoteTokenAddress = ''; + logUtils.warn(`${market.quoteToken}: No address found.`); + } + }); + return markets; +} + +/** + * Retrieve orderbook from Paradex API for a given market. Parse orders and insert + * them into our database. + * @param paradexSource Data source which can query the Paradex API. + * @param market Object from the Paradex API with information about the market in question. + */ +async function getAndSaveMarketOrderbook(paradexSource: ParadexSource, market: ParadexMarket): Promise<void> { + const paradexOrderbookResponse = await paradexSource.getMarketOrderbookAsync(market.symbol); + const observedTimestamp = Date.now(); + + logUtils.log(`${market.symbol}: Parsing orders.`); + const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE); + + if (orders.length > 0) { + logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`); + const tokenOrderRepository = connection.getRepository(TokenOrder); + await tokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) }); + } else { + logUtils.log(`${market.symbol}: 0 orders to save.`); + } +} diff --git a/packages/pipeline/src/scripts/pull_radar_relay_orders.ts b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts new file mode 100644 index 000000000..6c18bcaef --- /dev/null +++ b/packages/pipeline/src/scripts/pull_radar_relay_orders.ts @@ -0,0 +1,53 @@ +// tslint:disable:no-console +import { HttpClient } from '@0x/connect'; +import * as R from 'ramda'; +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection, EntityManager } from 'typeorm'; + +import { createObservedTimestampForOrder, SraOrder } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseSraOrders } from '../parsers/sra_orders'; +import { handleError } from '../utils'; + +const RADAR_RELAY_URL = 'https://api.radarrelay.com/0x/v2'; +const ORDERS_PER_PAGE = 10000; // Number of orders to get per request. + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getOrderbookAsync(); + process.exit(0); +})().catch(handleError); + +async function getOrderbookAsync(): Promise<void> { + console.log('Getting all orders...'); + const connectClient = new HttpClient(RADAR_RELAY_URL); + const rawOrders = await connectClient.getOrdersAsync({ + perPage: ORDERS_PER_PAGE, + }); + console.log(`Got ${rawOrders.records.length} orders.`); + console.log('Parsing orders...'); + // Parse the sra orders, then add source url to each. + const orders = R.pipe(parseSraOrders, R.map(setSourceUrl(RADAR_RELAY_URL)))(rawOrders); + // Save all the orders and update the observed time stamps in a single + // transaction. + console.log('Saving orders and updating timestamps...'); + await connection.transaction(async (manager: EntityManager): Promise<void> => { + for (const order of orders) { + await manager.save(SraOrder, order); + const observedTimestamp = createObservedTimestampForOrder(order); + await manager.save(observedTimestamp); + } + }); +} + +const sourceUrlProp = R.lensProp('sourceUrl'); + +/** + * Sets the source url for a single order. Returns a new order instead of + * mutating the given one. + */ +const setSourceUrl = R.curry((sourceURL: string, order: SraOrder): SraOrder => { + return R.set(sourceUrlProp, sourceURL, order); +}); diff --git a/packages/pipeline/src/scripts/pull_trusted_tokens.ts b/packages/pipeline/src/scripts/pull_trusted_tokens.ts new file mode 100644 index 000000000..1befc4437 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_trusted_tokens.ts @@ -0,0 +1,52 @@ +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { MetamaskTrustedTokenMeta, TrustedTokenSource, ZeroExTrustedTokenMeta } from '../data_sources/trusted_tokens'; +import { TokenMetadata } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseMetamaskTrustedTokens, parseZeroExTrustedTokens } from '../parsers/token_metadata'; +import { handleError } from '../utils'; + +const METAMASK_TRUSTED_TOKENS_URL = + 'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/d45916c533116510cc8e9e048a8b5fc3732a6b6d/contract-map.json'; + +const ZEROEX_TRUSTED_TOKENS_URL = 'https://website-api.0xproject.com/tokens'; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getMetamaskTrustedTokens(); + await getZeroExTrustedTokens(); + process.exit(0); +})().catch(handleError); + +async function getMetamaskTrustedTokens(): Promise<void> { + // tslint:disable-next-line:no-console + console.log('Getting latest metamask trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TokenMetadata); + const trustedTokensSource = new TrustedTokenSource<Map<string, MetamaskTrustedTokenMeta>>( + METAMASK_TRUSTED_TOKENS_URL, + ); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseMetamaskTrustedTokens(resp); + // tslint:disable-next-line:no-console + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line:no-console + console.log('Done saving metamask trusted tokens.'); +} + +async function getZeroExTrustedTokens(): Promise<void> { + // tslint:disable-next-line:no-console + console.log('Getting latest 0x trusted tokens list ...'); + const trustedTokensRepository = connection.getRepository(TokenMetadata); + const trustedTokensSource = new TrustedTokenSource<ZeroExTrustedTokenMeta[]>(ZEROEX_TRUSTED_TOKENS_URL); + const resp = await trustedTokensSource.getTrustedTokenMetaAsync(); + const trustedTokens = parseZeroExTrustedTokens(resp); + // tslint:disable-next-line:no-console + console.log('Saving metamask trusted tokens list'); + await trustedTokensRepository.save(trustedTokens); + // tslint:disable-next-line:no-console + console.log('Done saving metamask trusted tokens.'); +} diff --git a/packages/pipeline/src/scripts/update_relayer_info.ts b/packages/pipeline/src/scripts/update_relayer_info.ts new file mode 100644 index 000000000..f8918728d --- /dev/null +++ b/packages/pipeline/src/scripts/update_relayer_info.ts @@ -0,0 +1,33 @@ +// tslint:disable:no-console +import 'reflect-metadata'; +import { Connection, ConnectionOptions, createConnection } from 'typeorm'; + +import { RelayerRegistrySource } from '../data_sources/relayer-registry'; +import { Relayer } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { parseRelayers } from '../parsers/relayer_registry'; +import { handleError } from '../utils'; + +// NOTE(albrow): We need to manually update this URL for now. Fix this when we +// have the relayer-registry behind semantic versioning. +const RELAYER_REGISTRY_URL = + 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/4701c85677d161ea729a466aebbc1826c6aa2c0b/relayers.json'; + +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + await getRelayers(); + process.exit(0); +})().catch(handleError); + +async function getRelayers(): Promise<void> { + console.log('Getting latest relayer info...'); + const relayerRepository = connection.getRepository(Relayer); + const relayerSource = new RelayerRegistrySource(RELAYER_REGISTRY_URL); + const relayersResp = await relayerSource.getRelayerInfoAsync(); + const relayers = parseRelayers(relayersResp); + console.log('Saving relayer info...'); + await relayerRepository.save(relayers); + console.log('Done saving relayer info.'); +} diff --git a/packages/pipeline/src/types.ts b/packages/pipeline/src/types.ts new file mode 100644 index 000000000..e02b42a40 --- /dev/null +++ b/packages/pipeline/src/types.ts @@ -0,0 +1,2 @@ +export type AssetType = 'erc20' | 'erc721'; +export type OrderType = 'bid' | 'ask'; diff --git a/packages/pipeline/src/utils/constants.ts b/packages/pipeline/src/utils/constants.ts new file mode 100644 index 000000000..56f3e82d8 --- /dev/null +++ b/packages/pipeline/src/utils/constants.ts @@ -0,0 +1,3 @@ +// Block number when the Exchange contract was deployed to mainnet. +export const EXCHANGE_START_BLOCK = 6271590; +export const INFURA_ROOT_URL = 'https://mainnet.infura.io'; diff --git a/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts new file mode 100644 index 000000000..9d3ef2fba --- /dev/null +++ b/packages/pipeline/src/utils/get_ohlcv_trading_pairs.ts @@ -0,0 +1,92 @@ +import { fetchAsync } from '@0x/utils'; +import * as R from 'ramda'; +import { Connection } from 'typeorm'; + +export interface TradingPair { + fromSymbol: string; + toSymbol: string; + latestSavedTime: number; +} + +const COINLIST_API = 'https://min-api.cryptocompare.com/data/all/coinlist?BuiltOn=7605'; + +interface CryptoCompareCoinListResp { + Data: Map<string, CryptoCompareCoin>; +} + +interface CryptoCompareCoin { + Symbol: string; + BuiltOn: string; + SmartContractAddress: string; +} + +const TO_CURRENCIES = ['USD', 'EUR', 'ETH', 'USDT']; +const ETHEREUM_IDENTIFIER = '7605'; +const HTTP_OK_STATUS = 200; +/** + * Get trading pairs with latest scraped time for OHLCV records + * @param conn a typeorm Connection to postgres + */ +export async function fetchOHLCVTradingPairsAsync( + conn: Connection, + source: string, + earliestBackfillTime: number, +): Promise<TradingPair[]> { + // fetch existing ohlcv records + const latestTradingPairs: Array<{ + from_symbol: string; + to_symbol: string; + latest: string; + }> = await conn.query(`SELECT + MAX(end_time) as latest, + from_symbol, + to_symbol + FROM raw.ohlcv_external + GROUP BY from_symbol, to_symbol;`); + + const latestTradingPairsIndex: { [fromSym: string]: { [toSym: string]: number } } = {}; + latestTradingPairs.forEach(pair => { + const latestIndex: { [toSym: string]: number } = latestTradingPairsIndex[pair.from_symbol] || {}; + latestIndex[pair.to_symbol] = parseInt(pair.latest, 10); // tslint:disable-line:custom-no-magic-numbers + latestTradingPairsIndex[pair.from_symbol] = latestIndex; + }); + + // get token symbols used by Crypto Compare + const allCoinsResp = await fetchAsync(COINLIST_API); + if (allCoinsResp.status !== HTTP_OK_STATUS) { + return []; + } + const allCoins: CryptoCompareCoinListResp = await allCoinsResp.json(); + const erc20CoinsIndex: Map<string, string> = new Map(); + Object.entries(allCoins.Data).forEach(pair => { + const [symbol, coinData] = pair; + if (coinData.BuiltOn === ETHEREUM_IDENTIFIER && coinData.SmartContractAddress !== 'N/A') { + erc20CoinsIndex.set(coinData.SmartContractAddress.toLowerCase(), symbol); + } + }); + + // fetch all tokens that are traded on 0x + const rawTokenAddresses: Array<{ tokenaddress: string }> = await conn.query( + `SELECT DISTINCT(maker_token_address) as tokenaddress FROM raw.exchange_fill_events UNION + SELECT DISTINCT(taker_token_address) as tokenaddress FROM raw.exchange_fill_events`, + ); + const tokenAddresses = R.pluck('tokenaddress', rawTokenAddresses); + + // join token addresses with CC symbols + const allTokenSymbols: string[] = tokenAddresses + .map(tokenAddress => erc20CoinsIndex.get(tokenAddress.toLowerCase()) || '') + .filter(x => x); + + // generate list of all tokens with time of latest existing record OR default earliest time + const allTradingPairCombinations: TradingPair[] = R.chain(sym => { + return TO_CURRENCIES.map(fiat => { + return { + fromSymbol: sym, + toSymbol: fiat, + latestSavedTime: R.path<number>([sym, fiat], latestTradingPairsIndex) || earliestBackfillTime, + }; + }); + }, allTokenSymbols); + + return allTradingPairCombinations; +} diff --git a/packages/pipeline/src/utils/index.ts b/packages/pipeline/src/utils/index.ts new file mode 100644 index 000000000..2096a0a39 --- /dev/null +++ b/packages/pipeline/src/utils/index.ts @@ -0,0 +1,38 @@ +import { BigNumber } from '@0x/utils'; +export * from './transformers'; +export * from './constants'; + +/** + * If the given BigNumber is not null, returns the string representation of that + * number. Otherwise, returns null. + * @param n The number to convert. + */ +export function bigNumbertoStringOrNull(n: BigNumber): string | null { + if (n == null) { + return null; + } + return n.toString(); +} + +/** + * Logs an error by intelligently checking for `message` and `stack` properties. + * Intended for use with top-level immediately invoked asynchronous functions. + * @param e the error to log. + */ +export function handleError(e: any): void { + if (e.message != null) { + // tslint:disable-next-line:no-console + console.error(e.message); + } else { + // tslint:disable-next-line:no-console + console.error('Unknown error'); + } + if (e.stack != null) { + // tslint:disable-next-line:no-console + console.error(e.stack); + } else { + // tslint:disable-next-line:no-console + console.error('(No stack trace)'); + } + process.exit(1); +} diff --git a/packages/pipeline/src/utils/transformers/big_number.ts b/packages/pipeline/src/utils/transformers/big_number.ts new file mode 100644 index 000000000..5f2e4d565 --- /dev/null +++ b/packages/pipeline/src/utils/transformers/big_number.ts @@ -0,0 +1,16 @@ +import { BigNumber } from '@0x/utils'; +import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer'; + +export class BigNumberTransformer implements ValueTransformer { + // tslint:disable-next-line:prefer-function-over-method + public to(value: BigNumber | null): string | null { + return value === null ? null : value.toString(); + } + + // tslint:disable-next-line:prefer-function-over-method + public from(value: string | null): BigNumber | null { + return value === null ? null : new BigNumber(value); + } +} + +export const bigNumberTransformer = new BigNumberTransformer(); diff --git a/packages/pipeline/src/utils/transformers/index.ts b/packages/pipeline/src/utils/transformers/index.ts new file mode 100644 index 000000000..232c1c5de --- /dev/null +++ b/packages/pipeline/src/utils/transformers/index.ts @@ -0,0 +1,2 @@ +export * from './big_number'; +export * from './number_to_bigint'; diff --git a/packages/pipeline/src/utils/transformers/number_to_bigint.ts b/packages/pipeline/src/utils/transformers/number_to_bigint.ts new file mode 100644 index 000000000..85560c1f0 --- /dev/null +++ b/packages/pipeline/src/utils/transformers/number_to_bigint.ts @@ -0,0 +1,27 @@ +import { BigNumber } from '@0x/utils'; +import { ValueTransformer } from 'typeorm/decorator/options/ValueTransformer'; + +const decimalRadix = 10; + +// Can be used to convert a JavaScript number type to a Postgres bigint type and +// vice versa. By default TypeORM will silently convert number types to string +// if the corresponding Postgres type is bigint. See +// https://github.com/typeorm/typeorm/issues/2400 for more information. +export class NumberToBigIntTransformer implements ValueTransformer { + // tslint:disable-next-line:prefer-function-over-method + public to(value: number): string { + return value.toString(); + } + + // tslint:disable-next-line:prefer-function-over-method + public from(value: string): number { + if (new BigNumber(value).greaterThan(Number.MAX_SAFE_INTEGER)) { + throw new Error( + `Attempted to convert PostgreSQL bigint value (${value}) to JavaScript number type but it is too big to safely convert`, + ); + } + return Number.parseInt(value, decimalRadix); + } +} + +export const numberToBigIntTransformer = new NumberToBigIntTransformer(); |