mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-25 01:15:58 +00:00
fix(storage): address code quality comments
This commit is contained in:
@@ -17,20 +17,25 @@ _SEQ_PROCESS_BITS = 9
|
||||
_SEQ_PROCESS_SALT = secrets.randbits(_SEQ_PROCESS_BITS)
|
||||
_SEQ_COUNTER_LIMIT = 1 << _SEQ_COUNTER_BITS
|
||||
_SEQ_TIMESTAMP_SHIFT = _SEQ_COUNTER_BITS + _SEQ_PROCESS_BITS
|
||||
_last_seq_millis = 0
|
||||
_seq_lock = threading.Lock()
|
||||
|
||||
|
||||
def _allocate_sequence_base(batch_size: int) -> int:
|
||||
if batch_size >= _SEQ_COUNTER_LIMIT:
|
||||
raise ValueError(f"Run event batch is too large: {batch_size} >= {_SEQ_COUNTER_LIMIT}")
|
||||
class _SequenceAllocator:
|
||||
def __init__(self) -> None:
|
||||
self._last_millis = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
global _last_seq_millis
|
||||
now_ms = time.time_ns() // 1_000_000
|
||||
with _seq_lock:
|
||||
seq_ms = max(now_ms, _last_seq_millis + 1)
|
||||
_last_seq_millis = seq_ms
|
||||
return (seq_ms << _SEQ_TIMESTAMP_SHIFT) | (_SEQ_PROCESS_SALT << _SEQ_COUNTER_BITS)
|
||||
def allocate_base(self, batch_size: int) -> int:
|
||||
if batch_size >= _SEQ_COUNTER_LIMIT:
|
||||
raise ValueError(f"Run event batch is too large: {batch_size} >= {_SEQ_COUNTER_LIMIT}")
|
||||
|
||||
now_ms = time.time_ns() // 1_000_000
|
||||
with self._lock:
|
||||
seq_ms = max(now_ms, self._last_millis + 1)
|
||||
self._last_millis = seq_ms
|
||||
return (seq_ms << _SEQ_TIMESTAMP_SHIFT) | (_SEQ_PROCESS_SALT << _SEQ_COUNTER_BITS)
|
||||
|
||||
|
||||
_sequence_allocator = _SequenceAllocator()
|
||||
|
||||
|
||||
def _serialize_content(content: Any, metadata: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
||||
@@ -75,7 +80,7 @@ class DbRunEventRepository(RunEventRepositoryProtocol):
|
||||
if not events:
|
||||
return []
|
||||
|
||||
seq_base = _allocate_sequence_base(len(events))
|
||||
seq_base = _sequence_allocator.allocate_base(len(events))
|
||||
|
||||
rows: list[RunEventModel] = []
|
||||
|
||||
|
||||
Reference in New Issue
Block a user