From 8bac655dc1c0e9a133492fe816c67f2c08b3e46b Mon Sep 17 00:00:00 2001 From: fragosti Date: Wed, 30 Jan 2019 09:39:59 -0800 Subject: Stop omitting trades because of duplicate tx hashes --- packages/pipeline/src/data_sources/bloxy/index.ts | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) (limited to 'packages') diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts index 94468d25a..3d9147757 100644 --- a/packages/pipeline/src/data_sources/bloxy/index.ts +++ b/packages/pipeline/src/data_sources/bloxy/index.ts @@ -1,6 +1,7 @@ import axios from 'axios'; import * as R from 'ramda'; +import { logUtils } from '@0x/utils'; // URL to use for getting dex trades from Bloxy. export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades'; // Number of trades to get at once. Must be less than or equal to MAX_OFFSET. @@ -73,6 +74,15 @@ export class BloxySource { * already been seen. */ public async getDexTradesAsync(lastSeenTimestamp: number): Promise { + const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp); + logUtils.log('Removing duplicate entries'); + const uniqueTrades = R.uniqBy(R.toString, allTrades) as BloxyTrade[]; + logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`); + return uniqueTrades; + } + + // Potentially returns duplicate trades. + private async _scrapeAllDexTradesAsync(lastSeenTimestamp: number): Promise { let allTrades: BloxyTrade[] = []; // Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive) @@ -90,7 +100,7 @@ export class BloxySource { if (trades.length === 0) { // There are no more trades left for the days we are querying. // This means we are done. - return filterDuplicateTrades(allTrades); + return allTrades; } const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades)); allTrades = allTrades.concat(sortedTrades); @@ -100,10 +110,10 @@ export class BloxySource { if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) { // We are at the point where we have already seen trades for the // timestamp range that is being returned. We're done. - return filterDuplicateTrades(allTrades); + return allTrades; } } - return filterDuplicateTrades(allTrades); + return allTrades; } private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise { @@ -129,5 +139,3 @@ function getDaysSinceTimestamp(timestamp: number): number { const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay; return Math.ceil(daysSinceTimestamp); } - -const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash); -- cgit