@@ -1163,6 +1189,7 @@ export default function Automations() {
isLoggedIn={authenticatedData ? true : false}
setShowLoginPrompt={setShowLoginPrompt}
suggestedCard={true}
+ setToastMessage={setToastMessage}
/>
))}
diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py
index c707a08f..a5be3086 100644
--- a/src/khoj/database/adapters/__init__.py
+++ b/src/khoj/database/adapters/__init__.py
@@ -1741,7 +1741,7 @@ class AutomationAdapters:
return {
"id": automation.id,
"subject": automation_metadata["subject"],
- "query_to_run": re.sub(r"^/automated_task\s*", "", automation_metadata["query_to_run"]),
+ "query_to_run": automation_metadata["query_to_run"],
"scheduling_request": automation_metadata["scheduling_request"],
"schedule": schedule,
"crontime": crontime,
@@ -1783,6 +1783,19 @@ class AutomationAdapters:
return automation
+ @staticmethod
+ async def aget_automation(user: KhojUser, automation_id: str) -> Job:
+ # Perform validation checks
+ # Check if user is allowed to delete this automation id
+ if not automation_id.startswith(f"automation_{user.uuid}_"):
+ raise ValueError("Invalid automation id")
+ # Check if automation with this id exist
+ automation: Job = await sync_to_async(state.scheduler.get_job)(job_id=automation_id)
+ if not automation:
+ raise ValueError("Invalid automation id")
+
+ return automation
+
@staticmethod
def delete_automation(user: KhojUser, automation_id: str):
# Get valid, user-owned automation
diff --git a/src/khoj/processor/conversation/prompts.py b/src/khoj/processor/conversation/prompts.py
index 034209d4..77ac4214 100644
--- a/src/khoj/processor/conversation/prompts.py
+++ b/src/khoj/processor/conversation/prompts.py
@@ -935,7 +935,7 @@ AI: Here is one I found: "It's not denial. I'm just selective about the reality
User: Hahah, nice! Show a new one every morning.
Khoj: {{
"crontime": "0 9 * * *",
- "query": "/automated_task Share a funny Calvin and Hobbes or Bill Watterson quote from my notes",
+ "query": "Share a funny Calvin and Hobbes or Bill Watterson quote from my notes",
"subject": "Your Calvin and Hobbes Quote for the Day"
}}
@@ -955,7 +955,7 @@ AI: The latest released Khoj python package version is 1.5.0.
User: Notify me when version 2.0.0 is released
Khoj: {{
"crontime": "0 10 * * *",
- "query": "/automated_task What is the latest released version of the Khoj python package?",
+ "query": "/automated_task /research What is the latest released version of the Khoj python package?",
"subject": "Khoj Python Package Version 2.0.0 Release"
}}
@@ -1101,33 +1101,33 @@ to_notify_or_not = PromptTemplate.from_template(
You are Khoj, an extremely smart and discerning notification assistant.
- Decide whether the user should be notified of the AI's response using the Original User Query, Executed User Query and AI Response triplet.
- Notify the user only if the AI's response satisfies the user specified requirements.
-- You should only respond with a "Yes" or "No". Do not say anything else.
+- You should return a response with your reason and "Yes" or "No" decision in JSON format. Do not say anything else.
# Examples:
Original User Query: Hahah, nice! Show a new one every morning at 9am. My Current Location: Shanghai, China
Executed User Query: Could you share a funny Calvin and Hobbes quote from my notes?
AI Reponse: Here is one I found: "It's not denial. I'm just selective about the reality I accept."
-Khoj: Yes
+Khoj: {{ "reason": "The AI has shared a funny Calvin and Hobbes quote." , "decision": "Yes" }}
Original User Query: Every evening check if it's going to rain tomorrow. Notify me only if I'll need an umbrella. My Current Location: Nairobi, Kenya
Executed User Query: Is it going to rain tomorrow in Nairobi, Kenya
AI Response: Tomorrow's forecast is sunny with a high of 28°C and a low of 18°C
-Khoj: No
+Khoj: {{ "reason": "It is not expected to rain tomorrow.", "decision": "No" }}
-Original User Query: Tell me when version 2.0.0 is released. My Current Location: Mexico City, Mexico
-Executed User Query: Check if version 2.0.0 of the Khoj python package is released
-AI Response: The latest released Khoj python package version is 1.5.0.
-Khoj: No
-
-Original User Query: Paint me a sunset every evening. My Current Location: Shanghai, China
-Executed User Query: Paint me a sunset in Shanghai, China
+Original User Query: Paint a sunset for me every evening. My Current Location: Shanghai, China
+Executed User Query: Paint a sunset in Shanghai, China
AI Response: https://khoj-generated-images.khoj.dev/user110/image78124.webp
-Khoj: Yes
+Khoj: {{ "reason": "The AI has created an image.", "decision": "Yes" }}
-Original User Query: Share a summary of the tasks I've completed at the end of the day. My Current Location: Oslo, Norway
-Executed User Query: Share a summary of the tasks I've completed today.
-AI Response: I'm sorry, I couldn't find any relevant notes to respond to your message.
-Khoj: No
+Original User Query: Notify me when Khoj version 2.0.0 is released
+Executed User Query: What is the latest released version of the Khoj python package
+AI Response: The latest released Khoj python package version is 1.5.0.
+Khoj: {{ "reason": "Version 2.0.0 of Khoj has not been released yet." , "decision": "No" }}
+
+Original User Query: Share a summary of the tasks I've completed at the end of the day.
+Executed User Query: Generate a summary of the tasks I've completed today.
+AI Response: You have completed the following tasks today: 1. Meeting with the team 2. Submit travel expense report
+Khoj: {{ "reason": "The AI has provided a summary of completed tasks.", "decision": "Yes" }}
Original User Query: {original_query}
Executed User Query: {executed_query}
@@ -1137,6 +1137,26 @@ Khoj:
)
+automation_format_prompt = PromptTemplate.from_template(
+ """
+You are Khoj, a smart and creative researcher and writer with a knack for creating engaging content.
+- You *CAN REMEMBER ALL NOTES and PERSONAL INFORMATION FOREVER* that the user ever shares with you.
+- You *CAN* generate look-up real-time information from the internet, send notifications and answer questions based on the user's notes.
+
+Convert the AI response into a clear, structured markdown report with section headings to improve readability.
+Your response will be sent in the body of an email to the user.
+Do not add an email subject. Never add disclaimers in your final response.
+
+You are provided the following details for context.
+
+{username}
+Original User Query: {original_query}
+Executed Chat Request: {executed_query}
+AI Response: {response}
+Khoj:
+""".strip()
+)
+
# System messages to user
# --
help_message = PromptTemplate.from_template(
diff --git a/src/khoj/routers/api.py b/src/khoj/routers/api.py
index 3f58ca1a..a29e993a 100644
--- a/src/khoj/routers/api.py
+++ b/src/khoj/routers/api.py
@@ -38,15 +38,15 @@ from khoj.processor.conversation.offline.chat_model import extract_questions_off
from khoj.processor.conversation.offline.whisper import transcribe_audio_offline
from khoj.processor.conversation.openai.gpt import extract_questions
from khoj.processor.conversation.openai.whisper import transcribe_audio
-from khoj.processor.conversation.utils import defilter_query
+from khoj.processor.conversation.utils import clean_json, defilter_query
from khoj.routers.helpers import (
ApiUserRateLimiter,
ChatEvent,
CommonQueryParams,
ConversationCommandRateLimiter,
- acreate_title_from_query,
get_user_config,
schedule_automation,
+ schedule_query,
update_telemetry_state,
)
from khoj.search_filter.date_filter import DateFilter
@@ -566,7 +566,7 @@ def delete_automation(request: Request, automation_id: str) -> Response:
@api.post("/automation", response_class=Response)
@requires(["authenticated"])
-async def post_automation(
+def post_automation(
request: Request,
q: str,
crontime: str,
@@ -584,11 +584,15 @@ async def post_automation(
if not cron_descriptor.get_description(crontime):
return Response(content="Invalid crontime", status_code=400)
+ # Infer subject, query to run
+ _, query_to_run, generated_subject = schedule_query(q, conversation_history={}, user=user)
+ subject = subject or generated_subject
+
# Normalize query parameters
# Add /automated_task prefix to query if not present
- q = q.strip()
- if not q.startswith("/automated_task"):
- query_to_run = f"/automated_task {q}"
+ query_to_run = query_to_run.strip()
+ if not query_to_run.startswith("/automated_task"):
+ query_to_run = f"/automated_task {query_to_run}"
# Normalize crontime for AP Scheduler CronTrigger
crontime = crontime.strip()
@@ -603,24 +607,19 @@ async def post_automation(
minute_value = crontime.split(" ")[0]
if not minute_value.isdigit():
return Response(
- content="Recurrence of every X minutes is unsupported. Please create a less frequent schedule.",
+ content="Minute level recurrence is unsupported. Please create a less frequent schedule.",
status_code=400,
)
- if not subject:
- subject = await acreate_title_from_query(q, user)
-
- title = f"Automation: {subject}"
-
# Create new Conversation Session associated with this new task
- conversation = await ConversationAdapters.acreate_conversation_session(user, request.user.client_app, title=title)
-
- calling_url = request.url.replace(query=f"{request.url.query}")
+ title = f"Automation: {subject}"
+ conversation = ConversationAdapters.create_conversation_session(user, request.user.client_app, title=title)
# Schedule automation with query_to_run, timezone, subject directly provided by user
try:
# Use the query to run as the scheduling request if the scheduling request is unset
- automation = await schedule_automation(
+ calling_url = request.url.replace(query=f"{request.url.query}")
+ automation = schedule_automation(
query_to_run, subject, crontime, timezone, q, user, calling_url, str(conversation.id)
)
except Exception as e:
@@ -691,11 +690,15 @@ def edit_job(
logger.error(f"Error editing automation {automation_id} for {user.email}: {e}", exc_info=True)
return Response(content="Invalid automation", status_code=403)
+ # Infer subject, query to run
+ _, query_to_run, _ = schedule_query(q, conversation_history={}, user=user)
+ subject = subject
+
# Normalize query parameters
# Add /automated_task prefix to query if not present
- q = q.strip()
- if not q.startswith("/automated_task"):
- query_to_run = f"/automated_task {q}"
+ query_to_run = query_to_run.strip()
+ if not query_to_run.startswith("/automated_task"):
+ query_to_run = f"/automated_task {query_to_run}"
# Normalize crontime for AP Scheduler CronTrigger
crontime = crontime.strip()
if len(crontime.split(" ")) > 5:
@@ -713,7 +716,7 @@ def edit_job(
)
# Construct updated automation metadata
- automation_metadata = json.loads(automation.name)
+ automation_metadata: dict[str, str] = json.loads(clean_json(automation.name))
automation_metadata["scheduling_request"] = q
automation_metadata["query_to_run"] = query_to_run
automation_metadata["subject"] = subject.strip()
diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py
index 4e458651..d6afddb3 100644
--- a/src/khoj/routers/helpers.py
+++ b/src/khoj/routers/helpers.py
@@ -551,9 +551,37 @@ async def generate_online_subqueries(
return {q}
-async def schedule_query(
+def schedule_query(
q: str, conversation_history: dict, user: KhojUser, query_images: List[str] = None, tracer: dict = {}
-) -> Tuple[str, ...]:
+) -> Tuple[str, str, str]:
+ """
+ Schedule the date, time to run the query. Assume the server timezone is UTC.
+ """
+ chat_history = construct_chat_history(conversation_history)
+
+ crontime_prompt = prompts.crontime_prompt.format(
+ query=q,
+ chat_history=chat_history,
+ )
+
+ raw_response = send_message_to_model_wrapper_sync(
+ crontime_prompt, query_images=query_images, response_type="json_object", user=user, tracer=tracer
+ )
+
+ # Validate that the response is a non-empty, JSON-serializable list
+ try:
+ raw_response = raw_response.strip()
+ response: Dict[str, str] = json.loads(clean_json(raw_response))
+ if not response or not isinstance(response, Dict) or len(response) != 3:
+ raise AssertionError(f"Invalid response for scheduling query : {response}")
+ return response.get("crontime"), response.get("query"), response.get("subject")
+ except Exception:
+ raise AssertionError(f"Invalid response for scheduling query: {raw_response}")
+
+
+async def aschedule_query(
+ q: str, conversation_history: dict, user: KhojUser, query_images: List[str] = None, tracer: dict = {}
+) -> Tuple[str, str, str]:
"""
Schedule the date, time to run the query. Assume the server timezone is UTC.
"""
@@ -571,7 +599,7 @@ async def schedule_query(
# Validate that the response is a non-empty, JSON-serializable list
try:
raw_response = raw_response.strip()
- response: Dict[str, str] = json.loads(raw_response)
+ response: Dict[str, str] = json.loads(clean_json(raw_response))
if not response or not isinstance(response, Dict) or len(response) != 3:
raise AssertionError(f"Invalid response for scheduling query : {response}")
return response.get("crontime"), response.get("query"), response.get("subject")
@@ -1065,6 +1093,7 @@ def send_message_to_model_wrapper_sync(
system_message: str = "",
response_type: str = "text",
user: KhojUser = None,
+ query_images: List[str] = None,
query_files: str = "",
tracer: dict = {},
):
@@ -1090,6 +1119,7 @@ def send_message_to_model_wrapper_sync(
max_prompt_size=max_tokens,
vision_enabled=vision_available,
model_type=chat_model.model_type,
+ query_images=query_images,
query_files=query_files,
)
@@ -1112,6 +1142,7 @@ def send_message_to_model_wrapper_sync(
max_prompt_size=max_tokens,
vision_enabled=vision_available,
model_type=chat_model.model_type,
+ query_images=query_images,
query_files=query_files,
)
@@ -1134,6 +1165,7 @@ def send_message_to_model_wrapper_sync(
max_prompt_size=max_tokens,
vision_enabled=vision_available,
model_type=chat_model.model_type,
+ query_images=query_images,
query_files=query_files,
)
@@ -1154,6 +1186,7 @@ def send_message_to_model_wrapper_sync(
max_prompt_size=max_tokens,
vision_enabled=vision_available,
model_type=chat_model.model_type,
+ query_images=query_images,
query_files=query_files,
)
@@ -1634,6 +1667,25 @@ class CommonQueryParamsClass:
CommonQueryParams = Annotated[CommonQueryParamsClass, Depends()]
+def format_automation_response(scheduling_request: str, executed_query: str, ai_response: str, user: KhojUser) -> bool:
+ """
+ Format the AI response to send in automation email to user.
+ """
+ name = get_user_name(user)
+ if name:
+ username = prompts.user_name.format(name=name)
+
+ automation_format_prompt = prompts.automation_format_prompt.format(
+ original_query=scheduling_request,
+ executed_query=executed_query,
+ response=ai_response,
+ username=username,
+ )
+
+ with timer("Chat actor: Format automation response", logger):
+ return send_message_to_model_wrapper_sync(automation_format_prompt, user=user)
+
+
def should_notify(original_query: str, executed_query: str, ai_response: str, user: KhojUser) -> bool:
"""
Decide whether to notify the user of the AI response.
@@ -1651,12 +1703,19 @@ def should_notify(original_query: str, executed_query: str, ai_response: str, us
with timer("Chat actor: Decide to notify user of automation response", logger):
try:
# TODO Replace with async call so we don't have to maintain a sync version
- response = send_message_to_model_wrapper_sync(to_notify_or_not, user)
- should_notify_result = "no" not in response.lower()
- logger.info(f'Decided to {"not " if not should_notify_result else ""}notify user of automation response.')
+ raw_response = send_message_to_model_wrapper_sync(to_notify_or_not, user=user, response_type="json_object")
+ response = json.loads(raw_response)
+ should_notify_result = response["decision"] == "Yes"
+ reason = response.get("reason", "unknown")
+ logger.info(
+ f'Decided to {"not " if not should_notify_result else ""}notify user of automation response because of reason: {reason}.'
+ )
return should_notify_result
- except:
- logger.warning(f"Fallback to notify user of automation response as failed to infer should notify or not.")
+ except Exception as e:
+ logger.warning(
+ f"Fallback to notify user of automation response as failed to infer should notify or not. {e}",
+ exc_info=True,
+ )
return True
@@ -1751,10 +1810,12 @@ def scheduled_chat(
if should_notify(
original_query=scheduling_request, executed_query=cleaned_query, ai_response=ai_response, user=user
):
+ formatted_response = format_automation_response(scheduling_request, cleaned_query, ai_response, user)
+
if is_resend_enabled():
- send_task_email(user.get_short_name(), user.email, cleaned_query, ai_response, subject, is_image)
+ send_task_email(user.get_short_name(), user.email, cleaned_query, formatted_response, subject, is_image)
else:
- return raw_response
+ return formatted_response
async def create_automation(
@@ -1766,12 +1827,66 @@ async def create_automation(
conversation_id: str = None,
tracer: dict = {},
):
- crontime, query_to_run, subject = await schedule_query(q, meta_log, user, tracer=tracer)
- job = await schedule_automation(query_to_run, subject, crontime, timezone, q, user, calling_url, conversation_id)
+ crontime, query_to_run, subject = await aschedule_query(q, meta_log, user, tracer=tracer)
+ job = await aschedule_automation(query_to_run, subject, crontime, timezone, q, user, calling_url, conversation_id)
return job, crontime, query_to_run, subject
-async def schedule_automation(
+def schedule_automation(
+ query_to_run: str,
+ subject: str,
+ crontime: str,
+ timezone: str,
+ scheduling_request: str,
+ user: KhojUser,
+ calling_url: URL,
+ conversation_id: str,
+):
+ # Disable minute level automation recurrence
+ minute_value = crontime.split(" ")[0]
+ if not minute_value.isdigit():
+ # Run automation at some random minute (to distribute request load) instead of running every X minutes
+ crontime = " ".join([str(math.floor(random() * 60))] + crontime.split(" ")[1:])
+
+ user_timezone = pytz.timezone(timezone)
+ trigger = CronTrigger.from_crontab(crontime, user_timezone)
+ trigger.jitter = 60
+ # Generate id and metadata used by task scheduler and process locks for the task runs
+ job_metadata = json.dumps(
+ {
+ "query_to_run": query_to_run,
+ "scheduling_request": scheduling_request,
+ "subject": subject,
+ "crontime": crontime,
+ "conversation_id": str(conversation_id),
+ }
+ )
+ query_id = hashlib.md5(f"{query_to_run}_{crontime}".encode("utf-8")).hexdigest()
+ job_id = f"automation_{user.uuid}_{query_id}"
+ job = state.scheduler.add_job(
+ run_with_process_lock,
+ trigger=trigger,
+ args=(
+ scheduled_chat,
+ f"{ProcessLock.Operation.SCHEDULED_JOB}_{user.uuid}_{query_id}",
+ ),
+ kwargs={
+ "query_to_run": query_to_run,
+ "scheduling_request": scheduling_request,
+ "subject": subject,
+ "user": user,
+ "calling_url": calling_url,
+ "job_id": job_id,
+ "conversation_id": conversation_id,
+ },
+ id=job_id,
+ name=job_metadata,
+ max_instances=2, # Allow second instance to kill any previous instance with stale lock
+ )
+ return job
+
+
+async def aschedule_automation(
query_to_run: str,
subject: str,
crontime: str,
diff --git a/tests/test_online_chat_actors.py b/tests/test_online_chat_actors.py
index f873be45..2b08fc74 100644
--- a/tests/test_online_chat_actors.py
+++ b/tests/test_online_chat_actors.py
@@ -636,11 +636,11 @@ async def test_infer_webpage_urls_actor_extracts_correct_links(chat_client, defa
),
],
)
-async def test_infer_task_scheduling_request(
+def test_infer_task_scheduling_request(
chat_client, user_query, expected_crontime, expected_qs, unexpected_qs, default_user2
):
# Act
- crontime, inferred_query, _ = await schedule_query(user_query, {}, default_user2)
+ crontime, inferred_query, _ = schedule_query(user_query, {}, default_user2)
inferred_query = inferred_query.lower()
# Assert