1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
// tslint:disable:no-console
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper';
import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities';
import * as ormConfig from '../ormconfig';
import {
CopperSearchResponse,
parseActivities,
parseActivityTypes,
parseCustomFields,
parseLeads,
parseOpportunities,
} from '../parsers/copper';
import { handleError } from '../utils';
const ONE_SECOND = 1000;
const COPPER_RATE_LIMIT = 10;
let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
const accessToken = process.env.COPPER_ACCESS_TOKEN;
const userEmail = process.env.COPPER_USER_EMAIL;
if (accessToken === undefined || userEmail === undefined) {
throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL');
}
const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail);
const fetchPromises = [
fetchAndSaveLeadsAsync(source),
fetchAndSaveOpportunitiesAsync(source),
fetchAndSaveActivitiesAsync(source),
fetchAndSaveCustomFieldsAsync(source),
fetchAndSaveActivityTypesAsync(source),
];
fetchPromises.forEach(async fn => {
await fn;
});
})().catch(handleError);
async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> {
const repository = connection.getRepository(CopperLead);
const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads');
console.log(`Fetching Copper leads starting from ${startTime}...`);
await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository);
}
async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> {
const repository = connection.getRepository(CopperOpportunity);
const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities');
console.log(`Fetching Copper opportunities starting from ${startTime}...`);
await fetchAndSaveAsync(
CopperEndpoint.Opportunities,
source,
startTime,
{ sort_by: 'name' },
parseOpportunities,
repository,
);
}
async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> {
const repository = connection.getRepository(CopperActivity);
const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities');
const searchParams = {
minimum_activity_date: Math.floor(startTime / ONE_SECOND),
};
console.log(`Fetching Copper activities starting from ${startTime}...`);
await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository);
}
async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> {
const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`);
if (R.isEmpty(queryResult)) {
return 0;
} else {
return queryResult[0]._max;
}
}
// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological
// order and not missing any records, we are scraping all available pages. If Copper data gets larger,
// it would make sense to search for and start filling from the first page that contains a new record.
// This search would increase our network calls and is not efficient to implement with our current small volume
// of Copper records.
async function fetchAndSaveAsync<T extends CopperSearchResponse, E>(
endpoint: CopperEndpoint,
source: CopperSource,
startTime: number,
searchParams: CopperSearchParams,
parseFn: (recs: T[]) => E[],
repository: Repository<E>,
): Promise<void> {
let saved = 0;
const numPages = await source.fetchNumberOfPagesAsync(endpoint);
try {
for (let i = numPages; i > 0; i--) {
console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`);
const raw = await source.fetchSearchResultsAsync<T>(endpoint, {
...searchParams,
page_number: i,
});
const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime);
const parsed = parseFn(newRecords);
await repository.save<any>(parsed);
saved += newRecords.length;
}
} catch (err) {
console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`);
} finally {
console.log(`Saved ${saved} items from ${endpoint}, done.`);
}
}
async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> {
console.log(`Fetching Copper activity types...`);
const activityTypes = await source.fetchActivityTypesAsync();
const repository = connection.getRepository(CopperActivityType);
await repository.save(parseActivityTypes(activityTypes));
}
async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> {
console.log(`Fetching Copper custom fields...`);
const customFields = await source.fetchCustomFieldsAsync();
const repository = connection.getRepository(CopperCustomField);
await repository.save(parseCustomFields(customFields));
}
|