mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-09 13:25:11 +00:00
Add additional robustness in verifying job execution parameters at run time
This commit is contained in:
@@ -16,6 +16,7 @@ from django.contrib.sessions.backends.db import SessionStore
|
|||||||
from django.db import models
|
from django.db import models
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django.db.models.manager import BaseManager
|
from django.db.models.manager import BaseManager
|
||||||
|
from django_apscheduler.models import DjangoJob, DjangoJobExecution
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from pgvector.django import CosineDistance
|
from pgvector.django import CosineDistance
|
||||||
from torch import Tensor
|
from torch import Tensor
|
||||||
@@ -936,6 +937,14 @@ class AutomationAdapters:
|
|||||||
if not automation.id.startswith(f"automation_{user.uuid}_"):
|
if not automation.id.startswith(f"automation_{user.uuid}_"):
|
||||||
raise ValueError("Invalid automation id")
|
raise ValueError("Invalid automation id")
|
||||||
|
|
||||||
|
django_job = DjangoJob.objects.filter(id=automation.id).first()
|
||||||
|
execution = DjangoJobExecution.objects.filter(job=django_job)
|
||||||
|
|
||||||
|
last_run_time = None
|
||||||
|
|
||||||
|
if execution.exists():
|
||||||
|
last_run_time = execution.latest("run_time").run_time
|
||||||
|
|
||||||
automation_metadata = json.loads(automation.name)
|
automation_metadata = json.loads(automation.name)
|
||||||
crontime = automation_metadata["crontime"]
|
crontime = automation_metadata["crontime"]
|
||||||
timezone = automation.next_run_time.strftime("%Z")
|
timezone = automation.next_run_time.strftime("%Z")
|
||||||
@@ -948,6 +957,7 @@ class AutomationAdapters:
|
|||||||
"schedule": schedule,
|
"schedule": schedule,
|
||||||
"crontime": crontime,
|
"crontime": crontime,
|
||||||
"next": automation.next_run_time.strftime("%Y-%m-%d %I:%M %p %Z"),
|
"next": automation.next_run_time.strftime("%Y-%m-%d %I:%M %p %Z"),
|
||||||
|
"last_run": last_run_time.strftime("%Y-%m-%d %I:%M %p %Z") if last_run_time else None,
|
||||||
}
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from khoj.database.models import (
|
|||||||
KhojUser,
|
KhojUser,
|
||||||
NotionConfig,
|
NotionConfig,
|
||||||
OpenAIProcessorConversationConfig,
|
OpenAIProcessorConversationConfig,
|
||||||
|
ProcessLock,
|
||||||
ReflectiveQuestion,
|
ReflectiveQuestion,
|
||||||
SearchModelConfig,
|
SearchModelConfig,
|
||||||
SpeechToTextModelOptions,
|
SpeechToTextModelOptions,
|
||||||
@@ -44,6 +45,7 @@ class KhojUserAdmin(UserAdmin):
|
|||||||
admin.site.register(KhojUser, KhojUserAdmin)
|
admin.site.register(KhojUser, KhojUserAdmin)
|
||||||
|
|
||||||
admin.site.register(ChatModelOptions)
|
admin.site.register(ChatModelOptions)
|
||||||
|
admin.site.register(ProcessLock)
|
||||||
admin.site.register(SpeechToTextModelOptions)
|
admin.site.register(SpeechToTextModelOptions)
|
||||||
admin.site.register(OpenAIProcessorConversationConfig)
|
admin.site.register(OpenAIProcessorConversationConfig)
|
||||||
admin.site.register(SearchModelConfig)
|
admin.site.register(SearchModelConfig)
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ from starlette.requests import URL
|
|||||||
|
|
||||||
from khoj.database.adapters import (
|
from khoj.database.adapters import (
|
||||||
AgentAdapters,
|
AgentAdapters,
|
||||||
|
AutomationAdapters,
|
||||||
ConversationAdapters,
|
ConversationAdapters,
|
||||||
EntryAdapters,
|
EntryAdapters,
|
||||||
create_khoj_token,
|
create_khoj_token,
|
||||||
@@ -883,7 +884,26 @@ def should_notify(original_query: str, executed_query: str, ai_response: str) ->
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def scheduled_chat(query_to_run: str, scheduling_request: str, subject: str, user: KhojUser, calling_url: URL):
|
def scheduled_chat(
|
||||||
|
query_to_run: str, scheduling_request: str, subject: str, user: KhojUser, calling_url: URL, job_id: str = None
|
||||||
|
):
|
||||||
|
logger.info(f"Processing scheduled_chat: {query_to_run}")
|
||||||
|
if job_id:
|
||||||
|
# Get the job object and check whether the time is valid for it to run. This helps avoid race conditions that cause the same job to be run multiple times.
|
||||||
|
job = AutomationAdapters.get_automation(user, job_id)
|
||||||
|
metadata = AutomationAdapters.get_automation_metadata(user, job)
|
||||||
|
|
||||||
|
last_run_time = metadata.get("last_run", None)
|
||||||
|
|
||||||
|
# Convert last_run_time from %Y-%m-%d %I:%M %p %Z to datetime object
|
||||||
|
if last_run_time:
|
||||||
|
last_run_time = datetime.strptime(last_run_time, "%Y-%m-%d %I:%M %p %Z").replace(tzinfo=timezone.utc)
|
||||||
|
|
||||||
|
# If the last run time was within the last 6 hours, don't run it again. This helps avoid multithreading issues and rate limits.
|
||||||
|
if (datetime.now(timezone.utc) - last_run_time).total_seconds() < 21600:
|
||||||
|
logger.info(f"Skipping scheduled chat {job_id} as the next run time is in the future.")
|
||||||
|
return
|
||||||
|
|
||||||
# Extract relevant params from the original URL
|
# Extract relevant params from the original URL
|
||||||
scheme = "http" if not calling_url.is_secure else "https"
|
scheme = "http" if not calling_url.is_secure else "https"
|
||||||
query_dict = parse_qs(calling_url.query)
|
query_dict = parse_qs(calling_url.query)
|
||||||
@@ -971,6 +991,7 @@ async def schedule_automation(
|
|||||||
"subject": subject,
|
"subject": subject,
|
||||||
"user": user,
|
"user": user,
|
||||||
"calling_url": calling_url,
|
"calling_url": calling_url,
|
||||||
|
"job_id": job_id,
|
||||||
},
|
},
|
||||||
id=job_id,
|
id=job_id,
|
||||||
name=job_metadata,
|
name=job_metadata,
|
||||||
|
|||||||
Reference in New Issue
Block a user