From ff44734774990fb94178309f734f9cdd3509e6a3 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sat, 22 Jun 2024 10:09:20 +0530 Subject: [PATCH 1/4] Run online searches in parallel to process multiple queries faster --- src/khoj/processor/tools/online_search.py | 43 +++++++++++++---------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index 6d0aaace..a37e3c3a 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -6,7 +6,6 @@ from collections import defaultdict from typing import Callable, Dict, List, Optional, Tuple, Union import aiohttp -import requests from bs4 import BeautifulSoup from markdownify import markdownify @@ -61,11 +60,16 @@ async def search_online( subqueries = await generate_online_subqueries(query, conversation_history, location) response_dict = {} - for subquery in subqueries: + if subqueries: + logger.info(f"🌐 Searching the Internet for {list(subqueries)}") if send_status_func: - await send_status_func(f"**🌐 Searching the Internet for**: {subquery}") - logger.info(f"🌐 Searching the Internet for '{subquery}'") - response_dict[subquery] = search_with_google(subquery) + subqueries_str = "\n- " + "\n- ".join(list(subqueries)) + await send_status_func(f"**🌐 Searching the Internet for**: {subqueries_str}") + + with timer(f"Internet searches for {list(subqueries)} took", logger): + search_tasks = [search_with_google(subquery) for subquery in subqueries] + search_results = await asyncio.gather(*search_tasks) + response_dict = {subquery: search_result for subquery, search_result in search_results} # Gather distinct web pages from organic search results of each subquery without an instant answer webpage_links = { @@ -92,23 +96,24 @@ async def search_online( return response_dict -def search_with_google(subquery: str): - payload = json.dumps({"q": subquery}) +async def search_with_google(query: str) -> Tuple[str, Dict[str, List[Dict]]]: + payload = json.dumps({"q": query}) headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"} - response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload) + async with aiohttp.ClientSession() as session: + async with session.post(SERPER_DEV_URL, headers=headers, data=payload) as response: + if response.status != 200: + logger.error(await response.text()) + return query, {} + json_response = await response.json() + extraction_fields = ["organic", "answerBox", "peopleAlsoAsk", "knowledgeGraph"] + extracted_search_result = { + field: json_response[field] + for field in extraction_fields + if not is_none_or_empty(json_response.get(field)) + } - if response.status_code != 200: - logger.error(response.text) - return {} - - json_response = response.json() - extraction_fields = ["organic", "answerBox", "peopleAlsoAsk", "knowledgeGraph"] - extracted_search_result = { - field: json_response[field] for field in extraction_fields if not is_none_or_empty(json_response.get(field)) - } - - return extracted_search_result + return query, extracted_search_result async def read_webpages( From a038e4911b9c96b1c2fe41afe7ab8b51c90959c5 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sat, 22 Jun 2024 10:42:45 +0530 Subject: [PATCH 2/4] Default to Jina search, reader API when no Serper.dev, Olostep API keys Jina AI provides a search and webpage reader API that doesn't require an API key. This provides a good default to enable online search for self-hosted readers requiring no additional setup. Jina search API also returns webpage contents with the results, so just use those directly when Jina Search API used instead of trying to read webpages separately. The extract relvant content from webpage step using a chat model is still used from the `read_webpage_and_extract_content' func in this case. Parse search results from Jina search API into same format as Serper.dev for accurate rendering of online references by clients --- src/khoj/processor/tools/online_search.py | 72 ++++++++++++++++++----- src/khoj/routers/api_chat.py | 48 +++++---------- 2 files changed, 73 insertions(+), 47 deletions(-) diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index a37e3c3a..72191077 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -2,6 +2,7 @@ import asyncio import json import logging import os +import urllib.parse from collections import defaultdict from typing import Callable, Dict, List, Optional, Tuple, Union @@ -22,6 +23,10 @@ logger = logging.getLogger(__name__) SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY") SERPER_DEV_URL = "https://google.serper.dev/search" +JINA_READER_API_URL = "https://r.jina.ai/" +JINA_SEARCH_API_URL = "https://s.jina.ai/" +JINA_API_KEY = os.getenv("JINA_API_KEY") + OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY") OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" OLOSTEP_QUERY_PARAMS = { @@ -49,9 +54,6 @@ async def search_online( custom_filters: List[str] = [], ): query += " ".join(custom_filters) - if not online_search_enabled(): - logger.warn("SERPER_DEV_API_KEY is not set") - return {} if not is_internet_connected(): logger.warn("Cannot search online as not connected to internet") return {} @@ -67,25 +69,28 @@ async def search_online( await send_status_func(f"**🌐 Searching the Internet for**: {subqueries_str}") with timer(f"Internet searches for {list(subqueries)} took", logger): - search_tasks = [search_with_google(subquery) for subquery in subqueries] + search_func = search_with_google if SERPER_DEV_API_KEY else search_with_jina + search_tasks = [search_func(subquery) for subquery in subqueries] search_results = await asyncio.gather(*search_tasks) response_dict = {subquery: search_result for subquery, search_result in search_results} - # Gather distinct web pages from organic search results of each subquery without an instant answer - webpage_links = { - organic["link"]: subquery + # Gather distinct web page data from organic results of each subquery without an instant answer. + # Content of web pages is directly available when Jina is used for search. + webpages = { + (organic.get("link"), subquery, organic.get("content")) for subquery in response_dict for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ] if "answerBox" not in response_dict[subquery] } # Read, extract relevant info from the retrieved web pages - if webpage_links: + if webpages: + webpage_links = [link for link, _, _ in webpages] logger.info(f"🌐👀 Reading web pages at: {list(webpage_links)}") if send_status_func: webpage_links_str = "\n- " + "\n- ".join(list(webpage_links)) await send_status_func(f"**📖 Reading web pages**: {webpage_links_str}") - tasks = [read_webpage_and_extract_content(subquery, link) for link, subquery in webpage_links.items()] + tasks = [read_webpage_and_extract_content(subquery, link, content) for link, subquery, content in webpages] results = await asyncio.gather(*tasks) # Collect extracted info from the retrieved web pages @@ -139,10 +144,13 @@ async def read_webpages( return response -async def read_webpage_and_extract_content(subquery: str, url: str) -> Tuple[str, Union[None, str], str]: +async def read_webpage_and_extract_content( + subquery: str, url: str, content: str = None +) -> Tuple[str, Union[None, str], str]: try: - with timer(f"Reading web page at '{url}' took", logger): - content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_at_url(url) + if is_none_or_empty(content): + with timer(f"Reading web page at '{url}' took", logger): + content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_with_jina(url) with timer(f"Extracting relevant information from web page at '{url}' took", logger): extracted_info = await extract_relevant_info(subquery, content) return subquery, extracted_info, url @@ -177,5 +185,41 @@ async def read_webpage_with_olostep(web_url: str) -> str: return response_json["markdown_content"] -def online_search_enabled(): - return SERPER_DEV_API_KEY is not None +async def read_webpage_with_jina(web_url: str) -> str: + jina_reader_api_url = f"{JINA_READER_API_URL}/{web_url}" + headers = {"Accept": "application/json", "X-Timeout": "30"} + if JINA_API_KEY: + headers["Authorization"] = f"Bearer {JINA_API_KEY}" + + async with aiohttp.ClientSession() as session: + async with session.get(jina_reader_api_url, headers=headers) as response: + response.raise_for_status() + response_json = await response.json() + return response_json["data"]["content"] + + +async def search_with_jina(query: str) -> Tuple[str, Dict[str, List[Dict]]]: + encoded_query = urllib.parse.quote(query) + jina_search_api_url = f"{JINA_SEARCH_API_URL}/{encoded_query}" + headers = {"Accept": "application/json"} + if JINA_API_KEY: + headers["Authorization"] = f"Bearer {JINA_API_KEY}" + + async with aiohttp.ClientSession() as session: + async with session.get(jina_search_api_url, headers=headers) as response: + if response.status != 200: + logger.error(await response.text()) + return query, {} + response_json = await response.json() + parsed_response = [ + { + "title": item["title"], + "content": item.get("content"), + # rename description -> snippet for consistency + "snippet": item["description"], + # rename url -> link for consistency + "link": item["url"], + } + for item in response_json["data"] + ] + return query, {"organic": parsed_response} diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index a014fca5..6d0cf926 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -29,11 +29,7 @@ from khoj.processor.conversation.prompts import ( ) from khoj.processor.conversation.utils import save_to_conversation_log from khoj.processor.speech.text_to_speech import generate_text_to_speech -from khoj.processor.tools.online_search import ( - online_search_enabled, - read_webpages, - search_online, -) +from khoj.processor.tools.online_search import read_webpages, search_online from khoj.routers.api import extract_references_and_questions from khoj.routers.helpers import ( ApiUserRateLimiter, @@ -767,22 +763,16 @@ async def websocket_endpoint( conversation_commands.remove(ConversationCommand.Notes) if ConversationCommand.Online in conversation_commands: - if not online_search_enabled(): - conversation_commands.remove(ConversationCommand.Online) - # If online search is not enabled, try to read webpages directly - if ConversationCommand.Webpage not in conversation_commands: - conversation_commands.append(ConversationCommand.Webpage) - else: - try: - online_results = await search_online( - defiltered_query, meta_log, location, send_status_update, custom_filters - ) - except ValueError as e: - logger.warning(f"Error searching online: {e}. Attempting to respond without online results") - await send_complete_llm_response( - f"Error searching online: {e}. Attempting to respond without online results" - ) - continue + try: + online_results = await search_online( + defiltered_query, meta_log, location, send_status_update, custom_filters + ) + except ValueError as e: + logger.warning(f"Error searching online: {e}. Attempting to respond without online results") + await send_complete_llm_response( + f"Error searching online: {e}. Attempting to respond without online results" + ) + continue if ConversationCommand.Webpage in conversation_commands: try: @@ -1067,18 +1057,10 @@ async def chat( conversation_commands.remove(ConversationCommand.Notes) if ConversationCommand.Online in conversation_commands: - if not online_search_enabled(): - conversation_commands.remove(ConversationCommand.Online) - # If online search is not enabled, try to read webpages directly - if ConversationCommand.Webpage not in conversation_commands: - conversation_commands.append(ConversationCommand.Webpage) - else: - try: - online_results = await search_online( - defiltered_query, meta_log, location, custom_filters=_custom_filters - ) - except ValueError as e: - logger.warning(f"Error searching online: {e}. Attempting to respond without online results") + try: + online_results = await search_online(defiltered_query, meta_log, location, custom_filters=_custom_filters) + except ValueError as e: + logger.warning(f"Error searching online: {e}. Attempting to respond without online results") if ConversationCommand.Webpage in conversation_commands: try: From 553beae8485c7ba20f86b8a454da6c92fa749f43 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sat, 22 Jun 2024 12:18:43 +0530 Subject: [PATCH 3/4] No need to set OpenAI API key from environment variable explicitly It is unnecessary as the OpenAI client automatically tries to use API key from OPENAI_API_KEY env var when the api_key field is unset --- src/khoj/processor/conversation/openai/utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/khoj/processor/conversation/openai/utils.py b/src/khoj/processor/conversation/openai/utils.py index 49f1d6b6..b17e5c3d 100644 --- a/src/khoj/processor/conversation/openai/utils.py +++ b/src/khoj/processor/conversation/openai/utils.py @@ -1,5 +1,4 @@ import logging -import os from threading import Thread from typing import Dict @@ -40,7 +39,7 @@ def completion_with_backoff( client: openai.OpenAI = openai_clients.get(client_key) if not client: client = openai.OpenAI( - api_key=openai_api_key or os.getenv("OPENAI_API_KEY"), + api_key=openai_api_key, base_url=api_base_url, ) openai_clients[client_key] = client @@ -102,7 +101,7 @@ def llm_thread(g, messages, model_name, temperature, openai_api_key=None, api_ba client_key = f"{openai_api_key}--{api_base_url}" if client_key not in openai_clients: client: openai.OpenAI = openai.OpenAI( - api_key=openai_api_key or os.getenv("OPENAI_API_KEY"), + api_key=openai_api_key, base_url=api_base_url, ) openai_clients[client_key] = client From d5ceff269133048726c1e0223e4acebf4c37aa61 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sat, 22 Jun 2024 13:45:08 +0530 Subject: [PATCH 4/4] Update tests and documentation with Jina reader API usage and info Update offline, openai chat actor, director tests to not require Serper to run the online command tests Update documentation for self-hosted online search to mention no setup is required by default. But improvements can be made by using Serper.dev or Olostep --- documentation/docs/features/online_search.md | 14 +++++++----- tests/test_offline_chat_director.py | 23 ++++++++++---------- tests/test_openai_chat_director.py | 23 ++++++++++---------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/documentation/docs/features/online_search.md b/documentation/docs/features/online_search.md index 894d515d..956eac0f 100644 --- a/documentation/docs/features/online_search.md +++ b/documentation/docs/features/online_search.md @@ -1,17 +1,21 @@ # Online Search -By default, Khoj will try to infer which information-sourcing tools are required to answer your question. Sometimes, you'll have a need for outside questions that the LLM's knowledge doesn't cover. In that case, it will use the `online` search feature. +Khoj will research on the internet to ground its responses, when it determines that it would need fresh information outside its existing knowledge to answer the query. It will always show any online references it used to respond to your requests. -For example, these queries would trigger an online search: +By default, Khoj will try to infer which information sources, it needs to read to answer your question. This can include reading your documents or researching information online. You can also explicitly trigger an online search by adding the `/online` prefix to your chat query. + +Example queries that should trigger an online search: - What's the latest news about the Israel-Palestine war? - Where can I find the best pizza in New York City? -- Deadline for filing taxes 2024. +- /online Deadline for filing taxes 2024. - Give me a summary of this article: https://en.wikipedia.org/wiki/Haitian_Revolution Try it out yourself! https://app.khoj.dev ## Self-Hosting -The general online search function currently requires an API key from Serper.dev. You can grab one here: https://serper.dev/, and then add it as an environment variable with the name `SERPER_DEV_API_KEY`. +Online search works out of the box even when self-hosting. Khoj uses [JinaAI's reader API](https://jina.ai/reader/) to search online and read webpages by default. No API key setup is necessary. -Without any API keys, Khoj will use the `requests` library to directly read any webpages you give it a link to. This means that you can use Khoj to read any webpage that you have access in your local network. +To improve online search, set the `SERPER_DEV_API_KEY` environment variable to your [Serper.dev](https://serper.dev/) API key. These search results include additional context like answer box, knowledge graph etc. + +For advanced webpage reading, set the `OLOSTEP_API_KEY` environment variable to your [Olostep](https://www.olostep.com/) API key. This has a higher success rate at reading webpages than the default webpage reader. diff --git a/tests/test_offline_chat_director.py b/tests/test_offline_chat_director.py index 1c4ad3ac..a72dae56 100644 --- a/tests/test_offline_chat_director.py +++ b/tests/test_offline_chat_director.py @@ -62,7 +62,6 @@ def test_offline_chat_with_no_chat_history_or_retrieved_content(client_offline_c # ---------------------------------------------------------------------------------------------------- -@pytest.mark.skipif(os.getenv("SERPER_DEV_API_KEY") is None, reason="requires SERPER_DEV_API_KEY") @pytest.mark.chatquality @pytest.mark.django_db(transaction=True) def test_chat_with_online_content(client_offline_chat): @@ -75,18 +74,18 @@ def test_chat_with_online_content(client_offline_chat): response_message = response_message.split("### compiled references")[0] # Assert - expected_responses = ["http://www.paulgraham.com/greatwork.html"] + expected_responses = [ + "https://paulgraham.com/greatwork.html", + "https://www.paulgraham.com/greatwork.html", + "http://www.paulgraham.com/greatwork.html", + ] assert response.status_code == 200 - assert any([expected_response in response_message for expected_response in expected_responses]), ( - "Expected links or serper not setup in response but got: " + response_message - ) + assert any( + [expected_response in response_message for expected_response in expected_responses] + ), f"Expected links: {expected_responses}. Actual response: {response_message}" # ---------------------------------------------------------------------------------------------------- -@pytest.mark.skipif( - os.getenv("SERPER_DEV_API_KEY") is None or os.getenv("OLOSTEP_API_KEY") is None, - reason="requires SERPER_DEV_API_KEY and OLOSTEP_API_KEY", -) @pytest.mark.chatquality @pytest.mark.django_db(transaction=True) def test_chat_with_online_webpage_content(client_offline_chat): @@ -101,9 +100,9 @@ def test_chat_with_online_webpage_content(client_offline_chat): # Assert expected_responses = ["185", "1871", "horse"] assert response.status_code == 200 - assert any([expected_response in response_message for expected_response in expected_responses]), ( - "Expected links or serper not setup in response but got: " + response_message - ) + assert any( + [expected_response in response_message for expected_response in expected_responses] + ), f"Expected response with {expected_responses}. But actual response had: {response_message}" # ---------------------------------------------------------------------------------------------------- diff --git a/tests/test_openai_chat_director.py b/tests/test_openai_chat_director.py index b547f78e..26d93d31 100644 --- a/tests/test_openai_chat_director.py +++ b/tests/test_openai_chat_director.py @@ -61,7 +61,6 @@ def test_chat_with_no_chat_history_or_retrieved_content(chat_client): # ---------------------------------------------------------------------------------------------------- -@pytest.mark.skipif(os.getenv("SERPER_DEV_API_KEY") is None, reason="requires SERPER_DEV_API_KEY") @pytest.mark.chatquality @pytest.mark.django_db(transaction=True) def test_chat_with_online_content(chat_client): @@ -74,18 +73,18 @@ def test_chat_with_online_content(chat_client): response_message = response_message.split("### compiled references")[0] # Assert - expected_responses = ["http://www.paulgraham.com/greatwork.html"] + expected_responses = [ + "https://paulgraham.com/greatwork.html", + "https://www.paulgraham.com/greatwork.html", + "http://www.paulgraham.com/greatwork.html", + ] assert response.status_code == 200 - assert any([expected_response in response_message for expected_response in expected_responses]), ( - "Expected links or serper not setup in response but got: " + response_message - ) + assert any( + [expected_response in response_message for expected_response in expected_responses] + ), f"Expected links: {expected_responses}. Actual response: {response_message}" # ---------------------------------------------------------------------------------------------------- -@pytest.mark.skipif( - os.getenv("SERPER_DEV_API_KEY") is None or os.getenv("OLOSTEP_API_KEY") is None, - reason="requires SERPER_DEV_API_KEY and OLOSTEP_API_KEY", -) @pytest.mark.chatquality @pytest.mark.django_db(transaction=True) def test_chat_with_online_webpage_content(chat_client): @@ -100,9 +99,9 @@ def test_chat_with_online_webpage_content(chat_client): # Assert expected_responses = ["185", "1871", "horse"] assert response.status_code == 200 - assert any([expected_response in response_message for expected_response in expected_responses]), ( - "Expected links or serper not setup in response but got: " + response_message - ) + assert any( + [expected_response in response_message for expected_response in expected_responses] + ), f"Expected links: {expected_responses}. Actual response: {response_message}" # ----------------------------------------------------------------------------------------------------