ClickHouse usage analytics: events/gauges tables with daily MV#3
ClickHouse usage analytics: events/gauges tables with daily MV#3lohanidamodar wants to merge 100 commits intomainfrom
Conversation
- Database adapter - ClickHouse adapter
- Removed hardcoded column definitions in Usage class, replacing with dynamic schema derived from SQL adapter. - Introduced new Query class for building ClickHouse queries with fluent interface. - Added support for advanced query operations including find and count methods. - Enhanced error handling and SQL injection prevention mechanisms. - Created comprehensive usage guide for ClickHouse adapter. - Added unit tests for Query class to ensure functionality and robustness. - Maintained backward compatibility with existing methods while improving overall architecture.
…metric logging with deterministic IDs
…ed tags in ClickHouse and Database adapters
…pdate tests for new behavior
Split the single MergeTree table into two separate tables: - Events table with dedicated columns for path, method, status, resource, resourceId - Gauges table with simple metric/value/time/tags schema Event-specific columns are automatically extracted from tags during addBatch. The daily SummingMergeTree MV now aggregates by metric, resource, resourceId. All read methods accept an optional $type parameter to target specific tables, with null querying both tables transparently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Query the pre-aggregated daily SummingMergeTree table for fast billing/analytics instead of scanning raw events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- country: LowCardinality(Nullable(String)) for efficient low-cardinality storage - userAgent: Nullable(String) with bloom filter index - Both extracted from tags into dedicated columns like other event fields - Added getCountry() and getUserAgent() getters on Metric Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Daily table only has metric, value, time, resource, resourceId, tenant. No path/status/userAgent/country/tags — those don't aggregate meaningfully. MV groups by metric, resource, resourceId, tenant, day. ORDER BY includes resource and resourceId for efficient billing queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use Metric::EVENT_COLUMNS to extract all event columns from tags instead of hardcoding the list. Now country and userAgent are properly stored in dedicated columns instead of being left in tags JSON. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Daily table now only has metric, value, time, tenant. One row per metric per project per day — optimal for billing. Resource-level breakdown queries the raw events table directly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Single query with GROUP BY metric for summing multiple metrics from the daily table. Returns array<string, int>. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Updated for events/gauges split, event-specific columns, daily MV, query-time aggregation, billing methods, and complete API reference with examples. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…country/userAgent - Remove stale 'type' key from addBatch() @param array shape in Usage.php, Adapter.php, Database.php - Fix mixed-to-string cast in ClickHouse.php event column extraction with type-safe checks - Reduce path size from 1024 to 255 and userAgent size from 512 to 255 in Metric::getEventSchema() to stay within MySQL 768-byte index limit - Update MetricTest assertions: 11 attributes, 9 indexes, 7 EVENT_COLUMNS - Update ClickHouseTest: userAgent/country are now event columns, not tags Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add UsageQuery class extending Query with a custom groupByInterval method that enables time-bucketed aggregated queries. When present in the queries array, the ClickHouse adapter switches from raw row returns to aggregated results grouped by time bucket (SUM for events, argMax for gauges). Supported intervals: 1m, 5m, 15m, 1h, 1d, 1w, 1M. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Query::parse() uses static::isMethod() which allows UsageQuery
to extend the valid method list. Without this override, parsing
'groupByInterval("time","1h")' throws "Invalid query method".
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add array shape type to findAggregatedFromTable $parsed parameter so PHPStan recognizes typed keys (filters, params, orderBy) - Provide default for optional groupByInterval key access - Split compound type check for $interval to satisfy PHPStan string narrowing in exception message interpolation - Remove extra trailing blank line in UsageBase.php (PSR-12) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Database adapter silently cast string tenant IDs to int via (int), which truncates non-numeric strings (e.g. UUIDs) to 0 — effectively disabling tenant isolation. Now throws InvalidArgumentException for non-numeric tenants so the mismatch is caught immediately. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…on, getTotal ambiguity - Buffer key now includes tag hash so events with same metric but different tags (e.g. different paths) stay as separate entries instead of silently discarding the second call's tags - Daily table queries (findDaily, sumDaily, sumDailyBatch) now validate attributes against the daily schema (metric, value, time, tenant) instead of the full event schema. Querying path/method/status on the daily table now throws immediately instead of causing a ClickHouse "No such column" runtime error - Changed (int) cast to (float) for agg_value in getTimeSeries to avoid truncating fractional gauge values or large event sums - getTotal() now throws when a metric exists in both event and gauge tables instead of silently adding incompatible aggregations Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| /** | ||
| * Enable or disable shared tables mode (multi-tenant with tenant column). | ||
| * | ||
| * @param bool $sharedTables | ||
| * @return self | ||
| */ | ||
| abstract public function setSharedTables(bool $sharedTables): self; |
There was a problem hiding this comment.
This seems specific to Database adapter, should we move it to Databases.php?
| * - path: API endpoint path (events only) | ||
| * - method: HTTP method (events only) | ||
| * - status: HTTP status code (events only) |
There was a problem hiding this comment.
We're going to need a separate HttpLog.php type anyways, do we need these properties?
PHPStan level max flagged getTimeSeries() annotations as int while the ClickHouse adapter emits floats via agg_value cast. Updates the abstract, both adapters, the Usage facade, and zeroFillTimeSeries to float. Also throws on json_encode failure in Usage::collect so the md5() input is guaranteed string instead of string|false. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| // Also check the other type's attributes for cross-table queries | ||
| $otherType = $type === 'event' ? 'gauge' : 'event'; | ||
| foreach ($this->getAttributes($otherType) as $attribute) { | ||
| if ($attribute['$id'] === $attributeName) { | ||
| return true; | ||
| } | ||
| } | ||
|
|
||
| throw new Exception("Invalid attribute name: {$attributeName}"); |
There was a problem hiding this comment.
Cross-type attribute fallback passes invalid columns to gauge queries
validateAttributeName() falls back to checking the opposite type's attributes. When find($queries, null) (or count/sum with $type = null) is called with event-specific filters like Query::equal('path', '/v1/'), parseQueries forwards the same $queries to both tables. For the gauge call, validateAttributeName('path', 'gauge') passes because it finds path in the event schema, generating WHERE \path` = {param:String}against the gauges table — which has no such column — causing a ClickHouseUNKNOWN_IDENTIFIER` server error at runtime.
The cross-type fallback should be removed so that an attribute is only valid if it exists in the target table's own schema:
// Remove lines 1138-1144 (the "Also check the other type's attributes" block)
throw new Exception("Invalid attribute name: {$attributeName}");Any cross-table query with type-specific filters should require the caller to pass an explicit $type.
| public function purge(array $queries = [], ?string $type = null): bool | ||
| { | ||
| $this->setOperationContext('purge()'); | ||
|
|
||
| $typesToPurge = []; | ||
| if ($type === Usage::TYPE_EVENT || $type === null) { | ||
| $typesToPurge[] = Usage::TYPE_EVENT; | ||
| } | ||
| if ($type === Usage::TYPE_GAUGE || $type === null) { | ||
| $typesToPurge[] = Usage::TYPE_GAUGE; | ||
| } | ||
|
|
||
| foreach ($typesToPurge as $purgeType) { | ||
| $tableName = $this->getTableForType($purgeType); | ||
| $escapedTable = $this->escapeIdentifier($this->database) . '.' . $this->escapeIdentifier($tableName); | ||
|
|
||
| $parsed = $this->parseQueries($queries, $purgeType); | ||
| $whereData = $this->buildWhereClause($parsed['filters'], $parsed['params']); | ||
| $whereClause = $whereData['clause']; | ||
| $params = $whereData['params']; | ||
|
|
||
| if (empty($whereClause)) { | ||
| $whereClause = ' WHERE 1=1'; | ||
| } | ||
|
|
||
| $sql = "DELETE FROM {$escapedTable}{$whereClause}"; | ||
| $this->query($sql, $params); | ||
| } | ||
|
|
||
| return true; | ||
| } |
There was a problem hiding this comment.
purge() leaves stale aggregates in the daily MV table
DELETE FROM on the events and gauges tables does not cascade to the _events_daily SummingMergeTree table populated by the materialized view. After purge(), the daily table still contains all previously aggregated rows. Immediate re-insertion of the same metrics will double-count in the billing table, and any billing query against _events_daily will return inflated totals. This also means every test teardown (which calls purge()) leaves residual daily-table state that accumulates across runs.
| public function purge(array $queries = [], ?string $type = null): bool | ||
| { | ||
| $this->db->getAuthorization()->skip(function () use ($queries) { | ||
| $dbQueries = $this->convertQueriesToDatabase($queries); | ||
| $dbQueries[] = DatabaseQuery::limit(100); | ||
|
|
||
| do { | ||
| $documents = $this->db->find( | ||
| collection: $this->collection, | ||
| queries: $dbQueries, | ||
| ); | ||
|
|
||
| foreach ($documents as $document) { | ||
| $this->db->deleteDocument($this->collection, $document->getId()); | ||
| } | ||
| } while (! empty($documents)); | ||
| }); | ||
|
|
||
| return true; | ||
| } |
There was a problem hiding this comment.
purge() ignores $type parameter, deletes all records regardless of type
The $type argument is not forwarded into convertQueriesToDatabase($queries) and no type discriminator is injected into $dbQueries. A call to purge([], Usage::TYPE_EVENT) will delete both events and gauges indiscriminately. Since addBatch() does not persist a type field in each document, the fix requires both write and read/delete paths to be aligned.
…ts-only sum - extractGroupByInterval: match by method string, not instanceof (parsed queries are base Query) - flush(): selectively clear buffer on per-batch success (retry preserved on failure) - collect(): use TYPE_EVENT constant instead of string literal - addBatch(): require explicit \$type param (no default) - sum(): events-only by default (summing gauges is meaningless) - sumDaily*: document as events-only (daily MV has only events) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| $this->executeWithRetry( | ||
| function (int $attempt) use ($table, $data): void { | ||
| $startTime = microtime(true); | ||
| $scheme = $this->secure ? 'https' : 'http'; | ||
| $escapedTable = $this->escapeIdentifier($table); | ||
|
|
||
| $queryParams = ['query' => "INSERT INTO {$escapedTable} FORMAT JSONEachRow"]; | ||
| if ($this->asyncInserts) { | ||
| $queryParams['async_insert'] = '1'; | ||
| $queryParams['wait_for_async_insert'] = $this->asyncInsertWait ? '1' : '0'; | ||
| } | ||
| $url = "{$scheme}://{$this->host}:{$this->port}/?" . http_build_query($queryParams); | ||
|
|
||
| $this->client->addHeader('X-ClickHouse-Database', $this->database); | ||
| $this->client->addHeader('Content-Type', 'application/x-ndjson'); | ||
|
|
||
| if ($this->enableKeepAlive) { | ||
| $this->client->addHeader('Connection', 'keep-alive'); | ||
| } else { | ||
| $this->client->addHeader('Connection', 'close'); | ||
| } | ||
|
|
||
| if ($this->enableCompression) { | ||
| $this->client->addHeader('Accept-Encoding', 'gzip'); | ||
| } | ||
|
|
||
| if ($attempt === 0) { | ||
| $this->requestCount++; | ||
| } | ||
|
|
||
| $body = implode("\n", $data); | ||
|
|
||
| $sql = "INSERT INTO {$escapedTable} FORMAT JSONEachRow"; | ||
| $params = ['rows' => count($data), 'bytes' => strlen($body)]; | ||
|
|
||
| try { | ||
| $response = $this->client->fetch( | ||
| url: $url, | ||
| method: Client::METHOD_POST, | ||
| body: $body | ||
| ); | ||
|
|
||
| $httpCode = $response->getStatusCode(); | ||
|
|
||
| if ($httpCode !== 200) { | ||
| $bodyStr = $response->getBody(); | ||
| $bodyStr = is_string($bodyStr) ? $bodyStr : ''; | ||
| $duration = microtime(true) - $startTime; | ||
| $rowCount = count($data); | ||
| $baseError = "ClickHouse insert failed with HTTP {$httpCode}: {$bodyStr}"; | ||
| $errorMsg = $this->buildErrorMessage($baseError, $table, "INSERT INTO {$table} ({$rowCount} rows)"); | ||
| $this->logQuery($sql, $params, $duration, false, $errorMsg, $attempt); | ||
|
|
||
| throw new Exception($errorMsg . '|HTTP_CODE:' . $httpCode); | ||
| } | ||
|
|
||
| $duration = microtime(true) - $startTime; | ||
| $this->logQuery($sql, $params, $duration, true, null, $attempt); | ||
| } finally { | ||
| $this->client->removeHeader('Content-Type'); | ||
| } | ||
| }, | ||
| function (Exception $e, ?int $httpCode): bool { | ||
| $exceptionHttpCode = null; | ||
| if (preg_match('/\|HTTP_CODE:(\d+)$/', $e->getMessage(), $matches)) { | ||
| $exceptionHttpCode = (int) $matches[1]; | ||
| } | ||
| return $this->isRetryableError($exceptionHttpCode, $e->getMessage()); | ||
| }, | ||
| function (Exception $e, int $attempt) use ($table, $data): Exception { | ||
| $cleanMessage = preg_replace('/\|HTTP_CODE:\d+$/', '', $e->getMessage()); | ||
| $cleanMessage = is_string($cleanMessage) ? $cleanMessage : $e->getMessage(); | ||
|
|
||
| if (strpos($cleanMessage, '[Operation:') !== false) { | ||
| return new Exception($cleanMessage, 0, $e); | ||
| } | ||
|
|
||
| $rowCount = count($data); | ||
| $baseError = "ClickHouse insert execution failed after " . ($attempt + 1) . " attempt(s): {$cleanMessage}"; | ||
| $errorMsg = $this->buildErrorMessage($baseError, $table, "INSERT INTO {$table} ({$rowCount} rows)"); | ||
| return new Exception($errorMsg, 0, $e); | ||
| } | ||
| ); | ||
| } |
There was a problem hiding this comment.
INSERT retry creates duplicate rows in MergeTree
insert() wraps ClickHouse HTTP inserts in executeWithRetry(), which retries on HTTP 408/503/504 and on messages containing "timeout" or "connection". When ClickHouse commits the first attempt but the HTTP response is lost in transit (e.g. a proxy timeout), the retry re-sends the same payload. Since plain MergeTree does not deduplicate, every successful-but-unacknowledged attempt produces a permanent duplicate row — inflating event counts, gauge snapshots, and the _events_daily SummingMergeTree aggregates.
Consider one of the following mitigations:
- Set ClickHouse
insert_deduplicate = 1and use deterministic block checksums (requires ReplicatedMergeTree). - Use
async_insertmode withwait_for_async_insert = 1, which deduplicates at the server side. - Or disable retries for INSERT operations and let the caller decide on retry semantics:
// Only retry SELECT/DDL; never retry INSERT
$this->executeWithRetry(
$operation,
fn ($e, $code) => false, // never retry inserts
$buildException
);Push the count cap down into the DB layer so callers that only need a capped total (e.g. rendering "5000+") can stop ClickHouse early instead of scanning the full filtered set. ClickHouse wraps the count in a LIMIT-bounded subquery; Database delegates to utopia-php/database's existing $max arg. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| // Also check the other type's attributes for cross-table queries | ||
| $otherType = $type === 'event' ? 'gauge' : 'event'; | ||
| foreach ($this->getAttributes($otherType) as $attribute) { | ||
| if ($attribute['$id'] === $attributeName) { | ||
| return true; | ||
| } | ||
| } |
There was a problem hiding this comment.
Cross-type attribute fallback sends invalid columns to gauge queries
validateAttributeName falls back to checking the opposite type's schema (lines 1139–1144). When find/count/sum is called with $type = null, parseQueries passes the same queries to both tables. A caller passing Query::equal('path', '/v1/') passes validation for the gauge table because path is found in the event schema, generating WHERE \path` = {param:String}against the gauges table — which has no such column — and causing a ClickHouseUNKNOWN_IDENTIFIER` server error at runtime.
Remove the fallback block so each attribute is only valid for its own table's schema:
| // Also check the other type's attributes for cross-table queries | |
| $otherType = $type === 'event' ? 'gauge' : 'event'; | |
| foreach ($this->getAttributes($otherType) as $attribute) { | |
| if ($attribute['$id'] === $attributeName) { | |
| return true; | |
| } | |
| } | |
| throw new Exception("Invalid attribute name: {$attributeName}"); | |
| } |
Summary
Complete rewrite of the usage analytics library with a two-table architecture optimized for both real-time analytics and billing.
Architecture
Key Changes
API
Write
Read
Billing (Daily MV)
Test Plan