aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/data_sources/bloxy/index.ts20
-rw-r--r--packages/pipeline/src/entities/dex_trade.ts2
-rw-r--r--packages/pipeline/src/parsers/bloxy/index.ts1
3 files changed, 18 insertions, 5 deletions
diff --git a/packages/pipeline/src/data_sources/bloxy/index.ts b/packages/pipeline/src/data_sources/bloxy/index.ts
index 94468d25a..22ab195b3 100644
--- a/packages/pipeline/src/data_sources/bloxy/index.ts
+++ b/packages/pipeline/src/data_sources/bloxy/index.ts
@@ -1,6 +1,8 @@
import axios from 'axios';
import * as R from 'ramda';
+import { logUtils } from '@0x/utils';
+
// URL to use for getting dex trades from Bloxy.
export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades';
// Number of trades to get at once. Must be less than or equal to MAX_OFFSET.
@@ -26,6 +28,7 @@ export interface BloxyTrade {
tx_time: string;
tx_date: string;
tx_sender: string;
+ tradeIndex: string;
smart_contract_id: number;
smart_contract_address: string;
contract_type: string;
@@ -73,6 +76,15 @@ export class BloxySource {
* already been seen.
*/
public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
+ const allTrades = await this._scrapeAllDexTradesAsync(lastSeenTimestamp);
+ logUtils.log(`Removing duplicates from ${allTrades.length} entries`);
+ const uniqueTrades = R.uniqBy((trade: BloxyTrade) => `${trade.tradeIndex}-${trade.tx_hash}`, allTrades);
+ logUtils.log(`Removed ${allTrades.length - uniqueTrades.length} duplicate entries`);
+ return uniqueTrades;
+ }
+
+ // Potentially returns duplicate trades.
+ private async _scrapeAllDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
let allTrades: BloxyTrade[] = [];
// Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive)
@@ -90,7 +102,7 @@ export class BloxySource {
if (trades.length === 0) {
// There are no more trades left for the days we are querying.
// This means we are done.
- return filterDuplicateTrades(allTrades);
+ return allTrades;
}
const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades));
allTrades = allTrades.concat(sortedTrades);
@@ -100,10 +112,10 @@ export class BloxySource {
if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) {
// We are at the point where we have already seen trades for the
// timestamp range that is being returned. We're done.
- return filterDuplicateTrades(allTrades);
+ return allTrades;
}
}
- return filterDuplicateTrades(allTrades);
+ return allTrades;
}
private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> {
@@ -129,5 +141,3 @@ function getDaysSinceTimestamp(timestamp: number): number {
const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay;
return Math.ceil(daysSinceTimestamp);
}
-
-const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash);
diff --git a/packages/pipeline/src/entities/dex_trade.ts b/packages/pipeline/src/entities/dex_trade.ts
index 9d288cb51..93dcaf238 100644
--- a/packages/pipeline/src/entities/dex_trade.ts
+++ b/packages/pipeline/src/entities/dex_trade.ts
@@ -9,6 +9,8 @@ export class DexTrade {
public sourceUrl!: string;
@PrimaryColumn({ name: 'tx_hash' })
public txHash!: string;
+ @PrimaryColumn({ name: 'trade_index' })
+ public tradeIndex!: string;
@Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
public txTimestamp!: number;
diff --git a/packages/pipeline/src/parsers/bloxy/index.ts b/packages/pipeline/src/parsers/bloxy/index.ts
index caa55d289..3d797aff0 100644
--- a/packages/pipeline/src/parsers/bloxy/index.ts
+++ b/packages/pipeline/src/parsers/bloxy/index.ts
@@ -21,6 +21,7 @@ export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade {
const dexTrade = new DexTrade();
dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL;
dexTrade.txHash = rawTrade.tx_hash;
+ dexTrade.tradeIndex = rawTrade.tradeIndex;
dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime();
dexTrade.txDate = rawTrade.tx_date;
dexTrade.txSender = rawTrade.tx_sender;