diff options
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r-- | packages/pipeline/src/data_sources/bloxy/index.ts | 20 | ||||
-rw-r--r-- | packages/pipeline/src/entities/dex_trade.ts | 2 | ||||
-rw-r--r-- | packages/pipeline/src/parsers/bloxy/index.ts | 1 |
3 files changed, 18 insertions, 5 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index 94468d25a..22ab195b3 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -1,6 +1,8 @@ import axios from 'axios'; import * as R from 'ramda'; +import { logUtils } from '@0x/utils'; + // URL to use for getting dex trades from Bloxy. export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; // Number of trades to get at once. Must be less than or equal to MAX_OFFSET. @@ -26,6 +28,7 @@ export interface BloxyTrade { tx_time: string; tx_date: string; tx_sender: string; + tradeIndex: string; smart_contract_id: number; smart_contract_address: string; contract_type: string; @@ -73,6 +76,15 @@ export class BloxySource { * already been seen. */ public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> { + const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp); + logUtils.log(`Removing duplicates from ${allTrades.length} entries`); + const uniqueTrades = R.uniqBy((trade: BloxyTrade) => `${trade.tradeIndex}-${trade.tx_hash}`, allTrades); + logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`); + return uniqueTrades; + } + + // Potentially returns duplicate trades. + private async _scrapeAllDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> { let allTrades: BloxyTrade[] = []; // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive) @@ -90,7 +102,7 @@ export class BloxySource { if (trades.length === 0) { // There are no more trades left for the days we are querying. // This means we are done. - return filterDuplicateTrades(allTrades); + return allTrades; } const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades)); allTrades = allTrades.concat(sortedTrades); @@ -100,10 +112,10 @@ export class BloxySource { if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) { // We are at the point where we have already seen trades for the // timestamp range that is being returned. We're done. - return filterDuplicateTrades(allTrades); + return allTrades; } } - return filterDuplicateTrades(allTrades); + return allTrades; } private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> { @@ -129,5 +141,3 @@ function getDaysSinceTimestamp(timestamp: number): number { const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay; return Math.ceil(daysSinceTimestamp); } - -const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash); diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts index 9d288cb51..93dcaf238 100644 --- a/packages/pipeline/src/entities/dex_trade.ts +++ b/packages/pipeline/src/entities/dex_trade.ts @@ -9,6 +9,8 @@ export class DexTrade { public sourceUrl!: string; @PrimaryColumn({ name: 'tx_hash' }) public txHash!: string; + @PrimaryColumn({ name: 'trade_index' }) + public tradeIndex!: string; @Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer }) public txTimestamp!: number; diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts index caa55d289..3d797aff0 100644 --- a/packages/pipeline/src/parsers/bloxy/index.ts +++ b/packages/pipeline/src/parsers/bloxy/index.ts @@ -21,6 +21,7 @@ export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade { const dexTrade = new DexTrade(); dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL; dexTrade.txHash = rawTrade.tx_hash; + dexTrade.tradeIndex = rawTrade.tradeIndex; dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime(); dexTrade.txDate = rawTrade.tx_date; dexTrade.txSender = rawTrade.tx_sender; |