Collections: one batch processing per task #786
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughRefactors collection creation into a two-phase workflow: a setup job uploads documents and computes batches; batch jobs process each batch until final collection creation. Provider APIs split upload from create, DB adds batch-tracking fields, Celery tasks/starters updated, routes simplified, and tests expanded to cover these flows. ChangesBatch-Driven Collection Creation
Sequence Diagram(s)sequenceDiagram
participant Client
participant API
participant DB
participant Scheduler
participant SetupWorker
participant Provider
participant BatchWorker
Client->>API: POST /collections (CreationRequest)
API->>DB: create CollectionJob (save trace_id)
API->>Scheduler: start_collection_setup_job(request, job_id, trace_id)
Scheduler->>SetupWorker: run setup task
SetupWorker->>Provider: upload_files(storage, docs)
Provider->>DB: persist openai_file_id and file_size_kb
SetupWorker->>DB: update job to PROCESSING with batches
SetupWorker->>Scheduler: start_collection_batch_job(first_batch_args)
Scheduler->>BatchWorker: run batch task
BatchWorker->>Provider: create(docs, vector_store_id)
Provider->>DB: return vector_store_id / collection metadata
BatchWorker->>DB: checkpoint progress / enqueue next batch or finalize
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
backend/app/services/collections/helpers.py (1)
84-99:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReplace the implicit
TypeErrorwith an explicit validation error.Removing the
or 0fallback means a document withfile_size_kb=Nonenow crashes inside the batching loop with an opaqueunsupported operand type(s) for +: 'int' and 'NoneType'mid-iteration. Callers cannot tell which document is invalid and any batches accumulated up to that point are discarded. A pre-loop validation (or explicit per-doc check) yields a clear message and a deterministic failure point.🛡️ Proposed fix
for doc in documents: - doc_size_kb = doc.file_size_kb + if doc.file_size_kb is None: + raise ValueError( + f"[batch_documents] Document {doc.id} has no file_size_kb; " + "sizes must be backfilled before batching." + ) + doc_size_kb = doc.file_size_kb🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/helpers.py` around lines 84 - 99, The batching loop in batch_documents (the for doc in documents loop using current_batch and current_batch_size_kb) can raise an opaque TypeError when doc.file_size_kb is None; add explicit validation for each doc before using it (either a pre-loop scan or a per-doc check) that verifies file_size_kb is not None and is a numeric type, and if invalid raise a clear ValueError that includes an identifier (e.g., doc.id or doc.name) so callers know which document failed; perform this validation before updating current_batch_size_kb so existing batches are preserved and add a short logger.warning or logger.error with the same diagnostic information when raising.backend/app/crud/rag/open_ai.py (1)
119-151:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winRemove the unused
updatemethod fromOpenAIVectorStoreCrud.This method is not called anywhere in the codebase and has been replaced by
update_batch. Additionally, it's missing a return type hint, which violates the coding guideline requiring type hints on all function return values. Removing it eliminates redundant code and the maintenance burden of two divergent upload flows.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/crud/rag/open_ai.py` around lines 119 - 151, Delete the unused OpenAIVectorStoreCrud.update method (the entire function) since upload logic is now handled by update_batch; after removal, run a quick search for any remaining references to OpenAIVectorStoreCrud.update and remove them, and clean up any now-unused imports or symbols used only by that method (e.g., BytesIO, Document, CloudStorage) to avoid lints and type-hint violations.backend/app/services/collections/create_collection.py (2)
174-303: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd type hints for
task_instance(and tighten helper hints).Per the coding guidelines, all function parameters and return values must have type hints. The following are missing/loose:
execute_setup_job(... task_instance, ...) -> None—task_instancelacks a typeexecute_batch_job(... task_instance, ...) -> None— same_persist_succeeded_docs(succeeded: list, ...)— should belist[Document]_retry_failed_uploads(vector_store_crud, ..., failed_docs: list, ...)—vector_store_crudlacks a type,failed_docsshould belist[Document]
task_instancecan be typed ascelery.Task(or kept asAnyfromtypingif you want to avoid the dependency leak).As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/create_collection.py` around lines 174 - 303, The functions are missing/loose type hints: add an explicit type for task_instance in both execute_setup_job and execute_batch_job (use celery.Task or typing.Any if you want to avoid importing Celery), and tighten helper signatures so _persist_succeeded_docs uses succeeded: list[Document] and _retry_failed_uploads uses failed_docs: list[Document] and type-hint vector_store_crud to the actual CRUD class (e.g., VectorStoreCrud) or typing.Any if that class isn't accessible; also import any needed names (Document, Any, celery.Task) and update return annotations if necessary.
39-66:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReturn type mismatch: declared
-> strbut returns aUUID.
collection_job_idis aUUID(per the parameter annotation on line 43); returning it directly violates the declared-> str. Cast or change the annotation.🐛 Proposed fix
- return collection_job_id + return str(collection_job_id)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/create_collection.py` around lines 39 - 66, The function start_job currently declares a return type of -> str but returns collection_job_id which is a UUID; fix by either changing the function signature to return -> UUID or converting the returned value to a string with return str(collection_job_id). Update any imports/annotations if you choose UUID (e.g., ensure UUID is imported) and keep the rest of the logic (calls to CollectionJobCrud.update and start_create_collection_job) unchanged.backend/app/services/collections/providers/openai.py (1)
23-28:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUpdate test calls to match new
createsignature.The test suite in
backend/app/tests/services/collections/providers/test_openai_provider.pyhas three test functions that callprovider.create()with the old three-argument signature:
test_create_openai_vector_store_only()(line 40)test_create_openai_with_assistant()(line 79)test_create_propagates_exception()(line 143)All three pass
storageas the second argument and a documents list as the third, but the updated signature iscreate(collection_request, docs, vector_store_id=None, is_final=False). The tests need to pass the documents list as the second argument, notstorage:
- Change from:
provider.create(collection_request, storage, documents)- Change to:
provider.create(collection_request, documents)(with vector_store_id as named argument if needed)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/providers/openai.py` around lines 23 - 28, Update the three failing tests so they call the new create signature: replace calls to provider.create(collection_request, storage, documents) with provider.create(collection_request, documents) and, if a vector_store_id or is_final was intended, pass those as named args (e.g. provider.create(collection_request, documents, vector_store_id=..., is_final=...)); modify the three test functions in backend/app/tests/services/collections/providers/test_openai_provider.py (test_create_openai_vector_store_only, test_create_openai_with_assistant, test_create_propagates_exception) to pass the documents list as the second parameter and remove the positional storage argument.
🧹 Nitpick comments (4)
backend/app/models/document.py (1)
49-53: ⚡ Quick winAlign column comment between model and migration.
Migration
055sets the column comment to"File ID assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading", but the model declares it as"File ID assigned by OpenAI (avoid re-uploading)". Futurealembic revision --autogenerateruns may flag this drift as an unintended schema change. Pick one wording and keep both in sync.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/models/document.py` around lines 49 - 53, The model field openai_file_id's sa_column_kwargs comment string mismatches the migration; update the Field definition for openai_file_id in the Document model to use the exact comment used in migration 055 ("File ID assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading") so the sa_column_kwargs comment and the migration stay in sync and prevent autogenerate diffs.backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py (1)
47-55: 💤 Low valueMigration name only mentions
collection_jobs, but it also altersdocument.The filename and revision message refer to
collection_jobsonly, while the upgrade also addsdocument.openai_file_id. Consider splitting into two migrations or renaming/updating the message so the change scope is discoverable from the migration filename and history.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py` around lines 47 - 55, The migration '055_add_batch_tracking_to_collections_jobs.py' declares changes for collection_jobs but also adds a column to document (op.add_column adding document.openai_file_id); either split the document change into a separate migration or rename/update this migration's filename and revision message to reflect both changes (and update the upgrade/revision docstring) so the history accurately describes the addition of document.openai_file_id alongside the collection_jobs alterations.backend/app/services/collections/providers/openai.py (1)
47-52: ⚡ Quick winOpen one DB session for the whole batch, not one per document.
The current code opens a fresh
Session(engine)and constructs aDocumentCrudfor every successful upload. For a collection with hundreds/thousands of docs this multiplies connection overhead unnecessarily. A single session outside the loop with per-doc commits (or a single commit at the end if you don't need partial-progress durability) is cleaner.♻️ Proposed refactor
- def upload_files( + def upload_files( self, storage: CloudStorage, docs: list[Document], project_id: int, ) -> None: - for doc in docs: - if self.get_existing_file_id(doc): - continue - try: - content = storage.get(doc.object_store_url) - if doc.file_size_kb is None: - doc.file_size_kb = round(len(content) / 1024, 2) - f_obj = BytesIO(content) - f_obj.name = doc.fname - uploaded = self.client.files.create(file=f_obj, purpose="assistants") - doc.openai_file_id = uploaded.id - with Session(engine) as session: - document_crud = DocumentCrud(session, project_id) - db_doc = document_crud.read_one(doc.id) - db_doc.openai_file_id = uploaded.id - db_doc.file_size_kb = doc.file_size_kb - document_crud.update(db_doc) - except Exception as err: - ... + with Session(engine) as session: + document_crud = DocumentCrud(session, project_id) + for doc in docs: + if self.get_existing_file_id(doc): + continue + content = storage.get(doc.object_store_url) + if doc.file_size_kb is None: + doc.file_size_kb = round(len(content) / 1024, 2) + f_obj = BytesIO(content) + f_obj.name = doc.fname + uploaded = self.client.files.create(file=f_obj, purpose="assistants") + doc.openai_file_id = uploaded.id + db_doc = document_crud.read_one(doc.id) + db_doc.openai_file_id = uploaded.id + db_doc.file_size_kb = doc.file_size_kb + document_crud.update(db_doc)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/providers/openai.py` around lines 47 - 52, The code currently creates a new Session(engine) and DocumentCrud for every uploaded document; instead open a single Session(engine) outside the upload loop and reuse it (and a DocumentCrud instance per project_id) for each doc, calling document_crud.read_one(doc.id), updating db_doc.openai_file_id and db_doc.file_size_kb, and then document_crud.update(db_doc) inside the loop; perform either a session.commit() per document for partial durability or one commit after the loop, and ensure the session is closed once after processing the entire batch.backend/app/services/collections/create_collection.py (1)
475-491: ⚡ Quick winChange
except BaseExceptiontoexcept Exception.
BaseExceptioncatchesKeyboardInterrupt,SystemExit, andGeneratorExit, which should normally be allowed to propagate. Additionally, gevent'sTimeoutdeliberately inherits fromBaseException(notException), so this generic handler will swallow timeouts that escape the dedicatedexcept Timeouthandler above and incorrectly mark the job as failed. Useexcept Exceptioninstead.♻️ Proposed change
- except BaseException as err: + except Exception as err: logger.error( "[create_collection.execute_batch_job] Batch %d failed | job_id=%s, error=%s", ...🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/create_collection.py` around lines 475 - 491, The catch-all in create_collection.execute_batch_job currently uses "except BaseException as err" which improperly catches KeyboardInterrupt/SystemExit and gevent Timeouts; change that handler to "except Exception as err" so only regular exceptions are caught (leaving the earlier "except Timeout" and system-exiting signals to propagate), and keep the existing logging, _mark_job_failed, and callback logic unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 74-105: The gevent_timeout decorator currently raises TimeoutError
unconditionally in its finally block causing tasks like
run_create_collection_job and run_collection_batch_job to always fail; modify
gevent_timeout (the decorator implementation) so that the Timeout exception is
raised only inside the except Timeout: handler and the finally: block only calls
timeout.cancel() (no raise), ensuring timeout.cancel() is reachable and
successful task completions do not raise TimeoutError.
In `@backend/app/celery/utils.py`:
- Around line 185-208: gevent_timeout currently always raises TimeoutError and
never cancels the gevent Timeout; fix wrapper in gevent_timeout by tracking
whether the gevent Timeout fired (e.g., timed_out flag and optionally store
result/exception), don't unconditionally raise in finally, always call
timeout.cancel() in the finally block, and only raise TimeoutError (or re-raise
the stored Timeout) after timeout.cancel() if timed_out is true; reference
wrapper, Timeout, timeout.cancel(), task_name and func.__name__ to locate where
to apply the change.
In `@backend/app/crud/rag/open_ai.py`:
- Around line 158-163: The docstring for the batch upload method incorrectly
refers to provider_file_id; update it to reference the actual Document attribute
used in the code (doc.openai_file_id) so the docstring matches the
implementation (see the method that calls upload_and_poll / the loop that reads
doc.openai_file_id). Ensure the sentence now states that all docs must have
openai_file_id set before calling this method and return description remains
unchanged.
- Around line 182-190: In OpenAIVectorStoreCrud.update_batch, when
batch.file_counts.failed > 0, don't mark all docs for retry; call the OpenAI
helper client.beta.vector_stores.file_batches.list_files(batch_id=batch.id,
vector_store_id=vector_store_id, filter="failed") to get only failed file
entries, map those failed file identifiers back to the input docs list (using
the same file id/key used when building docs), and extend the failed list with
only those docs so upload_and_poll() is retried only for genuinely failed files
instead of the entire batch.
In `@backend/app/services/collections/create_collection.py`:
- Around line 122-172: The two helper functions _persist_succeeded_docs and
_retry_failed_uploads (and the stale docstring reference to
_upload_batch_with_retry) are dead code and OpenAIVectorStoreCrud is unused;
either wire them into the batch path (execute_setup_job / execute_batch_job) or
remove them. Fix by removing the unused helpers _persist_succeeded_docs and
_retry_failed_uploads and the OpenAIVectorStoreCrud import, and update the
execute_batch_job docstring to not reference _upload_batch_with_retry;
alternatively, if you intend to keep retry logic, add calls from
execute_batch_job/execute_setup_job to _retry_failed_uploads (and ensure
vector_store_crud is passed) and implement or rename _upload_batch_with_retry
accordingly so the docstring matches the implemented function.
- Around line 304-311: Update the Phase 2 docstring to remove the reference to
the non-existent _upload_batch_with_retry and instead describe the actual
behavior: that the code calls provider.create(...) which delegates to
vector_store_crud.update_batch, and that inline retries are handled by
_retry_failed_uploads (if used) or by the underlying vector_store_crud; ensure
the docstring accurately states that failed items are retried via
_retry_failed_uploads or the vector_store_crud retry semantics, and that the
function still checkpoints progress, queues next batch, and finalizes the
collection on the last batch.
- Around line 215-220: The log call in create_collection.execute_setup_job uses
four format specifiers but only passes job_id and len(flat_docs), causing a
runtime TypeError; update the logger.info call to either (A) reduce the format
string to match the two provided args (e.g., remove failed and duration_s
placeholders) or (B) compute and supply the missing values by timing the
upload_files call and getting a failed count (modify upload_files to return a
result struct with failed_count and have execute_setup_job measure duration_s
and pass job_id, len(flat_docs), failed_count, duration_s into logger.info).
Ensure the change references logger.info and the upload_files/flat_docs
variables so the log formatting and values are consistent.
- Around line 243-253: The first batch enqueue call to
start_collection_batch_job is missing the required vector_store_id expected by
execute_batch_job, causing a TypeError; fix it by passing vector_store_id=None
in the start_collection_batch_job invocation (where project_id/job_id/trace_id
are passed) so execute_batch_job receives the argument, or alternatively add a
default vector_store_id: Optional[...] = None to execute_batch_job's signature;
reference start_collection_batch_job and execute_batch_job when making the
change.
In `@backend/app/services/collections/providers/openai.py`:
- Around line 30-59: The upload_files loop in OpenAIProvider.upload_files
currently logs per-document exceptions and continues, leaving docs with None
file_size_kb/openai_file_id and causing downstream TypeError or silent failures;
modify upload_files to either (A) fail-fast by re-raising the caught exception
after logging so callers (e.g., create_collection.execute_setup_job) can stop
and surface the real error, or (B) accumulate per-doc failures into a structured
result (e.g., list of successes and failures) and return that to callers so they
can decide (and avoid passing docs without openai_file_id to
vector_store_crud.update_batch); update the function signature and callers
accordingly (refer to upload_files, create_collection.execute_setup_job, and
vector_store_crud.update_batch) so callers handle the returned error info or the
propagated exception.
---
Outside diff comments:
In `@backend/app/crud/rag/open_ai.py`:
- Around line 119-151: Delete the unused OpenAIVectorStoreCrud.update method
(the entire function) since upload logic is now handled by update_batch; after
removal, run a quick search for any remaining references to
OpenAIVectorStoreCrud.update and remove them, and clean up any now-unused
imports or symbols used only by that method (e.g., BytesIO, Document,
CloudStorage) to avoid lints and type-hint violations.
In `@backend/app/services/collections/create_collection.py`:
- Around line 174-303: The functions are missing/loose type hints: add an
explicit type for task_instance in both execute_setup_job and execute_batch_job
(use celery.Task or typing.Any if you want to avoid importing Celery), and
tighten helper signatures so _persist_succeeded_docs uses succeeded:
list[Document] and _retry_failed_uploads uses failed_docs: list[Document] and
type-hint vector_store_crud to the actual CRUD class (e.g., VectorStoreCrud) or
typing.Any if that class isn't accessible; also import any needed names
(Document, Any, celery.Task) and update return annotations if necessary.
- Around line 39-66: The function start_job currently declares a return type of
-> str but returns collection_job_id which is a UUID; fix by either changing the
function signature to return -> UUID or converting the returned value to a
string with return str(collection_job_id). Update any imports/annotations if you
choose UUID (e.g., ensure UUID is imported) and keep the rest of the logic
(calls to CollectionJobCrud.update and start_create_collection_job) unchanged.
In `@backend/app/services/collections/helpers.py`:
- Around line 84-99: The batching loop in batch_documents (the for doc in
documents loop using current_batch and current_batch_size_kb) can raise an
opaque TypeError when doc.file_size_kb is None; add explicit validation for each
doc before using it (either a pre-loop scan or a per-doc check) that verifies
file_size_kb is not None and is a numeric type, and if invalid raise a clear
ValueError that includes an identifier (e.g., doc.id or doc.name) so callers
know which document failed; perform this validation before updating
current_batch_size_kb so existing batches are preserved and add a short
logger.warning or logger.error with the same diagnostic information when
raising.
In `@backend/app/services/collections/providers/openai.py`:
- Around line 23-28: Update the three failing tests so they call the new create
signature: replace calls to provider.create(collection_request, storage,
documents) with provider.create(collection_request, documents) and, if a
vector_store_id or is_final was intended, pass those as named args (e.g.
provider.create(collection_request, documents, vector_store_id=...,
is_final=...)); modify the three test functions in
backend/app/tests/services/collections/providers/test_openai_provider.py
(test_create_openai_vector_store_only, test_create_openai_with_assistant,
test_create_propagates_exception) to pass the documents list as the second
parameter and remove the positional storage argument.
---
Nitpick comments:
In `@backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.py`:
- Around line 47-55: The migration
'055_add_batch_tracking_to_collections_jobs.py' declares changes for
collection_jobs but also adds a column to document (op.add_column adding
document.openai_file_id); either split the document change into a separate
migration or rename/update this migration's filename and revision message to
reflect both changes (and update the upgrade/revision docstring) so the history
accurately describes the addition of document.openai_file_id alongside the
collection_jobs alterations.
In `@backend/app/models/document.py`:
- Around line 49-53: The model field openai_file_id's sa_column_kwargs comment
string mismatches the migration; update the Field definition for openai_file_id
in the Document model to use the exact comment used in migration 055 ("File ID
assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading") so
the sa_column_kwargs comment and the migration stay in sync and prevent
autogenerate diffs.
In `@backend/app/services/collections/create_collection.py`:
- Around line 475-491: The catch-all in create_collection.execute_batch_job
currently uses "except BaseException as err" which improperly catches
KeyboardInterrupt/SystemExit and gevent Timeouts; change that handler to "except
Exception as err" so only regular exceptions are caught (leaving the earlier
"except Timeout" and system-exiting signals to propagate), and keep the existing
logging, _mark_job_failed, and callback logic unchanged.
In `@backend/app/services/collections/providers/openai.py`:
- Around line 47-52: The code currently creates a new Session(engine) and
DocumentCrud for every uploaded document; instead open a single Session(engine)
outside the upload loop and reuse it (and a DocumentCrud instance per
project_id) for each doc, calling document_crud.read_one(doc.id), updating
db_doc.openai_file_id and db_doc.file_size_kb, and then
document_crud.update(db_doc) inside the loop; perform either a session.commit()
per document for partial durability or one commit after the loop, and ensure the
session is closed once after processing the entire batch.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 14146497-9eeb-46d5-94d9-fe7751afed6a
📒 Files selected for processing (12)
backend/app/alembic/versions/055_add_batch_tracking_to_collections_jobs.pybackend/app/api/docs/documents/upload.mdbackend/app/celery/tasks/job_execution.pybackend/app/celery/utils.pybackend/app/crud/rag/open_ai.pybackend/app/models/collection_job.pybackend/app/models/document.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/helpers.pybackend/app/services/collections/providers/base.pybackend/app/services/collections/providers/openai.pybackend/app/tests/services/collections/test_helpers.py
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/collections/create_collection.py (1)
42-69:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAlign
start_job's return value with its signature.The function is annotated as returning
str, but it currently returns aUUID. That mismatch leaks into callers at the service boundary and is easy to fix by either returningstr(collection_job_id)or changing the annotation toUUID.Suggested change
- return collection_job_id + return str(collection_job_id)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/create_collection.py` around lines 42 - 69, The function start_job currently has a return type annotation of str but returns a UUID (collection_job_id); update the function so the return type matches the actual value by either converting collection_job_id to a string before returning (return str(collection_job_id)) or changing the return annotation to UUID; adjust the signature and any callers if needed to keep types consistent (refer to start_job and collection_job_id to locate the change).
🧹 Nitpick comments (2)
backend/app/alembic/versions/058_add_batch_tracking_to_collections_jobs.py (1)
19-19: ⚡ Quick winAdd return annotations to the migration hooks.
upgradeanddowngradeare new functions, but both are missing return types. Please add-> Noneso the migration stays consistent with the repo-wide typing rule.Suggested change
-def upgrade(): +def upgrade() -> None: op.add_column( "collection_jobs", sa.Column( @@ -def downgrade(): +def downgrade() -> None: op.drop_column("collection_jobs", "total_batches") op.drop_column("collection_jobs", "current_batch_number") op.drop_column("collection_jobs", "documents_uploaded") op.drop_column("document", "openai_file_id")As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.Also applies to: 58-58
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/alembic/versions/058_add_batch_tracking_to_collections_jobs.py` at line 19, The migration hooks upgrade and downgrade are missing return type annotations; update the function signatures for both upgrade() and downgrade() to include -> None (e.g., def upgrade() -> None:) so they conform to the repo-wide typing rule and maintain consistency across migrations.backend/app/celery/tasks/job_execution.py (1)
133-150: ⚡ Quick winAnnotate the new Celery task entrypoints.
run_collection_setup_jobandrun_collection_batch_jobare newly added without return annotations. These wrappers currently returnNone, so please add-> Noneon both.Suggested change
def run_collection_setup_job( self, project_id: int, job_id: str, trace_id: str, **kwargs -): +) -> None: @@ def run_collection_batch_job( self, project_id: int, job_id: str, trace_id: str, **kwargs -): +) -> None:As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.Also applies to: 153-170
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/celery/tasks/job_execution.py` around lines 133 - 150, The new Celery task wrappers run_collection_setup_job and run_collection_batch_job currently lack return type annotations and by design return None; update both function signatures to include an explicit return type of -> None (e.g., def run_collection_setup_job(...) -> None:) so they conform to the project's typing guideline; locate the two task definitions (run_collection_setup_job and run_collection_batch_job) and add the -> None annotation to each signature without changing the function bodies.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/celery/utils.py`:
- Around line 123-125: The log message in start_collection_batch_job is using
the wrong prefix ("[start_collection_setup_job]"); update the logger.info call
inside the start_collection_batch_job function to use the correct prefix
"[start_collection_batch_job]" and follow the project's logging guideline format
(e.g., logger.info(f"[start_collection_batch_job] Started job
{mask_string(job_id)} with Celery task {mask_string(task_id)}")) so batch
enqueues are distinguishable in worker logs.
In `@backend/app/crud/rag/open_ai.py`:
- Around line 119-140: The update currently swallows OpenAIError and logs batch
failures instead of stopping the flow; change OpenAIVectorStoreCrud.update so
that any exception from self.client.vector_stores.file_batches.upload_and_poll
(and any case where batch.file_counts.failed > 0 after upload_and_poll) is
propagated as an exception instead of just logging: remove/replace the
logger.error handling that swallows OpenAIError (re-raise or raise a new
descriptive exception including the original err) and add a check after
upload_and_poll that raises a descriptive error when batch.file_counts.failed >
0 (including vector_store_id and failed count) so callers of
OpenAIVectorStoreCrud.update (e.g., OpenAIProvider.create and execute_batch_job)
will abort on upload failures.
In `@backend/app/services/collections/providers/openai.py`:
- Around line 45-60: After files.create succeeds but DocumentCrud.update fails,
we must delete the orphaned provider file to avoid quota/storage leaks: in the
upload_files flow where uploaded = self.client.files.create(...) and
db_doc.openai_file_id/db_doc.file_size_kb are set before calling
document_crud.update(db_doc), catch exceptions from DocumentCrud.update and call
self.client.files.delete(uploaded.id) (wrap delete in its own try/except and log
any deletion failure using logger.error) before re-raising the original error so
the provider file is rolled back; reference the uploaded variable,
DocumentCrud.update, and self.client.files.delete when adding this cleanup.
In `@backend/app/tests/services/collections/test_create_collection.py`:
- Around line 37-56: Add explicit type hints to the test helpers: annotate
_mock_provider_with_size(llm_service_id: str, llm_service_name: str) -> Mock
(import Mock from unittest.mock), annotate the nested helper def
_set_file_size(storage: Any, docs: Iterable[Any], project_id: Any) -> None
(import Any, Iterable from typing), and annotate _patch_session(db: Session) ->
Any (or unittest.mock._patch) to reflect the patcher return; also add the
necessary imports (typing and unittest.mock) and keep references to
get_mock_provider and the patched create_collection.Session to locate the
functions.
- Line 7: The tests import SoftTimeLimitExceeded but never exercise it; add
mirror test cases that raise celery.exceptions.SoftTimeLimitExceeded where the
current tests raise gevent.Timeout so the code paths handled in production (the
except tuple (Timeout, SoftTimeLimitExceeded)) are covered. Locate the existing
tests in test_create_collection.py that currently use gevent.Timeout (the same
test functions/assertions) and add equivalent subtests or parametrize them to
also raise SoftTimeLimitExceeded, asserting the same behavior and cleanup as for
gevent.Timeout.
---
Outside diff comments:
In `@backend/app/services/collections/create_collection.py`:
- Around line 42-69: The function start_job currently has a return type
annotation of str but returns a UUID (collection_job_id); update the function so
the return type matches the actual value by either converting collection_job_id
to a string before returning (return str(collection_job_id)) or changing the
return annotation to UUID; adjust the signature and any callers if needed to
keep types consistent (refer to start_job and collection_job_id to locate the
change).
---
Nitpick comments:
In `@backend/app/alembic/versions/058_add_batch_tracking_to_collections_jobs.py`:
- Line 19: The migration hooks upgrade and downgrade are missing return type
annotations; update the function signatures for both upgrade() and downgrade()
to include -> None (e.g., def upgrade() -> None:) so they conform to the
repo-wide typing rule and maintain consistency across migrations.
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 133-150: The new Celery task wrappers run_collection_setup_job and
run_collection_batch_job currently lack return type annotations and by design
return None; update both function signatures to include an explicit return type
of -> None (e.g., def run_collection_setup_job(...) -> None:) so they conform to
the project's typing guideline; locate the two task definitions
(run_collection_setup_job and run_collection_batch_job) and add the -> None
annotation to each signature without changing the function bodies.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 2f5f5939-4016-41b6-bcbb-d8df33c6244f
📒 Files selected for processing (9)
backend/app/alembic/versions/058_add_batch_tracking_to_collections_jobs.pybackend/app/celery/tasks/job_execution.pybackend/app/celery/utils.pybackend/app/crud/rag/open_ai.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/providers/openai.pybackend/app/tests/services/collections/providers/test_openai_provider.pybackend/app/tests/services/collections/test_create_collection.pybackend/app/tests/services/collections/test_helpers.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/app/tests/services/collections/providers/test_openai_provider.py (1)
132-146: ⚡ Quick winAnnotate the new test helpers.
_make_doc()and_patch_session_and_crud()were added without full type annotations, so this new test code is out of sync with the repo’s Python typing rule.Suggested patch
+from typing import Any +from types import SimpleNamespace + - -def _make_doc(*, openai_file_id=None, file_size_kb=None): +def _make_doc( + *, openai_file_id: str | None = None, file_size_kb: float | None = None +) -> SimpleNamespace: return SimpleNamespace( id=uuid4(), fname="test.md", object_store_url="s3://bucket/test.md", openai_file_id=openai_file_id, file_size_kb=file_size_kb, ) -def _patch_session_and_crud(): +def _patch_session_and_crud() -> tuple[Any, Any]: """Patches Session and DocumentCrud used inside upload_files.""" session_patcher = patch("app.services.collections.providers.openai.Session") crud_patcher = patch("app.services.collections.providers.openai.DocumentCrud") return session_patcher, crud_patcherAs per coding guidelines, "Always add type hints to all function parameters and return values in Python code" and "Use Python 3.11+ with type hints throughout the codebase".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/collections/providers/test_openai_provider.py` around lines 132 - 146, The two test helper functions lack type annotations; add Python 3.11+ type hints: annotate _make_doc(openai_file_id: Optional[str] = None, file_size_kb: Optional[int] = None) -> SimpleNamespace (import Optional from typing and SimpleNamespace from types) and annotate _patch_session_and_crud() -> tuple[unittest.mock._patch, unittest.mock._patch] (or -> tuple[Any, Any] with Any imported from typing if you prefer public types), and update imports accordingly so the new helpers (_make_doc and _patch_session_and_crud) comply with the repo's typing rules.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/crud/rag/open_ai.py`:
- Around line 116-123: The code calls
self.client.vector_stores.file_batches.upload_and_poll with file_ids built from
[doc.openai_file_id for doc in docs] without validating openai_file_id; change
the logic in the method that constructs batch/upload (the block using docs,
openai_file_id, vector_store_id, and upload_and_poll) to filter out any docs
where doc.openai_file_id is falsy before building file_ids, and if any docs were
dropped either log a warning (including identifiers like doc.id or index) or
raise a clear error; after filtering, if the resulting file_ids list is empty,
return early instead of calling upload_and_poll.
---
Nitpick comments:
In `@backend/app/tests/services/collections/providers/test_openai_provider.py`:
- Around line 132-146: The two test helper functions lack type annotations; add
Python 3.11+ type hints: annotate _make_doc(openai_file_id: Optional[str] =
None, file_size_kb: Optional[int] = None) -> SimpleNamespace (import Optional
from typing and SimpleNamespace from types) and annotate
_patch_session_and_crud() -> tuple[unittest.mock._patch, unittest.mock._patch]
(or -> tuple[Any, Any] with Any imported from typing if you prefer public
types), and update imports accordingly so the new helpers (_make_doc and
_patch_session_and_crud) comply with the repo's typing rules.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9446da95-b118-41f1-a29c-08d27ece525b
📒 Files selected for processing (5)
backend/app/celery/utils.pybackend/app/crud/rag/open_ai.pybackend/app/services/collections/providers/openai.pybackend/app/tests/services/collections/providers/test_openai_provider.pybackend/app/tests/services/collections/test_create_collection.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/celery/utils.py
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
backend/app/api/routes/documents.py (1)
76-99: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd return type hints to route functions.
All route functions in this file are missing return type annotations, which violates the coding guideline. As per coding guidelines, Python functions should always have type hints for all parameters and return values.
Proposed fix to add return type hints
def list_docs( session: SessionDep, current_user: AuthContextDep, skip: int = Query(0, ge=0), limit: int = Query(100, gt=0, le=100), include_url: bool = Query(...), -): +) -> APIResponse[list[Union[DocumentPublic, TransformedDocumentPublic]]]: async def upload_doc( session: SessionDep, current_user: AuthContextDep, src: UploadFile = File(...), target_format: str | None = Form(...), transformer: str | None = Form(...), callback_url: str | None = Form(None, ...), -): +) -> APIResponse[DocumentUploadResponse]: def remove_doc( session: SessionDep, current_user: AuthContextDep, doc_id: UUID = FastPath(description="Document to delete"), -): +) -> APIResponse[Message]: def permanent_delete_doc( session: SessionDep, current_user: AuthContextDep, doc_id: UUID = FastPath(description="Document to permanently delete"), -): +) -> APIResponse[Message]: def doc_info( session: SessionDep, current_user: AuthContextDep, doc_id: UUID = FastPath(description="Document to retrieve"), include_url: bool = Query(...), -): +) -> APIResponse[Union[DocumentPublic, TransformedDocumentPublic]]:Also applies to: 109-182, 191-210, 219-241, 250-273
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/api/routes/documents.py` around lines 76 - 99, The route function list_docs (and other route handlers in this file) lack return type annotations; update list_docs to include an explicit return type (e.g., -> APIResponse or the appropriate response type/Union/ResponseModel) and add similar return type hints to the other handlers listed (lines covering functions around 109-182, 191-210, 219-241, 250-273). Locate each route function (e.g., list_docs) that constructs and returns APIResponse.success_response (and functions that call DocumentCrud, build_document_schemas, get_cloud_storage) and append the correct return type annotation to the function signature to satisfy the project typing guidelines.backend/app/api/routes/collections.py (1)
68-78: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd return type hints to route functions.
All route functions in this file are missing return type annotations, which violates the coding guideline. As per coding guidelines, Python functions should always have type hints for all parameters and return values.
Proposed fix to add return type hints
def list_collections( session: SessionDep, current_user: AuthContextDep, -): +) -> APIResponse[List[CollectionPublic]]: def create_collection( session: SessionDep, current_user: AuthContextDep, request: CreationRequest, -): +) -> APIResponse[CollectionJobImmediatePublic]: def delete_collection( session: SessionDep, current_user: AuthContextDep, collection_id: UUID = FastPath(description="Collection to delete"), request: CallbackRequest | None = Body(default=None), -): +) -> APIResponse[CollectionJobImmediatePublic]: def collection_info( session: SessionDep, current_user: AuthContextDep, collection_id: UUID = FastPath(description="Collection to retrieve"), include_docs: bool = Query(...), include_url: bool = Query(...), limit: int | None = Query(...), -): +) -> APIResponse[CollectionWithDocsPublic]:Also applies to: 88-130, 140-187, 196-236
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/api/routes/collections.py` around lines 68 - 78, The route functions (e.g., list_collections) lack return type annotations; add explicit return type hints (use -> APIResponse) to all route handlers in this file (including the functions in the other ranges noted: lines ~88-130, 140-187, 196-236) so signatures read like def list_collections(... ) -> APIResponse:; also update imports if necessary so the APIResponse type is available for annotations and ensure any helper conversion functions (e.g., to_collection_public) keep their own annotations consistent.backend/app/services/collections/create_collection.py (1)
42-67:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReturn type annotation mismatch.
start_jobis annotated to returnstrbut returnscollection_job_id(aUUID) at line 67. Either cast tostr(...)to match the annotation, or change the annotation toUUID. Tests pass because they don't type-check the return, but downstream callers/typing tools will be misled.🐛 Proposed fix
-) -> str: +) -> UUID: trace_id = correlation_id.get() or "N/A"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/create_collection.py` around lines 42 - 67, The function start_job currently declares a return type of str but returns collection_job_id (a UUID); update the implementation so the signature and return match—either cast the return to str by returning str(collection_job_id) or change the return annotation to UUID and keep returning collection_job_id as-is; update the function signature and any related type imports (if choosing UUID) and ensure callers expect the chosen type.
♻️ Duplicate comments (2)
backend/app/crud/rag/open_ai.py (1)
110-122:⚠️ Potential issue | 🟡 Minor | 💤 Low valueValidate
openai_file_idis set before passing toupload_and_poll.Any doc with
openai_file_id=Nonewill be forwarded straight intofile_idsand surface as an obscure OpenAI provider error rather than a clear local contract violation. WhileOpenAIProvider.upload_filesis expected to populate this field, a fast-fail check at this boundary makes debugging contract violations trivial.🛡️ Suggested guard
if not docs: return + missing = [str(doc.id) for doc in docs if not doc.openai_file_id] + if missing: + raise ValueError( + f"All documents must have openai_file_id before vector store attach: {missing}" + ) + try: batch = self.client.vector_stores.file_batches.upload_and_poll(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/crud/rag/open_ai.py` around lines 110 - 122, The code passes doc.openai_file_id values (which may be None) into self.client.vector_stores.file_batches.upload_and_poll causing ambiguous provider errors; before calling upload_and_poll in the OpenAIVectorStoreCrud update logic, validate the docs list so that you either filter out docs with missing openai_file_id or raise a clear ValueError listing offending doc ids; adjust the file_ids argument to be a list comprehension like [d.openai_file_id for d in docs if d.openai_file_id], and if any docs were filtered raise/log a descriptive error referencing the docs variable and the openai_file_id field so callers get an immediate contract violation instead of an OpenAIError.backend/app/tests/services/collections/test_create_collection.py (1)
37-55: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winTest helpers still missing type annotations.
_mock_provider_with_size, the nested_set_file_size, and_patch_sessionstill lack parameter and return type annotations. As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".🐛 Proposed fix
-def _mock_provider_with_size(llm_service_id: str, llm_service_name: str): +def _mock_provider_with_size( + llm_service_id: str, llm_service_name: str +) -> MagicMock: """Returns a mock provider whose upload_files sets file_size_kb=10.0 on each doc.""" mock_provider = get_mock_provider(llm_service_id, llm_service_name) - def _set_file_size(storage, docs, project_id): + def _set_file_size(storage: Any, docs: list[Any], project_id: int) -> None: for doc in docs: doc.file_size_kb = 10.0 mock_provider.upload_files.side_effect = _set_file_size return mock_provider -def _patch_session(db: Session): +def _patch_session(db: Session) -> Any: """Context manager that routes all Session(engine) calls to the test db."""🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/collections/test_create_collection.py` around lines 37 - 55, Add missing type hints: annotate _mock_provider_with_size(llm_service_id: str, llm_service_name: str) -> Mock (import Mock from unittest.mock), annotate the nested _set_file_size(storage: Any, docs: Sequence[Any], project_id: str) -> None (import Any, Sequence from typing), and annotate _patch_session(db: Session) -> Any (or the concrete patcher type) (import Session from sqlalchemy.orm and Any from typing); update imports accordingly so all three functions and the nested helper have full parameter and return type annotations.
🧹 Nitpick comments (9)
backend/app/services/collections/providers/openai.py (1)
3-3: 💤 Low valueUse builtin
listinstead of deprecatedtyping.List.Ruff (UP035) flags
typing.Listas deprecated, and the base class already useslist[Document]— aligning here removes the inconsistency and lets you drop the import.♻️ Proposed fix
import logging from io import BytesIO -from typing import List from openai import OpenAI from sqlmodel import Session @@ def create( self, - docs: List[Document], + docs: list[Document], vector_store_id: str | None = None, ) -> Collection:As per coding guidelines: "Use Python 3.11+ with type hints throughout the codebase".
Also applies to: 92-96
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/providers/openai.py` at line 3, Replace usages of the deprecated typing.List import with the built-in list type: remove the "from typing import List" import and update any annotations that use List (e.g., function signatures or variables referenced around lines where Document lists are used — look for symbols like Document, return types or parameters annotated as List[Document]) to use built-in list[Document] instead; ensure all occurrences (including the block mentioned around lines 92-96) are updated and that any unused typing import is removed.backend/app/tests/services/collections/providers/test_openai_provider.py (1)
65-79: 💤 Low valueAdd type hints to helper functions.
_make_docand_patch_session_and_crudlack parameter/return type annotations.♻️ Proposed fix
-def _make_doc(*, openai_file_id=None, file_size_kb=None): +def _make_doc( + *, + openai_file_id: str | None = None, + file_size_kb: float | None = None, +) -> SimpleNamespace: return SimpleNamespace( id=uuid4(), fname="test.md", object_store_url="s3://bucket/test.md", openai_file_id=openai_file_id, file_size_kb=file_size_kb, ) -def _patch_session_and_crud(): +def _patch_session_and_crud() -> tuple[Any, Any]: """Patches Session and DocumentCrud used inside upload_files.""" session_patcher = patch("app.services.collections.providers.openai.Session") crud_patcher = patch("app.services.collections.providers.openai.DocumentCrud") return session_patcher, crud_patcherYou will also need to import
Anyfromtyping.As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/collections/providers/test_openai_provider.py` around lines 65 - 79, Add type hints for both helpers: annotate _make_doc(openai_file_id: Optional[str] = None, file_size_kb: Optional[int] = None) -> SimpleNamespace and annotate _patch_session_and_crud() -> Tuple[Any, Any] (or Tuple[object, object]) so the return types and optional params are explicit; import Any, Optional, Tuple from typing at the top (and keep SimpleNamespace and patch usage unchanged) to satisfy the typing requirement.backend/app/tests/services/collections/test_create_collection.py (2)
108-156: 💤 Low valueConsider asserting batch metadata persistence on the job row.
The test verifies job status, task_id, and the queued batch kwargs, but doesn't assert that
current_batch_number=0,total_batches, ordocuments_uploaded=[](lines 244–247 in production code) were actually written to the job row. Adding those checks would catch regressions in the second-update block ofexecute_setup_job.assert updated_job.total_batches == 1 assert updated_job.current_batch_number == 0 assert updated_job.documents_uploaded == []🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/collections/test_create_collection.py` around lines 108 - 156, The test for execute_setup_job is missing assertions that the job row persisted the batch metadata written in the second-update block; update the test (after reading updated_job via CollectionJobCrud.read_one(job.id)) to assert updated_job.total_batches == 1, updated_job.current_batch_number == 0, and updated_job.documents_uploaded == [] so the persistence of those fields is verified alongside status/task_id and queued batch assertions.
258-340: 💤 Low valueTimeout test fidelity: triggering via
upload_filesdoesn't exercise the surrounding code.Both
test_execute_setup_job_timeout_marks_failed_and_reraisesandtest_execute_setup_job_soft_time_limit_marks_failed_and_reraisesraise the timeout fromupload_files.side_effect. That validates theexcept (Timeout, SoftTimeLimitExceeded)handler but doesn't cover timeouts that occur after upload (e.g., duringbatch_documentsor the second job update). Consider parameterizing the injection point or adding one case where the timeout surfaces afterupload_files()returns to harden coverage of the rest of thetryblock.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/services/collections/test_create_collection.py` around lines 258 - 340, The tests only inject the timeout via mock_provider.upload_files.side_effect which exercises the upload exception path but not timeouts occurring later in execute_setup_job; add a new case (or parameterize the existing tests) where upload_files returns normally and instead raise Timeout or SoftTimeLimitExceeded from the subsequent step (e.g., set start_collection_batch_job.side_effect or mock_queue_batch.side_effect or have a mock for batch_documents raise) so execute_setup_job experiences the timeout after upload, then assert CollectionJobStatus.FAILED and that "soft time limit" appears in updated_job.error_message as in test_execute_setup_job_*.backend/app/services/collections/helpers.py (2)
82-97: 💤 Low valueSingle document exceeding
MAX_BATCH_SIZE_KBproduces an oversized batch.If
doc.file_size_kbis greater thanMAX_BATCH_SIZE_KB(e.g. an upstream check was bypassed or a future change raisesMAX_DOC_SIZE_MB), thewould_exceed_sizecheck still places the document in a batch by itself, and that batch silently exceeds the 30 MB limit downstream. Today this is bounded byMAX_DOC_SIZE_MB = 25, so it is not an active bug — flagging only as a defensive guard for future maintenance. A simple safeguard:♻️ Optional defensive guard
for doc in documents: doc_size_kb = doc.file_size_kb + if doc_size_kb > MAX_BATCH_SIZE_KB: + raise ValueError( + f"[batch_documents] Document {doc.id} size {doc_size_kb}KB " + f"exceeds MAX_BATCH_SIZE_KB={MAX_BATCH_SIZE_KB}" + ) would_exceed_size = (current_batch_size_kb + doc_size_kb) > MAX_BATCH_SIZE_KB🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/helpers.py` around lines 82 - 97, The loop that batches documents can create a batch larger than MAX_BATCH_SIZE_KB if a single doc.file_size_kb > MAX_BATCH_SIZE_KB; add a defensive guard inside the documents loop: check if doc.file_size_kb > MAX_BATCH_SIZE_KB and if so log an error/warning (including the doc identifier), skip adding that document to current_batch (or raise/return an explicit error depending on desired behavior), and continue to the next doc so you never append a single oversized document into docs_batches; reference the variables/documents loop, doc.file_size_kb, MAX_BATCH_SIZE_KB, current_batch, current_batch_size_kb and docs_batches when making the change.
131-140: 💤 Low valueClarify
knowledge_base_providerfield semantics in the schema description.
knowledge_base_provideris correctly populated fromcollection.llm_service_name(e.g.,"openai vector store"), and tests confirm this is intentional. However, the field name suggests a provider identifier (e.g.,"openai"), while it actually holds the full service name/description. The schema description "Knowledge base provider name" is ambiguous and could be clearer about what downstream consumers should expect from this field.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/helpers.py` around lines 131 - 140, The schema description for the CollectionPublic field knowledge_base_provider is ambiguous: it currently reads like a provider identifier but the code (to_collection_public) sets it from collection.llm_service_name (e.g., "openai vector store"). Update the schema/field docstring/description for CollectionPublic::knowledge_base_provider to explicitly state it contains the full LLM service name/description (the llm_service_name value), not just a short provider id; locate references to CollectionPublic and to_collection_public to ensure the description matches usage and update any API docs or comments that mention "Knowledge base provider name" accordingly.backend/app/services/collections/create_collection.py (1)
211-248: 💤 Low valueRedundant first job update before upload — can be consolidated.
The job is updated twice in
execute_setup_job: once at lines 211–218 (settingtask_id+PROCESSING), and again at lines 236–248 (re-setting the same two fields plus batch metadata). The first transaction is largely wasted because the second one immediately overwrites the same fields afterupload_files()returns. If you want to markPROCESSINGearly so a frontend can observe upload-in-progress, that's fine, but otherwise consider collapsing into a single post-upload update to reduce DB churn and lock contention.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/collections/create_collection.py` around lines 211 - 248, The code performs two successive CollectionJobCrud.update calls in execute_setup_job—one before provider.upload_files and another after—causing redundant DB churn; either remove the pre-upload update (the call to CollectionJobCrud.update(job_uuid, CollectionJobUpdate(task_id=task_id, status=CollectionJobStatus.PROCESSING))) or merge its intent into the later update so only a single update occurs after provider.upload_files completes (ensuring the later update still sets task_id, status, total_size_mb, current_batch_number, total_batches, and documents_uploaded); if you need an early "upload in progress" state for the frontend instead, keep the pre-upload update but avoid overwriting identical fields in the second call by only updating batch metadata there.backend/app/alembic/versions/061_add_batch_tracking_to_collections_jobs.py (1)
19-19: 💤 Low valueAdd return type hints to migration functions.
Per the project coding guideline ("Always add type hints to all function parameters and return values in Python code"), the migration functions should declare
-> None.♻️ Proposed change
-def upgrade(): +def upgrade() -> None: op.add_column( ... -def downgrade(): +def downgrade() -> None: op.drop_column("collection_jobs", "total_batches")As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".
Also applies to: 58-58
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/alembic/versions/061_add_batch_tracking_to_collections_jobs.py` at line 19, The migration functions lack return type hints; update the function signatures in this migration (add "-> None") for upgrade() and the corresponding downgrade() function referenced around line 58 so both read e.g. def upgrade() -> None: and def downgrade() -> None:, keeping existing bodies unchanged.backend/app/models/collection.py (1)
145-157: 💤 Low valueConsider using
ProviderTypeforknowledge_base_provider.The DB-side
Collection.provideris typed asProviderType(enum), but the public response exposesknowledge_base_provider: str. Returning the enum type here would surface the supported provider set in the OpenAPI schema and keep the public contract aligned with the internal type. This is optional given it's only a response model.♻️ Proposed change
knowledge_base_id: str = Field( description="Knowledge base ID (e.g., Vector Store ID)", ) - knowledge_base_provider: str = Field( + knowledge_base_provider: ProviderType = Field( description="Knowledge base provider name", )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/models/collection.py` around lines 145 - 157, Change the CollectionPublic response model to use the ProviderType enum instead of a plain string for the knowledge_base_provider field so the OpenAPI schema and public contract match the DB model; update the type annotation on CollectionPublic.knowledge_base_provider to ProviderType and add the necessary import for ProviderType (referencing the CollectionPublic class and the knowledge_base_provider field) so the enum values are exposed in the generated schema.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/services/collections/create_collection.py`:
- Line 119: The log prefix in the warning inside the _mark_job_failed function
is incorrect (it uses "[create_collection]"); update that logger.warning to use
the function name prefix "[_mark_job_failed]". Likewise, any log messages in the
_handle_job_failure function that currently use
"[create_collection.execute_job]" should be changed to the correct prefix
"[_handle_job_failure]". Locate logger.warning / logger.error calls in the
_mark_job_failed and _handle_job_failure functions and replace the
square-bracketed prefix strings accordingly so all logs follow the
"[function_name]" guideline.
- Around line 161-168: The functions execute_setup_job and execute_batch_job
declare the parameter task_instance without a type hint; update both signatures
to annotate task_instance with an explicit type (prefer celery.Task, or
typing.Any if intentionally untyped) and rename it to _task_instance if the
parameter is unused to silence Ruff ARG001; ensure the return type remains ->
None and update any imports (e.g., from celery import Task or from typing import
Any) accordingly.
- Around line 220-234: The code relies on OpenAIProvider.upload_files mutating
flat_docs, causing fragile aggregation at total_size_kb; change upload_files
(base class and OpenAIProvider.upload_files) to return explicit size information
(e.g., total_size_kb or a list of per-doc sizes) and update the caller in
create_collection.execute_setup_job to use that returned value for aggregation
instead of reading doc.file_size_kb from potentially detached objects; also
update the base class docstring to document the new return contract and ensure
OpenAIProvider populates/returns sizes even when get_existing_file_id(doc)
short-circuits (handle None safely, convert to 0 if missing) so sum() never
encounters None.
In
`@backend/app/tests/crud/collections/collection/test_crud_collection_delete.py`:
- Around line 12-14: The helper function get_vector_store_collection currently
declares an unused db: Session parameter—remove db from the function signature
and any type hints so it becomes get_vector_store_collection(client: OpenAI,
project_id: int) -> Collection, and then update every call site in this test
module that passes a db to instead call get_vector_store_collection(client,
project_id); ensure imports/type annotations align with the new signature and
run tests to confirm no remaining references to the removed parameter.
In
`@backend/app/tests/crud/collections/collection/test_crud_collection_read_all.py`:
- Around line 10-21: The function create_collections currently declares a return
type of Collection but actually returns an int (crud.project_id); change the
return type annotation to int to match the returned value. Locate the
create_collections definition (uses get_project, get_vector_store_collection,
DocumentStore and CollectionCrud) and update its signature to return int,
keeping existing parameter types (db: Session, n: int) so the type hint reflects
the actual return value.
---
Outside diff comments:
In `@backend/app/api/routes/collections.py`:
- Around line 68-78: The route functions (e.g., list_collections) lack return
type annotations; add explicit return type hints (use -> APIResponse) to all
route handlers in this file (including the functions in the other ranges noted:
lines ~88-130, 140-187, 196-236) so signatures read like def
list_collections(... ) -> APIResponse:; also update imports if necessary so the
APIResponse type is available for annotations and ensure any helper conversion
functions (e.g., to_collection_public) keep their own annotations consistent.
In `@backend/app/api/routes/documents.py`:
- Around line 76-99: The route function list_docs (and other route handlers in
this file) lack return type annotations; update list_docs to include an explicit
return type (e.g., -> APIResponse or the appropriate response
type/Union/ResponseModel) and add similar return type hints to the other
handlers listed (lines covering functions around 109-182, 191-210, 219-241,
250-273). Locate each route function (e.g., list_docs) that constructs and
returns APIResponse.success_response (and functions that call DocumentCrud,
build_document_schemas, get_cloud_storage) and append the correct return type
annotation to the function signature to satisfy the project typing guidelines.
In `@backend/app/services/collections/create_collection.py`:
- Around line 42-67: The function start_job currently declares a return type of
str but returns collection_job_id (a UUID); update the implementation so the
signature and return match—either cast the return to str by returning
str(collection_job_id) or change the return annotation to UUID and keep
returning collection_job_id as-is; update the function signature and any related
type imports (if choosing UUID) and ensure callers expect the chosen type.
---
Duplicate comments:
In `@backend/app/crud/rag/open_ai.py`:
- Around line 110-122: The code passes doc.openai_file_id values (which may be
None) into self.client.vector_stores.file_batches.upload_and_poll causing
ambiguous provider errors; before calling upload_and_poll in the
OpenAIVectorStoreCrud update logic, validate the docs list so that you either
filter out docs with missing openai_file_id or raise a clear ValueError listing
offending doc ids; adjust the file_ids argument to be a list comprehension like
[d.openai_file_id for d in docs if d.openai_file_id], and if any docs were
filtered raise/log a descriptive error referencing the docs variable and the
openai_file_id field so callers get an immediate contract violation instead of
an OpenAIError.
In `@backend/app/tests/services/collections/test_create_collection.py`:
- Around line 37-55: Add missing type hints: annotate
_mock_provider_with_size(llm_service_id: str, llm_service_name: str) -> Mock
(import Mock from unittest.mock), annotate the nested _set_file_size(storage:
Any, docs: Sequence[Any], project_id: str) -> None (import Any, Sequence from
typing), and annotate _patch_session(db: Session) -> Any (or the concrete
patcher type) (import Session from sqlalchemy.orm and Any from typing); update
imports accordingly so all three functions and the nested helper have full
parameter and return type annotations.
---
Nitpick comments:
In `@backend/app/alembic/versions/061_add_batch_tracking_to_collections_jobs.py`:
- Line 19: The migration functions lack return type hints; update the function
signatures in this migration (add "-> None") for upgrade() and the corresponding
downgrade() function referenced around line 58 so both read e.g. def upgrade()
-> None: and def downgrade() -> None:, keeping existing bodies unchanged.
In `@backend/app/models/collection.py`:
- Around line 145-157: Change the CollectionPublic response model to use the
ProviderType enum instead of a plain string for the knowledge_base_provider
field so the OpenAPI schema and public contract match the DB model; update the
type annotation on CollectionPublic.knowledge_base_provider to ProviderType and
add the necessary import for ProviderType (referencing the CollectionPublic
class and the knowledge_base_provider field) so the enum values are exposed in
the generated schema.
In `@backend/app/services/collections/create_collection.py`:
- Around line 211-248: The code performs two successive CollectionJobCrud.update
calls in execute_setup_job—one before provider.upload_files and another
after—causing redundant DB churn; either remove the pre-upload update (the call
to CollectionJobCrud.update(job_uuid, CollectionJobUpdate(task_id=task_id,
status=CollectionJobStatus.PROCESSING))) or merge its intent into the later
update so only a single update occurs after provider.upload_files completes
(ensuring the later update still sets task_id, status, total_size_mb,
current_batch_number, total_batches, and documents_uploaded); if you need an
early "upload in progress" state for the frontend instead, keep the pre-upload
update but avoid overwriting identical fields in the second call by only
updating batch metadata there.
In `@backend/app/services/collections/helpers.py`:
- Around line 82-97: The loop that batches documents can create a batch larger
than MAX_BATCH_SIZE_KB if a single doc.file_size_kb > MAX_BATCH_SIZE_KB; add a
defensive guard inside the documents loop: check if doc.file_size_kb >
MAX_BATCH_SIZE_KB and if so log an error/warning (including the doc identifier),
skip adding that document to current_batch (or raise/return an explicit error
depending on desired behavior), and continue to the next doc so you never append
a single oversized document into docs_batches; reference the variables/documents
loop, doc.file_size_kb, MAX_BATCH_SIZE_KB, current_batch, current_batch_size_kb
and docs_batches when making the change.
- Around line 131-140: The schema description for the CollectionPublic field
knowledge_base_provider is ambiguous: it currently reads like a provider
identifier but the code (to_collection_public) sets it from
collection.llm_service_name (e.g., "openai vector store"). Update the
schema/field docstring/description for CollectionPublic::knowledge_base_provider
to explicitly state it contains the full LLM service name/description (the
llm_service_name value), not just a short provider id; locate references to
CollectionPublic and to_collection_public to ensure the description matches
usage and update any API docs or comments that mention "Knowledge base provider
name" accordingly.
In `@backend/app/services/collections/providers/openai.py`:
- Line 3: Replace usages of the deprecated typing.List import with the built-in
list type: remove the "from typing import List" import and update any
annotations that use List (e.g., function signatures or variables referenced
around lines where Document lists are used — look for symbols like Document,
return types or parameters annotated as List[Document]) to use built-in
list[Document] instead; ensure all occurrences (including the block mentioned
around lines 92-96) are updated and that any unused typing import is removed.
In `@backend/app/tests/services/collections/providers/test_openai_provider.py`:
- Around line 65-79: Add type hints for both helpers: annotate
_make_doc(openai_file_id: Optional[str] = None, file_size_kb: Optional[int] =
None) -> SimpleNamespace and annotate _patch_session_and_crud() -> Tuple[Any,
Any] (or Tuple[object, object]) so the return types and optional params are
explicit; import Any, Optional, Tuple from typing at the top (and keep
SimpleNamespace and patch usage unchanged) to satisfy the typing requirement.
In `@backend/app/tests/services/collections/test_create_collection.py`:
- Around line 108-156: The test for execute_setup_job is missing assertions that
the job row persisted the batch metadata written in the second-update block;
update the test (after reading updated_job via
CollectionJobCrud.read_one(job.id)) to assert updated_job.total_batches == 1,
updated_job.current_batch_number == 0, and updated_job.documents_uploaded == []
so the persistence of those fields is verified alongside status/task_id and
queued batch assertions.
- Around line 258-340: The tests only inject the timeout via
mock_provider.upload_files.side_effect which exercises the upload exception path
but not timeouts occurring later in execute_setup_job; add a new case (or
parameterize the existing tests) where upload_files returns normally and instead
raise Timeout or SoftTimeLimitExceeded from the subsequent step (e.g., set
start_collection_batch_job.side_effect or mock_queue_batch.side_effect or have a
mock for batch_documents raise) so execute_setup_job experiences the timeout
after upload, then assert CollectionJobStatus.FAILED and that "soft time limit"
appears in updated_job.error_message as in test_execute_setup_job_*.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 066cca6e-1cdd-4427-b053-5148c6a3117b
📒 Files selected for processing (24)
backend/app/alembic/versions/061_add_batch_tracking_to_collections_jobs.pybackend/app/api/routes/collections.pybackend/app/api/routes/documents.pybackend/app/crud/collection/collection.pybackend/app/crud/rag/__init__.pybackend/app/crud/rag/open_ai.pybackend/app/models/collection.pybackend/app/models/document.pybackend/app/services/collections/create_collection.pybackend/app/services/collections/helpers.pybackend/app/services/collections/providers/base.pybackend/app/services/collections/providers/openai.pybackend/app/tests/api/routes/collections/test_collection_delete.pybackend/app/tests/api/routes/collections/test_collection_info.pybackend/app/tests/api/routes/collections/test_collection_job_info.pybackend/app/tests/api/routes/collections/test_collection_list.pybackend/app/tests/api/routes/collections/test_create_collections.pybackend/app/tests/crud/collections/collection/test_crud_collection_delete.pybackend/app/tests/crud/collections/collection/test_crud_collection_read_all.pybackend/app/tests/crud/collections/collection/test_crud_collection_read_one.pybackend/app/tests/services/collections/providers/test_openai_provider.pybackend/app/tests/services/collections/test_create_collection.pybackend/app/tests/services/collections/test_helpers.pybackend/app/tests/utils/collection.py
💤 Files with no reviewable changes (1)
- backend/app/tests/utils/collection.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/models/document.py
- backend/app/tests/services/collections/test_helpers.py
Summary
Target issue is #798 and #768
Notes
New Features
Bug Fixes
Documentation
Tests
Summary by CodeRabbit
New Features
Breaking Changes
Improvements