Init Swamp#3
Conversation
|
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
| RUN npm run build | ||
|
|
||
| # Expose the port the app runs on | ||
| EXPOSE 80 |
There was a problem hiding this comment.
This just has to match ENV PORT=... above; can add comment
| "main": "src/index.ts", | ||
| "scripts": { | ||
| "build": "tsc", | ||
| "HACK-comment": "TZ=UTC is necessary because SQLite will not respect system timezone", |
There was a problem hiding this comment.
It seems "//" is "idiomatic" https://stackoverflow.com/questions/14221579/how-do-i-add-comments-to-package-json-for-npm-install
| const sqlEngine = new InMemoryDuckDb(store); | ||
| const swamp = new Swamp(store, metaStore, sqlEngine); | ||
| swamp.initialize(); | ||
| swamp.addLoader(runPostgres, "postgres").withCadence(60 * 60); |
There was a problem hiding this comment.
(Adding naive questions from my first reading to reveal non-obvious names) What's cadence?
| @@ -0,0 +1,74 @@ | |||
| import axios from 'axios'; | |||
There was a problem hiding this comment.
Just the name of this demo API; can kill it https://jsonplaceholder.typicode.com/
| return response.data; | ||
| } catch (error) { | ||
| throw error; | ||
| } |
There was a problem hiding this comment.
What's the purpose of catching and rethrowing the error? Lots of theses try catch blocks
| } | ||
| } | ||
|
|
||
| // eslint-disable-next-line @typescript-eslint/no-unused-vars |
There was a problem hiding this comment.
This lint seems unnecessary. Doesn't it have a setting where you can exclude things that start with _?
| if (cursor.next) { | ||
| return { | ||
| 'page[cursor]': cursor.next, | ||
| 'filter': `greater-than(datetime,${cursor.since})`, |
There was a problem hiding this comment.
Is there an id cursor available? This could fail if there are two records with the same datetime
| cursor: nextCursor, | ||
| }; | ||
| } catch (e) { | ||
| console.log((e as any).response?.data); |
There was a problem hiding this comment.
cmd+f remove all the console.logs
|
|
||
| async function getRecords(apiKey: string, _cursor: Cursor): Promise<{ cursor: Cursor, inserts: Inserts, hasMore: boolean }> { | ||
| const { data, included, cursor } = await getKlaviyoEvents(apiKey, _cursor); | ||
| const events = data.map((event: any) => { |
There was a problem hiding this comment.
Have you tried unknown instead of any?
| const flow = event.attributes.event_properties.$flow; | ||
| const messageId = event.attributes.event_properties.$message; | ||
|
|
||
| const campaignId = flow ? null : messageId; |
| }; | ||
| } | ||
|
|
||
| const INITIAL_CURSOR = { since: '2024-07-30T21:48:59Z' }; |
| export async function runKlaviyo(_secrets: LoaderSecrets, cursor: Cursor | null): Promise<LoaderResponse<Cursor>> { | ||
| const secrets = Secrets.safeParse(_secrets); | ||
| if (secrets.error) { | ||
| console.error(`could not parse secrets: ${secrets.error}`); |
| console.error(`could not parse secrets: ${secrets.error}`); | ||
| throw new Error('failed to parse secrets'); | ||
| } | ||
| const { inserts, hasMore, cursor: newCursor } = await getRecords(secrets.data.apiKey, cursor || INITIAL_CURSOR); |
There was a problem hiding this comment.
| const { inserts, hasMore, cursor: newCursor } = await getRecords(secrets.data.apiKey, cursor || INITIAL_CURSOR); | |
| const { inserts, hasMore, cursor: newCursor } = await getRecords(secrets.data.apiKey, cursor ?? INITIAL_CURSOR); |
Could probably cmd+f the codebase for this
|
|
||
| export type Inserts = Record<string, RecordBatch>; | ||
|
|
||
| export const DuckDBRawType = z.enum(['VARCHAR', 'DOUBLE', 'DATE', 'TIMESTAMP', 'JSON', 'BOOLEAN', 'BIGINT', 'BLOB']); |
There was a problem hiding this comment.
why do we have duckdb things in core
| export class LoaderExecutor<SecretsType, CursorType> extends BaseTransformer { | ||
| loader: Loader<SecretsType, CursorType>; | ||
|
|
||
| rateLimitMs?: number; |
There was a problem hiding this comment.
| rateLimitMs?: number; | |
| rateLimitMillis?: number; |
| while (true) { | ||
| await this.maybeRun(false); | ||
| await sleep(1000 * 60 * 5); | ||
| continue; |
|
|
||
| export function sleep(ms: number) { | ||
| return new Promise((resolve) => { | ||
| setTimeout(resolve, ms); |
| initialize(messageBroker: MessageBroker): Promise<void>; | ||
| } | ||
|
|
||
| type DuckDbCredentials = S3Credentials & { type: 's3' }; |
| values.push(value); | ||
| } | ||
| } | ||
| columns.push('_swamp_updated_at'); |
| if (this.uploadQueue.length === 0) { | ||
| return null; | ||
| } | ||
| return this.uploadQueue.shift()!; |
| } | ||
| } | ||
|
|
||
| withS3Credentials(credentials: S3Credentials): InMemoryDuckDb { |
| return filePath; | ||
| } | ||
|
|
||
| export function createS3SecretStatement(credentials: S3Credentials): string { |
There was a problem hiding this comment.
these s3 things seem like they could be in their own file
|
|
||
| type IntermediateType = 'string' | 'number' | 'date' | 'datetime' | 'boolean' | 'bigint' | 'blob' | 'array' | 'object'; | ||
|
|
||
| function duckDBTypeForIntermediate(intermediate: IntermediateType): DuckDBRawType { |
No description provided.