diff options
Diffstat (limited to 'packages/pipeline/src/scripts/pull_copper.ts')
-rw-r--r-- | packages/pipeline/src/scripts/pull_copper.ts | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts new file mode 100644 index 000000000..69814f209 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_copper.ts @@ -0,0 +1,129 @@ +// tslint:disable:no-console +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper'; +import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { + CopperSearchResponse, + parseActivities, + parseActivityTypes, + parseCustomFields, + parseLeads, + parseOpportunities, +} from '../parsers/copper'; +import { handleError } from '../utils'; +const ONE_SECOND = 1000; +const COPPER_RATE_LIMIT = 10; +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + + const accessToken = process.env.COPPER_ACCESS_TOKEN; + const userEmail = process.env.COPPER_USER_EMAIL; + if (accessToken === undefined || userEmail === undefined) { + throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL'); + } + const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail); + + const fetchPromises = [ + fetchAndSaveLeadsAsync(source), + fetchAndSaveOpportunitiesAsync(source), + fetchAndSaveActivitiesAsync(source), + fetchAndSaveCustomFieldsAsync(source), + fetchAndSaveActivityTypesAsync(source), + ]; + fetchPromises.forEach(async fn => { + await fn; + }); +})().catch(handleError); + +async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> { + const repository = connection.getRepository(CopperLead); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads'); + console.log(`Fetching Copper leads starting from ${startTime}...`); + await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository); +} + +async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> { + const repository = connection.getRepository(CopperOpportunity); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities'); + console.log(`Fetching Copper opportunities starting from ${startTime}...`); + await fetchAndSaveAsync( + CopperEndpoint.Opportunities, + source, + startTime, + { sort_by: 'name' }, + parseOpportunities, + repository, + ); +} + +async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> { + const repository = connection.getRepository(CopperActivity); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities'); + const searchParams = { + minimum_activity_date: Math.floor(startTime / ONE_SECOND), + }; + console.log(`Fetching Copper activities starting from ${startTime}...`); + await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository); +} + +async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> { + const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`); + if (R.isEmpty(queryResult)) { + return 0; + } else { + return queryResult[0]._max; + } +} + +// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological +// order and not missing any records, we are scraping all available pages. If Copper data gets larger, +// it would make sense to search for and start filling from the first page that contains a new record. +// This search would increase our network calls and is not efficient to implement with our current small volume +// of Copper records. +async function fetchAndSaveAsync<T extends CopperSearchResponse, E>( + endpoint: CopperEndpoint, + source: CopperSource, + startTime: number, + searchParams: CopperSearchParams, + parseFn: (recs: T[]) => E[], + repository: Repository<E>, +): Promise<void> { + let saved = 0; + const numPages = await source.fetchNumberOfPagesAsync(endpoint); + try { + for (let i = numPages; i > 0; i--) { + console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`); + const raw = await source.fetchSearchResultsAsync<T>(endpoint, { + ...searchParams, + page_number: i, + }); + const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime); + const parsed = parseFn(newRecords); + await repository.save<any>(parsed); + saved += newRecords.length; + } + } catch (err) { + console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`); + } finally { + console.log(`Saved ${saved} items from ${endpoint}, done.`); + } +} + +async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> { + console.log(`Fetching Copper activity types...`); + const activityTypes = await source.fetchActivityTypesAsync(); + const repository = connection.getRepository(CopperActivityType); + await repository.save(parseActivityTypes(activityTypes)); +} + +async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> { + console.log(`Fetching Copper custom fields...`); + const customFields = await source.fetchCustomFieldsAsync(); + const repository = connection.getRepository(CopperCustomField); + await repository.save(parseCustomFields(customFields)); +} |