From 8e0bae894d4a9b655e465e4c662ff6ea2e6d5cfe Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Tue, 16 Apr 2024 23:52:16 +0530 Subject: [PATCH] Extract run with process lock logic into func. Use for content reindexing --- src/khoj/configure.py | 32 +++++++++----------------- src/khoj/database/adapters/__init__.py | 30 ++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/khoj/configure.py b/src/khoj/configure.py index e509532d..2cb3c613 100644 --- a/src/khoj/configure.py +++ b/src/khoj/configure.py @@ -307,30 +307,20 @@ def configure_middleware(app): app.add_middleware(SessionMiddleware, secret_key=os.environ.get("KHOJ_DJANGO_SECRET_KEY", "!secret")) -@schedule.repeat(schedule.every(22).to(25).hours) def update_content_index(): - try: - if ProcessLockAdapters.is_process_locked(ProcessLock.Operation.UPDATE_EMBEDDINGS): - logger.info("🔒 Skipping update content index due to lock") - return - ProcessLockAdapters.set_process_lock( - ProcessLock.Operation.UPDATE_EMBEDDINGS, max_duration_in_seconds=60 * 60 * 2 - ) + for user in get_all_users(): + all_files = collect_files(user=user) + success = configure_content(all_files, user=user) + all_files = collect_files(user=None) + success = configure_content(all_files, user=None) + if not success: + raise RuntimeError("Failed to update content index") + logger.info("📪 Content index updated via Scheduler") - with timer("📬 Updating content index via Scheduler"): - for user in get_all_users(): - all_files = collect_files(user=user) - success = configure_content(all_files, user=user) - all_files = collect_files(user=None) - success = configure_content(all_files, user=None) - if not success: - raise RuntimeError("Failed to update content index") - logger.info("📪 Content index updated via Scheduler") - - ProcessLockAdapters.remove_process_lock(ProcessLock.Operation.UPDATE_EMBEDDINGS) - except Exception as e: - logger.error(f"🚨 Error updating content index via Scheduler: {e}", exc_info=True) +@schedule.repeat(schedule.every(22).to(25).hours) +def update_content_index_regularly(): + ProcessLockAdapters.run_with_lock(update_content_index, ProcessLock.Operation.UPDATE_EMBEDDINGS) def configure_search_types(): diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index 9bab8d01..e2b84d17 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -5,7 +5,7 @@ import secrets import sys from datetime import date, datetime, timedelta, timezone from enum import Enum -from typing import List, Optional, Type +from typing import Callable, List, Optional, Type from asgiref.sync import sync_to_async from django.contrib.sessions.backends.db import SessionStore @@ -46,7 +46,7 @@ from khoj.search_filter.file_filter import FileFilter from khoj.search_filter.word_filter import WordFilter from khoj.utils import state from khoj.utils.config import OfflineChatProcessorModel -from khoj.utils.helpers import generate_random_name, is_none_or_empty +from khoj.utils.helpers import generate_random_name, is_none_or_empty, timer logger = logging.getLogger(__name__) @@ -421,6 +421,7 @@ class ProcessLockAdapters: tz=timezone.utc ): process_lock.delete() + logger.info(f"🔓 Deleted stale {process_name} process lock on timeout") return False return True @@ -428,6 +429,31 @@ class ProcessLockAdapters: def remove_process_lock(process_name: str): return ProcessLock.objects.filter(name=process_name).delete() + @staticmethod + def run_with_lock(func: Callable, operation: ProcessLock.Operation, max_duration_in_seconds: int = 600): + # Exit early if process lock is already taken + if ProcessLockAdapters.is_process_locked(operation): + logger.info(f"🔒 Skip executing {func} as {operation} lock is already taken") + return + + success = False + try: + # Set process lock + ProcessLockAdapters.set_process_lock(operation, max_duration_in_seconds) + logger.info(f"🔐 Locked {operation} to execute {func}") + + # Execute Function + with timer(f"🔒 Run {func} with {operation} process lock", logger): + func() + success = True + except Exception as e: + logger.error(f"🚨 Error executing {func} with {operation} process lock: {e}", exc_info=True) + success = False + finally: + # Remove Process Lock + ProcessLockAdapters.remove_process_lock(operation) + logger.info(f"🔓 Unlocked {operation} process after executing {func} {'Succeeded' if success else 'Failed'}") + class ClientApplicationAdapters: @staticmethod