mirror of
https://github.com/khoaliber/khoj.git
synced 2026-03-09 05:39:12 +00:00
Add Telemetry to Understand Khoj Usage
### Objective: Use telemetry to better understand Khoj usage. This will motivate and prioritize work for Khoj. Specific questions: - Number of active deployments of khoj server - How regularly is khoj used (hourly, daily, weekly etc)? - How much is which feature used (chat, search)? - Which UI interface is used most (obsidian, emacs, web ui)? ### Details - Expose setting to disable telemetry logging in khoj.yml - Create basic telemetry server to log data to a DB - Log calls to Khoj API /search, /chat, /update endpoints - Batch upload telemetry data to server at ~hourly interval
This commit is contained in:
45
.github/workflows/dockerize_telemetry_server.yml
vendored
Normal file
45
.github/workflows/dockerize_telemetry_server.yml
vendored
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
name: dockerize telemetry server
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
paths:
|
||||||
|
- src/telemetry/**
|
||||||
|
- .github/workflows/dockerize_telemetry_server.yml
|
||||||
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
paths:
|
||||||
|
- src/telemetry/**
|
||||||
|
- .github/workflows/dockerize_telemetry_server.yml
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
env:
|
||||||
|
DOCKER_IMAGE_TAG: ${{ github.ref == 'refs/heads/master' && 'latest' || github.event.pull_request.number }}
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
name: Build Docker Image, Push to Container Registry
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout Code
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v2
|
||||||
|
|
||||||
|
- name: Login to GitHub Container Registry
|
||||||
|
uses: docker/login-action@v2
|
||||||
|
with:
|
||||||
|
registry: ghcr.io
|
||||||
|
username: ${{ github.repository_owner }}
|
||||||
|
password: ${{ secrets.PAT }}
|
||||||
|
|
||||||
|
- name: 📦 Build and Push Docker Image
|
||||||
|
uses: docker/build-push-action@v2
|
||||||
|
with:
|
||||||
|
context: src/telemetry
|
||||||
|
file: src/telemetry/Dockerfile
|
||||||
|
push: true
|
||||||
|
tags: ghcr.io/${{ github.repository }}-telemetry:${{ env.DOCKER_IMAGE_TAG }}
|
||||||
@@ -3,6 +3,7 @@ import sys
|
|||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
import requests
|
||||||
|
|
||||||
# External Packages
|
# External Packages
|
||||||
import schedule
|
import schedule
|
||||||
@@ -62,7 +63,7 @@ def configure_routes(app):
|
|||||||
app.include_router(web_client)
|
app.include_router(web_client)
|
||||||
|
|
||||||
|
|
||||||
@schedule.repeat(schedule.every(1).hour)
|
@schedule.repeat(schedule.every(61).minutes)
|
||||||
def update_search_index():
|
def update_search_index():
|
||||||
state.search_index_lock.acquire()
|
state.search_index_lock.acquire()
|
||||||
state.model = configure_search(state.model, state.config, regenerate=False)
|
state.model = configure_search(state.model, state.config, regenerate=False)
|
||||||
@@ -189,7 +190,7 @@ def configure_conversation_processor(conversation_processor_config):
|
|||||||
return conversation_processor
|
return conversation_processor
|
||||||
|
|
||||||
|
|
||||||
@schedule.repeat(schedule.every(15).minutes)
|
@schedule.repeat(schedule.every(17).minutes)
|
||||||
def save_chat_session():
|
def save_chat_session():
|
||||||
# No need to create empty log file
|
# No need to create empty log file
|
||||||
if not (
|
if not (
|
||||||
@@ -223,3 +224,19 @@ def save_chat_session():
|
|||||||
|
|
||||||
state.processor_config.conversation.chat_session = None
|
state.processor_config.conversation.chat_session = None
|
||||||
logger.info("📩 Saved current chat session to conversation logs")
|
logger.info("📩 Saved current chat session to conversation logs")
|
||||||
|
|
||||||
|
|
||||||
|
@schedule.repeat(schedule.every(59).minutes)
|
||||||
|
def upload_telemetry():
|
||||||
|
if not state.config.app.should_log_telemetry or not state.telemetry:
|
||||||
|
message = "📡 No telemetry to upload" if not state.telemetry else "📡 Telemetry logging disabled"
|
||||||
|
logger.debug(message)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.debug(f"📡 Upload usage telemetry to {constants.telemetry_server}:\n{state.telemetry}")
|
||||||
|
requests.post(constants.telemetry_server, json=state.telemetry)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"📡 Error uploading telemetry: {e}")
|
||||||
|
else:
|
||||||
|
state.telemetry = []
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ from khoj.configure import configure_processor, configure_search
|
|||||||
from khoj.processor.conversation.gpt import converse, extract_questions
|
from khoj.processor.conversation.gpt import converse, extract_questions
|
||||||
from khoj.processor.conversation.utils import message_to_log, message_to_prompt
|
from khoj.processor.conversation.utils import message_to_log, message_to_prompt
|
||||||
from khoj.search_type import image_search, text_search
|
from khoj.search_type import image_search, text_search
|
||||||
from khoj.utils.helpers import timer
|
from khoj.utils.helpers import log_telemetry, timer
|
||||||
from khoj.utils.rawconfig import FullConfig, SearchResponse
|
from khoj.utils.rawconfig import FullConfig, SearchResponse
|
||||||
from khoj.utils.state import SearchType
|
from khoj.utils.state import SearchType
|
||||||
from khoj.utils import state, constants
|
from khoj.utils import state, constants
|
||||||
@@ -168,6 +168,11 @@ def search(
|
|||||||
# Cache results
|
# Cache results
|
||||||
state.query_cache[query_cache_key] = results
|
state.query_cache[query_cache_key] = results
|
||||||
|
|
||||||
|
# Only log telemetry if query is new and not a continuation of previous query
|
||||||
|
if state.previous_query is None or state.previous_query not in user_query:
|
||||||
|
state.telemetry += [log_telemetry(telemetry_type="api", api="search", app_config=state.config.app)]
|
||||||
|
state.previous_query = user_query
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
@@ -191,6 +196,8 @@ def update(t: Optional[SearchType] = None, force: Optional[bool] = False):
|
|||||||
else:
|
else:
|
||||||
logger.info("📬 Processor reconfigured via API")
|
logger.info("📬 Processor reconfigured via API")
|
||||||
|
|
||||||
|
state.telemetry += [log_telemetry(telemetry_type="api", api="update", app_config=state.config.app)]
|
||||||
|
|
||||||
return {"status": "ok", "message": "khoj reloaded"}
|
return {"status": "ok", "message": "khoj reloaded"}
|
||||||
|
|
||||||
|
|
||||||
@@ -251,4 +258,6 @@ def chat(q: Optional[str] = None):
|
|||||||
conversation_log=meta_log.get("chat", []),
|
conversation_log=meta_log.get("chat", []),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
state.telemetry += [log_telemetry(telemetry_type="api", api="chat", app_config=state.config.app)]
|
||||||
|
|
||||||
return {"status": status, "response": gpt_response, "context": compiled_references}
|
return {"status": status, "response": gpt_response, "context": compiled_references}
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ from pathlib import Path
|
|||||||
app_root_directory = Path(__file__).parent.parent.parent
|
app_root_directory = Path(__file__).parent.parent.parent
|
||||||
web_directory = app_root_directory / "khoj/interface/web/"
|
web_directory = app_root_directory / "khoj/interface/web/"
|
||||||
empty_escape_sequences = "\n|\r|\t| "
|
empty_escape_sequences = "\n|\r|\t| "
|
||||||
|
app_env_filepath = "~/.khoj/env"
|
||||||
|
telemetry_server = "https://khoj.beta.haletic.com/v1/telemetry"
|
||||||
|
|
||||||
# default app config to use
|
# default app config to use
|
||||||
default_config = {
|
default_config = {
|
||||||
|
|||||||
@@ -1,14 +1,22 @@
|
|||||||
# Standard Packages
|
# Standard Packages
|
||||||
from __future__ import annotations # to avoid quoting type hints
|
from __future__ import annotations # to avoid quoting type hints
|
||||||
import logging
|
|
||||||
import sys
|
|
||||||
import torch
|
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
import datetime
|
||||||
from importlib import import_module
|
from importlib import import_module
|
||||||
from os.path import join
|
import logging
|
||||||
|
from os import path
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import platform
|
||||||
|
import requests
|
||||||
|
import sys
|
||||||
from time import perf_counter
|
from time import perf_counter
|
||||||
|
import torch
|
||||||
from typing import Optional, Union, TYPE_CHECKING
|
from typing import Optional, Union, TYPE_CHECKING
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
# Internal Packages
|
||||||
|
from khoj.utils import constants
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
# External Packages
|
# External Packages
|
||||||
@@ -16,6 +24,7 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
# Internal Packages
|
# Internal Packages
|
||||||
from khoj.utils.models import BaseEncoder
|
from khoj.utils.models import BaseEncoder
|
||||||
|
from khoj.utils.rawconfig import AppConfig
|
||||||
|
|
||||||
|
|
||||||
def is_none_or_empty(item):
|
def is_none_or_empty(item):
|
||||||
@@ -59,7 +68,7 @@ def load_model(model_name: str, model_type, model_dir=None, device: str = None)
|
|||||||
"Load model from disk or huggingface"
|
"Load model from disk or huggingface"
|
||||||
# Construct model path
|
# Construct model path
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
model_path = join(model_dir, model_name.replace("/", "_")) if model_dir is not None else None
|
model_path = path.join(model_dir, model_name.replace("/", "_")) if model_dir is not None else None
|
||||||
|
|
||||||
# Load model from model_path if it exists there
|
# Load model from model_path if it exists there
|
||||||
model_type_class = get_class_by_name(model_type) if isinstance(model_type, str) else model_type
|
model_type_class = get_class_by_name(model_type) if isinstance(model_type, str) else model_type
|
||||||
@@ -123,3 +132,61 @@ class LRU(OrderedDict):
|
|||||||
if len(self) > self.capacity:
|
if len(self) > self.capacity:
|
||||||
oldest = next(iter(self))
|
oldest = next(iter(self))
|
||||||
del self[oldest]
|
del self[oldest]
|
||||||
|
|
||||||
|
|
||||||
|
def get_server_id():
|
||||||
|
"""Get, Generate Persistent, Random ID per server install.
|
||||||
|
Helps count distinct khoj servers deployed.
|
||||||
|
Maintains anonymity by using non-PII random id."""
|
||||||
|
# Expand path to the khoj env file. It contains persistent internal app data
|
||||||
|
app_env_filename = path.expanduser(constants.app_env_filepath)
|
||||||
|
|
||||||
|
# Check if the file exists
|
||||||
|
if path.exists(app_env_filename):
|
||||||
|
# Read the contents of the file
|
||||||
|
with open(app_env_filename, "r") as f:
|
||||||
|
contents = f.readlines()
|
||||||
|
|
||||||
|
# Extract the server_id from the contents
|
||||||
|
for line in contents:
|
||||||
|
key, value = line.strip().split("=")
|
||||||
|
if key.strip() == "server_id":
|
||||||
|
server_id = value.strip()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# If server_id is not found, generate a new one
|
||||||
|
server_id = str(uuid.uuid4())
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Generate a new server id
|
||||||
|
server_id = str(uuid.uuid4())
|
||||||
|
|
||||||
|
# Write the server_id to the file
|
||||||
|
with open(app_env_filename, "w") as f:
|
||||||
|
f.write("server_id=" + server_id + "\n")
|
||||||
|
|
||||||
|
return server_id
|
||||||
|
|
||||||
|
|
||||||
|
def log_telemetry(telemetry_type: str, api: str = None, client: str = None, app_config: AppConfig = None):
|
||||||
|
"""Log basic app usage telemetry like client, os, api called"""
|
||||||
|
# Do not log usage telemetry, if telemetry is disabled via app config
|
||||||
|
if not app_config or not app_config.should_log_telemetry:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Populate telemetry data to log
|
||||||
|
request_body = {
|
||||||
|
"telemetry_type": telemetry_type,
|
||||||
|
"server_id": get_server_id(),
|
||||||
|
"os": platform.system(),
|
||||||
|
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
|
}
|
||||||
|
if api:
|
||||||
|
# API endpoint on server called by client
|
||||||
|
request_body["api"] = api
|
||||||
|
if client:
|
||||||
|
# Client from which the API was called. E.g Emacs, Obsidian
|
||||||
|
request_body["client"] = client
|
||||||
|
|
||||||
|
# Log telemetry data to telemetry endpoint
|
||||||
|
return request_body
|
||||||
|
|||||||
@@ -89,10 +89,15 @@ class ProcessorConfig(ConfigBase):
|
|||||||
conversation: Optional[ConversationProcessorConfig]
|
conversation: Optional[ConversationProcessorConfig]
|
||||||
|
|
||||||
|
|
||||||
|
class AppConfig(ConfigBase):
|
||||||
|
should_log_telemetry: bool
|
||||||
|
|
||||||
|
|
||||||
class FullConfig(ConfigBase):
|
class FullConfig(ConfigBase):
|
||||||
content_type: Optional[ContentConfig]
|
content_type: Optional[ContentConfig]
|
||||||
search_type: Optional[SearchConfig]
|
search_type: Optional[SearchConfig]
|
||||||
processor: Optional[ProcessorConfig]
|
processor: Optional[ProcessorConfig]
|
||||||
|
app: Optional[AppConfig] = AppConfig(should_log_telemetry=True)
|
||||||
|
|
||||||
|
|
||||||
class SearchResponse(ConfigBase):
|
class SearchResponse(ConfigBase):
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# Standard Packages
|
# Standard Packages
|
||||||
import threading
|
import threading
|
||||||
from typing import List
|
from typing import List, Dict
|
||||||
from packaging import version
|
from packaging import version
|
||||||
|
|
||||||
# External Packages
|
# External Packages
|
||||||
@@ -25,6 +25,8 @@ cli_args: List[str] = None
|
|||||||
query_cache = LRU()
|
query_cache = LRU()
|
||||||
search_index_lock = threading.Lock()
|
search_index_lock = threading.Lock()
|
||||||
SearchType = utils_config.SearchType
|
SearchType = utils_config.SearchType
|
||||||
|
telemetry: List[Dict[str, str]] = []
|
||||||
|
previous_query: str = None
|
||||||
|
|
||||||
if torch.cuda.is_available():
|
if torch.cuda.is_available():
|
||||||
# Use CUDA GPU
|
# Use CUDA GPU
|
||||||
|
|||||||
10
src/telemetry/Dockerfile
Normal file
10
src/telemetry/Dockerfile
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
# Get Base Image
|
||||||
|
FROM tiangolo/uvicorn-gunicorn:python3.11-slim
|
||||||
|
LABEL org.opencontainers.image.source https://github.com/debanjum/khoj
|
||||||
|
|
||||||
|
# Install Telemetry Server Dependencies
|
||||||
|
COPY requirements.txt /tmp/requirements.txt
|
||||||
|
RUN pip install --no-cache-dir -r /tmp/requirements.txt
|
||||||
|
|
||||||
|
# Copy Application
|
||||||
|
COPY telemetry.py /app/main.py
|
||||||
2
src/telemetry/requirements.txt
Normal file
2
src/telemetry/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
uvicorn
|
||||||
|
fastapi
|
||||||
65
src/telemetry/telemetry.py
Normal file
65
src/telemetry/telemetry.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
# Standard Packages
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
from typing import Dict, List
|
||||||
|
|
||||||
|
# External Packages
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi import HTTPException
|
||||||
|
import sqlite3
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
|
||||||
|
# Initialize Global App Variables
|
||||||
|
app = FastAPI()
|
||||||
|
sqlfile = "khoj.sqlite"
|
||||||
|
logger = logging.getLogger()
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/v1/telemetry")
|
||||||
|
def v1_telemetry(telemetry_data: List[Dict[str, str]]):
|
||||||
|
# Throw exception if no telemetry data received in POST request body
|
||||||
|
if len(telemetry_data) == 0:
|
||||||
|
error_message = "Post body is empty. It should contain some telemetry data"
|
||||||
|
logger.error(error_message)
|
||||||
|
raise HTTPException(status_code=500, detail=error_message)
|
||||||
|
|
||||||
|
# Insert recieved telemetry data into SQLite db
|
||||||
|
logger.info(f"Insert row into telemetry table: {telemetry_data}")
|
||||||
|
with sqlite3.connect(sqlfile) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
# Create a table if it doesn't exist
|
||||||
|
cur.execute(
|
||||||
|
"""CREATE TABLE IF NOT EXISTS usage (id INTEGER PRIMARY KEY, time TIMESTAMP, type TEXT, server_id TEXT, os TEXT, api TEXT, client TEXT)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Log telemetry data
|
||||||
|
for item in telemetry_data:
|
||||||
|
cur.execute(
|
||||||
|
"INSERT INTO usage (time, type, server_id, os, api, client) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
(
|
||||||
|
item["timestamp"],
|
||||||
|
item["telemetry_type"],
|
||||||
|
item["server_id"],
|
||||||
|
item["os"],
|
||||||
|
item.get("api"),
|
||||||
|
item.get("client"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
# Commit the changes
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
return {"status": "ok", "message": "Logged usage telemetry"}
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Setup Argument Parser
|
||||||
|
parser = argparse.ArgumentParser(description="Start Khoj Telemetry Server")
|
||||||
|
parser.add_argument("--host", default="127.0.0.1", type=str, help="I.P of telemetry server")
|
||||||
|
parser.add_argument("--port", "-p", default=80, type=int, help="Port of telemetry server")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Start Application Server
|
||||||
|
uvicorn.run(app, host=args.host, port=args.port, log_level="debug")
|
||||||
Reference in New Issue
Block a user