mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-02 21:19:12 +00:00
Extract run with process lock logic into func. Use for content reindexing
This commit is contained in:
@@ -307,30 +307,20 @@ def configure_middleware(app):
|
|||||||
app.add_middleware(SessionMiddleware, secret_key=os.environ.get("KHOJ_DJANGO_SECRET_KEY", "!secret"))
|
app.add_middleware(SessionMiddleware, secret_key=os.environ.get("KHOJ_DJANGO_SECRET_KEY", "!secret"))
|
||||||
|
|
||||||
|
|
||||||
@schedule.repeat(schedule.every(22).to(25).hours)
|
|
||||||
def update_content_index():
|
def update_content_index():
|
||||||
try:
|
for user in get_all_users():
|
||||||
if ProcessLockAdapters.is_process_locked(ProcessLock.Operation.UPDATE_EMBEDDINGS):
|
all_files = collect_files(user=user)
|
||||||
logger.info("🔒 Skipping update content index due to lock")
|
success = configure_content(all_files, user=user)
|
||||||
return
|
all_files = collect_files(user=None)
|
||||||
ProcessLockAdapters.set_process_lock(
|
success = configure_content(all_files, user=None)
|
||||||
ProcessLock.Operation.UPDATE_EMBEDDINGS, max_duration_in_seconds=60 * 60 * 2
|
if not success:
|
||||||
)
|
raise RuntimeError("Failed to update content index")
|
||||||
|
logger.info("📪 Content index updated via Scheduler")
|
||||||
|
|
||||||
with timer("📬 Updating content index via Scheduler"):
|
|
||||||
for user in get_all_users():
|
|
||||||
all_files = collect_files(user=user)
|
|
||||||
success = configure_content(all_files, user=user)
|
|
||||||
all_files = collect_files(user=None)
|
|
||||||
success = configure_content(all_files, user=None)
|
|
||||||
if not success:
|
|
||||||
raise RuntimeError("Failed to update content index")
|
|
||||||
|
|
||||||
logger.info("📪 Content index updated via Scheduler")
|
@schedule.repeat(schedule.every(22).to(25).hours)
|
||||||
|
def update_content_index_regularly():
|
||||||
ProcessLockAdapters.remove_process_lock(ProcessLock.Operation.UPDATE_EMBEDDINGS)
|
ProcessLockAdapters.run_with_lock(update_content_index, ProcessLock.Operation.UPDATE_EMBEDDINGS)
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"🚨 Error updating content index via Scheduler: {e}", exc_info=True)
|
|
||||||
|
|
||||||
|
|
||||||
def configure_search_types():
|
def configure_search_types():
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import secrets
|
|||||||
import sys
|
import sys
|
||||||
from datetime import date, datetime, timedelta, timezone
|
from datetime import date, datetime, timedelta, timezone
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import List, Optional, Type
|
from typing import Callable, List, Optional, Type
|
||||||
|
|
||||||
from asgiref.sync import sync_to_async
|
from asgiref.sync import sync_to_async
|
||||||
from django.contrib.sessions.backends.db import SessionStore
|
from django.contrib.sessions.backends.db import SessionStore
|
||||||
@@ -46,7 +46,7 @@ from khoj.search_filter.file_filter import FileFilter
|
|||||||
from khoj.search_filter.word_filter import WordFilter
|
from khoj.search_filter.word_filter import WordFilter
|
||||||
from khoj.utils import state
|
from khoj.utils import state
|
||||||
from khoj.utils.config import OfflineChatProcessorModel
|
from khoj.utils.config import OfflineChatProcessorModel
|
||||||
from khoj.utils.helpers import generate_random_name, is_none_or_empty
|
from khoj.utils.helpers import generate_random_name, is_none_or_empty, timer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -421,6 +421,7 @@ class ProcessLockAdapters:
|
|||||||
tz=timezone.utc
|
tz=timezone.utc
|
||||||
):
|
):
|
||||||
process_lock.delete()
|
process_lock.delete()
|
||||||
|
logger.info(f"🔓 Deleted stale {process_name} process lock on timeout")
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -428,6 +429,31 @@ class ProcessLockAdapters:
|
|||||||
def remove_process_lock(process_name: str):
|
def remove_process_lock(process_name: str):
|
||||||
return ProcessLock.objects.filter(name=process_name).delete()
|
return ProcessLock.objects.filter(name=process_name).delete()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def run_with_lock(func: Callable, operation: ProcessLock.Operation, max_duration_in_seconds: int = 600):
|
||||||
|
# Exit early if process lock is already taken
|
||||||
|
if ProcessLockAdapters.is_process_locked(operation):
|
||||||
|
logger.info(f"🔒 Skip executing {func} as {operation} lock is already taken")
|
||||||
|
return
|
||||||
|
|
||||||
|
success = False
|
||||||
|
try:
|
||||||
|
# Set process lock
|
||||||
|
ProcessLockAdapters.set_process_lock(operation, max_duration_in_seconds)
|
||||||
|
logger.info(f"🔐 Locked {operation} to execute {func}")
|
||||||
|
|
||||||
|
# Execute Function
|
||||||
|
with timer(f"🔒 Run {func} with {operation} process lock", logger):
|
||||||
|
func()
|
||||||
|
success = True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"🚨 Error executing {func} with {operation} process lock: {e}", exc_info=True)
|
||||||
|
success = False
|
||||||
|
finally:
|
||||||
|
# Remove Process Lock
|
||||||
|
ProcessLockAdapters.remove_process_lock(operation)
|
||||||
|
logger.info(f"🔓 Unlocked {operation} process after executing {func} {'Succeeded' if success else 'Failed'}")
|
||||||
|
|
||||||
|
|
||||||
class ClientApplicationAdapters:
|
class ClientApplicationAdapters:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user