Use sha256 user buckets with legacy migration

This commit is contained in:
taohe
2026-06-11 21:08:08 +08:00
parent 0798489734
commit 0268ebbe6a
4 changed files with 118 additions and 4 deletions
@@ -1,4 +1,5 @@
import hashlib
import logging
import os
import re
import shutil
@@ -13,6 +14,9 @@ _SAFE_THREAD_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
_SAFE_USER_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
_UNSAFE_USER_ID_CHAR_RE = re.compile(r"[^A-Za-z0-9_\-]")
_SAFE_USER_ID_DIGEST_HEX_LEN = 16
_SAFE_USER_ID_DIGEST_RE = re.compile(r"^[0-9a-f]{16}$")
logger = logging.getLogger(__name__)
def _default_local_base_dir() -> Path:
@@ -47,10 +51,17 @@ def make_safe_user_id(raw: str) -> str:
sanitized = _UNSAFE_USER_ID_CHAR_RE.sub("-", raw)
if sanitized == raw:
return raw
digest = hashlib.sha1(raw.encode("utf-8")).hexdigest()[:_SAFE_USER_ID_DIGEST_HEX_LEN]
digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:_SAFE_USER_ID_DIGEST_HEX_LEN]
return f"{sanitized}-{digest}"
def _looks_like_digested_user_dir(name: str, prefix: str) -> bool:
if not name.startswith(prefix):
return False
suffix = name[len(prefix) :]
return bool(_SAFE_USER_ID_DIGEST_RE.match(suffix))
def _join_host_path(base: str, *parts: str) -> str:
"""Join host filesystem path segments while preserving native style.
@@ -172,6 +183,45 @@ class Paths:
"""Directory for a specific user: `{base_dir}/users/{user_id}/`."""
return self.base_dir / "users" / _validate_user_id(user_id)
def prepare_user_dir_for_raw_id(self, raw_user_id: str) -> str:
"""Return the safe user ID and migrate a unique legacy unsafe-id bucket.
A previous branch revision used a weak digest for unsafe external user IDs.
New IDs use SHA-256, but if exactly one old-style bucket with the same
sanitized prefix already exists, move it to the current bucket name so
existing local memory/files/threads remain visible.
"""
safe_user_id = make_safe_user_id(raw_user_id)
sanitized = _UNSAFE_USER_ID_CHAR_RE.sub("-", raw_user_id)
if safe_user_id == raw_user_id:
return safe_user_id
users_dir = self.base_dir / "users"
target_dir = users_dir / safe_user_id
if target_dir.exists() or not users_dir.exists():
return safe_user_id
legacy_prefix = f"{sanitized}-"
try:
legacy_candidates = [candidate for candidate in users_dir.iterdir() if candidate.is_dir() and candidate.name != safe_user_id and _looks_like_digested_user_dir(candidate.name, legacy_prefix)]
except OSError:
logger.exception("Failed to inspect user directories for legacy unsafe-id migration")
return safe_user_id
if not legacy_candidates:
return safe_user_id
if len(legacy_candidates) > 1:
logger.warning("Multiple legacy unsafe-id user directories matched; skipping automatic migration")
return safe_user_id
try:
legacy_candidates[0].rename(target_dir)
logger.info("Migrated legacy unsafe-id user directory to the current digest format")
except OSError:
logger.exception("Failed to migrate legacy unsafe-id user directory")
return safe_user_id
def user_memory_file(self, user_id: str) -> Path:
"""Per-user memory file: `{base_dir}/users/{user_id}/memory.json`."""
return self.user_dir(user_id) / "memory.json"