Skip to content

Init Swamp#3

Open
wpride wants to merge 14 commits into
mainfrom
wsp/swampdk
Open

Init Swamp#3
wpride wants to merge 14 commits into
mainfrom
wsp/swampdk

Conversation

@wpride

@wpride wpride commented Aug 6, 2024

Copy link
Copy Markdown

No description provided.

@vercel

vercel Bot commented Aug 6, 2024

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
sdk ✅ Ready (Inspect) Visit Preview 💬 Add feedback Sep 14, 2024 0:27am

RUN npm run build

# Expose the port the app runs on
EXPOSE 80

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

80?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just has to match ENV PORT=... above; can add comment

"main": "src/index.ts",
"scripts": {
"build": "tsc",
"HACK-comment": "TZ=UTC is necessary because SQLite will not respect system timezone",

@drx drx Aug 6, 2024

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever

const sqlEngine = new InMemoryDuckDb(store);
const swamp = new Swamp(store, metaStore, sqlEngine);
swamp.initialize();
swamp.addLoader(runPostgres, "postgres").withCadence(60 * 60);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Adding naive questions from my first reading to reveal non-obvious names) What's cadence?

@@ -0,0 +1,74 @@
import axios from 'axios';

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is jsonplaceholder?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the name of this demo API; can kill it https://jsonplaceholder.typicode.com/

return response.data;
} catch (error) {
throw error;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of catching and rethrowing the error? Lots of theses try catch blocks

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many such catches

}
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lint seems unnecessary. Doesn't it have a setting where you can exclude things that start with _?

if (cursor.next) {
return {
'page[cursor]': cursor.next,
'filter': `greater-than(datetime,${cursor.since})`,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an id cursor available? This could fail if there are two records with the same datetime

cursor: nextCursor,
};
} catch (e) {
console.log((e as any).response?.data);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmd+f remove all the console.logs


async function getRecords(apiKey: string, _cursor: Cursor): Promise<{ cursor: Cursor, inserts: Inserts, hasMore: boolean }> {
const { data, included, cursor } = await getKlaviyoEvents(apiKey, _cursor);
const events = data.map((event: any) => {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried unknown instead of any?

const flow = event.attributes.event_properties.$flow;
const messageId = event.attributes.event_properties.$message;

const campaignId = flow ? null : messageId;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's flow?

};
}

const INITIAL_CURSOR = { since: '2024-07-30T21:48:59Z' };

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill

export async function runKlaviyo(_secrets: LoaderSecrets, cursor: Cursor | null): Promise<LoaderResponse<Cursor>> {
const secrets = Secrets.safeParse(_secrets);
if (secrets.error) {
console.error(`could not parse secrets: ${secrets.error}`);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange formatting here

console.error(`could not parse secrets: ${secrets.error}`);
throw new Error('failed to parse secrets');
}
const { inserts, hasMore, cursor: newCursor } = await getRecords(secrets.data.apiKey, cursor || INITIAL_CURSOR);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const { inserts, hasMore, cursor: newCursor } = await getRecords(secrets.data.apiKey, cursor || INITIAL_CURSOR);
const { inserts, hasMore, cursor: newCursor } = await getRecords(secrets.data.apiKey, cursor ?? INITIAL_CURSOR);

Could probably cmd+f the codebase for this

Comment thread packages/swamp/src/core/types.ts Outdated

export type Inserts = Record<string, RecordBatch>;

export const DuckDBRawType = z.enum(['VARCHAR', 'DOUBLE', 'DATE', 'TIMESTAMP', 'JSON', 'BOOLEAN', 'BIGINT', 'BLOB']);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have duckdb things in core

export class LoaderExecutor<SecretsType, CursorType> extends BaseTransformer {
loader: Loader<SecretsType, CursorType>;

rateLimitMs?: number;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rateLimitMs?: number;
rateLimitMillis?: number;

while (true) {
await this.maybeRun(false);
await sleep(1000 * 60 * 5);
continue;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
continue;


export function sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node has a built-in for this

Comment thread packages/swamp/src/query/query.ts Outdated
initialize(messageBroker: MessageBroker): Promise<void>;
}

type DuckDbCredentials = S3Credentials & { type: 's3' };

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duck duck

Comment thread packages/swamp/src/query/query.ts Outdated
values.push(value);
}
}
columns.push('_swamp_updated_at');

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_updated_at?

Comment thread packages/swamp/src/query/query.ts Outdated
if (this.uploadQueue.length === 0) {
return null;
}
return this.uploadQueue.shift()!;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's up with this ! guy?

Comment thread packages/swamp/src/query/query.ts Outdated
}
}

withS3Credentials(credentials: S3Credentials): InMemoryDuckDb {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/s3/storage/g?

return filePath;
}

export function createS3SecretStatement(credentials: S3Credentials): string {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these s3 things seem like they could be in their own file

Comment thread packages/swamp/src/util/record.util.ts Outdated

type IntermediateType = 'string' | 'number' | 'date' | 'datetime' | 'boolean' | 'bigint' | 'blob' | 'array' | 'object';

function duckDBTypeForIntermediate(intermediate: IntermediateType): DuckDBRawType {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quack quack

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants