mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-05 13:21:18 +00:00
Improve Online Search: Parallelize Search, Use Jina Reader API by default (#832)
- Overview Khoj wil be able to do online search out of the box, even for self-hosted users - Default to Jina search, reader API when no Serper.dev, Olostep API keys - Run online searches in parallel to process multiple queries faster - Details - Jina provides a [reader API](https://github.com/jina-ai/reader) for online search and web page reading It requires no API key. This provides a good default to enable online search for self-hosted readers requiring no additional setup. - Jina search API also returns webpage contents with the results, so just use those directly when Jina Search API used instead of trying to read webpages separately. The extract relevant content from webpage step using a chat model is still used from the `read_webpage_and_extract_content' func in this case. - Parse search results from Jina search API into same format as Serper.dev for accurate rendering of online references by clients - Run online searches in parallel with AsyncIO to process multiple queries faster
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
import logging
|
||||
import os
|
||||
from threading import Thread
|
||||
from typing import Dict
|
||||
|
||||
@@ -40,7 +39,7 @@ def completion_with_backoff(
|
||||
client: openai.OpenAI = openai_clients.get(client_key)
|
||||
if not client:
|
||||
client = openai.OpenAI(
|
||||
api_key=openai_api_key or os.getenv("OPENAI_API_KEY"),
|
||||
api_key=openai_api_key,
|
||||
base_url=api_base_url,
|
||||
)
|
||||
openai_clients[client_key] = client
|
||||
@@ -102,7 +101,7 @@ def llm_thread(g, messages, model_name, temperature, openai_api_key=None, api_ba
|
||||
client_key = f"{openai_api_key}--{api_base_url}"
|
||||
if client_key not in openai_clients:
|
||||
client: openai.OpenAI = openai.OpenAI(
|
||||
api_key=openai_api_key or os.getenv("OPENAI_API_KEY"),
|
||||
api_key=openai_api_key,
|
||||
base_url=api_base_url,
|
||||
)
|
||||
openai_clients[client_key] = client
|
||||
|
||||
@@ -2,11 +2,11 @@ import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import urllib.parse
|
||||
from collections import defaultdict
|
||||
from typing import Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import aiohttp
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from markdownify import markdownify
|
||||
|
||||
@@ -23,6 +23,10 @@ logger = logging.getLogger(__name__)
|
||||
SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY")
|
||||
SERPER_DEV_URL = "https://google.serper.dev/search"
|
||||
|
||||
JINA_READER_API_URL = "https://r.jina.ai/"
|
||||
JINA_SEARCH_API_URL = "https://s.jina.ai/"
|
||||
JINA_API_KEY = os.getenv("JINA_API_KEY")
|
||||
|
||||
OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY")
|
||||
OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI"
|
||||
OLOSTEP_QUERY_PARAMS = {
|
||||
@@ -50,9 +54,6 @@ async def search_online(
|
||||
custom_filters: List[str] = [],
|
||||
):
|
||||
query += " ".join(custom_filters)
|
||||
if not online_search_enabled():
|
||||
logger.warn("SERPER_DEV_API_KEY is not set")
|
||||
return {}
|
||||
if not is_internet_connected():
|
||||
logger.warn("Cannot search online as not connected to internet")
|
||||
return {}
|
||||
@@ -61,27 +62,35 @@ async def search_online(
|
||||
subqueries = await generate_online_subqueries(query, conversation_history, location)
|
||||
response_dict = {}
|
||||
|
||||
for subquery in subqueries:
|
||||
if subqueries:
|
||||
logger.info(f"🌐 Searching the Internet for {list(subqueries)}")
|
||||
if send_status_func:
|
||||
await send_status_func(f"**🌐 Searching the Internet for**: {subquery}")
|
||||
logger.info(f"🌐 Searching the Internet for '{subquery}'")
|
||||
response_dict[subquery] = search_with_google(subquery)
|
||||
subqueries_str = "\n- " + "\n- ".join(list(subqueries))
|
||||
await send_status_func(f"**🌐 Searching the Internet for**: {subqueries_str}")
|
||||
|
||||
# Gather distinct web pages from organic search results of each subquery without an instant answer
|
||||
webpage_links = {
|
||||
organic["link"]: subquery
|
||||
with timer(f"Internet searches for {list(subqueries)} took", logger):
|
||||
search_func = search_with_google if SERPER_DEV_API_KEY else search_with_jina
|
||||
search_tasks = [search_func(subquery) for subquery in subqueries]
|
||||
search_results = await asyncio.gather(*search_tasks)
|
||||
response_dict = {subquery: search_result for subquery, search_result in search_results}
|
||||
|
||||
# Gather distinct web page data from organic results of each subquery without an instant answer.
|
||||
# Content of web pages is directly available when Jina is used for search.
|
||||
webpages = {
|
||||
(organic.get("link"), subquery, organic.get("content"))
|
||||
for subquery in response_dict
|
||||
for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ]
|
||||
if "answerBox" not in response_dict[subquery]
|
||||
}
|
||||
|
||||
# Read, extract relevant info from the retrieved web pages
|
||||
if webpage_links:
|
||||
if webpages:
|
||||
webpage_links = [link for link, _, _ in webpages]
|
||||
logger.info(f"🌐👀 Reading web pages at: {list(webpage_links)}")
|
||||
if send_status_func:
|
||||
webpage_links_str = "\n- " + "\n- ".join(list(webpage_links))
|
||||
await send_status_func(f"**📖 Reading web pages**: {webpage_links_str}")
|
||||
tasks = [read_webpage_and_extract_content(subquery, link) for link, subquery in webpage_links.items()]
|
||||
tasks = [read_webpage_and_extract_content(subquery, link, content) for link, subquery, content in webpages]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
# Collect extracted info from the retrieved web pages
|
||||
@@ -92,23 +101,24 @@ async def search_online(
|
||||
return response_dict
|
||||
|
||||
|
||||
def search_with_google(subquery: str):
|
||||
payload = json.dumps({"q": subquery})
|
||||
async def search_with_google(query: str) -> Tuple[str, Dict[str, List[Dict]]]:
|
||||
payload = json.dumps({"q": query})
|
||||
headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"}
|
||||
|
||||
response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload)
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(SERPER_DEV_URL, headers=headers, data=payload) as response:
|
||||
if response.status != 200:
|
||||
logger.error(await response.text())
|
||||
return query, {}
|
||||
json_response = await response.json()
|
||||
extraction_fields = ["organic", "answerBox", "peopleAlsoAsk", "knowledgeGraph"]
|
||||
extracted_search_result = {
|
||||
field: json_response[field]
|
||||
for field in extraction_fields
|
||||
if not is_none_or_empty(json_response.get(field))
|
||||
}
|
||||
|
||||
if response.status_code != 200:
|
||||
logger.error(response.text)
|
||||
return {}
|
||||
|
||||
json_response = response.json()
|
||||
extraction_fields = ["organic", "answerBox", "peopleAlsoAsk", "knowledgeGraph"]
|
||||
extracted_search_result = {
|
||||
field: json_response[field] for field in extraction_fields if not is_none_or_empty(json_response.get(field))
|
||||
}
|
||||
|
||||
return extracted_search_result
|
||||
return query, extracted_search_result
|
||||
|
||||
|
||||
async def read_webpages(
|
||||
@@ -134,10 +144,13 @@ async def read_webpages(
|
||||
return response
|
||||
|
||||
|
||||
async def read_webpage_and_extract_content(subquery: str, url: str) -> Tuple[str, Union[None, str], str]:
|
||||
async def read_webpage_and_extract_content(
|
||||
subquery: str, url: str, content: str = None
|
||||
) -> Tuple[str, Union[None, str], str]:
|
||||
try:
|
||||
with timer(f"Reading web page at '{url}' took", logger):
|
||||
content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_at_url(url)
|
||||
if is_none_or_empty(content):
|
||||
with timer(f"Reading web page at '{url}' took", logger):
|
||||
content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_with_jina(url)
|
||||
with timer(f"Extracting relevant information from web page at '{url}' took", logger):
|
||||
extracted_info = await extract_relevant_info(subquery, content)
|
||||
return subquery, extracted_info, url
|
||||
@@ -172,5 +185,41 @@ async def read_webpage_with_olostep(web_url: str) -> str:
|
||||
return response_json["markdown_content"]
|
||||
|
||||
|
||||
def online_search_enabled():
|
||||
return SERPER_DEV_API_KEY is not None
|
||||
async def read_webpage_with_jina(web_url: str) -> str:
|
||||
jina_reader_api_url = f"{JINA_READER_API_URL}/{web_url}"
|
||||
headers = {"Accept": "application/json", "X-Timeout": "30"}
|
||||
if JINA_API_KEY:
|
||||
headers["Authorization"] = f"Bearer {JINA_API_KEY}"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(jina_reader_api_url, headers=headers) as response:
|
||||
response.raise_for_status()
|
||||
response_json = await response.json()
|
||||
return response_json["data"]["content"]
|
||||
|
||||
|
||||
async def search_with_jina(query: str) -> Tuple[str, Dict[str, List[Dict]]]:
|
||||
encoded_query = urllib.parse.quote(query)
|
||||
jina_search_api_url = f"{JINA_SEARCH_API_URL}/{encoded_query}"
|
||||
headers = {"Accept": "application/json"}
|
||||
if JINA_API_KEY:
|
||||
headers["Authorization"] = f"Bearer {JINA_API_KEY}"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(jina_search_api_url, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
logger.error(await response.text())
|
||||
return query, {}
|
||||
response_json = await response.json()
|
||||
parsed_response = [
|
||||
{
|
||||
"title": item["title"],
|
||||
"content": item.get("content"),
|
||||
# rename description -> snippet for consistency
|
||||
"snippet": item["description"],
|
||||
# rename url -> link for consistency
|
||||
"link": item["url"],
|
||||
}
|
||||
for item in response_json["data"]
|
||||
]
|
||||
return query, {"organic": parsed_response}
|
||||
|
||||
@@ -29,11 +29,7 @@ from khoj.processor.conversation.prompts import (
|
||||
)
|
||||
from khoj.processor.conversation.utils import save_to_conversation_log
|
||||
from khoj.processor.speech.text_to_speech import generate_text_to_speech
|
||||
from khoj.processor.tools.online_search import (
|
||||
online_search_enabled,
|
||||
read_webpages,
|
||||
search_online,
|
||||
)
|
||||
from khoj.processor.tools.online_search import read_webpages, search_online
|
||||
from khoj.routers.api import extract_references_and_questions
|
||||
from khoj.routers.helpers import (
|
||||
ApiUserRateLimiter,
|
||||
@@ -767,22 +763,16 @@ async def websocket_endpoint(
|
||||
conversation_commands.remove(ConversationCommand.Notes)
|
||||
|
||||
if ConversationCommand.Online in conversation_commands:
|
||||
if not online_search_enabled():
|
||||
conversation_commands.remove(ConversationCommand.Online)
|
||||
# If online search is not enabled, try to read webpages directly
|
||||
if ConversationCommand.Webpage not in conversation_commands:
|
||||
conversation_commands.append(ConversationCommand.Webpage)
|
||||
else:
|
||||
try:
|
||||
online_results = await search_online(
|
||||
defiltered_query, meta_log, location, send_status_update, custom_filters
|
||||
)
|
||||
except ValueError as e:
|
||||
logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
|
||||
await send_complete_llm_response(
|
||||
f"Error searching online: {e}. Attempting to respond without online results"
|
||||
)
|
||||
continue
|
||||
try:
|
||||
online_results = await search_online(
|
||||
defiltered_query, meta_log, location, send_status_update, custom_filters
|
||||
)
|
||||
except ValueError as e:
|
||||
logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
|
||||
await send_complete_llm_response(
|
||||
f"Error searching online: {e}. Attempting to respond without online results"
|
||||
)
|
||||
continue
|
||||
|
||||
if ConversationCommand.Webpage in conversation_commands:
|
||||
try:
|
||||
@@ -1067,18 +1057,10 @@ async def chat(
|
||||
conversation_commands.remove(ConversationCommand.Notes)
|
||||
|
||||
if ConversationCommand.Online in conversation_commands:
|
||||
if not online_search_enabled():
|
||||
conversation_commands.remove(ConversationCommand.Online)
|
||||
# If online search is not enabled, try to read webpages directly
|
||||
if ConversationCommand.Webpage not in conversation_commands:
|
||||
conversation_commands.append(ConversationCommand.Webpage)
|
||||
else:
|
||||
try:
|
||||
online_results = await search_online(
|
||||
defiltered_query, meta_log, location, custom_filters=_custom_filters
|
||||
)
|
||||
except ValueError as e:
|
||||
logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
|
||||
try:
|
||||
online_results = await search_online(defiltered_query, meta_log, location, custom_filters=_custom_filters)
|
||||
except ValueError as e:
|
||||
logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
|
||||
|
||||
if ConversationCommand.Webpage in conversation_commands:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user