diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index 1678024ee..079418a54 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -33,7 +33,7 @@ abstract class Adapter protected bool $alterLocks = false; - protected bool $skipDuplicates = false; + protected OnDuplicate $onDuplicate = OnDuplicate::Fail; /** * @var array @@ -395,23 +395,23 @@ public function inTransaction(): bool } /** - * Run a callback with skipDuplicates enabled. - * Duplicate key errors during createDocuments() will be silently skipped - * instead of thrown. Nestable — saves and restores previous state. + * Run a callback scoped to a specific OnDuplicate mode. Create-style + * operations (createDocument, createCollection, createAttribute, createIndex) + * dispatch on this mode. Nestable — saves and restores previous state. * * @template T * @param callable(): T $callback * @return T */ - public function skipDuplicates(callable $callback): mixed + public function withOnDuplicate(OnDuplicate $mode, callable $callback): mixed { - $previous = $this->skipDuplicates; - $this->skipDuplicates = true; + $previous = $this->onDuplicate; + $this->onDuplicate = $mode; try { return $callback(); } finally { - $this->skipDuplicates = $previous; + $this->onDuplicate = $previous; } } diff --git a/src/Database/Adapter/MariaDB.php b/src/Database/Adapter/MariaDB.php index 223f91e71..e881dcb4e 100644 --- a/src/Database/Adapter/MariaDB.php +++ b/src/Database/Adapter/MariaDB.php @@ -16,6 +16,7 @@ use Utopia\Database\Exception\Timeout as TimeoutException; use Utopia\Database\Exception\Truncate as TruncateException; use Utopia\Database\Helpers\ID; +use Utopia\Database\OnDuplicate; use Utopia\Database\Operator; use Utopia\Database\Query; @@ -2376,4 +2377,33 @@ public function getSupportForTTLIndexes(): bool { return false; } + + /** + * MariaDB/MySQL Upsert: append `ON DUPLICATE KEY UPDATE col = VALUES(col), ...` + * so the INSERT replaces the matched row's columns instead of throwing. + * + * @param array $columns + */ + protected function getInsertSuffix(string $table, array $columns = []): string + { + if ($this->onDuplicate !== OnDuplicate::Upsert || empty($columns)) { + return ''; + } + + $assignments = []; + foreach ($columns as $col) { + // Skip the primary unique key (_uid) — no need to "update" what matched. + // Safe to skip _id too since it's auto-increment in INSERT and untouched on update. + if (\in_array($col, ['`_uid`', '`_id`'], true)) { + continue; + } + $assignments[] = "{$col} = VALUES({$col})"; + } + + if (empty($assignments)) { + return ''; + } + + return 'ON DUPLICATE KEY UPDATE ' . \implode(', ', $assignments); + } } diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 311b60476..89137b5c1 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -17,6 +17,7 @@ use Utopia\Database\Exception\Timeout as TimeoutException; use Utopia\Database\Exception\Transaction as TransactionException; use Utopia\Database\Exception\Type as TypeException; +use Utopia\Database\OnDuplicate; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Mongo\Client; @@ -123,7 +124,8 @@ public function withTransaction(callable $callback): mixed } // upsert + $setOnInsert hits WriteConflict (E112) under txn snapshot isolation. - if ($this->skipDuplicates) { + // Both Skip and Upsert modes use the no-transaction path. + if ($this->onDuplicate !== OnDuplicate::Fail) { return $callback(); } @@ -429,10 +431,12 @@ public function createCollection(string $name, array $attributes = [], array $in { $id = $this->getNamespace() . '_' . $this->filter($name); - // In shared-tables mode or for metadata, the physical collection may - // already exist for another tenant. Return early to avoid a - // "Collection Exists" exception from the client. - if (!$this->inTransaction && ($this->getSharedTables() || $name === Database::METADATA) && $this->exists($this->getNamespace(), $name)) { + // In shared-tables mode or for metadata the physical collection may + // already exist. Return early to avoid "Collection Exists" from the + // client. + $tolerateExisting = $this->getSharedTables() || $name === Database::METADATA; + + if (!$this->inTransaction && $tolerateExisting && $this->exists($this->getNamespace(), $name)) { return true; } @@ -443,14 +447,15 @@ public function createCollection(string $name, array $attributes = [], array $in } catch (MongoException $e) { $e = $this->processException($e); if ($e instanceof DuplicateException) { + // Keep existing shared-tables/metadata behavior — no-op there. return true; } // Client throws code-0 "Collection Exists" when its pre-check - // finds the collection. In shared-tables/metadata context this - // is a no-op; otherwise re-throw as DuplicateException so - // Database::createCollection() can run orphan reconciliation. + // finds the collection. Tolerated contexts no-op; otherwise re-throw + // as DuplicateException so Database::createCollection() can run + // orphan reconciliation. if ($e->getCode() === 0 && stripos($e->getMessage(), 'Collection Exists') !== false) { - if ($this->getSharedTables() || $name === Database::METADATA) { + if ($tolerateExisting) { return true; } throw new DuplicateException('Collection already exists', $e->getCode(), $e); @@ -1497,12 +1502,15 @@ public function createDocuments(Document $collection, array $documents): array $records[] = $record; } - // insertMany aborts the txn on any duplicate; upsert + $setOnInsert no-ops instead. - if ($this->skipDuplicates) { + // insertMany aborts the txn on any duplicate; Mongo's upsert path handles + // both Skip ($setOnInsert: insert-only no-op) and Upsert ($set: overwrite). + if ($this->onDuplicate !== OnDuplicate::Fail) { if (empty($records)) { return []; } + $operator = $this->onDuplicate === OnDuplicate::Upsert ? '$set' : '$setOnInsert'; + $operations = []; foreach ($records as $record) { $filter = ['_uid' => $record['_uid'] ?? '']; @@ -1510,17 +1518,17 @@ public function createDocuments(Document $collection, array $documents): array $filter['_tenant'] = $record['_tenant'] ?? $this->getTenant(); } - // Filter fields can't reappear in $setOnInsert (mongo path-conflict error). - $setOnInsert = $record; - unset($setOnInsert['_uid'], $setOnInsert['_tenant']); + // Filter fields can't reappear in $setOnInsert/$set (mongo path-conflict error). + $payload = $record; + unset($payload['_uid'], $payload['_tenant']); - if (empty($setOnInsert)) { + if (empty($payload)) { continue; } $operations[] = [ 'filter' => $filter, - 'update' => ['$setOnInsert' => $setOnInsert], + 'update' => [$operator => $payload], ]; } @@ -3590,8 +3598,8 @@ protected function processException(\Throwable $e): \Throwable return new DuplicateException('Collection already exists', $e->getCode(), $e); } - // Index already exists - if ($e->getCode() === 85) { + // Index already exists (85 = IndexOptionsConflict, 86 = IndexKeySpecsConflict) + if ($e->getCode() === 85 || $e->getCode() === 86) { return new DuplicateException('Index already exists', $e->getCode(), $e); } diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 7bbfb98f2..e23ffdb4d 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -6,6 +6,7 @@ use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Exception as DatabaseException; +use Utopia\Database\OnDuplicate; use Utopia\Database\Validator\Authorization; use Utopia\Pools\Pool as UtopiaPool; @@ -43,8 +44,9 @@ public function __construct(UtopiaPool $pool) public function delegate(string $method, array $args): mixed { if ($this->pinnedAdapter !== null) { - if ($this->skipDuplicates) { - return $this->pinnedAdapter->skipDuplicates( + if ($this->onDuplicate !== OnDuplicate::Fail) { + return $this->pinnedAdapter->withOnDuplicate( + $this->onDuplicate, fn () => $this->pinnedAdapter->{$method}(...$args) ); } @@ -71,8 +73,9 @@ public function delegate(string $method, array $args): mixed $adapter->setMetadata($key, $value); } - if ($this->skipDuplicates) { - return $adapter->skipDuplicates( + if ($this->onDuplicate !== OnDuplicate::Fail) { + return $adapter->withOnDuplicate( + $this->onDuplicate, fn () => $adapter->{$method}(...$args) ); } @@ -156,8 +159,9 @@ public function withTransaction(callable $callback): mixed $this->pinnedAdapter = $adapter; try { - if ($this->skipDuplicates) { - return $adapter->skipDuplicates( + if ($this->onDuplicate !== OnDuplicate::Fail) { + return $adapter->withOnDuplicate( + $this->onDuplicate, fn () => $adapter->withTransaction($callback) ); } diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 2c27e08e7..a8e4f9830 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -17,6 +17,7 @@ use Utopia\Database\Exception\Transaction as TransactionException; use Utopia\Database\Exception\Truncate as TruncateException; use Utopia\Database\Helpers\ID; +use Utopia\Database\OnDuplicate; use Utopia\Database\Operator; use Utopia\Database\Query; @@ -241,6 +242,7 @@ public function createCollection(string $name, array $attributes = [], array $in } $sqlTenant = $this->sharedTables ? '_tenant INTEGER DEFAULT NULL,' : ''; + $collection = " CREATE TABLE {$this->getSQLTable($id)} ( _id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, @@ -291,19 +293,19 @@ public function createCollection(string $name, array $attributes = [], array $in $uniquePermissionIndex = $this->getShortKey("{$namespace}_{$this->tenant}_{$id}_ukey"); $permissionIndex = $this->getShortKey("{$namespace}_{$this->tenant}_{$id}_permission"); $permissions .= " - CREATE UNIQUE INDEX \"{$uniquePermissionIndex}\" + CREATE UNIQUE INDEX \"{$uniquePermissionIndex}\" ON {$this->getSQLTable($id . '_perms')} USING btree (_tenant,_document,_type,_permission); - CREATE INDEX \"{$permissionIndex}\" - ON {$this->getSQLTable($id . '_perms')} USING btree (_tenant,_permission,_type); + CREATE INDEX \"{$permissionIndex}\" + ON {$this->getSQLTable($id . '_perms')} USING btree (_tenant,_permission,_type); "; } else { $uniquePermissionIndex = $this->getShortKey("{$namespace}_{$id}_ukey"); $permissionIndex = $this->getShortKey("{$namespace}_{$id}_permission"); $permissions .= " - CREATE UNIQUE INDEX \"{$uniquePermissionIndex}\" + CREATE UNIQUE INDEX \"{$uniquePermissionIndex}\" ON {$this->getSQLTable($id . '_perms')} USING btree (_document COLLATE utf8_ci_ai,_type,_permission); - CREATE INDEX \"{$permissionIndex}\" - ON {$this->getSQLTable($id . '_perms')} USING btree (_permission,_type); + CREATE INDEX \"{$permissionIndex}\" + ON {$this->getSQLTable($id . '_perms')} USING btree (_permission,_type); "; } @@ -2352,23 +2354,55 @@ public function getSupportForOptionalSpatialAttributeWithExistingRows(): bool protected function getInsertKeyword(): string { + // Postgres doesn't have INSERT IGNORE — Skip/Upsert are realized via + // the ON CONFLICT suffix (see getInsertSuffix). return 'INSERT INTO'; } - protected function getInsertSuffix(string $table): string + /** + * Postgres permissions insert uses ON CONFLICT DO NOTHING via suffix; the + * keyword stays plain INSERT INTO. Override MariaDB's INSERT IGNORE (no-op + * in MySQL dialect, a syntax error in Postgres). + */ + protected function getInsertPermissionsKeyword(): string + { + return 'INSERT INTO'; + } + + /** + * @param array $columns + */ + protected function getInsertSuffix(string $table, array $columns = []): string { - if (!$this->skipDuplicates) { + if ($this->onDuplicate === OnDuplicate::Fail) { return ''; } $conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")'; - return "ON CONFLICT {$conflictTarget} DO NOTHING"; + if ($this->onDuplicate === OnDuplicate::Skip) { + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + // Upsert: DO UPDATE SET col = EXCLUDED.col for every column except the conflict key. + $assignments = []; + foreach ($columns as $col) { + if (\in_array($col, ['"_uid"', '"_id"', '"_tenant"'], true)) { + continue; + } + $assignments[] = "{$col} = EXCLUDED.{$col}"; + } + + if (empty($assignments)) { + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + return "ON CONFLICT {$conflictTarget} DO UPDATE SET " . \implode(', ', $assignments); } protected function getInsertPermissionsSuffix(): string { - if (!$this->skipDuplicates) { + if ($this->onDuplicate === OnDuplicate::Fail) { return ''; } diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 3fe2696db..9fd461563 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -15,6 +15,7 @@ use Utopia\Database\Exception\NotFound as NotFoundException; use Utopia\Database\Exception\Timeout as TimeoutException; use Utopia\Database\Exception\Transaction as TransactionException; +use Utopia\Database\OnDuplicate; use Utopia\Database\Operator; use Utopia\Database\Query; @@ -1030,19 +1031,42 @@ public function getSupportForHostname(): bool } /** - * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. - * Override in adapter subclasses for DB-specific syntax. + * Returns the INSERT keyword, varying by the active OnDuplicate mode. + * Override in adapter subclasses for DB-specific syntax (e.g. Postgres uses + * suffix ON CONFLICT instead). */ protected function getInsertKeyword(): string { - return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + return match ($this->onDuplicate) { + OnDuplicate::Skip => 'INSERT IGNORE INTO', + OnDuplicate::Upsert => 'INSERT INTO', // Upsert is realized by the ON DUPLICATE KEY UPDATE suffix on MySQL/MariaDB — handled in getInsertSuffix. + OnDuplicate::Fail => 'INSERT INTO', + }; + } + + /** + * Returns the INSERT keyword for the `_perms` side-table. Permissions have + * their own composite unique constraint (_document, _type, _permission), + * so on row Upsert we don't want to ON-DUPLICATE-KEY-UPDATE them — they're + * already there. Both Skip and Upsert modes should just silently ignore + * pre-existing permission rows. + */ + protected function getInsertPermissionsKeyword(): string + { + return $this->onDuplicate === OnDuplicate::Fail + ? 'INSERT INTO' + : 'INSERT IGNORE INTO'; } /** * Returns a suffix appended after VALUES clause for duplicate handling. * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). + * + * @param string $table table name (without namespace prefix) + * @param array $columns quoted column names present in the INSERT — needed + * to emit ON DUPLICATE KEY UPDATE / ON CONFLICT DO UPDATE SET clauses */ - protected function getInsertSuffix(string $table): string + protected function getInsertSuffix(string $table, array $columns = []): string { return ''; } @@ -2533,12 +2557,12 @@ public function createDocuments(Document $collection, array $documents): array $attributeKeys[] = '_tenant'; } - $columns = []; + $columnList = []; foreach ($attributeKeys as $key => $attribute) { - $columns[$key] = $this->quote($this->filter($attribute)); + $columnList[$key] = $this->quote($this->filter($attribute)); } - $columns = '(' . \implode(', ', $columns) . ')'; + $columns = '(' . \implode(', ', $columnList) . ')'; $bindIndex = 0; $batchKeys = []; @@ -2603,7 +2627,7 @@ public function createDocuments(Document $collection, array $documents): array $stmt = $this->getPDO()->prepare(" {$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} - {$this->getInsertSuffix($name)} + {$this->getInsertSuffix($name, $columnList)} "); foreach ($bindValues as $key => $value) { @@ -2617,7 +2641,7 @@ public function createDocuments(Document $collection, array $documents): array $permissions = \implode(', ', $permissions); $sqlPermissions = " - {$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + {$this->getInsertPermissionsKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) VALUES {$permissions} {$this->getInsertPermissionsSuffix()} "; diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 33f370775..411457116 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -16,6 +16,7 @@ use Utopia\Database\Exception\Timeout as TimeoutException; use Utopia\Database\Exception\Transaction as TransactionException; use Utopia\Database\Helpers\ID; +use Utopia\Database\OnDuplicate; use Utopia\Database\Operator; /** @@ -463,13 +464,14 @@ public function createIndex(string $collection, string $id, string $type, array // Workaround for no support for CREATE INDEX IF NOT EXISTS $stmt = $this->getPDO()->prepare(" - SELECT name - FROM sqlite_master + SELECT name + FROM sqlite_master WHERE type='index' AND name=:_index; "); $stmt->bindValue(':_index', "{$this->getNamespace()}_{$this->tenant}_{$name}_{$id}"); $stmt->execute(); $index = $stmt->fetch(); + $stmt->closeCursor(); if (!empty($index)) { return true; } @@ -1939,6 +1941,29 @@ public function getSupportForTTLIndexes(): bool protected function getInsertKeyword(): string { - return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + return match ($this->onDuplicate) { + OnDuplicate::Skip => 'INSERT OR IGNORE INTO', + OnDuplicate::Upsert => 'INSERT OR REPLACE INTO', + OnDuplicate::Fail => 'INSERT INTO', + }; + } + + protected function getInsertPermissionsKeyword(): string + { + return $this->onDuplicate === OnDuplicate::Fail + ? 'INSERT INTO' + : 'INSERT OR IGNORE INTO'; + } + + /** + * SQLite realizes Upsert via the `INSERT OR REPLACE` keyword, so no SUFFIX + * clause is needed. Override MariaDB's `ON DUPLICATE KEY UPDATE` suffix. + * + * @param array $columns + */ + protected function getInsertSuffix(string $table, array $columns = []): string + { + return ''; } + } diff --git a/src/Database/Database.php b/src/Database/Database.php index 136baad9b..7bca15026 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -417,7 +417,7 @@ class Database protected bool $preserveDates = false; - protected bool $skipDuplicates = false; + protected OnDuplicate $onDuplicate = OnDuplicate::Fail; protected bool $preserveSequence = false; @@ -844,15 +844,28 @@ public function skipRelationshipsExistCheck(callable $callback): mixed } } - public function skipDuplicates(callable $callback): mixed + /** + * Run $callback within a scope where create-style operations apply the + * given OnDuplicate mode. Nestable — previous mode is restored on return. + * + * @template T + * @param callable(): T $callback + * @return T + */ + public function withOnDuplicate(OnDuplicate $mode, callable $callback): mixed { - $previous = $this->skipDuplicates; - $this->skipDuplicates = true; + $previous = $this->onDuplicate; + $this->onDuplicate = $mode; try { - return $callback(); + // Mirror the mode onto the adapter so schema-level operations + // (createAttribute / createIndex / createCollection) that run + // directly against the adapter can observe it. createDocuments + // still goes through its own adapter->withOnDuplicate dispatch, + // which is nestable and idempotent with this outer scope. + return $this->adapter->withOnDuplicate($mode, $callback); } finally { - $this->skipDuplicates = $previous; + $this->onDuplicate = $previous; } } @@ -1708,6 +1721,13 @@ public function createCollection(string $id, array $attributes = [], array $inde $collection = $this->silent(fn () => $this->getCollection($id)); if (!$collection->isEmpty() && $id !== self::METADATA) { + // Skip/Upsert: collection data is never destroyed — both modes + // tolerate the existing collection and return its current metadata + // document. Per-attribute / per-index reconciliation happens via + // the dedicated createAttribute / createIndex paths. + if ($this->onDuplicate !== OnDuplicate::Fail) { + return $collection; + } throw new DuplicateException('Collection ' . $id . ' already exists'); } @@ -2145,6 +2165,20 @@ public function createAttribute(string $collection, string $id, string $type, in $filters = array_unique($filters); } + // Skip/Upsert: if the attribute already exists in metadata, tolerate + // and return. Spec reconciliation (drop + recreate on type change) is + // a caller concern — migration consults source vs destination metadata + // _updatedAt and issues deleteAttribute before a re-creation itself, + // so by the time this is called the attribute is either truly new or + // intentionally unchanged. + if ($this->onDuplicate !== OnDuplicate::Fail) { + foreach ($collection->getAttribute('attributes', []) as $existing) { + if (\strtolower($existing->getAttribute('key', $existing->getId())) === \strtolower($id)) { + return true; + } + } + } + $existsInSchema = false; $schemaAttributes = $this->adapter->getSupportForSchemaAttributes() @@ -4515,9 +4549,15 @@ public function createIndex(string $collection, string $id, string $type, array /** @var array $indexes */ foreach ($indexes as $index) { - if (\strtolower($index->getId()) === \strtolower($id)) { - throw new DuplicateException('Index already exists'); + if (\strtolower($index->getId()) !== \strtolower($id)) { + continue; + } + // Skip/Upsert: tolerate the existing index. Caller (e.g. migration) + // is responsible for dropping it first if the spec needs to change. + if ($this->onDuplicate !== OnDuplicate::Fail) { + return true; } + throw new DuplicateException('Index already exists'); } if ($this->adapter->getCountOfIndexes($collection) >= $this->adapter->getLimitForIndexes()) { @@ -5727,8 +5767,8 @@ public function createDocuments( foreach (\array_chunk($documents, $batchSize) as $chunk) { $insert = fn () => $this->withTransaction(fn () => $this->adapter->createDocuments($collection, $chunk)); // Set adapter flag before withTransaction so Mongo can opt out of a real txn. - $batch = $this->skipDuplicates - ? $this->adapter->skipDuplicates($insert) + $batch = $this->onDuplicate !== OnDuplicate::Fail + ? $this->adapter->withOnDuplicate($this->onDuplicate, $insert) : $insert(); $batch = $this->adapter->getSequences($collection->getId(), $batch); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index 86beb5d0a..7a76af2e2 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -205,12 +205,23 @@ public function delete(?string $database = null): bool public function createCollection(string $id, array $attributes = [], array $indexes = [], ?array $permissions = null, bool $documentSecurity = true): Document { - $result = $this->source->createCollection( - $id, - $attributes, - $indexes, - $permissions, - $documentSecurity + // Forward the OnDuplicate scope to source/destination so their + // Database-layer dispatch observes it. Skip/Upsert tolerate an + // existing collection; Fail rethrows as before. + $forward = fn (Database $target, callable $call) => + $this->onDuplicate !== OnDuplicate::Fail + ? $target->withOnDuplicate($this->onDuplicate, $call) + : $call(); + + $result = $forward( + $this->source, + fn () => $this->source->createCollection( + $id, + $attributes, + $indexes, + $permissions, + $documentSecurity + ) ); if ($this->destination === null) { @@ -227,12 +238,15 @@ public function createCollection(string $id, array $attributes = [], array $inde ); } - $this->destination->createCollection( - $id, - $attributes, - $indexes, - $permissions, - $documentSecurity + $forward( + $this->destination, + fn () => $this->destination->createCollection( + $id, + $attributes, + $indexes, + $permissions, + $documentSecurity + ) ); $this->silent(function () use ($id) { @@ -303,18 +317,26 @@ public function deleteCollection(string $id): bool public function createAttribute(string $collection, string $id, string $type, int $size, bool $required, $default = null, bool $signed = true, bool $array = false, ?string $format = null, array $formatOptions = [], array $filters = []): bool { - $result = $this->source->createAttribute( - $collection, - $id, - $type, - $size, - $required, - $default, - $signed, - $array, - $format, - $formatOptions, - $filters + $forward = fn (Database $target, callable $call) => + $this->onDuplicate !== OnDuplicate::Fail + ? $target->withOnDuplicate($this->onDuplicate, $call) + : $call(); + + $result = $forward( + $this->source, + fn () => $this->source->createAttribute( + $collection, + $id, + $type, + $size, + $required, + $default, + $signed, + $array, + $format, + $formatOptions, + $filters + ) ); if ($this->destination === null) { @@ -345,18 +367,21 @@ public function createAttribute(string $collection, string $id, string $type, in ); } - $result = $this->destination->createAttribute( - $collection, - $document->getId(), - $document->getAttribute('type'), - $document->getAttribute('size'), - $document->getAttribute('required'), - $document->getAttribute('default'), - $document->getAttribute('signed'), - $document->getAttribute('array'), - $document->getAttribute('format'), - $document->getAttribute('formatOptions'), - $document->getAttribute('filters'), + $result = $forward( + $this->destination, + fn () => $this->destination->createAttribute( + $collection, + $document->getId(), + $document->getAttribute('type'), + $document->getAttribute('size'), + $document->getAttribute('required'), + $document->getAttribute('default'), + $document->getAttribute('signed'), + $document->getAttribute('array'), + $document->getAttribute('format'), + $document->getAttribute('formatOptions'), + $document->getAttribute('filters'), + ) ); } catch (\Throwable $err) { $this->logError('createAttribute', $err); @@ -480,7 +505,15 @@ public function deleteAttribute(string $collection, string $id): bool public function createIndex(string $collection, string $id, string $type, array $attributes, array $lengths = [], array $orders = [], int $ttl = 1): bool { - $result = $this->source->createIndex($collection, $id, $type, $attributes, $lengths, $orders, $ttl); + $forward = fn (Database $target, callable $call) => + $this->onDuplicate !== OnDuplicate::Fail + ? $target->withOnDuplicate($this->onDuplicate, $call) + : $call(); + + $result = $forward( + $this->source, + fn () => $this->source->createIndex($collection, $id, $type, $attributes, $lengths, $orders, $ttl) + ); if ($this->destination === null) { return $result; @@ -505,14 +538,17 @@ public function createIndex(string $collection, string $id, string $type, array ); } - $result = $this->destination->createIndex( - $collection, - $document->getId(), - $document->getAttribute('type'), - $document->getAttribute('attributes'), - $document->getAttribute('lengths'), - $document->getAttribute('orders'), - $document->getAttribute('ttl', 0) + $result = $forward( + $this->destination, + fn () => $this->destination->createIndex( + $collection, + $document->getId(), + $document->getAttribute('type'), + $document->getAttribute('attributes'), + $document->getAttribute('lengths'), + $document->getAttribute('orders'), + $document->getAttribute('ttl', 0) + ) ); } catch (\Throwable $err) { $this->logError('createIndex', $err); @@ -601,8 +637,9 @@ public function createDocuments( ?callable $onNext = null, ?callable $onError = null, ): int { - $modified = $this->skipDuplicates - ? $this->source->skipDuplicates( + $modified = $this->onDuplicate !== OnDuplicate::Fail + ? $this->source->withOnDuplicate( + $this->onDuplicate, fn () => $this->source->createDocuments($collection, $documents, $batchSize, $onNext, $onError) ) : $this->source->createDocuments($collection, $documents, $batchSize, $onNext, $onError); @@ -621,8 +658,9 @@ public function createDocuments( // Forward every input to destination. "upgraded" status means the schema // is mirrored, not that every row is backfilled, so a row that is a - // duplicate on source may not yet exist on destination. In skipDuplicates - // mode the destination runs its own INSERT IGNORE and decides per-row. + // duplicate on source may not yet exist on destination. Under + // OnDuplicate::Skip/Upsert the destination runs its own dialect-specific + // conflict handling and decides per-row. try { $clones = []; foreach ($documents as $document) { @@ -638,8 +676,9 @@ public function createDocuments( $clones[] = $clone; } - if ($this->skipDuplicates) { - $this->destination->skipDuplicates( + if ($this->onDuplicate !== OnDuplicate::Fail) { + $this->destination->withOnDuplicate( + $this->onDuplicate, fn () => $this->destination->withPreserveDates( fn () => $this->destination->createDocuments($collection, $clones, $batchSize) ) diff --git a/src/Database/OnDuplicate.php b/src/Database/OnDuplicate.php new file mode 100644 index 000000000..21984f705 --- /dev/null +++ b/src/Database/OnDuplicate.php @@ -0,0 +1,27 @@ + + */ + public static function values(): array + { + return \array_map(fn (self $case) => $case->value, self::cases()); + } +} diff --git a/tests/e2e/Adapter/MirrorTest.php b/tests/e2e/Adapter/MirrorTest.php index de73d7be8..706fa4ee3 100644 --- a/tests/e2e/Adapter/MirrorTest.php +++ b/tests/e2e/Adapter/MirrorTest.php @@ -17,6 +17,7 @@ use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; use Utopia\Database\Mirror; +use Utopia\Database\OnDuplicate; use Utopia\Database\PDO; class MirrorTest extends Base @@ -331,7 +332,7 @@ public function testCreateDocumentsSkipDuplicatesBackfillsDestination(): void ], documentSecurity: false); // Seed the SOURCE only (bypass the mirror) with the row we want to - // skipDuplicates over later. Destination intentionally does NOT have it — + // apply OnDuplicate::Skip over later. Destination intentionally does NOT have it — // this simulates an in-flight backfill where the collection is marked // 'upgraded' (schema mirrored) but not every row has reached destination. $database->getSource()->createDocument($collection, new Document([ @@ -351,7 +352,7 @@ public function testCreateDocumentsSkipDuplicatesBackfillsDestination(): void $database->getDestination()->getDocument($collection, 'dup')->isEmpty() ); - $database->skipDuplicates(fn () => $database->createDocuments($collection, [ + $database->withOnDuplicate(OnDuplicate::Skip, fn () => $database->createDocuments($collection, [ new Document([ '$id' => 'dup', 'name' => 'WouldBe', diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index c45610c28..24ca56e70 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -20,6 +20,7 @@ use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; +use Utopia\Database\OnDuplicate; use Utopia\Database\Query; trait DocumentTests @@ -7901,10 +7902,10 @@ public function testCreateDocumentsIgnoreDuplicates(): void $this->assertNotEmpty($e->getMessage()); } - // With skipDuplicates, duplicates should be silently skipped + // With OnDuplicate::Skip, duplicates should be silently skipped $emittedIds = []; $collection = __FUNCTION__; - $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + $count = $database->withOnDuplicate(OnDuplicate::Skip, function () use ($database, $collection, &$emittedIds) { return $database->createDocuments($collection, [ new Document([ '$id' => 'doc1', @@ -7963,10 +7964,10 @@ public function testCreateDocumentsIgnoreAllDuplicates(): void ]), ]); - // With skipDuplicates, inserting only duplicates should succeed with no new rows + // With OnDuplicate::Skip, inserting only duplicates should succeed with no new rows $emittedIds = []; $collection = __FUNCTION__; - $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + $count = $database->withOnDuplicate(OnDuplicate::Skip, function () use ($database, $collection, &$emittedIds) { return $database->createDocuments($collection, [ new Document([ '$id' => 'existing', @@ -8000,7 +8001,7 @@ public function testCreateDocumentsSkipDuplicatesEmptyBatch(): void $database->createCollection($collection); $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true); - $count = $database->skipDuplicates(fn () => $database->createDocuments($collection, [])); + $count = $database->withOnDuplicate(OnDuplicate::Skip, fn () => $database->createDocuments($collection, [])); $this->assertSame(0, $count); $this->assertCount(0, $database->find($collection)); @@ -8029,9 +8030,9 @@ public function testCreateDocumentsSkipDuplicatesNestedScope(): void // Nested scope — inner scope runs inside outer scope. // After inner exits, outer state should still be "skip enabled". // After outer exits, state should restore to "skip disabled". - $countOuter = $database->skipDuplicates(function () use ($database, $collection, $makeDoc) { + $countOuter = $database->withOnDuplicate(OnDuplicate::Skip, function () use ($database, $collection, $makeDoc) { // Inner scope: add dup + new - $countInner = $database->skipDuplicates(function () use ($database, $collection, $makeDoc) { + $countInner = $database->withOnDuplicate(OnDuplicate::Skip, function () use ($database, $collection, $makeDoc) { return $database->createDocuments($collection, [ $makeDoc('seed', 'Dup'), $makeDoc('innerNew', 'InnerNew'), @@ -8086,7 +8087,7 @@ public function testCreateDocumentsSkipDuplicatesLargeBatch(): void } $database->createDocuments($collection, $seed); - // Now call skipDuplicates with 300 docs: 50 existing (0-49) + 250 new (50-299). + // Now call with OnDuplicate::Skip and 300 docs: 50 existing (0-49) + 250 new (50-299). // 300 > default INSERT_BATCH_SIZE, so this exercises the chunk loop. $batch = []; for ($i = 0; $i < 300; $i++) { @@ -8101,7 +8102,7 @@ public function testCreateDocumentsSkipDuplicatesLargeBatch(): void } $emittedIds = []; - $count = $database->skipDuplicates(function () use ($database, $collection, $batch, &$emittedIds) { + $count = $database->withOnDuplicate(OnDuplicate::Skip, function () use ($database, $collection, $batch, &$emittedIds) { return $database->createDocuments($collection, $batch, onNext: function (Document $doc) use (&$emittedIds) { $emittedIds[] = $doc->getId(); }); @@ -8141,13 +8142,14 @@ public function testCreateDocumentsSkipDuplicatesSecondCallSkipsAll(): void ); // First call — all new - $firstCount = $database->skipDuplicates( + $firstCount = $database->withOnDuplicate( + OnDuplicate::Skip, fn () => $database->createDocuments($collection, $makeBatch('First')) ); $this->assertSame(3, $firstCount); $emittedIds = []; - $secondCount = $database->skipDuplicates(function () use ($database, $collection, $makeBatch, &$emittedIds) { + $secondCount = $database->withOnDuplicate(OnDuplicate::Skip, function () use ($database, $collection, $makeBatch, &$emittedIds) { return $database->createDocuments($collection, $makeBatch('Second'), onNext: function (Document $doc) use (&$emittedIds) { $emittedIds[] = $doc->getId(); }); @@ -8237,7 +8239,7 @@ public function testCreateDocumentsSkipDuplicatesRelationships(): void ]), ]; - $database->skipDuplicates(fn () => $database->createDocuments($parent, $batch)); + $database->withOnDuplicate(OnDuplicate::Skip, fn () => $database->createDocuments($parent, $batch)); $existing = $database->getDocument($parent, 'existingParent'); $this->assertFalse($existing->isEmpty()); @@ -8260,4 +8262,199 @@ public function testCreateDocumentsSkipDuplicatesRelationships(): void \sort($allChildIds); $this->assertSame(['existingChild', 'newChild', 'retryChild'], $allChildIds); } + + /** + * OnDuplicate::Upsert — existing rows are overwritten with the incoming + * values; new rows are inserted. The returned count reflects every input. + */ + public function testCreateDocsUpsertOverwrites(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + $database->createAttribute(__FUNCTION__, 'tag', Database::VAR_STRING, 128, false); + + $permissions = [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::any()), + ]; + + // Seed two docs. + $database->createDocuments(__FUNCTION__, [ + new Document(['$id' => 'a', 'name' => 'original-A', 'tag' => 'keep', '$permissions' => $permissions]), + new Document(['$id' => 'b', 'name' => 'original-B', 'tag' => 'keep', '$permissions' => $permissions]), + ]); + + // Upsert: overwrite 'a', leave 'b' untouched (not in batch), insert 'c'. + $collection = __FUNCTION__; + $count = $database->withOnDuplicate(OnDuplicate::Upsert, function () use ($database, $collection, $permissions) { + return $database->createDocuments($collection, [ + new Document(['$id' => 'a', 'name' => 'replaced-A', 'tag' => 'new', '$permissions' => $permissions]), + new Document(['$id' => 'c', 'name' => 'inserted-C', 'tag' => 'new', '$permissions' => $permissions]), + ]); + }); + $this->assertSame(2, $count); + + $docs = $database->find(__FUNCTION__, [Query::orderAsc('$id')]); + $this->assertCount(3, $docs); + $this->assertSame('replaced-A', $docs[0]->getAttribute('name')); + $this->assertSame('new', $docs[0]->getAttribute('tag')); + $this->assertSame('original-B', $docs[1]->getAttribute('name')); + $this->assertSame('keep', $docs[1]->getAttribute('tag')); + $this->assertSame('inserted-C', $docs[2]->getAttribute('name')); + } + + /** + * OnDuplicate::Upsert — a batch composed entirely of duplicates overwrites + * every existing row; zero rows are skipped. + */ + public function testCreateDocsUpsertAll(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + $permissions = [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::any()), + ]; + + $database->createDocuments(__FUNCTION__, [ + new Document(['$id' => 'x', 'name' => 'v1', '$permissions' => $permissions]), + new Document(['$id' => 'y', 'name' => 'v1', '$permissions' => $permissions]), + ]); + + $collection = __FUNCTION__; + $count = $database->withOnDuplicate(OnDuplicate::Upsert, function () use ($database, $collection, $permissions) { + return $database->createDocuments($collection, [ + new Document(['$id' => 'x', 'name' => 'v2', '$permissions' => $permissions]), + new Document(['$id' => 'y', 'name' => 'v2', '$permissions' => $permissions]), + ]); + }); + $this->assertSame(2, $count); + + $docs = $database->find(__FUNCTION__, [Query::orderAsc('$id')]); + $this->assertCount(2, $docs); + $this->assertSame('v2', $docs[0]->getAttribute('name')); + $this->assertSame('v2', $docs[1]->getAttribute('name')); + } + + /** + * OnDuplicate::Skip and Upsert tolerate an existing collection — they + * return the current metadata document instead of throwing. Collections + * are never destructive at the library layer; callers that need to + * reconcile schema drop/recreate the individual attributes / indexes. + */ + public function testCreateCollSkipUpsertTolerates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + $database->createDocument(__FUNCTION__, new Document([ + '$id' => 'doc', + 'name' => 'keep', + '$permissions' => [Permission::read(Role::any())], + ])); + + $collection = __FUNCTION__; + $database->withOnDuplicate(OnDuplicate::Skip, fn () => $database->createCollection($collection)); + $this->assertSame('keep', $database->getDocument(__FUNCTION__, 'doc')->getAttribute('name')); + + $database->withOnDuplicate(OnDuplicate::Upsert, fn () => $database->createCollection($collection)); + $this->assertSame('keep', $database->getDocument(__FUNCTION__, 'doc')->getAttribute('name')); + } + + /** + * OnDuplicate::Skip and Upsert tolerate an existing attribute. Caller is + * responsible for dropping first if the spec needs to change (migration + * consults source vs destination _metadata._updatedAt to decide). + */ + public function testCreateAttrSkipUpsertTolerates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + $database->createDocument(__FUNCTION__, new Document([ + '$id' => 'doc', + 'name' => 'keep', + '$permissions' => [Permission::read(Role::any())], + ])); + + $collection = __FUNCTION__; + + // Skip: same-spec re-declare is a no-op. + $this->assertTrue($database->withOnDuplicate( + OnDuplicate::Skip, + fn () => $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true) + )); + + // Skip: even a wider-spec re-declare is tolerated (not applied). + $this->assertTrue($database->withOnDuplicate( + OnDuplicate::Skip, + fn () => $database->createAttribute($collection, 'name', Database::VAR_STRING, 512, true) + )); + + // Upsert: same — tolerate existing. Migration handles drop+recreate itself. + $this->assertTrue($database->withOnDuplicate( + OnDuplicate::Upsert, + fn () => $database->createAttribute($collection, 'name', Database::VAR_STRING, 512, true) + )); + + // Metadata still reflects the ORIGINAL spec — library didn't touch it. + $nameAttr = null; + foreach ($database->getCollection(__FUNCTION__)->getAttribute('attributes', []) as $attr) { + if ($attr->getAttribute('key') === 'name') { + $nameAttr = $attr; + break; + } + } + $this->assertNotNull($nameAttr); + $this->assertSame(128, (int) $nameAttr->getAttribute('size')); + $this->assertSame('keep', $database->getDocument(__FUNCTION__, 'doc')->getAttribute('name')); + } + + /** + * OnDuplicate::Skip and Upsert tolerate an existing index. End state is + * always the first-declared spec; callers that need a different spec + * deleteIndex() first. + */ + public function testCreateIdxSkipUpsertTolerates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'a', Database::VAR_STRING, 64, true); + $database->createAttribute(__FUNCTION__, 'b', Database::VAR_STRING, 64, true); + $database->createIndex(__FUNCTION__, 'idx', Database::INDEX_KEY, ['a']); + + $collection = __FUNCTION__; + + $this->assertTrue($database->withOnDuplicate( + OnDuplicate::Skip, + fn () => $database->createIndex($collection, 'idx', Database::INDEX_KEY, ['b']) + )); + + $this->assertTrue($database->withOnDuplicate( + OnDuplicate::Upsert, + fn () => $database->createIndex($collection, 'idx', Database::INDEX_KEY, ['b']) + )); + + // Metadata still reflects the original column — library tolerates, + // caller must drop first to change spec. + $indexes = $database->getCollection(__FUNCTION__)->getAttribute('indexes', []); + $this->assertCount(1, $indexes); + $this->assertSame(['a'], $indexes[0]->getAttribute('attributes')); + } }