mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-02 13:18:18 +00:00
Fix and Improve Online Search and Webpage Read (#1147)
New - Support Firecrawl as a online search provider Improve - Fallback to other enabled online search providers on failure - Speed up online search with Jina by excluding webpage content in search results Fix - Fix Jina webpage reader. Improve it to include generated alt text to each image on webpage - Truncate online query to Serper if query exceeds max supported length
This commit is contained in:
@@ -38,6 +38,7 @@ SERPER_DEV_URL = "https://google.serper.dev/search"
|
||||
JINA_SEARCH_API_URL = "https://s.jina.ai/"
|
||||
JINA_API_KEY = os.getenv("JINA_API_KEY")
|
||||
|
||||
FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY")
|
||||
FIRECRAWL_USE_LLM_EXTRACT = is_env_var_true("FIRECRAWL_USE_LLM_EXTRACT")
|
||||
|
||||
OLOSTEP_QUERY_PARAMS = {
|
||||
@@ -95,29 +96,40 @@ async def search_online(
|
||||
yield response_dict
|
||||
return
|
||||
|
||||
search_engines = []
|
||||
if SERPER_DEV_API_KEY:
|
||||
search_engine = "Serper"
|
||||
search_engines.append((search_engine, search_with_serper))
|
||||
if GOOGLE_SEARCH_API_KEY and GOOGLE_SEARCH_ENGINE_ID:
|
||||
search_engine = "Google"
|
||||
search_func = search_with_google
|
||||
elif SERPER_DEV_API_KEY:
|
||||
search_engine = "Serper"
|
||||
search_func = search_with_serper
|
||||
elif JINA_API_KEY:
|
||||
search_engines.append((search_engine, search_with_google))
|
||||
if FIRECRAWL_API_KEY:
|
||||
search_engine = "Firecrawl"
|
||||
search_engines.append((search_engine, search_with_firecrawl))
|
||||
if JINA_API_KEY:
|
||||
search_engine = "Jina"
|
||||
search_func = search_with_jina
|
||||
else:
|
||||
search_engine = "Searxng"
|
||||
search_func = search_with_searxng
|
||||
search_engines.append((search_engine, search_with_jina))
|
||||
search_engine = "Searxng"
|
||||
search_engines.append((search_engine, search_with_searxng))
|
||||
|
||||
logger.info(f"🌐 Searching the Internet with {search_engine} for {subqueries}")
|
||||
logger.info(f"🌐 Searching the Internet for {subqueries}")
|
||||
if send_status_func:
|
||||
subqueries_str = "\n- " + "\n- ".join(subqueries)
|
||||
async for event in send_status_func(f"**Searching the Internet for**: {subqueries_str}"):
|
||||
yield {ChatEvent.STATUS: event}
|
||||
|
||||
with timer(f"Internet searches for {subqueries} took", logger):
|
||||
search_tasks = [search_func(subquery, location) for subquery in subqueries]
|
||||
search_results = await asyncio.gather(*search_tasks)
|
||||
response_dict = {subquery: search_result for subquery, search_result in search_results}
|
||||
response_dict = {}
|
||||
for search_engine, search_func in search_engines:
|
||||
with timer(f"Internet searches with {search_engine} for {subqueries} took", logger):
|
||||
try:
|
||||
search_tasks = [search_func(subquery, location) for subquery in subqueries]
|
||||
search_results = await asyncio.gather(*search_tasks)
|
||||
response_dict = {subquery: search_result for subquery, search_result in search_results if search_result}
|
||||
if not is_none_or_empty(response_dict):
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching with {search_engine}: {e}")
|
||||
response_dict = {}
|
||||
|
||||
# Gather distinct web pages from organic results for subqueries without an instant answer.
|
||||
webpages: Dict[str, Dict] = {}
|
||||
@@ -159,6 +171,69 @@ async def search_online(
|
||||
yield response_dict
|
||||
|
||||
|
||||
async def search_with_firecrawl(query: str, location: LocationData) -> Tuple[str, Dict[str, List[Dict]]]:
|
||||
"""
|
||||
Search using Firecrawl API.
|
||||
|
||||
Args:
|
||||
query: The search query string
|
||||
location: Location data for geolocation-based search
|
||||
|
||||
Returns:
|
||||
Tuple containing the original query and a dictionary of search results
|
||||
"""
|
||||
# Set up API endpoint and headers
|
||||
firecrawl_api_url = "https://api.firecrawl.dev/v1/search"
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {FIRECRAWL_API_KEY}"}
|
||||
|
||||
# Prepare request payload
|
||||
country_code = location.country_code.lower() if location and location.country_code else "us"
|
||||
payload = {
|
||||
"query": query,
|
||||
"limit": 10, # Maximum number of results
|
||||
"country": country_code,
|
||||
"lang": "en",
|
||||
"timeout": 10000,
|
||||
"scrapeOptions": {},
|
||||
}
|
||||
|
||||
# Add location parameter if available
|
||||
if location and location.city:
|
||||
payload["location"] = f"{location.city}, {location.region}, {location.country}"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
try:
|
||||
async with session.post(firecrawl_api_url, headers=headers, json=payload) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
logger.error(f"Firecrawl search failed: {error_text}")
|
||||
return query, {}
|
||||
|
||||
response_json = await response.json()
|
||||
|
||||
if not response_json.get("success", False):
|
||||
logger.error(f"Firecrawl search failed: {response_json.get('warning', 'Unknown error')}")
|
||||
return query, {}
|
||||
|
||||
# Transform Firecrawl response to match the expected format
|
||||
organic_results = []
|
||||
for item in response_json.get("data", []):
|
||||
organic_results.append(
|
||||
{
|
||||
"title": item["title"],
|
||||
"link": item["url"],
|
||||
"snippet": item["description"],
|
||||
"content": item.get("markdown", None),
|
||||
}
|
||||
)
|
||||
|
||||
return query, {"organic": organic_results}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error searching with Firecrawl: {str(e)}")
|
||||
return query, {}
|
||||
|
||||
|
||||
async def search_with_searxng(query: str, location: LocationData) -> Tuple[str, Dict[str, List[Dict]]]:
|
||||
"""Search using local SearXNG instance."""
|
||||
# Use environment variable or default to localhost
|
||||
@@ -251,9 +326,16 @@ async def search_with_google(query: str, location: LocationData) -> Tuple[str, D
|
||||
|
||||
|
||||
async def search_with_serper(query: str, location: LocationData) -> Tuple[str, Dict[str, List[Dict]]]:
|
||||
country_code = location.country_code.lower() if location and location.country_code else "us"
|
||||
payload = json.dumps({"q": query, "gl": country_code})
|
||||
headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"}
|
||||
country_code = location.country_code.lower() if location and location.country_code else "us"
|
||||
max_query_length = 2048
|
||||
if len(query) > max_query_length:
|
||||
logger.warning(
|
||||
f"Truncate online query. Query length {len(query)} exceeds {max_query_length} supported by Serper. Query: {query}"
|
||||
)
|
||||
query = query[:max_query_length]
|
||||
|
||||
payload = json.dumps({"q": query, "gl": country_code})
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(SERPER_DEV_URL, headers=headers, data=payload) as response:
|
||||
@@ -398,16 +480,16 @@ async def read_webpage_with_olostep(web_url: str, api_key: str, api_url: str) ->
|
||||
|
||||
|
||||
async def read_webpage_with_jina(web_url: str, api_key: str, api_url: str) -> str:
|
||||
jina_reader_api_url = f"{api_url}/{web_url}"
|
||||
headers = {"Accept": "application/json", "X-Timeout": "30"}
|
||||
headers = {"Accept": "application/json", "X-Timeout": "30", "X-With-Generated-Alt": "true"}
|
||||
data = {"url": web_url}
|
||||
if api_key:
|
||||
headers["Authorization"] = f"Bearer {api_key}"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(jina_reader_api_url, headers=headers) as response:
|
||||
async with session.post(api_url, json=data, headers=headers) as response:
|
||||
response.raise_for_status()
|
||||
response_json = await response.json()
|
||||
return response_json["data"]["content"]
|
||||
content = await response.text()
|
||||
return content
|
||||
|
||||
|
||||
async def read_webpage_with_firecrawl(web_url: str, api_key: str, api_url: str) -> str:
|
||||
@@ -459,10 +541,6 @@ Collate only relevant information from the website to answer the target query an
|
||||
|
||||
|
||||
async def search_with_jina(query: str, location: LocationData) -> Tuple[str, Dict[str, List[Dict]]]:
|
||||
encoded_query = urllib.parse.quote(query)
|
||||
jina_search_api_url = f"{JINA_SEARCH_API_URL}/{encoded_query}"
|
||||
headers = {"Accept": "application/json"}
|
||||
|
||||
# First check for jina scraper configuration in database
|
||||
default_jina_scraper = (
|
||||
await ServerChatSettings.objects.filter()
|
||||
@@ -477,13 +555,15 @@ async def search_with_jina(query: str, location: LocationData) -> Tuple[str, Dic
|
||||
jina_scraper = await WebScraper.objects.filter(type=WebScraper.WebScraperType.JINA).afirst()
|
||||
|
||||
# Get API key from DB scraper config or environment variable
|
||||
data = {"q": query}
|
||||
headers = {"Accept": "application/json", "X-Respond-With": "no-content"}
|
||||
api_key = jina_scraper.api_key if jina_scraper and jina_scraper.api_key else JINA_API_KEY
|
||||
|
||||
if api_key:
|
||||
headers["Authorization"] = f"Bearer {api_key}"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(jina_search_api_url, headers=headers) as response:
|
||||
async with session.post(JINA_SEARCH_API_URL, json=data, headers=headers) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
logger.error(f"Jina search failed: {error_text}")
|
||||
|
||||
Reference in New Issue
Block a user