diff --git a/pyproject.toml b/pyproject.toml index 8bab7876..b4b6c123 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ classifiers = [ "Topic :: Text Processing :: Linguistic", ] dependencies = [ - "bs4 >= 0.0.1", + "beautifulsoup4 ~= 4.12.3", "dateparser >= 1.1.1", "defusedxml == 0.7.1", "fastapi >= 0.104.1", @@ -58,7 +58,6 @@ dependencies = [ "langchain <= 0.2.0", "langchain-openai >= 0.0.5", "requests >= 2.26.0", - "bs4 >= 0.0.1", "anyio == 3.7.1", "pymupdf >= 1.23.5", "django == 4.2.10", @@ -76,6 +75,7 @@ dependencies = [ "openai-whisper >= 20231117", "django-phonenumber-field == 7.3.0", "phonenumbers == 8.13.27", + "markdownify ~= 0.11.6", ] dynamic = ["version"] diff --git a/src/khoj/processor/conversation/offline/chat_model.py b/src/khoj/processor/conversation/offline/chat_model.py index 799d6d31..437bdd3d 100644 --- a/src/khoj/processor/conversation/offline/chat_model.py +++ b/src/khoj/processor/conversation/offline/chat_model.py @@ -247,7 +247,7 @@ def llm_thread(g, messages: List[ChatMessage], model: Any): def send_message_to_model_offline( message, loaded_model=None, model="mistral-7b-instruct-v0.1.Q4_0.gguf", streaming=False, system_message="" -): +) -> str: try: from gpt4all import GPT4All except ModuleNotFoundError as e: diff --git a/src/khoj/processor/conversation/openai/gpt.py b/src/khoj/processor/conversation/openai/gpt.py index 3ed1ff90..90b636f3 100644 --- a/src/khoj/processor/conversation/openai/gpt.py +++ b/src/khoj/processor/conversation/openai/gpt.py @@ -146,12 +146,9 @@ def converse( return iter([prompts.no_online_results_found.format()]) if ConversationCommand.Online in conversation_commands: - simplified_online_results = online_results.copy() - for result in online_results: - if online_results[result].get("extracted_content"): - simplified_online_results[result] = online_results[result]["extracted_content"] - - conversation_primer = f"{prompts.online_search_conversation.format(online_results=str(simplified_online_results))}\n{conversation_primer}" + conversation_primer = ( + f"{prompts.online_search_conversation.format(online_results=str(online_results))}\n{conversation_primer}" + ) if not is_none_or_empty(compiled_references): conversation_primer = f"{prompts.notes_conversation.format(query=user_query, references=compiled_references)}\n{conversation_primer}" diff --git a/src/khoj/processor/conversation/openai/utils.py b/src/khoj/processor/conversation/openai/utils.py index 00ad74ce..c7c38d46 100644 --- a/src/khoj/processor/conversation/openai/utils.py +++ b/src/khoj/processor/conversation/openai/utils.py @@ -43,7 +43,7 @@ class StreamingChatCallbackHandler(StreamingStdOutCallbackHandler): before_sleep=before_sleep_log(logger, logging.DEBUG), reraise=True, ) -def completion_with_backoff(**kwargs): +def completion_with_backoff(**kwargs) -> str: messages = kwargs.pop("messages") if not "openai_api_key" in kwargs: kwargs["openai_api_key"] = os.getenv("OPENAI_API_KEY") diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index f1ff8e5d..597f394e 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -1,116 +1,131 @@ +import asyncio import json import logging import os -from typing import Dict, List, Union +from typing import Dict, Tuple, Union +import aiohttp import requests +from bs4 import BeautifulSoup +from markdownify import markdownify from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries -from khoj.utils.helpers import is_none_or_empty +from khoj.utils.helpers import is_none_or_empty, timer from khoj.utils.rawconfig import LocationData logger = logging.getLogger(__name__) SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY") -OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY") - SERPER_DEV_URL = "https://google.serper.dev/search" +OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY") OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" - OLOSTEP_QUERY_PARAMS = { "timeout": 35, # seconds "waitBeforeScraping": 1, # seconds - "saveHtml": False, - "saveMarkdown": True, + "saveHtml": "False", + "saveMarkdown": "True", "removeCSSselectors": "default", "htmlTransformer": "none", - "removeImages": True, - "fastLane": True, + "removeImages": "True", + "fastLane": "True", # Similar to Stripe's API, the expand parameters avoid the need to make a second API call # to retrieve the dataset (from the dataset API) if you only need the markdown or html. - "expandMarkdown": True, - "expandHtml": False, + "expandMarkdown": "True", + "expandHtml": "False", } +MAX_WEBPAGES_TO_READ = 1 -async def search_with_google(query: str, conversation_history: dict, location: LocationData): - def _search_with_google(subquery: str): - payload = json.dumps( - { - "q": subquery, - } - ) - - headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"} - - response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload) - - if response.status_code != 200: - logger.error(response.text) - return {} - - json_response = response.json() - sub_response_dict = {} - sub_response_dict["knowledgeGraph"] = json_response.get("knowledgeGraph", {}) - sub_response_dict["organic"] = json_response.get("organic", []) - sub_response_dict["answerBox"] = json_response.get("answerBox", []) - sub_response_dict["peopleAlsoAsk"] = json_response.get("peopleAlsoAsk", []) - - return sub_response_dict - +async def search_online(query: str, conversation_history: dict, location: LocationData): if SERPER_DEV_API_KEY is None: logger.warn("SERPER_DEV_API_KEY is not set") return {} # Breakdown the query into subqueries to get the correct answer subqueries = await generate_online_subqueries(query, conversation_history, location) - response_dict = {} for subquery in subqueries: logger.info(f"Searching with Google for '{subquery}'") - response_dict[subquery] = _search_with_google(subquery) + response_dict[subquery] = search_with_google(subquery) - extracted_content: Dict[str, List] = {} - if is_none_or_empty(OLOSTEP_API_KEY): - logger.warning("OLOSTEP_API_KEY is not set. Skipping web scraping.") - return response_dict + # Gather distinct web pages from organic search results of each subquery without an instant answer + webpage_links = { + result["link"] + for subquery in response_dict + for result in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ] + if "answerBox" not in response_dict[subquery] + } - for subquery in response_dict: - # If a high quality answer is not found, search the web pages of the first 3 organic results - if is_none_or_empty(response_dict[subquery].get("answerBox")): - extracted_content[subquery] = [] - for result in response_dict[subquery].get("organic")[:1]: - logger.info(f"Searching web page of '{result['link']}'") - try: - extracted_content[subquery].append(search_with_olostep(result["link"]).strip()) - except Exception as e: - logger.error(f"Error while searching web page of '{result['link']}': {e}", exc_info=True) - continue - extracted_relevant_content = await extract_relevant_info(subquery, extracted_content) - response_dict[subquery]["extracted_content"] = extracted_relevant_content + # Read, extract relevant info from the retrieved web pages + tasks = [] + for webpage_link in webpage_links: + logger.info(f"Reading web page at '{webpage_link}'") + task = read_webpage_and_extract_content(subquery, webpage_link) + tasks.append(task) + results = await asyncio.gather(*tasks) + + # Collect extracted info from the retrieved web pages + for subquery, extracted_webpage_content in results: + if extracted_webpage_content is not None: + response_dict[subquery]["extracted_content"] = extracted_webpage_content return response_dict -def search_with_olostep(web_url: str) -> str: - if OLOSTEP_API_KEY is None: - raise ValueError("OLOSTEP_API_KEY is not set") +def search_with_google(subquery: str): + payload = json.dumps({"q": subquery}) + headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"} + response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload) + + if response.status_code != 200: + logger.error(response.text) + return {} + + json_response = response.json() + extraction_fields = ["organic", "answerBox", "peopleAlsoAsk", "knowledgeGraph"] + extracted_search_result = { + field: json_response[field] for field in extraction_fields if not is_none_or_empty(json_response.get(field)) + } + + return extracted_search_result + + +async def read_webpage_and_extract_content(subquery: str, url: str) -> Tuple[str, Union[None, str]]: + try: + with timer(f"Reading web page at '{url}' took", logger): + content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage(url) + with timer(f"Extracting relevant information from web page at '{url}' took", logger): + extracted_info = await extract_relevant_info(subquery, content) + return subquery, extracted_info + except Exception as e: + logger.error(f"Failed to read web page at '{url}' with {e}") + return subquery, None + + +async def read_webpage(web_url: str) -> str: + headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36", + } + + async with aiohttp.ClientSession() as session: + async with session.get(web_url, headers=headers, timeout=30) as response: + response.raise_for_status() + html = await response.text() + parsed_html = BeautifulSoup(html, "html.parser") + body = parsed_html.body.get_text(separator="\n", strip=True) + return markdownify(body) + + +async def read_webpage_with_olostep(web_url: str) -> str: headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"} - web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore web_scraping_params["url"] = web_url - try: - response = requests.request("GET", OLOSTEP_API_URL, params=web_scraping_params, headers=headers) - - if response.status_code != 200: - logger.error(response, exc_info=True) - return None - except Exception as e: - logger.error(f"Error while searching with Olostep: {e}", exc_info=True) - return None - - return response.json()["markdown_content"] + async with aiohttp.ClientSession() as session: + async with session.get(OLOSTEP_API_URL, params=web_scraping_params, headers=headers) as response: + response.raise_for_status() + response_json = await response.json() + return response_json["markdown_content"] diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index 0fc31b3b..7a99869c 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -14,7 +14,7 @@ from khoj.database.adapters import ConversationAdapters, EntryAdapters, aget_use from khoj.database.models import KhojUser from khoj.processor.conversation.prompts import help_message, no_entries_found from khoj.processor.conversation.utils import save_to_conversation_log -from khoj.processor.tools.online_search import search_with_google +from khoj.processor.tools.online_search import search_online from khoj.routers.api import extract_references_and_questions from khoj.routers.helpers import ( ApiUserRateLimiter, @@ -289,7 +289,7 @@ async def chat( if ConversationCommand.Online in conversation_commands: try: - online_results = await search_with_google(defiltered_query, meta_log, location) + online_results = await search_online(defiltered_query, meta_log, location) except ValueError as e: return StreamingResponse( iter(["Please set your SERPER_DEV_API_KEY to get started with online searches 🌐"]), diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index f8c355fa..85cf9a55 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -256,15 +256,17 @@ async def generate_online_subqueries(q: str, conversation_history: dict, locatio return [q] -async def extract_relevant_info(q: str, corpus: dict) -> List[str]: +async def extract_relevant_info(q: str, corpus: str) -> Union[str, None]: """ - Given a target corpus, extract the most relevant info given a query + Extract relevant information for a given query from the target corpus """ - key = list(corpus.keys())[0] + if is_none_or_empty(corpus) or is_none_or_empty(q): + return None + extract_relevant_information = prompts.extract_relevant_information.format( query=q, - corpus=corpus[key], + corpus=corpus.strip(), ) response = await send_message_to_model_wrapper( diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 215c1430..086e4895 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -7,7 +7,7 @@ import pytest from scipy.stats import linregress from khoj.processor.embeddings import EmbeddingsModel -from khoj.processor.tools.online_search import search_with_olostep +from khoj.processor.tools.online_search import read_webpage, read_webpage_with_olostep from khoj.utils import helpers @@ -84,13 +84,29 @@ def test_encode_docs_memory_leak(): assert slope < 2, f"Memory leak suspected on {device}. Memory usage increased at ~{slope:.2f} MB per iteration" -@pytest.mark.skipif(os.getenv("OLOSTEP_API_KEY") is None, reason="OLOSTEP_API_KEY is not set") -def test_olostep_api(): +@pytest.mark.asyncio +async def test_reading_webpage(): # Arrange website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" # Act - response = search_with_olostep(website) + response = await read_webpage(website) + + # Assert + assert ( + "An alarm sent from the area near the fire also failed to register at the courthouse where the fire watchmen were" + in response + ) + + +@pytest.mark.skipif(os.getenv("OLOSTEP_API_KEY") is None, reason="OLOSTEP_API_KEY is not set") +@pytest.mark.asyncio +async def test_reading_webpage_with_olostep(): + # Arrange + website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" + + # Act + response = await read_webpage_with_olostep(website) # Assert assert (