diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index 048df839..6928f51c 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -16,6 +16,7 @@ from django.contrib.sessions.backends.db import SessionStore from django.db import models from django.db.models import Q from django.db.models.manager import BaseManager +from django_apscheduler.models import DjangoJob, DjangoJobExecution from fastapi import HTTPException from pgvector.django import CosineDistance from torch import Tensor @@ -936,6 +937,14 @@ class AutomationAdapters: if not automation.id.startswith(f"automation_{user.uuid}_"): raise ValueError("Invalid automation id") + django_job = DjangoJob.objects.filter(id=automation.id).first() + execution = DjangoJobExecution.objects.filter(job=django_job) + + last_run_time = None + + if execution.exists(): + last_run_time = execution.latest("run_time").run_time + automation_metadata = json.loads(automation.name) crontime = automation_metadata["crontime"] timezone = automation.next_run_time.strftime("%Z") @@ -948,6 +957,7 @@ class AutomationAdapters: "schedule": schedule, "crontime": crontime, "next": automation.next_run_time.strftime("%Y-%m-%d %I:%M %p %Z"), + "last_run": last_run_time.strftime("%Y-%m-%d %I:%M %p %Z") if last_run_time else None, } @staticmethod diff --git a/src/khoj/database/admin.py b/src/khoj/database/admin.py index b76b0fce..260c4124 100644 --- a/src/khoj/database/admin.py +++ b/src/khoj/database/admin.py @@ -15,6 +15,7 @@ from khoj.database.models import ( KhojUser, NotionConfig, OpenAIProcessorConversationConfig, + ProcessLock, ReflectiveQuestion, SearchModelConfig, SpeechToTextModelOptions, @@ -44,6 +45,7 @@ class KhojUserAdmin(UserAdmin): admin.site.register(KhojUser, KhojUserAdmin) admin.site.register(ChatModelOptions) +admin.site.register(ProcessLock) admin.site.register(SpeechToTextModelOptions) admin.site.register(OpenAIProcessorConversationConfig) admin.site.register(SearchModelConfig) diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index 6b44253a..c56f1f34 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -35,6 +35,7 @@ from starlette.requests import URL from khoj.database.adapters import ( AgentAdapters, + AutomationAdapters, ConversationAdapters, EntryAdapters, create_khoj_token, @@ -883,7 +884,26 @@ def should_notify(original_query: str, executed_query: str, ai_response: str) -> return True -def scheduled_chat(query_to_run: str, scheduling_request: str, subject: str, user: KhojUser, calling_url: URL): +def scheduled_chat( + query_to_run: str, scheduling_request: str, subject: str, user: KhojUser, calling_url: URL, job_id: str = None +): + logger.info(f"Processing scheduled_chat: {query_to_run}") + if job_id: + # Get the job object and check whether the time is valid for it to run. This helps avoid race conditions that cause the same job to be run multiple times. + job = AutomationAdapters.get_automation(user, job_id) + metadata = AutomationAdapters.get_automation_metadata(user, job) + + last_run_time = metadata.get("last_run", None) + + # Convert last_run_time from %Y-%m-%d %I:%M %p %Z to datetime object + if last_run_time: + last_run_time = datetime.strptime(last_run_time, "%Y-%m-%d %I:%M %p %Z").replace(tzinfo=timezone.utc) + + # If the last run time was within the last 6 hours, don't run it again. This helps avoid multithreading issues and rate limits. + if (datetime.now(timezone.utc) - last_run_time).total_seconds() < 21600: + logger.info(f"Skipping scheduled chat {job_id} as the next run time is in the future.") + return + # Extract relevant params from the original URL scheme = "http" if not calling_url.is_secure else "https" query_dict = parse_qs(calling_url.query) @@ -971,6 +991,7 @@ async def schedule_automation( "subject": subject, "user": user, "calling_url": calling_url, + "job_id": job_id, }, id=job_id, name=job_metadata,