diff --git a/pyproject.toml b/pyproject.toml index 304b3886..06b2da55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,9 +76,14 @@ dependencies = [ "django-phonenumber-field == 7.3.0", "phonenumbers == 8.13.27", "markdownify ~= 0.11.6", + "markdown-it-py ~= 3.0.0", "websockets == 12.0", "psutil >= 5.8.0", "huggingface-hub >= 0.22.2", + "apscheduler ~= 3.10.0", + "pytz ~= 2024.1", + "cron-descriptor == 1.4.3", + "django_apscheduler == 0.6.2", ] dynamic = ["version"] diff --git a/src/interface/desktop/chat.html b/src/interface/desktop/chat.html index aa73df21..c26fe4b5 100644 --- a/src/interface/desktop/chat.html +++ b/src/interface/desktop/chat.html @@ -40,6 +40,7 @@ let region = null; let city = null; let countryName = null; + let timezone = null; fetch("https://ipapi.co/json") .then(response => response.json()) @@ -47,6 +48,7 @@ region = data.region; city = data.city; countryName = data.country_name; + timezone = data.timezone; }) .catch(err => { console.log(err); @@ -463,16 +465,16 @@ } // Generate backend API URL to execute query - let chatApi = `${hostURL}/api/chat?q=${encodeURIComponent(query)}&n=${resultsCount}&client=web&stream=true&conversation_id=${conversationID}®ion=${region}&city=${city}&country=${countryName}`; + let chatApi = `${hostURL}/api/chat?q=${encodeURIComponent(query)}&n=${resultsCount}&client=web&stream=true&conversation_id=${conversationID}®ion=${region}&city=${city}&country=${countryName}&timezone=${timezone}`; - let new_response = document.createElement("div"); - new_response.classList.add("chat-message", "khoj"); - new_response.attributes["data-meta"] = "๐Ÿฎ Khoj at " + formatDate(new Date()); - chat_body.appendChild(new_response); + let newResponseEl = document.createElement("div"); + newResponseEl.classList.add("chat-message", "khoj"); + newResponseEl.attributes["data-meta"] = "๐Ÿฎ Khoj at " + formatDate(new Date()); + chat_body.appendChild(newResponseEl); - let newResponseText = document.createElement("div"); - newResponseText.classList.add("chat-message-text", "khoj"); - new_response.appendChild(newResponseText); + let newResponseTextEl = document.createElement("div"); + newResponseTextEl.classList.add("chat-message-text", "khoj"); + newResponseEl.appendChild(newResponseTextEl); // Temporary status message to indicate that Khoj is thinking let loadingEllipsis = document.createElement("div"); @@ -495,7 +497,7 @@ loadingEllipsis.appendChild(thirdEllipsis); loadingEllipsis.appendChild(fourthEllipsis); - newResponseText.appendChild(loadingEllipsis); + newResponseTextEl.appendChild(loadingEllipsis); document.getElementById("chat-body").scrollTop = document.getElementById("chat-body").scrollHeight; let chatTooltip = document.getElementById("chat-tooltip"); @@ -540,11 +542,11 @@ // If the chunk is not a JSON object, just display it as is rawResponse += chunk; } finally { - newResponseText.innerHTML = ""; - newResponseText.appendChild(formatHTMLMessage(rawResponse)); + newResponseTextEl.innerHTML = ""; + newResponseTextEl.appendChild(formatHTMLMessage(rawResponse)); if (references != null) { - newResponseText.appendChild(references); + newResponseTextEl.appendChild(references); } document.getElementById("chat-body").scrollTop = document.getElementById("chat-body").scrollHeight; @@ -563,7 +565,7 @@ if (done) { // Append any references after all the data has been streamed if (references != {}) { - newResponseText.appendChild(createReferenceSection(references)); + newResponseTextEl.appendChild(createReferenceSection(references)); } document.getElementById("chat-body").scrollTop = document.getElementById("chat-body").scrollHeight; document.getElementById("chat-input").removeAttribute("disabled"); @@ -576,8 +578,8 @@ if (chunk.includes("### compiled references:")) { const additionalResponse = chunk.split("### compiled references:")[0]; rawResponse += additionalResponse; - newResponseText.innerHTML = ""; - newResponseText.appendChild(formatHTMLMessage(rawResponse)); + newResponseTextEl.innerHTML = ""; + newResponseTextEl.appendChild(formatHTMLMessage(rawResponse)); const rawReference = chunk.split("### compiled references:")[1]; const rawReferenceAsJson = JSON.parse(rawReference); @@ -589,14 +591,14 @@ readStream(); } else { // Display response from Khoj - if (newResponseText.getElementsByClassName("lds-ellipsis").length > 0) { - newResponseText.removeChild(loadingEllipsis); + if (newResponseTextEl.getElementsByClassName("lds-ellipsis").length > 0) { + newResponseTextEl.removeChild(loadingEllipsis); } // If the chunk is not a JSON object, just display it as is rawResponse += chunk; - newResponseText.innerHTML = ""; - newResponseText.appendChild(formatHTMLMessage(rawResponse)); + newResponseTextEl.innerHTML = ""; + newResponseTextEl.appendChild(formatHTMLMessage(rawResponse)); readStream(); } diff --git a/src/interface/obsidian/src/chat_modal.ts b/src/interface/obsidian/src/chat_modal.ts index 504ce4db..31b938a1 100644 --- a/src/interface/obsidian/src/chat_modal.ts +++ b/src/interface/obsidian/src/chat_modal.ts @@ -15,6 +15,7 @@ export class KhojChatModal extends Modal { region: string; city: string; countryName: string; + timezone: string; constructor(app: App, setting: KhojSetting) { super(app); @@ -30,6 +31,7 @@ export class KhojChatModal extends Modal { this.region = data.region; this.city = data.city; this.countryName = data.country_name; + this.timezone = data.timezone; }) .catch(err => { console.log(err); @@ -393,7 +395,7 @@ export class KhojChatModal extends Modal { // Get chat response from Khoj backend let encodedQuery = encodeURIComponent(query); - let chatUrl = `${this.setting.khojUrl}/api/chat?q=${encodedQuery}&n=${this.setting.resultsCount}&client=obsidian&stream=true®ion=${this.region}&city=${this.city}&country=${this.countryName}`; + let chatUrl = `${this.setting.khojUrl}/api/chat?q=${encodedQuery}&n=${this.setting.resultsCount}&client=obsidian&stream=true®ion=${this.region}&city=${this.city}&country=${this.countryName}&timezone=${this.timezone}`; let responseElement = this.createKhojResponseDiv(); // Temporary status message to indicate that Khoj is thinking diff --git a/src/khoj/app/settings.py b/src/khoj/app/settings.py index 27be968e..2672f98d 100644 --- a/src/khoj/app/settings.py +++ b/src/khoj/app/settings.py @@ -77,6 +77,7 @@ INSTALLED_APPS = [ "django.contrib.messages", "django.contrib.staticfiles", "phonenumber_field", + "django_apscheduler", ] MIDDLEWARE = [ @@ -169,3 +170,20 @@ STATIC_URL = "/static/" # https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" + + +# Format string for displaying run time timestamps in the Django admin site. The default +# just adds seconds to the standard Django format, which is useful for displaying the timestamps +# for jobs that are scheduled to run on intervals of less than one minute. +# +# See https://docs.djangoproject.com/en/dev/ref/settings/#datetime-format for format string +# syntax details. +APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a" + +# Maximum run time allowed for jobs that are triggered manually via the Django admin site, which +# prevents admin site HTTP requests from timing out. +# +# Longer running jobs should probably be handed over to a background task processing library +# that supports multiple background worker processes instead (e.g. Dramatiq, Celery, Django-RQ, +# etc. See: https://djangopackages.org/grids/g/workers-queues-tasks/ for popular options). +APSCHEDULER_RUN_NOW_TIMEOUT = 240 # Seconds diff --git a/src/khoj/configure.py b/src/khoj/configure.py index 38b8223f..81e653d4 100644 --- a/src/khoj/configure.py +++ b/src/khoj/configure.py @@ -324,7 +324,7 @@ def update_content_index(): @schedule.repeat(schedule.every(22).to(25).hours) def update_content_index_regularly(): ProcessLockAdapters.run_with_lock( - update_content_index, ProcessLock.Operation.UPDATE_EMBEDDINGS, max_duration_in_seconds=60 * 60 * 2 + update_content_index, ProcessLock.Operation.INDEX_CONTENT, max_duration_in_seconds=60 * 60 * 2 ) diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index 10fde9e8..048df839 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -1,12 +1,16 @@ +import json import logging import math import random +import re import secrets import sys from datetime import date, datetime, timedelta, timezone from enum import Enum -from typing import Callable, List, Optional, Type +from typing import Callable, Iterable, List, Optional, Type +import cron_descriptor +from apscheduler.job import Job from asgiref.sync import sync_to_async from django.contrib.sessions.backends.db import SessionStore from django.db import models @@ -68,7 +72,14 @@ async def set_notion_config(token: str, user: KhojUser): return notion_config -async def create_khoj_token(user: KhojUser, name=None): +def create_khoj_token(user: KhojUser, name=None): + "Create Khoj API key for user" + token = f"kk-{secrets.token_urlsafe(32)}" + name = name or f"{generate_random_name().title()}" + return KhojApiUser.objects.create(token=token, user=user, name=name) + + +async def acreate_khoj_token(user: KhojUser, name=None): "Create Khoj API key for user" token = f"kk-{secrets.token_urlsafe(32)}" name = name or f"{generate_random_name().title()}" @@ -429,7 +440,7 @@ class ProcessLockAdapters: return ProcessLock.objects.filter(name=process_name).delete() @staticmethod - def run_with_lock(func: Callable, operation: ProcessLock.Operation, max_duration_in_seconds: int = 600): + def run_with_lock(func: Callable, operation: ProcessLock.Operation, max_duration_in_seconds: int = 600, **kwargs): # Exit early if process lock is already taken if ProcessLockAdapters.is_process_locked(operation): logger.info(f"๐Ÿ”’ Skip executing {func} as {operation} lock is already taken") @@ -443,7 +454,7 @@ class ProcessLockAdapters: # Execute Function with timer(f"๐Ÿ”’ Run {func} with {operation} process lock", logger): - func() + func(**kwargs) success = True except Exception as e: logger.error(f"๐Ÿšจ Error executing {func} with {operation} process lock: {e}", exc_info=True) @@ -454,6 +465,13 @@ class ProcessLockAdapters: logger.info(f"๐Ÿ”“ Unlocked {operation} process after executing {func} {'Succeeded' if success else 'Failed'}") +def run_with_process_lock(*args, **kwargs): + """Wrapper function used for scheduling jobs. + Required as APScheduler can't discover the `ProcessLockAdapter.run_with_lock' method on its own. + """ + return ProcessLockAdapters.run_with_lock(*args, **kwargs) + + class ClientApplicationAdapters: @staticmethod async def aget_client_application_by_id(client_id: str, client_secret: str): @@ -901,3 +919,62 @@ class EntryAdapters: @staticmethod def get_unique_file_sources(user: KhojUser): return Entry.objects.filter(user=user).values_list("file_source", flat=True).distinct().all() + + +class AutomationAdapters: + @staticmethod + def get_automations(user: KhojUser) -> Iterable[Job]: + all_automations: Iterable[Job] = state.scheduler.get_jobs() + for automation in all_automations: + if automation.id.startswith(f"automation_{user.uuid}_"): + yield automation + + @staticmethod + def get_automation_metadata(user: KhojUser, automation: Job): + # Perform validation checks + # Check if user is allowed to delete this automation id + if not automation.id.startswith(f"automation_{user.uuid}_"): + raise ValueError("Invalid automation id") + + automation_metadata = json.loads(automation.name) + crontime = automation_metadata["crontime"] + timezone = automation.next_run_time.strftime("%Z") + schedule = f"{cron_descriptor.get_description(crontime)} {timezone}" + return { + "id": automation.id, + "subject": automation_metadata["subject"], + "query_to_run": re.sub(r"^/automated_task\s*", "", automation_metadata["query_to_run"]), + "scheduling_request": automation_metadata["scheduling_request"], + "schedule": schedule, + "crontime": crontime, + "next": automation.next_run_time.strftime("%Y-%m-%d %I:%M %p %Z"), + } + + @staticmethod + def get_automations_metadata(user: KhojUser): + for automation in AutomationAdapters.get_automations(user): + yield AutomationAdapters.get_automation_metadata(user, automation) + + @staticmethod + def get_automation(user: KhojUser, automation_id: str) -> Job: + # Perform validation checks + # Check if user is allowed to delete this automation id + if not automation_id.startswith(f"automation_{user.uuid}_"): + raise ValueError("Invalid automation id") + # Check if automation with this id exist + automation: Job = state.scheduler.get_job(job_id=automation_id) + if not automation: + raise ValueError("Invalid automation id") + + return automation + + @staticmethod + def delete_automation(user: KhojUser, automation_id: str): + # Get valid, user-owned automation + automation: Job = AutomationAdapters.get_automation(user, automation_id) + + # Collate info about user automation to be deleted + automation_metadata = AutomationAdapters.get_automation_metadata(user, automation) + + automation.remove() + return automation_metadata diff --git a/src/khoj/database/migrations/0036_alter_processlock_name.py b/src/khoj/database/migrations/0036_alter_processlock_name.py new file mode 100644 index 00000000..87c9a0d4 --- /dev/null +++ b/src/khoj/database/migrations/0036_alter_processlock_name.py @@ -0,0 +1,19 @@ +# Generated by Django 4.2.10 on 2024-04-16 18:39 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("database", "0035_processlock"), + ] + + operations = [ + migrations.AlterField( + model_name="processlock", + name="name", + field=models.CharField( + choices=[("index_content", "Index Content"), ("scheduled_job", "Scheduled Job")], max_length=200 + ), + ), + ] diff --git a/src/khoj/database/migrations/0038_merge_20240426_1640.py b/src/khoj/database/migrations/0038_merge_20240426_1640.py new file mode 100644 index 00000000..74cabb85 --- /dev/null +++ b/src/khoj/database/migrations/0038_merge_20240426_1640.py @@ -0,0 +1,12 @@ +# Generated by Django 4.2.10 on 2024-04-26 16:40 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("database", "0036_alter_processlock_name"), + ("database", "0037_searchmodelconfig_bi_encoder_docs_encode_config_and_more"), + ] + + operations: list = [] diff --git a/src/khoj/database/migrations/0039_merge_20240501_0301.py b/src/khoj/database/migrations/0039_merge_20240501_0301.py new file mode 100644 index 00000000..c2bb1a87 --- /dev/null +++ b/src/khoj/database/migrations/0039_merge_20240501_0301.py @@ -0,0 +1,12 @@ +# Generated by Django 4.2.10 on 2024-05-01 03:01 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("database", "0038_merge_20240425_0857"), + ("database", "0038_merge_20240426_1640"), + ] + + operations: list = [] diff --git a/src/khoj/database/models/__init__.py b/src/khoj/database/models/__init__.py index 4077c35c..ad35be01 100644 --- a/src/khoj/database/models/__init__.py +++ b/src/khoj/database/models/__init__.py @@ -109,7 +109,8 @@ class Agent(BaseModel): class ProcessLock(BaseModel): class Operation(models.TextChoices): - UPDATE_EMBEDDINGS = "update_embeddings" + INDEX_CONTENT = "index_content" + SCHEDULED_JOB = "scheduled_job" # We need to make sure that some operations are thread-safe. To do so, add locks for potentially shared operations. # For example, we need to make sure that only one process is updating the embeddings at a time. diff --git a/src/khoj/interface/email/task.html b/src/khoj/interface/email/task.html new file mode 100644 index 00000000..1e78ce34 --- /dev/null +++ b/src/khoj/interface/email/task.html @@ -0,0 +1,42 @@ + + + + Khoj AI - Automation + + + + + +
+
+

Your Open, Personal AI

+

Hey {{name}}!

+

I've shared your automation results below:

+ +
+
+ +

{{subject}}

+
+

{{result}}

+
+
+

The automation query I ran on your behalf: {{query}}

+

You can view, delete your automations via the settings page

+
+
+

- Khoj

+ + + + + + + + +
DocsGitHubTwitterLinkedInDiscord
+ + + diff --git a/src/khoj/interface/web/assets/icons/automation.svg b/src/khoj/interface/web/assets/icons/automation.svg new file mode 100644 index 00000000..162dd9ba --- /dev/null +++ b/src/khoj/interface/web/assets/icons/automation.svg @@ -0,0 +1,37 @@ + + + + + + + diff --git a/src/khoj/interface/web/assets/icons/copy-solid.svg b/src/khoj/interface/web/assets/icons/copy-solid.svg deleted file mode 100644 index da7020be..00000000 --- a/src/khoj/interface/web/assets/icons/copy-solid.svg +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/khoj/interface/web/assets/icons/delete.svg b/src/khoj/interface/web/assets/icons/delete.svg new file mode 100644 index 00000000..8e078275 --- /dev/null +++ b/src/khoj/interface/web/assets/icons/delete.svg @@ -0,0 +1,26 @@ + + + + + diff --git a/src/khoj/interface/web/assets/icons/edit.svg b/src/khoj/interface/web/assets/icons/edit.svg new file mode 100644 index 00000000..9dd66854 --- /dev/null +++ b/src/khoj/interface/web/assets/icons/edit.svg @@ -0,0 +1,4 @@ + + + + diff --git a/src/khoj/interface/web/assets/icons/microphone-solid.svg b/src/khoj/interface/web/assets/icons/microphone-solid.svg deleted file mode 100644 index 3fc4b91d..00000000 --- a/src/khoj/interface/web/assets/icons/microphone-solid.svg +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/khoj/interface/web/assets/icons/new.svg b/src/khoj/interface/web/assets/icons/new.svg new file mode 100644 index 00000000..f27d95f6 --- /dev/null +++ b/src/khoj/interface/web/assets/icons/new.svg @@ -0,0 +1,23 @@ + + + + + diff --git a/src/khoj/interface/web/assets/icons/trash-solid.svg b/src/khoj/interface/web/assets/icons/trash-solid.svg deleted file mode 100644 index 768d80f8..00000000 --- a/src/khoj/interface/web/assets/icons/trash-solid.svg +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/khoj/interface/web/assets/khoj.css b/src/khoj/interface/web/assets/khoj.css index 3c3536a7..1c57fbce 100644 --- a/src/khoj/interface/web/assets/khoj.css +++ b/src/khoj/interface/web/assets/khoj.css @@ -199,7 +199,7 @@ img.khoj-logo { border: 3px solid var(--primary-hover); } -@media screen and (max-width: 700px) { +@media screen and (max-width: 1000px) { .khoj-nav-dropdown-content { display: block; grid-auto-flow: row; @@ -215,7 +215,7 @@ img.khoj-logo { } } -@media only screen and (max-width: 700px) { +@media only screen and (max-width: 1000px) { div.khoj-header { display: grid; grid-auto-flow: column; diff --git a/src/khoj/interface/web/assets/natural-cron.min.js b/src/khoj/interface/web/assets/natural-cron.min.js new file mode 100644 index 00000000..d974dc4f --- /dev/null +++ b/src/khoj/interface/web/assets/natural-cron.min.js @@ -0,0 +1 @@ +!function(e){"object"==typeof exports&&"undefined"!=typeof module?module.exports=e():"function"==typeof define&&define.amd?define([],e):("undefined"!=typeof window?window:"undefined"!=typeof global?global:"undefined"!=typeof self?self:this).getCronString=e()}(function(){return function r(a,o,s){function u(n,e){if(!o[n]){if(!a[n]){var t="function"==typeof require&&require;if(!e&&t)return t(n,!0);if(i)return i(n,!0);throw(t=new Error("Cannot find module '"+n+"'")).code="MODULE_NOT_FOUND",t}t=o[n]={exports:{}},a[n][0].call(t.exports,function(e){return u(a[n][1][e]||e)},t,t.exports,r,a,o,s)}return o[n].exports}for(var i="function"==typeof require&&require,e=0;e -
+
+
{% block content %} {% endblock %} +
+ + +{% endblock %} diff --git a/src/khoj/interface/web/utils.html b/src/khoj/interface/web/utils.html index f9372482..b2d719cb 100644 --- a/src/khoj/interface/web/utils.html +++ b/src/khoj/interface/web/utils.html @@ -10,6 +10,9 @@ Agents Agents + + Automation + Automate {% if has_documents %} Search diff --git a/src/khoj/main.py b/src/khoj/main.py index 745b77fb..4a9593af 100644 --- a/src/khoj/main.py +++ b/src/khoj/main.py @@ -23,6 +23,7 @@ warnings.filterwarnings("ignore", message=r"legacy way to download files from th import uvicorn import django +from apscheduler.schedulers.background import BackgroundScheduler from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles @@ -126,6 +127,19 @@ def run(should_start_server=True): # Setup task scheduler poll_task_scheduler() + # Setup Background Scheduler + from django_apscheduler.jobstores import DjangoJobStore + + state.scheduler = BackgroundScheduler( + { + "apscheduler.timezone": "UTC", + "apscheduler.job_defaults.misfire_grace_time": "60", # Useful to run scheduled jobs even when worker delayed because it was busy or down + "apscheduler.job_defaults.coalesce": "true", # Combine multiple jobs into one if they are scheduled at the same time + } + ) + state.scheduler.add_jobstore(DjangoJobStore(), "default") + state.scheduler.start() + # Start Server configure_routes(app) @@ -145,6 +159,9 @@ def run(should_start_server=True): if should_start_server: start_server(app, host=args.host, port=args.port, socket=args.socket) + # Teardown + state.scheduler.shutdown() + def set_state(args): state.config_file = args.config_file diff --git a/src/khoj/processor/conversation/prompts.py b/src/khoj/processor/conversation/prompts.py index f5700167..a95bbe73 100644 --- a/src/khoj/processor/conversation/prompts.py +++ b/src/khoj/processor/conversation/prompts.py @@ -10,8 +10,7 @@ You were created by Khoj Inc. with the following capabilities: - You *CAN REMEMBER ALL NOTES and PERSONAL INFORMATION FOREVER* that the user ever shares with you. - Users can share files and other information with you using the Khoj Desktop, Obsidian or Emacs app. They can also drag and drop their files into the chat window. -- You *CAN* generate images, look-up real-time information from the internet, and answer questions based on the user's notes. -- You cannot set reminders. +- You *CAN* generate images, look-up real-time information from the internet, set reminders and answer questions based on the user's notes. - Say "I don't know" or "I don't understand" if you don't know what to say or if you don't know the answer to a question. - Ask crisp follow-up questions to get additional context, when the answer cannot be inferred from the provided notes or past conversations. - Sometimes the user will share personal information that needs to be remembered, like an account ID or a residential address. These can be acknowledged with a simple "Got it" or "Okay". @@ -301,6 +300,22 @@ AI: I can help with that. I see online that there is a new model of the Dell XPS Q: What are the specs of the new Dell XPS 15? Khoj: default +Example: +Chat History: +User: Where did I go on my last vacation? +AI: You went to Jordan and visited Petra, the Dead Sea, and Wadi Rum. + +Q: Remind me who did I go with on that trip? +Khoj: default + +Example: +Chat History: +User: How's the weather outside? Current Location: Bali, Indonesia +AI: It's currently 28ยฐC and partly cloudy in Bali. + +Q: Share a painting using the weather for Bali every morning. +Khoj: reminder + Now it's your turn to pick the mode you would like to use to answer the user's question. Provide your response as a string. Chat History: @@ -492,6 +507,115 @@ Khoj: """.strip() ) +# Automations +# -- +crontime_prompt = PromptTemplate.from_template( + """ +You are Khoj, an extremely smart and helpful task scheduling assistant +- Given a user query, infer the date, time to run the query at as a cronjob time string +- Use an approximate time that makes sense, if it not unspecified. +- Also extract the search query to run at the scheduled time. Add any context required from the chat history to improve the query. +- Return a JSON object with the cronjob time, the search query to run and the task subject in it. + +# Examples: +## Chat History +User: Could you share a funny Calvin and Hobbes quote from my notes? +AI: Here is one I found: "It's not denial. I'm just selective about the reality I accept." + +User: Hahah, nice! Show a new one every morning. +Khoj: {{ + "crontime": "0 9 * * *", + "query": "/automated_task Share a funny Calvin and Hobbes or Bill Watterson quote from my notes", + "subject": "Your Calvin and Hobbes Quote for the Day" +}} + +## Chat History + +User: Every monday evening at 6 share the top posts on hacker news from last week. Format it as a newsletter +Khoj: {{ + "crontime": "0 18 * * 1", + "query": "/automated_task Top posts last week on Hacker News", + "subject": "Your Weekly Top Hacker News Posts Newsletter" +}} + +## Chat History +User: What is the latest version of the khoj python package? +AI: The latest released Khoj python package version is 1.5.0. + +User: Notify me when version 2.0.0 is released +Khoj: {{ + "crontime": "0 10 * * *", + "query": "/automated_task What is the latest released version of the Khoj python package?", + "subject": "Khoj Python Package Version 2.0.0 Release" +}} + +## Chat History + +User: Tell me the latest local tech news on the first sunday of every month +Khoj: {{ + "crontime": "0 8 1-7 * 0", + "query": "/automated_task Find the latest local tech, AI and engineering news. Format it as a newsletter.", + "subject": "Your Monthly Dose of Local Tech News" +}} + +## Chat History + +User: Inform me when the national election results are declared. Run task at 4pm every thursday. +Khoj: {{ + "crontime": "0 16 * * 4", + "query": "/automated_task Check if the Indian national election results are officially declared", + "subject": "Indian National Election Results Declared" +}} + +# Chat History: +{chat_history} + +User: {query} +Khoj: +""".strip() +) + +to_notify_or_not = PromptTemplate.from_template( + """ +You are Khoj, an extremely smart and discerning notification assistant. +- Decide whether the user should be notified of the AI's response using the Original User Query, Executed User Query and AI Response triplet. +- Notify the user only if the AI's response satisfies the user specified requirements. +- You should only respond with a "Yes" or "No". Do not say anything else. + +# Examples: +Original User Query: Hahah, nice! Show a new one every morning at 9am. My Current Location: Shanghai, China +Executed User Query: Could you share a funny Calvin and Hobbes quote from my notes? +AI Reponse: Here is one I found: "It's not denial. I'm just selective about the reality I accept." +Khoj: Yes + +Original User Query: Every evening check if it's going to rain tomorrow. Notify me only if I'll need an umbrella. My Current Location: Nairobi, Kenya +Executed User Query: Is it going to rain tomorrow in Nairobi, Kenya +AI Response: Tomorrow's forecast is sunny with a high of 28ยฐC and a low of 18ยฐC +Khoj: No + +Original User Query: Tell me when version 2.0.0 is released. My Current Location: Mexico City, Mexico +Executed User Query: Check if version 2.0.0 of the Khoj python package is released +AI Response: The latest released Khoj python package version is 1.5.0. +Khoj: No + +Original User Query: Paint me a sunset every evening. My Current Location: Shanghai, China +Executed User Query: Paint me a sunset in Shanghai, China +AI Response: https://khoj-generated-images.khoj.dev/user110/image78124.webp +Khoj: Yes + +Original User Query: Share a summary of the tasks I've completed at the end of the day. My Current Location: Oslo, Norway +Executed User Query: Share a summary of the tasks I've completed today. +AI Response: I'm sorry, I couldn't find any relevant notes to respond to your message. +Khoj: No + +Original User Query: {original_query} +Executed User Query: {executed_query} +AI Response: {response} +Khoj: +""".strip() +) + + # System messages to user # -- help_message = PromptTemplate.from_template( diff --git a/src/khoj/processor/conversation/utils.py b/src/khoj/processor/conversation/utils.py index c970c421..775848c8 100644 --- a/src/khoj/processor/conversation/utils.py +++ b/src/khoj/processor/conversation/utils.py @@ -102,6 +102,7 @@ def save_to_conversation_log( intent_type: str = "remember", client_application: ClientApplication = None, conversation_id: int = None, + automation_id: str = None, ): user_message_time = user_message_time or datetime.now().strftime("%Y-%m-%d %H:%M:%S") updated_conversation = message_to_log( @@ -112,6 +113,7 @@ def save_to_conversation_log( "context": compiled_references, "intent": {"inferred-queries": inferred_queries, "type": intent_type}, "onlineContext": online_results, + "automationId": automation_id, }, conversation_log=meta_log.get("chat", []), ) diff --git a/src/khoj/routers/api.py b/src/khoj/routers/api.py index fe90698e..625238c1 100644 --- a/src/khoj/routers/api.py +++ b/src/khoj/routers/api.py @@ -7,6 +7,10 @@ import time import uuid from typing import Any, Callable, List, Optional, Union +import cron_descriptor +import pytz +from apscheduler.job import Job +from apscheduler.triggers.cron import CronTrigger from asgiref.sync import sync_to_async from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile from fastapi.requests import Request @@ -15,6 +19,7 @@ from starlette.authentication import has_required_scope, requires from khoj.configure import initialize_content from khoj.database.adapters import ( + AutomationAdapters, ConversationAdapters, EntryAdapters, get_user_photo, @@ -29,15 +34,16 @@ from khoj.routers.helpers import ( ApiUserRateLimiter, CommonQueryParams, ConversationCommandRateLimiter, + schedule_automation, update_telemetry_state, ) from khoj.search_filter.date_filter import DateFilter from khoj.search_filter.file_filter import FileFilter from khoj.search_filter.word_filter import WordFilter from khoj.search_type import text_search -from khoj.utils import constants, state +from khoj.utils import state from khoj.utils.config import OfflineChatProcessorModel -from khoj.utils.helpers import ConversationCommand, timer +from khoj.utils.helpers import ConversationCommand, is_none_or_empty, timer from khoj.utils.rawconfig import LocationData, SearchResponse from khoj.utils.state import SearchType @@ -386,3 +392,155 @@ def user_info(request: Request) -> Response: # Return user information as a JSON response return Response(content=json.dumps(user_info), media_type="application/json", status_code=200) + + +@api.get("/automations", response_class=Response) +@requires(["authenticated"]) +def get_automations(request: Request) -> Response: + user: KhojUser = request.user.object + + # Collate all automations created by user that are still active + automations_info = [automation_info for automation_info in AutomationAdapters.get_automations_metadata(user)] + + # Return tasks information as a JSON response + return Response(content=json.dumps(automations_info), media_type="application/json", status_code=200) + + +@api.delete("/automation", response_class=Response) +@requires(["authenticated"]) +def delete_automation(request: Request, automation_id: str) -> Response: + user: KhojUser = request.user.object + + try: + automation_info = AutomationAdapters.delete_automation(user, automation_id) + except ValueError: + return Response(status_code=204) + + # Return deleted automation information as a JSON response + return Response(content=json.dumps(automation_info), media_type="application/json", status_code=200) + + +@api.post("/automation", response_class=Response) +@requires(["authenticated"]) +async def post_automation( + request: Request, + q: str, + subject: str, + crontime: str, + city: Optional[str] = None, + region: Optional[str] = None, + country: Optional[str] = None, + timezone: Optional[str] = None, +) -> Response: + user: KhojUser = request.user.object + + # Perform validation checks + if is_none_or_empty(q) or is_none_or_empty(subject) or is_none_or_empty(crontime): + return Response(content="A query, subject and crontime is required", status_code=400) + if not cron_descriptor.get_description(crontime): + return Response(content="Invalid crontime", status_code=400) + + # Normalize query parameters + # Add /automated_task prefix to query if not present + q = q.strip() + if not q.startswith("/automated_task"): + query_to_run = f"/automated_task {q}" + # Normalize crontime for AP Scheduler CronTrigger + crontime = crontime.strip() + if len(crontime.split(" ")) > 5: + # Truncate crontime to 5 fields + crontime = " ".join(crontime.split(" ")[:5]) + # Convert crontime to standard unix crontime + crontime = crontime.replace("?", "*") + subject = subject.strip() + + # Schedule automation with query_to_run, timezone, subject directly provided by user + try: + # Use the query to run as the scheduling request if the scheduling request is unset + automation = await schedule_automation(query_to_run, subject, crontime, timezone, q, user, request.url) + except Exception as e: + logger.error(f"Error creating automation {q} for {user.email}: {e}") + return Response( + content=f"Unable to create automation. Ensure the automation doesn't already exist.", + media_type="text/plain", + status_code=500, + ) + + # Collate info about the created user automation + automation_info = AutomationAdapters.get_automation_metadata(user, automation) + + # Return information about the created automation as a JSON response + return Response(content=json.dumps(automation_info), media_type="application/json", status_code=200) + + +@api.put("/automation", response_class=Response) +@requires(["authenticated"]) +def edit_job( + request: Request, + automation_id: str, + q: Optional[str], + subject: Optional[str], + crontime: Optional[str], + city: Optional[str] = None, + region: Optional[str] = None, + country: Optional[str] = None, + timezone: Optional[str] = None, +) -> Response: + user: KhojUser = request.user.object + + # Perform validation checks + if is_none_or_empty(q) or is_none_or_empty(subject) or is_none_or_empty(crontime): + return Response(content="A query, subject and crontime is required", status_code=400) + if not cron_descriptor.get_description(crontime): + return Response(content="Invalid crontime", status_code=400) + + # Check, get automation to edit + try: + automation: Job = AutomationAdapters.get_automation(user, automation_id) + except ValueError as e: + return Response(content="Invalid automation", status_code=403) + + # Normalize query parameters + # Add /automated_task prefix to query if not present + q = q.strip() + if not q.startswith("/automated_task"): + query_to_run = f"/automated_task {q}" + # Normalize crontime for AP Scheduler CronTrigger + crontime = crontime.strip() + if len(crontime.split(" ")) > 5: + # Truncate crontime to 5 fields + crontime = " ".join(crontime.split(" ")[:5]) + # Convert crontime to standard unix crontime + crontime = crontime.replace("?", "*") + + # Construct updated automation metadata + automation_metadata = json.loads(automation.name) + automation_metadata["scheduling_request"] = q + automation_metadata["query_to_run"] = query_to_run + automation_metadata["subject"] = subject.strip() + automation_metadata["crontime"] = crontime + + # Modify automation with updated query, subject + automation.modify( + name=json.dumps(automation_metadata), + kwargs={ + "query_to_run": query_to_run, + "subject": subject, + "scheduling_request": q, + "user": user, + "calling_url": request.url, + }, + ) + + # Reschedule automation if crontime updated + user_timezone = pytz.timezone(timezone) + trigger = CronTrigger.from_crontab(crontime, user_timezone) + if automation.trigger != trigger: + automation.reschedule(trigger=trigger) + + # Collate info about the updated user automation + automation = AutomationAdapters.get_automation(user, automation.id) + automation_info = AutomationAdapters.get_automation_metadata(user, automation) + + # Return modified automation information as a JSON response + return Response(content=json.dumps(automation_info), media_type="application/json", status_code=200) diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index 2a1bbc5e..79a68969 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -1,6 +1,7 @@ import json import logging import math +from datetime import datetime from typing import Dict, Optional from urllib.parse import unquote @@ -29,10 +30,13 @@ from khoj.routers.api import extract_references_and_questions from khoj.routers.helpers import ( ApiUserRateLimiter, CommonQueryParams, + CommonQueryParamsClass, ConversationCommandRateLimiter, agenerate_chat_response, aget_relevant_information_sources, aget_relevant_output_modes, + construct_automation_created_message, + create_automation, get_conversation_command, is_ready_to_chat, text_to_image, @@ -212,7 +216,8 @@ async def chat_options( ) -> Response: cmd_options = {} for cmd in ConversationCommand: - cmd_options[cmd.value] = command_descriptions[cmd] + if cmd in command_descriptions: + cmd_options[cmd.value] = command_descriptions[cmd] update_telemetry_state( request=request, @@ -260,6 +265,7 @@ async def websocket_endpoint( city: Optional[str] = None, region: Optional[str] = None, country: Optional[str] = None, + timezone: Optional[str] = None, ): connection_alive = True @@ -352,6 +358,7 @@ async def websocket_endpoint( await send_rate_limit_message(e.detail) break + user_message_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conversation_commands = [get_conversation_command(query=q, any_references=True)] await send_status_update(f"**๐Ÿ‘€ Understanding Query**: {q}") @@ -366,13 +373,14 @@ async def websocket_endpoint( continue meta_log = conversation.conversation_log + is_automated_task = conversation_commands == [ConversationCommand.AutomatedTask] - if conversation_commands == [ConversationCommand.Default]: - conversation_commands = await aget_relevant_information_sources(q, meta_log) + if conversation_commands == [ConversationCommand.Default] or is_automated_task: + conversation_commands = await aget_relevant_information_sources(q, meta_log, is_automated_task) conversation_commands_str = ", ".join([cmd.value for cmd in conversation_commands]) await send_status_update(f"**๐Ÿ—ƒ๏ธ Chose Data Sources to Search:** {conversation_commands_str}") - mode = await aget_relevant_output_modes(q, meta_log) + mode = await aget_relevant_output_modes(q, meta_log, is_automated_task) await send_status_update(f"**๐Ÿง‘๐Ÿพโ€๐Ÿ’ป Decided Response Mode:** {mode.value}") if mode not in conversation_commands: conversation_commands.append(mode) @@ -381,6 +389,47 @@ async def websocket_endpoint( await conversation_command_rate_limiter.update_and_check_if_valid(websocket, cmd) q = q.replace(f"/{cmd.value}", "").strip() + if ConversationCommand.Automation in conversation_commands: + try: + automation, crontime, query_to_run, subject = await create_automation( + q, timezone, user, websocket.url, meta_log + ) + except Exception as e: + logger.error(f"Error scheduling task {q} for {user.email}: {e}") + await send_complete_llm_response( + f"Unable to create automation. Ensure the automation doesn't already exist." + ) + continue + + llm_response = construct_automation_created_message( + automation, crontime, query_to_run, subject, websocket.url + ) + await sync_to_async(save_to_conversation_log)( + q, + llm_response, + user, + meta_log, + user_message_time, + intent_type="automation", + client_application=websocket.user.client_app, + conversation_id=conversation_id, + inferred_queries=[query_to_run], + automation_id=automation.id, + ) + common = CommonQueryParamsClass( + client=websocket.user.client_app, + user_agent=websocket.headers.get("user-agent"), + host=websocket.headers.get("host"), + ) + update_telemetry_state( + request=websocket, + telemetry_type="api", + api="chat", + **common.__dict__, + ) + await send_complete_llm_response(llm_response) + continue + compiled_references, inferred_queries, defiltered_query = await extract_references_and_questions( websocket, meta_log, q, 7, 0.18, conversation_commands, location, send_status_update ) @@ -460,6 +509,7 @@ async def websocket_endpoint( image, user, meta_log, + user_message_time, intent_type=intent_type, inferred_queries=[improved_image_prompt], client_application=websocket.user.client_app, @@ -527,6 +577,7 @@ async def chat( city: Optional[str] = None, region: Optional[str] = None, country: Optional[str] = None, + timezone: Optional[str] = None, rate_limiter_per_minute=Depends( ApiUserRateLimiter(requests=5, subscribed_requests=60, window=60, slug="chat_minute") ), @@ -536,6 +587,7 @@ async def chat( ) -> Response: user: KhojUser = request.user.object q = unquote(q) + user_message_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(f"Chat request by {user.username}: {q}") await is_ready_to_chat(user) @@ -559,9 +611,11 @@ async def chat( else: meta_log = conversation.conversation_log - if conversation_commands == [ConversationCommand.Default]: - conversation_commands = await aget_relevant_information_sources(q, meta_log) - mode = await aget_relevant_output_modes(q, meta_log) + is_automated_task = conversation_commands == [ConversationCommand.AutomatedTask] + + if conversation_commands == [ConversationCommand.Default] or is_automated_task: + conversation_commands = await aget_relevant_information_sources(q, meta_log, is_automated_task) + mode = await aget_relevant_output_modes(q, meta_log, is_automated_task) if mode not in conversation_commands: conversation_commands.append(mode) @@ -576,6 +630,38 @@ async def chat( user_name = await aget_user_name(user) + if ConversationCommand.Automation in conversation_commands: + try: + automation, crontime, query_to_run, subject = await create_automation( + q, timezone, user, request.url, meta_log + ) + except Exception as e: + logger.error(f"Error creating automation {q} for {user.email}: {e}") + return Response( + content=f"Unable to create automation. Ensure the automation doesn't already exist.", + media_type="text/plain", + status_code=500, + ) + + llm_response = construct_automation_created_message(automation, crontime, query_to_run, subject, request.url) + await sync_to_async(save_to_conversation_log)( + q, + llm_response, + user, + meta_log, + user_message_time, + intent_type="automation", + client_application=request.user.client_app, + conversation_id=conversation_id, + inferred_queries=[query_to_run], + automation_id=automation.id, + ) + + if stream: + return StreamingResponse(llm_response, media_type="text/event-stream", status_code=200) + else: + return Response(content=llm_response, media_type="text/plain", status_code=200) + compiled_references, inferred_queries, defiltered_query = await extract_references_and_questions( request, meta_log, q, (n or 5), (d or math.inf), conversation_commands, location ) @@ -640,6 +726,7 @@ async def chat( image, user, meta_log, + user_message_time, intent_type=intent_type, inferred_queries=[improved_image_prompt], client_application=request.user.client_app, diff --git a/src/khoj/routers/auth.py b/src/khoj/routers/auth.py index 199ccd2b..6e0e30c1 100644 --- a/src/khoj/routers/auth.py +++ b/src/khoj/routers/auth.py @@ -12,7 +12,7 @@ from starlette.responses import HTMLResponse, RedirectResponse, Response from starlette.status import HTTP_302_FOUND from khoj.database.adapters import ( - create_khoj_token, + acreate_khoj_token, delete_khoj_token, get_khoj_tokens, get_or_create_user, @@ -67,9 +67,9 @@ async def login(request: Request): async def generate_token(request: Request, token_name: Optional[str] = None): "Generate API token for given user" if token_name: - token = await create_khoj_token(user=request.user.object, name=token_name) + token = await acreate_khoj_token(user=request.user.object, name=token_name) else: - token = await create_khoj_token(user=request.user.object) + token = await acreate_khoj_token(user=request.user.object) return { "token": token.token, "name": token.name, diff --git a/src/khoj/routers/email.py b/src/khoj/routers/email.py index 86bf67ee..1569eed5 100644 --- a/src/khoj/routers/email.py +++ b/src/khoj/routers/email.py @@ -6,6 +6,7 @@ try: except ImportError: pass +import markdown_it from django.conf import settings from jinja2 import Environment, FileSystemLoader @@ -47,3 +48,24 @@ async def send_welcome_email(name, email): "html": html_content, } ) + + +def send_task_email(name, email, query, result, subject): + if not is_resend_enabled(): + logger.debug("Email sending disabled") + return + + template = env.get_template("task.html") + + html_result = markdown_it.MarkdownIt().render(result) + html_content = template.render(name=name, subject=subject, query=query, result=html_result) + + r = resend.Emails.send( + { + "from": "Khoj ", + "to": email, + "subject": f"โœจ {subject}", + "html": html_content, + } + ) + return r diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index af33564f..0736e68f 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -1,8 +1,10 @@ import asyncio import base64 +import hashlib import io import json import logging +import re from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone from functools import partial @@ -17,18 +19,34 @@ from typing import ( Tuple, Union, ) +from urllib.parse import parse_qs, urlencode +import cron_descriptor import openai +import pytz +import requests +from apscheduler.job import Job +from apscheduler.triggers.cron import CronTrigger +from asgiref.sync import sync_to_async from fastapi import Depends, Header, HTTPException, Request, UploadFile from PIL import Image from starlette.authentication import has_required_scope +from starlette.requests import URL -from khoj.database.adapters import AgentAdapters, ConversationAdapters, EntryAdapters +from khoj.database.adapters import ( + AgentAdapters, + ConversationAdapters, + EntryAdapters, + create_khoj_token, + get_khoj_tokens, + run_with_process_lock, +) from khoj.database.models import ( ChatModelOptions, ClientApplication, Conversation, KhojUser, + ProcessLock, Subscription, TextToImageModelConfig, UserRequests, @@ -44,6 +62,7 @@ from khoj.processor.conversation.utils import ( generate_chatml_messages_with_context, save_to_conversation_log, ) +from khoj.routers.email import is_resend_enabled, send_task_email from khoj.routers.storage import upload_image from khoj.utils import state from khoj.utils.config import OfflineChatProcessorModel @@ -134,7 +153,7 @@ def update_telemetry_state( def construct_chat_history(conversation_history: dict, n: int = 4, agent_name="AI") -> str: chat_history = "" for chat in conversation_history.get("chat", [])[-n:]: - if chat["by"] == "khoj" and chat["intent"].get("type") == "remember": + if chat["by"] == "khoj" and chat["intent"].get("type") in ["remember", "reminder"]: chat_history += f"User: {chat['intent']['query']}\n" chat_history += f"{agent_name}: {chat['message']}\n" elif chat["by"] == "khoj" and ("text-to-image" in chat["intent"].get("type")): @@ -154,6 +173,8 @@ def get_conversation_command(query: str, any_references: bool = False) -> Conver return ConversationCommand.Online elif query.startswith("/image"): return ConversationCommand.Image + elif query.startswith("/automated_task"): + return ConversationCommand.AutomatedTask # If no relevant notes found for the given query elif not any_references: return ConversationCommand.General @@ -166,7 +187,7 @@ async def agenerate_chat_response(*args): return await loop.run_in_executor(executor, generate_chat_response, *args) -async def aget_relevant_information_sources(query: str, conversation_history: dict): +async def aget_relevant_information_sources(query: str, conversation_history: dict, is_task: bool): """ Given a query, determine which of the available tools the agent should use in order to answer appropriately. """ @@ -197,7 +218,7 @@ async def aget_relevant_information_sources(query: str, conversation_history: di logger.error(f"Invalid response for determining relevant tools: {response}") return tool_options - final_response = [] + final_response = [] if not is_task else [ConversationCommand.AutomatedTask] for llm_suggested_tool in response: if llm_suggested_tool in tool_options.keys(): # Check whether the tool exists as a valid ConversationCommand @@ -211,7 +232,7 @@ async def aget_relevant_information_sources(query: str, conversation_history: di return [ConversationCommand.Default] -async def aget_relevant_output_modes(query: str, conversation_history: dict): +async def aget_relevant_output_modes(query: str, conversation_history: dict, is_task: bool = False): """ Given a query, determine which of the available tools the agent should use in order to answer appropriately. """ @@ -220,6 +241,9 @@ async def aget_relevant_output_modes(query: str, conversation_history: dict): mode_options_str = "" for mode, description in mode_descriptions_for_llm.items(): + # Do not allow tasks to schedule another task + if is_task and mode == ConversationCommand.Automation: + continue mode_options[mode.value] = description mode_options_str += f'- "{mode.value}": "{description}"\n' @@ -312,6 +336,30 @@ async def generate_online_subqueries(q: str, conversation_history: dict, locatio return [q] +async def schedule_query(q: str, conversation_history: dict) -> Tuple[str, ...]: + """ + Schedule the date, time to run the query. Assume the server timezone is UTC. + """ + chat_history = construct_chat_history(conversation_history) + + crontime_prompt = prompts.crontime_prompt.format( + query=q, + chat_history=chat_history, + ) + + raw_response = await send_message_to_model_wrapper(crontime_prompt, response_type="json_object") + + # Validate that the response is a non-empty, JSON-serializable list + try: + raw_response = raw_response.strip() + response: Dict[str, str] = json.loads(raw_response) + if not response or not isinstance(response, Dict) or len(response) != 3: + raise AssertionError(f"Invalid response for scheduling query : {response}") + return response.get("crontime"), response.get("query"), response.get("subject") + except Exception: + raise AssertionError(f"Invalid response for scheduling query: {raw_response}") + + async def extract_relevant_info(q: str, corpus: str) -> Union[str, None]: """ Extract relevant information for a given query from the target corpus @@ -438,6 +486,51 @@ async def send_message_to_model_wrapper( raise HTTPException(status_code=500, detail="Invalid conversation config") +def send_message_to_model_wrapper_sync( + message: str, + system_message: str = "", + response_type: str = "text", +): + conversation_config: ChatModelOptions = ConversationAdapters.get_default_conversation_config() + + if conversation_config is None: + raise HTTPException(status_code=500, detail="Contact the server administrator to set a default chat model.") + + chat_model = conversation_config.chat_model + max_tokens = conversation_config.max_prompt_size + + if conversation_config.model_type == "offline": + if state.offline_chat_processor_config is None or state.offline_chat_processor_config.loaded_model is None: + state.offline_chat_processor_config = OfflineChatProcessorModel(chat_model, max_tokens) + + loaded_model = state.offline_chat_processor_config.loaded_model + truncated_messages = generate_chatml_messages_with_context( + user_message=message, system_message=system_message, model_name=chat_model, loaded_model=loaded_model + ) + + return send_message_to_model_offline( + messages=truncated_messages, + loaded_model=loaded_model, + model=chat_model, + streaming=False, + ) + + elif conversation_config.model_type == "openai": + openai_chat_config = ConversationAdapters.get_openai_conversation_config() + api_key = openai_chat_config.api_key + truncated_messages = generate_chatml_messages_with_context( + user_message=message, system_message=system_message, model_name=chat_model + ) + + openai_response = send_message_to_model( + messages=truncated_messages, api_key=api_key, model=chat_model, response_type=response_type + ) + + return openai_response + else: + raise HTTPException(status_code=500, detail="Invalid conversation config") + + def generate_chat_response( q: str, meta_log: dict, @@ -547,7 +640,7 @@ async def text_to_image( text2image_model = text_to_image_config.model_name chat_history = "" for chat in conversation_log.get("chat", [])[-4:]: - if chat["by"] == "khoj" and chat["intent"].get("type") == "remember": + if chat["by"] == "khoj" and chat["intent"].get("type") in ["remember", "reminder"]: chat_history += f"Q: {chat['intent']['query']}\n" chat_history += f"A: {chat['message']}\n" elif chat["by"] == "khoj" and "text-to-image" in chat["intent"].get("type"): @@ -751,3 +844,145 @@ class CommonQueryParamsClass: CommonQueryParams = Annotated[CommonQueryParamsClass, Depends()] + + +def should_notify(original_query: str, executed_query: str, ai_response: str) -> bool: + """ + Decide whether to notify the user of the AI response. + Default to notifying the user for now. + """ + if any(is_none_or_empty(message) for message in [original_query, executed_query, ai_response]): + return False + + to_notify_or_not = prompts.to_notify_or_not.format( + original_query=original_query, + executed_query=executed_query, + response=ai_response, + ) + + with timer("Chat actor: Decide to notify user of automation response", logger): + try: + response = send_message_to_model_wrapper_sync(to_notify_or_not) + should_notify_result = "no" not in response.lower() + logger.info(f'Decided to {"not " if not should_notify_result else ""}notify user of automation response.') + return should_notify_result + except: + logger.warning(f"Fallback to notify user of automation response as failed to infer should notify or not.") + return True + + +def scheduled_chat(query_to_run: str, scheduling_request: str, subject: str, user: KhojUser, calling_url: URL): + # Extract relevant params from the original URL + scheme = "http" if not calling_url.is_secure else "https" + query_dict = parse_qs(calling_url.query) + + # Replace the original scheduling query with the scheduled query + query_dict["q"] = [query_to_run] + + # Construct the URL to call the chat API with the scheduled query string + encoded_query = urlencode(query_dict, doseq=True) + url = f"{scheme}://{calling_url.netloc}/api/chat?{encoded_query}" + + # Construct the Headers for the chat API + headers = {"User-Agent": "Khoj"} + if not state.anonymous_mode: + # Add authorization request header in non-anonymous mode + token = get_khoj_tokens(user) + if is_none_or_empty(token): + token = create_khoj_token(user).token + else: + token = token[0].token + headers["Authorization"] = f"Bearer {token}" + + # Call the chat API endpoint with authenticated user token and query + raw_response = requests.get(url, headers=headers) + + # Stop if the chat API call was not successful + if raw_response.status_code != 200: + logger.error(f"Failed to run schedule chat: {raw_response.text}") + return None + + # Extract the AI response from the chat API response + cleaned_query = re.sub(r"^/automated_task\s*", "", query_to_run).strip() + if raw_response.headers.get("Content-Type") == "application/json": + response_map = raw_response.json() + ai_response = response_map.get("response") or response_map.get("image") + else: + ai_response = raw_response.text + + # Notify user if the AI response is satisfactory + if should_notify(original_query=scheduling_request, executed_query=cleaned_query, ai_response=ai_response): + if is_resend_enabled(): + send_task_email(user.get_short_name(), user.email, scheduling_request, ai_response, subject) + else: + return raw_response + + +async def create_automation(q: str, timezone: str, user: KhojUser, calling_url: URL, meta_log: dict = {}): + crontime, query_to_run, subject = await schedule_query(q, meta_log) + job = await schedule_automation(query_to_run, subject, crontime, timezone, q, user, calling_url) + return job, crontime, query_to_run, subject + + +async def schedule_automation( + query_to_run: str, + subject: str, + crontime: str, + timezone: str, + scheduling_request: str, + user: KhojUser, + calling_url: URL, +): + user_timezone = pytz.timezone(timezone) + trigger = CronTrigger.from_crontab(crontime, user_timezone) + # Generate id and metadata used by task scheduler and process locks for the task runs + job_metadata = json.dumps( + { + "query_to_run": query_to_run, + "scheduling_request": scheduling_request, + "subject": subject, + "crontime": crontime, + } + ) + query_id = hashlib.md5(f"{query_to_run}_{crontime}".encode("utf-8")).hexdigest() + job_id = f"automation_{user.uuid}_{query_id}" + job = await sync_to_async(state.scheduler.add_job)( + run_with_process_lock, + trigger=trigger, + args=( + scheduled_chat, + f"{ProcessLock.Operation.SCHEDULED_JOB}_{user.uuid}_{query_id}", + ), + kwargs={ + "query_to_run": query_to_run, + "scheduling_request": scheduling_request, + "subject": subject, + "user": user, + "calling_url": calling_url, + }, + id=job_id, + name=job_metadata, + max_instances=2, # Allow second instance to kill any previous instance with stale lock + jitter=30, + ) + return job + + +def construct_automation_created_message(automation: Job, crontime: str, query_to_run: str, subject: str, url: URL): + # Display next run time in user timezone instead of UTC + schedule = f'{cron_descriptor.get_description(crontime)} {automation.next_run_time.strftime("%Z")}' + next_run_time = automation.next_run_time.strftime("%Y-%m-%d %I:%M %p %Z") + # Remove /automated_task prefix from inferred_query + unprefixed_query_to_run = re.sub(r"^\/automated_task\s*", "", query_to_run) + # Create the automation response + scheme = "http" if not url.is_secure else "https" + automation_icon_url = f"{scheme}://{url.netloc}/static/assets/icons/automation.svg" + return f""" + ### ![]({automation_icon_url}) Created Automation +- Subject: **{subject}** +- Query to Run: "{unprefixed_query_to_run}" +- Schedule: `{schedule}` +- Next Run At: {next_run_time} + +Manage your automations [here](/automations). + """.strip() diff --git a/src/khoj/routers/web_client.py b/src/khoj/routers/web_client.py index 9e3a39b5..047273e9 100644 --- a/src/khoj/routers/web_client.py +++ b/src/khoj/routers/web_client.py @@ -11,6 +11,7 @@ from starlette.authentication import has_required_scope, requires from khoj.database import adapters from khoj.database.adapters import ( AgentAdapters, + AutomationAdapters, ConversationAdapters, EntryAdapters, get_user_github_config, @@ -364,3 +365,23 @@ def computer_config_page(request: Request): "khoj_version": state.khoj_version, }, ) + + +@web_client.get("/automations", response_class=HTMLResponse) +@requires(["authenticated"], redirect="login_page") +def automations_config_page(request: Request): + user = request.user.object + user_picture = request.session.get("user", {}).get("picture") + has_documents = EntryAdapters.user_has_entries(user=user) + + return templates.TemplateResponse( + "config_automation.html", + context={ + "request": request, + "username": user.username, + "user_photo": user_picture, + "is_active": has_required_scope(request, ["premium"]), + "has_documents": has_documents, + "khoj_version": state.khoj_version, + }, + ) diff --git a/src/khoj/utils/helpers.py b/src/khoj/utils/helpers.py index 3cb5bfac..4b24b828 100644 --- a/src/khoj/utils/helpers.py +++ b/src/khoj/utils/helpers.py @@ -304,6 +304,8 @@ class ConversationCommand(str, Enum): Online = "online" Webpage = "webpage" Image = "image" + Automation = "automation" + AutomatedTask = "automated_task" command_descriptions = { @@ -313,6 +315,7 @@ command_descriptions = { ConversationCommand.Online: "Search for information on the internet.", ConversationCommand.Webpage: "Get information from webpage links provided by you.", ConversationCommand.Image: "Generate images by describing your imagination in words.", + ConversationCommand.Automation: "Automatically run your query at a specified time or interval.", ConversationCommand.Help: "Display a help message with all available commands and other metadata.", } @@ -325,7 +328,8 @@ tool_descriptions_for_llm = { } mode_descriptions_for_llm = { - ConversationCommand.Image: "Use this if you think the user is requesting an image or visual response to their query.", + ConversationCommand.Image: "Use this if the user is requesting an image or visual response to their query.", + ConversationCommand.Automation: "Use this if the user is requesting a response at a scheduled date or time.", ConversationCommand.Default: "Use this if the other response modes don't seem to fit the query.", } diff --git a/src/khoj/utils/state.py b/src/khoj/utils/state.py index 8270a70f..7439929f 100644 --- a/src/khoj/utils/state.py +++ b/src/khoj/utils/state.py @@ -4,6 +4,7 @@ from collections import defaultdict from pathlib import Path from typing import Any, Dict, List +from apscheduler.schedulers.background import BackgroundScheduler from openai import OpenAI from whisper import Whisper @@ -29,6 +30,7 @@ cli_args: List[str] = None query_cache: Dict[str, LRU] = defaultdict(LRU) chat_lock = threading.Lock() SearchType = utils_config.SearchType +scheduler: BackgroundScheduler = None telemetry: List[Dict[str, str]] = [] khoj_version: str = None device = get_device() diff --git a/tests/test_openai_chat_actors.py b/tests/test_openai_chat_actors.py index df9d8f07..7c4f5ee3 100644 --- a/tests/test_openai_chat_actors.py +++ b/tests/test_openai_chat_actors.py @@ -12,8 +12,11 @@ from khoj.routers.helpers import ( aget_relevant_output_modes, generate_online_subqueries, infer_webpage_urls, + schedule_query, + should_notify, ) from khoj.utils.helpers import ConversationCommand +from khoj.utils.rawconfig import LocationData # Initialize variables for tests api_key = os.getenv("OPENAI_API_KEY") @@ -490,71 +493,42 @@ async def test_websearch_khoj_website_for_info_about_khoj(chat_client): # ---------------------------------------------------------------------------------------------------- @pytest.mark.anyio @pytest.mark.django_db(transaction=True) -async def test_use_default_response_mode(chat_client): - # Arrange - user_query = "What's the latest in the Israel/Palestine conflict?" - +@pytest.mark.parametrize( + "user_query, expected_mode", + [ + ("What's the latest in the Israel/Palestine conflict?", "default"), + ("Summarize the latest tech news every Monday evening", "reminder"), + ("Paint a scenery in Timbuktu in the winter", "image"), + ("Remind me, when did I last visit the Serengeti?", "default"), + ], +) +async def test_use_default_response_mode(chat_client, user_query, expected_mode): # Act mode = await aget_relevant_output_modes(user_query, {}) # Assert - assert mode.value == "default" + assert mode.value == expected_mode # ---------------------------------------------------------------------------------------------------- @pytest.mark.anyio @pytest.mark.django_db(transaction=True) -async def test_use_image_response_mode(chat_client): - # Arrange - user_query = "Paint a scenery in Timbuktu in the winter" - - # Act - mode = await aget_relevant_output_modes(user_query, {}) - - # Assert - assert mode.value == "image" - - -# ---------------------------------------------------------------------------------------------------- -@pytest.mark.anyio -@pytest.mark.django_db(transaction=True) -async def test_select_data_sources_actor_chooses_to_search_notes(chat_client): - # Arrange - user_query = "Where did I learn to swim?" - +@pytest.mark.parametrize( + "user_query, expected_conversation_commands", + [ + ("Where did I learn to swim?", [ConversationCommand.Notes]), + ("Where is the nearest hospital?", [ConversationCommand.Online]), + ("Summarize the wikipedia page on the history of the internet", [ConversationCommand.Webpage]), + ], +) +async def test_select_data_sources_actor_chooses_to_search_notes( + chat_client, user_query, expected_conversation_commands +): # Act conversation_commands = await aget_relevant_information_sources(user_query, {}) # Assert - assert ConversationCommand.Notes in conversation_commands - - -# ---------------------------------------------------------------------------------------------------- -@pytest.mark.anyio -@pytest.mark.django_db(transaction=True) -async def test_select_data_sources_actor_chooses_to_search_online(chat_client): - # Arrange - user_query = "Where is the nearest hospital?" - - # Act - conversation_commands = await aget_relevant_information_sources(user_query, {}) - - # Assert - assert ConversationCommand.Online in conversation_commands - - -# ---------------------------------------------------------------------------------------------------- -@pytest.mark.anyio -@pytest.mark.django_db(transaction=True) -async def test_select_data_sources_actor_chooses_to_read_webpage(chat_client): - # Arrange - user_query = "Summarize the wikipedia page on the history of the internet" - - # Act - conversation_commands = await aget_relevant_information_sources(user_query, {}) - - # Assert - assert ConversationCommand.Webpage in conversation_commands + assert expected_conversation_commands in conversation_commands # ---------------------------------------------------------------------------------------------------- @@ -571,6 +545,104 @@ async def test_infer_webpage_urls_actor_extracts_correct_links(chat_client): assert "https://en.wikipedia.org/wiki/History_of_the_Internet" in urls +# ---------------------------------------------------------------------------------------------------- +@pytest.mark.anyio +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize( + "user_query, location, expected_crontime, expected_qs, unexpected_qs", + [ + ( + "Share the weather forecast for the next day daily at 7:30pm", + ("Ubud", "Bali", "Indonesia"), + "30 11 * * *", # ensure correctly converts to utc + ["weather forecast", "ubud"], + ["7:30"], + ), + ( + "Notify me when the new President of Brazil is announced", + ("Sao Paulo", "Sao Paulo", "Brazil"), + "* *", # crontime is variable + ["brazil", "president"], + ["notify"], # ensure reminder isn't re-triggered on scheduled query run + ), + ( + "Let me know whenever Elon leaves Twitter. Check this every afternoon at 12", + ("Karachi", "Sindh", "Pakistan"), + "0 7 * * *", # ensure correctly converts to utc + ["elon", "twitter"], + ["12"], + ), + ( + "Draw a wallpaper every morning using the current weather", + ("Bogota", "Cundinamarca", "Colombia"), + "* * *", # daily crontime + ["weather", "wallpaper", "bogota"], + ["every"], + ), + ], +) +async def test_infer_task_scheduling_request( + chat_client, user_query, location, expected_crontime, expected_qs, unexpected_qs +): + # Arrange + location_data = LocationData(city=location[0], region=location[1], country=location[2]) + + # Act + crontime, inferred_query = await schedule_query(user_query, location_data, {}) + inferred_query = inferred_query.lower() + + # Assert + assert expected_crontime in crontime + for expected_q in expected_qs: + assert expected_q in inferred_query, f"Expected fragment {expected_q} in query: {inferred_query}" + for unexpected_q in unexpected_qs: + assert ( + unexpected_q not in inferred_query + ), f"Did not expect fragment '{unexpected_q}' in query: '{inferred_query}'" + + +# ---------------------------------------------------------------------------------------------------- +@pytest.mark.anyio +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize( + "scheduling_query, executing_query, generated_response, expected_should_notify", + [ + ( + "Notify me if it is going to rain tomorrow?", + "What's the weather forecast for tomorrow?", + "It is sunny and warm tomorrow.", + False, + ), + ( + "Summarize the latest news every morning", + "Summarize today's news", + "Today in the news: AI is taking over the world", + True, + ), + ( + "Create a weather wallpaper every morning using the current weather", + "Paint a weather wallpaper using the current weather", + "https://khoj-generated-wallpaper.khoj.dev/user110/weathervane.webp", + True, + ), + ( + "Let me know the election results once they are offically declared", + "What are the results of the elections? Has the winner been declared?", + "The election results has not been declared yet.", + False, + ), + ], +) +def test_decision_on_when_to_notify_scheduled_task_results( + chat_client, scheduling_query, executing_query, generated_response, expected_should_notify +): + # Act + generated_should_notify = should_notify(scheduling_query, executing_query, generated_response) + + # Assert + assert generated_should_notify == expected_should_notify + + # Helpers # ---------------------------------------------------------------------------------------------------- def populate_chat_history(message_list):