diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..281c35b --- /dev/null +++ b/.env.example @@ -0,0 +1,7 @@ +HOST=0.0.0.0 +PORT=4421 +DATA_DIR=./data +MODEL_CONFIG=./models.json +MODEL_DIRECTORY= +UPDATE_INTERVAL=30 +LOG_LEVEL=WARNING diff --git a/.gitignore b/.gitignore index 90772fa..3927059 100644 --- a/.gitignore +++ b/.gitignore @@ -1,163 +1,29 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class +# Ignore everything +* -# C extensions -*.so +# Don't ignore directories, so we can recurse into them +!*/ -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST +# Don't ignore .gitignore +!.gitignore -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec +# Don't ignore LICENSE +!LICENSE -# Installer logs -pip-log.txt -pip-delete-this-directory.txt +# Don't ignore these files +!*.example +!setup.py +!.pre-commit-config.yaml +!run.bat -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ +# Don't ignore .github directory +!.github/** -# Translations -*.mo -*.pot +# Don't ignore Python files in src directory +!src/**/*.py -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal +# Don't ignore requirements.txt in root +!requirements.txt -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control -.pdm.toml -.pdm-python -.pdm-build/ - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ -.idea/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +# Don't ignore Markdown files in root +!*.md \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..eaf91e2 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/run.bat b/run.bat new file mode 100644 index 0000000..3f193e8 --- /dev/null +++ b/run.bat @@ -0,0 +1,6 @@ +@echo off +echo Starting monitor... +start python ./src/monitor.py +echo Starting web UI... +start python ./src/graph.py +pause diff --git a/src/get_data.py b/src/get_data.py deleted file mode 100644 index 85fee03..0000000 --- a/src/get_data.py +++ /dev/null @@ -1,138 +0,0 @@ -import os -import json -import argparse -import sqlite3 -from datetime import datetime, timedelta - - -def get_latest_rows(db_file, hours=1): - now = datetime.now() - one_hour_ago = now - timedelta(hours=hours) - one_hour_ago_timestamp = int(one_hour_ago.timestamp()) - - conn = sqlite3.connect(db_file) - cursor = conn.cursor() - - cursor.execute( - f"SELECT timestamp, data FROM json_data WHERE timestamp >= {one_hour_ago_timestamp} ORDER BY timestamp DESC" - ) - rows = cursor.fetchall() - - conn.close() - - if not rows: - print( - f"No rows found in the last {hours} hour(s). Showing info for last 5 rows:" - ) - conn = sqlite3.connect(db_file) - cursor = conn.cursor() - cursor.execute( - f"SELECT timestamp, data FROM json_data ORDER BY timestamp DESC LIMIT 5" - ) - rows = cursor.fetchall() - conn.close() - for timestamp, _ in rows: - print(f" {datetime.fromtimestamp(timestamp)}") - - return rows - - -def extract_stats(data_json): - try: - data = json.loads(data_json) - total_prompt_tokens = float(data["vllm:prompt_tokens_total"][0]["value"]) - total_generation_tokens = float( - data["vllm:generation_tokens_total"][0]["value"] - ) - total_requests = sum( - float(item["value"]) for item in data["vllm:request_success_total"] - ) - avg_prompt_throughput = float( - data["vllm:avg_prompt_throughput_toks_per_s"][0]["value"] - ) - avg_generation_throughput = float( - data["vllm:avg_generation_throughput_toks_per_s"][0]["value"] - ) - gpu_cache_usage_perc = float(data["vllm:gpu_cache_usage_perc"][0]["value"]) - num_requests_running = float(data["vllm:num_requests_running"][0]["value"]) - - except (KeyError, IndexError, json.JSONDecodeError) as e: - print(f"Error extracting stats from data: {str(e)}") - return None - - return { - "total_prompt_tokens": total_prompt_tokens, - "total_generation_tokens": total_generation_tokens, - "total_requests": total_requests, - "avg_prompt_throughput": avg_prompt_throughput, - "avg_generation_throughput": avg_generation_throughput, - "gpu_cache_usage_perc": gpu_cache_usage_perc, - "num_requests_running": num_requests_running, - } - - -def main(db_file, hours): - latest_rows = get_latest_rows(db_file, hours) - - if not latest_rows: - print(f"No rows found for the last {hours} hour(s).") - return - - print(f"Processing {len(latest_rows)} rows.") - - valid_stats = [ - extract_stats(data) - for _, data in latest_rows - if extract_stats(data) is not None - ] - - if not valid_stats: - print("No valid statistics could be extracted from the rows.") - return - - first_stats = valid_stats[-1] # Oldest row - last_stats = valid_stats[0] # Newest row - - tokens_processed = ( - last_stats["total_prompt_tokens"] - - first_stats["total_prompt_tokens"] - + last_stats["total_generation_tokens"] - - first_stats["total_generation_tokens"] - ) - requests_processed = last_stats["total_requests"] - first_stats["total_requests"] - - avg_prompt_throughput = sum( - stat["avg_prompt_throughput"] for stat in valid_stats - ) / len(valid_stats) - avg_generation_throughput = sum( - stat["avg_generation_throughput"] for stat in valid_stats - ) / len(valid_stats) - avg_num_requests_running = sum( - stat["num_requests_running"] for stat in valid_stats - ) / len(valid_stats) - avg_gpu_cache_usage_perc = sum( - stat["gpu_cache_usage_perc"] for stat in valid_stats - ) / len(valid_stats) - - print(f"\nStats for the last {hours} hour(s):") - print(f"Tokens processed: {tokens_processed:,.0f}") - print(f"Requests processed: {requests_processed:,.0f}") - print(f"Average prompt throughput: {avg_prompt_throughput:.2f} tokens/s") - print(f"Average generation throughput: {avg_generation_throughput:.2f} tokens/s") - print( - f"Average number of requests running: {avg_num_requests_running:.2f} requests" - ) - print(f"Average GPU cache usage percent: {avg_gpu_cache_usage_perc * 100:.2f}%") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Extract stats from a SQLite database for a specified time period" - ) - parser.add_argument("db_file", help="Path to the SQLite database file") - parser.add_argument( - "--hours", type=int, default=1, help="Number of hours to look back (default: 1)" - ) - args = parser.parse_args() - - main(args.db_file, args.hours) diff --git a/src/globals.py b/src/globals.py new file mode 100644 index 0000000..f0aeba4 --- /dev/null +++ b/src/globals.py @@ -0,0 +1,14 @@ +import os + + +# Load environment variables from .env file +def load_env(): + try: + with open(".env") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + os.environ[key.strip()] = value.strip() + except FileNotFoundError: + print("No .env file found. Using default environment variables.") diff --git a/src/graph.py b/src/graph.py index 679dce6..d0aa9bf 100644 --- a/src/graph.py +++ b/src/graph.py @@ -3,7 +3,6 @@ import json import logging import os import sqlite3 -import subprocess import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed @@ -14,9 +13,21 @@ import plotly.offline as pyo from flask import Flask, render_template, request, send_file from plotly.subplots import make_subplots -# Set up logging with a higher level +from globals import load_env + + +load_env() + +# Configuration +HOST = os.getenv("HOST", "0.0.0.0") +PORT = int(os.getenv("PORT", 4421)) +DATA_DIR = os.getenv("DATA_DIR", "./data") +UPDATE_INTERVAL = int(os.getenv("UPDATE_INTERVAL", 30)) +LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING") + +# Set up logging logging.basicConfig( - level=logging.WARNING, # Changed from DEBUG to WARNING + level=getattr(logging, LOG_LEVEL), format="%(asctime)s - %(levelname)s - %(message)s", ) @@ -75,12 +86,10 @@ async def load_data_from_db(filepath): async def load_data(): - data_dir = "./data" - tasks = [] - for filename in os.listdir(data_dir): + for filename in os.listdir(DATA_DIR): if filename.endswith(".sqlite"): - filepath = os.path.join(data_dir, filename) + filepath = os.path.join(DATA_DIR, filename) tasks.append(load_data_from_db(filepath)) await asyncio.gather(*tasks) @@ -94,7 +103,7 @@ async def load_data(): async def background_data_loader(): while True: await load_data() - await asyncio.sleep(30) # Check for updates every 30 seconds + await asyncio.sleep(UPDATE_INTERVAL) def start_background_loop(loop): @@ -235,16 +244,136 @@ def create_plots(selected_model): return fig +def get_latest_rows(db_file, hours=1): + now = datetime.now() + one_hour_ago = now - timedelta(hours=hours) + one_hour_ago_timestamp = int(one_hour_ago.timestamp()) + + conn = sqlite3.connect(db_file) + cursor = conn.cursor() + + cursor.execute( + f"SELECT timestamp, data FROM json_data WHERE timestamp >= {one_hour_ago_timestamp} ORDER BY timestamp DESC" + ) + rows = cursor.fetchall() + + conn.close() + + if not rows: + print( + f"No rows found in the last {hours} hour(s). Showing info for last 5 rows:" + ) + conn = sqlite3.connect(db_file) + cursor = conn.cursor() + cursor.execute( + f"SELECT timestamp, data FROM json_data ORDER BY timestamp DESC LIMIT 5" + ) + rows = cursor.fetchall() + conn.close() + for timestamp, _ in rows: + print(f" {datetime.fromtimestamp(timestamp)}") + + return rows + + +def extract_stats(data_json): + try: + data = json.loads(data_json) + total_prompt_tokens = float(data["vllm:prompt_tokens_total"][0]["value"]) + total_generation_tokens = float( + data["vllm:generation_tokens_total"][0]["value"] + ) + total_requests = sum( + float(item["value"]) for item in data["vllm:request_success_total"] + ) + avg_prompt_throughput = float( + data["vllm:avg_prompt_throughput_toks_per_s"][0]["value"] + ) + avg_generation_throughput = float( + data["vllm:avg_generation_throughput_toks_per_s"][0]["value"] + ) + gpu_cache_usage_perc = float(data["vllm:gpu_cache_usage_perc"][0]["value"]) + num_requests_running = float(data["vllm:num_requests_running"][0]["value"]) + + except (KeyError, IndexError, json.JSONDecodeError) as e: + print(f"Error extracting stats from data: {str(e)}") + return None + + return { + "total_prompt_tokens": total_prompt_tokens, + "total_generation_tokens": total_generation_tokens, + "total_requests": total_requests, + "avg_prompt_throughput": avg_prompt_throughput, + "avg_generation_throughput": avg_generation_throughput, + "gpu_cache_usage_perc": gpu_cache_usage_perc, + "num_requests_running": num_requests_running, + } + + +def get_data(db_file, hours): + latest_rows = get_latest_rows(db_file, hours) + + if not latest_rows: + print(f"No rows found for the last {hours} hour(s).") + return + + print(f"Processing {len(latest_rows)} rows.") + + valid_stats = [ + extract_stats(data) + for _, data in latest_rows + if extract_stats(data) is not None + ] + + if not valid_stats: + print("No valid statistics could be extracted from the rows.") + return + + first_stats = valid_stats[-1] # Oldest row + last_stats = valid_stats[0] # Newest row + + tokens_processed = ( + last_stats["total_prompt_tokens"] + - first_stats["total_prompt_tokens"] + + last_stats["total_generation_tokens"] + - first_stats["total_generation_tokens"] + ) + requests_processed = last_stats["total_requests"] - first_stats["total_requests"] + + avg_prompt_throughput = sum( + stat["avg_prompt_throughput"] for stat in valid_stats + ) / len(valid_stats) + avg_generation_throughput = sum( + stat["avg_generation_throughput"] for stat in valid_stats + ) / len(valid_stats) + avg_num_requests_running = sum( + stat["num_requests_running"] for stat in valid_stats + ) / len(valid_stats) + avg_gpu_cache_usage_perc = sum( + stat["gpu_cache_usage_perc"] for stat in valid_stats + ) / len(valid_stats) + + return ( + f"\nStats for the last {hours} hour(s):\n" + f"Tokens processed: {tokens_processed:,.0f}\n" + f"Requests processed: {requests_processed:,.0f}\n" + f"Average prompt throughput: {avg_prompt_throughput:.2f} tokens/s\n" + f"Average generation throughput: {avg_generation_throughput:.2f} tokens/s\n" + f"Average tokens per request: {tokens_processed/requests_processed:,.2f} tokens\n" + f"Average number of requests running: {avg_num_requests_running:.2f} requests\n" + f"Average GPU cache usage percent: {avg_gpu_cache_usage_perc * 100:.2f}%" + ) + + app = Flask(__name__) @app.route("/", methods=["GET", "POST"]) def index(): - data_dir = "./data" model_names = [ name[:-7] - for name in os.listdir(data_dir) - if name.endswith(".sqlite") and os.path.isfile(os.path.join(data_dir, name)) + for name in os.listdir(DATA_DIR) + if name.endswith(".sqlite") and os.path.isfile(os.path.join(DATA_DIR, name)) ] valid_model_names = set(model_names) @@ -269,14 +398,7 @@ def index(): "An error occurred while creating the plot. Please try again later." ) - command = [ - "python", - "get_data.py", - "--hours", - "24", - f".\\data\\{selected_model}.sqlite", - ] - result = subprocess.run(command, capture_output=True, text=True) + result = get_data(f"{DATA_DIR}/{selected_model}.sqlite", 24) else: logging.error(f"Invalid model selected: {selected_model}") result = None @@ -286,7 +408,7 @@ def index(): plot_div=plot_div, model_name=selected_model, model_names=model_names, - result=result.stdout if result else None, + result=result, error_message=error_message, ) @@ -317,4 +439,4 @@ if __name__ == "__main__": t.start() asgi_app = WsgiToAsgi(app) - uvicorn.run(asgi_app, host="0.0.0.0", port=4421) + uvicorn.run(asgi_app, host=HOST, port=PORT) diff --git a/src/monitor.py b/src/monitor.py index d366809..82df534 100644 --- a/src/monitor.py +++ b/src/monitor.py @@ -6,6 +6,8 @@ from datetime import datetime import logging import sqlite3 +from globals import load_env + # Set up basic configuration for logging logging.basicConfig( level=logging.DEBUG, @@ -16,8 +18,17 @@ logging.basicConfig( print("Starting monitor.") +load_env() + +# Configuration +USER_AGENT = os.getenv( + "USER_AGENT", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36", +) +MODEL_CONFIG = os.getenv("MODEL_CONFIG", "./models.json") + # Load model info -with open("models.json", "r") as file: +with open(MODEL_CONFIG, "r") as file: models = json.load(file) @@ -25,9 +36,7 @@ def call_metrics_endpoint(model_name, base_url): url = f"{base_url}/metrics" logging.debug(f"Calling metrics endpoint: {url}") try: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" - } + headers = {"User-Agent": USER_AGENT} response = requests.get(url, headers=headers) response.raise_for_status() logging.debug(f"Received successful response from {url}")