refactor: migrate in-memory token caches to Redis with memory fallback#329
refactor: migrate in-memory token caches to Redis with memory fallback#329nap-liu wants to merge 1 commit intodataelement:mainfrom
Conversation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Refactors multiple third-party access-token caches to use a shared Redis-backed cache (with in-process memory fallback) to support multi-instance deployments and TTL-based early refresh.
Changes:
- Added a unified
token_cache.pyhelper to read/write/delete tokens via Redis with memory fallback. - Migrated token caching for DingTalk/WeCom org sync adapters, Feishu service/provider, and Microsoft Teams integration to use the unified cache.
- Added/standardized TTL early-refresh buffers when persisting tokens.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/app/services/org_sync_adapter.py | Moves DingTalk + WeCom org-sync access tokens from in-instance caching to Redis/memory cache keys with TTL. |
| backend/app/services/feishu_service.py | Adds Redis/memory caching for Feishu tenant/app access tokens with early-refresh TTL handling. |
| backend/app/services/auth_provider.py | Adds Redis/memory caching for Feishu auth-provider app token retrieval with TTL handling. |
| backend/app/core/token_cache.py | Introduces unified Redis-backed token cache utilities with in-memory fallback. |
| backend/app/api/wecom.py | Adds a shared WeCom token helper and updates message-processing flows to reuse cached tokens. |
| backend/app/api/teams.py | Replaces in-memory Teams token cache with Redis/memory cache-based implementation and TTL buffers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| cache_key = f"clawith:token:wecom:{self.corp_id}" | ||
| cached = await get_cached_token(cache_key) | ||
| if cached: | ||
| self._access_token = cached | ||
| return cached |
There was a problem hiding this comment.
The WeCom access_token is derived from both corp_id and corp_secret, but the cache key only includes corp_id. This can cause token collisions between different WeCom secrets for the same corp (e.g., contact-sync secret vs agent secret), leading to intermittent 48009/permission errors. Include a secret identifier (e.g., hash of corp_secret) or a scope-specific suffix in the cache key to keep tokens distinct.
| Key: clawith:token:wecom:{corp_id} | ||
| TTL: 6900s (7200s validity - 5 min early refresh) | ||
| """ | ||
| from app.core.token_cache import get_cached_token, set_cached_token | ||
| import httpx as _httpx | ||
|
|
||
| cache_key = f"clawith:token:wecom:{corp_id}" |
There was a problem hiding this comment.
This WeCom token cache key only uses corp_id, but tokens differ per corp_secret. If multiple ChannelConfig entries share a corp_id with different secrets, they will overwrite each other and break API calls. Consider including a stable secret identifier (e.g., hash) or agent_id/scope in the cache key.
| Key: clawith:token:wecom:{corp_id} | |
| TTL: 6900s (7200s validity - 5 min early refresh) | |
| """ | |
| from app.core.token_cache import get_cached_token, set_cached_token | |
| import httpx as _httpx | |
| cache_key = f"clawith:token:wecom:{corp_id}" | |
| Key: clawith:token:wecom:{corp_id}:{sha256(corp_secret)} | |
| TTL: 6900s (7200s validity - 5 min early refresh) | |
| """ | |
| from app.core.token_cache import get_cached_token, set_cached_token | |
| import httpx as _httpx | |
| secret_hash = hashlib.sha256(corp_secret.encode("utf-8")).hexdigest() | |
| cache_key = f"clawith:token:wecom:{corp_id}:{secret_hash}" |
| # In-memory fallback store: {key: (value, expire_at)} | ||
| _memory_cache: dict[str, tuple[str, float]] = {} | ||
|
|
There was a problem hiding this comment.
The in-memory fallback cache is an unbounded module-level dict. In multi-tenant deployments with many unique keys, this can grow without limit. Consider adding an LRU/max-size cap to _memory_cache (and evict oldest entries) to keep memory usage bounded.
| if val: | ||
| return val if isinstance(val, str) else val.decode() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Redis errors are swallowed silently here. If Redis is misconfigured or intermittently failing, this hides the root cause and can add repeated exception overhead on hot paths. Consider adding rate-limited logging and/or a simple backoff/circuit-breaker when Redis operations fail.
| from app.core.token_cache import get_cached_token, set_cached_token | ||
|
|
||
| target_app_id = app_id or self.app_id | ||
| target_app_secret = app_secret or self.app_secret | ||
|
|
||
| cache_key = f"clawith:token:feishu_tenant:{target_app_id}" | ||
|
|
||
| cached = await get_cached_token(cache_key) | ||
| if cached: |
There was a problem hiding this comment.
There are existing unit tests for FeishuService, but none assert the new Redis/memory caching behavior (cache hit avoids HTTP call, TTL buffer logic). Add tests by monkeypatching get_cached_token/set_cached_token and asserting the HTTP client is only invoked on cache miss.
| if not app_id: # only cache default app token | ||
| self._app_access_token = token | ||
|
|
||
| expire = data.get("expire", 7200) |
There was a problem hiding this comment.
expire is used in arithmetic (expire - 300) but is not coerced to an int. If the Feishu API returns it as a string, this will raise a TypeError at runtime. Coerce to int (similar to other code paths that int() the expiry) before computing TTL.
| expire = data.get("expire", 7200) | |
| expire = int(data.get("expire", 7200)) |
| self._app_access_token = data.get("app_access_token", "") | ||
| return self._app_access_token | ||
| token = data.get("app_access_token", "") or data.get("tenant_access_token", "") | ||
| expire = data.get("expire", 7200) |
There was a problem hiding this comment.
expire is used in arithmetic (expire - 300) but is not coerced to an int. If the Feishu API returns it as a string, this will raise a TypeError. Parse/cast it to an int (e.g., int(data.get('expire') or 7200)) before computing TTL.
| expire = data.get("expire", 7200) | |
| expire = int(data.get("expire") or 7200) |
| # Fallback to memory | ||
| if key in _memory_cache: | ||
| val, expire_at = _memory_cache[key] | ||
| if time.time() < expire_at: | ||
| return val |
There was a problem hiding this comment.
Expired entries are only removed when the same key is read again, so expired keys can accumulate indefinitely. Consider opportunistic cleanup (e.g., scan/delete a few expired entries on each set) or a periodic purge strategy.
| if redis: | ||
| await redis.setex(key, ttl_seconds, value) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Redis errors are swallowed silently in the write path as well. Consider logging failures (rate-limited) so operators can detect Redis issues instead of silently falling back to per-process memory caches.
| if token: | ||
| ttl = max(expires_in - 300, 60) | ||
| await set_cached_token(cache_key, token, ttl) | ||
| self._access_token = token | ||
| self._token_expires_at = datetime.now() + timedelta(seconds=max(expires_in - 60, 60)) |
There was a problem hiding this comment.
self._token_expires_at is set here but never read anywhere in this module, and its buffer (expires_in-60) also diverges from the Redis TTL buffer (expires_in-300). Either remove _token_expires_at or use it consistently for the in-process cache semantics.
Summary
token_cache.py: Redis-backed cache with automatic in-memory fallbackTest plan