ML Notes
ML Notes
#!/bin/bash
# Get the LLM_MODEL_NAME from .env file
LLM_MODEL_NAME=$(grep -v '^#' .env | grep 'LLM_MODEL_NAME' | cut -d '=' -f2)
# Check if LLM_MODEL_NAME was found
if [ -z "$LLM_MODEL_NAME" ]; then
echo "Error: LLM_MODEL_NAME not found in .env file or it's commented out."
exit 1
fi
echo "Using LLM model: $LLM_MODEL_NAME"
# Pull the Docker model
echo "Pulling Docker model..."
docker model pull $LLM_MODEL_NAME
# Build and run Docker container
echo "Running Docker Compose..."
docker compose up --build
services:
python-genai:
build:
context: ./py-genai
dockerfile: Dockerfile
ports:
- "8081:8081"
environment:
- PORT=8081
env_file:
- .env
restart: unless-stopped
extra_hosts:
- "host.docker.internal:host-gateway"
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Make sure templates directory exists
RUN mkdir -p templates
# Create the template directory
COPY templates/index.html templates/
# Expose port 8080
EXPOSE 9090
# Run the application
CMD ["python", "app.py"]
import os
import json
import requests
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename
from PyPDF2 import PdfReader # For PDF extraction
from docx import Document # For DOCX extraction
from backend.query_engine import QueryEngine
app = Flask(__name__)
# Allowed file extensions
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx'}
@app.route('/')
def index():
"""Serves the chat web interface"""
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat_api():
"""Processes chat API requests"""
data = request.json
message = data.get('message', '')
qe = QueryEngine()
# Special command for getting model info
if message == "!modelinfo":
return jsonify({'model': qe.get_model_name()})
# Call the LLM API with the message
try:
response = qe._call_llm(message)
return jsonify({'response': response})
except Exception as e:
app.logger.error(f"Error calling LLM API: {e}")
return jsonify({'error': 'Failed to get response from LLM'}), 500
@app.route('/api/upload', methods=['POST'])
def upload_file():
qe = QueryEngine()
"""Handles file upload and text extraction"""
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file and qe.allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join('', filename)
file.save(file_path)
# Extract text from the uploaded file
file_extension = filename.rsplit('.', 1)[1].lower()
extracted_text = qe.extract_text_from_file(file_path, file_extension)
# Store the context from the extracted text for future use
# This could be stored in a session, database, or any storage mechanism.
app.config['EXTRACTED_CONTEXT'] = extracted_text
return jsonify({'message': 'File uploaded and text extracted successfully.'})
return jsonify({'error': 'Invalid file type'}), 400
if __name__ == '__main__':
port = int(os.getenv("PORT", 8080))
qe = QueryEngine()
print(f"Server starting on http://localhost:{port}")
print(f"Using LLM endpoint: {qe.get_llm_endpoint()}")
print(f"Using model: {qe.get_model_name()}")
app.run(host='0.0.0.0', port=port, debug=os.getenv("DEBUG", "false").lower() == "true")
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Hello-GenAI in Python</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
line-height: 1.6;
color: #333;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #0078D7;
text-align: center;
}
.container {
display: flex;
flex-direction: column;
height: 80vh;
}
#chat-box {
flex-grow: 1;
border: 1px solid #ddd;
border-radius: 8px;
padding: 15px;
margin-bottom: 15px;
overflow-y: auto;
background-color: #f9f9f9;
}
.input-container {
display: flex;
gap: 10px;
}
#message-input {
flex-grow: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 16px;
}
button {
padding: 10px 20px;
background-color: #0078D7;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
}
button:hover {
background-color: #0056a3;
}
.message {
margin-bottom: 15px;
padding: 10px;
border-radius: 4px;
}
.user-message {
background-color: #e3f2fd;
border-left: 4px solid #2196F3;
text-align: right;
}
.bot-message {
background-color: #f1f1f1;
border-left: 4px solid #9e9e9e;
}
.loading {
text-align: center;
margin: 10px 0;
font-style: italic;
color: #666;
}
.footer {
margin-top: 20px;
text-align: center;
font-size: 0.8rem;
color: #666;
}
</style>
</head>
<body>
<h1>Hello-GenAI in Python</h1>
<div class="container">
<div id="chat-box">
<div class="message bot-message">
Hello! I'm your GenAI assistant. How can I help you today?
</div>
</div>
<div class="input-container">
<input type="text" id="message-input" placeholder="Type your message here..." autofocus>
<button id="send-button">Send</button>
</div>
<!-- File upload section -->
<div class="input-container">
<input type="file" id="file-input">
<button id="upload-button">Upload File</button>
</div>
</div>
<div class="footer">
© 2025 hello-genai | Powered by <span id="model-name">Loading model info...</span>
</div>
<script>
document.addEventListener('DOMContentLoaded', function() {
const chatBox = document.getElementById('chat-box');
const messageInput = document.getElementById('message-input');
const sendButton = document.getElementById('send-button');
const uploadButton = document.getElementById('upload-button');
const fileInput = document.getElementById('file-input');
const modelNameSpan = document.getElementById('model-name');
// Get model info
fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: "!modelinfo" }),
})
.then(res => res.json())
.then(data => {
modelNameSpan.textContent = data.model || "AI Language Model";
})
.catch(() => {
modelNameSpan.textContent = "AI Language Model";
});
function addMessageToChat(role, content) {
const messageDiv = document.createElement('div');
messageDiv.className = role === 'user' ? 'message user-message' : 'message bot-message';
messageDiv.innerHTML = formatMarkdown(decodeHtml(content));
chatBox.appendChild(messageDiv);
chatBox.scrollTop = chatBox.scrollHeight;
}
function formatMarkdown(text) {
return text.replace(/\*\*(.*?)\*\*/g, '<strong>$1</strong>').replace(/\n/g, '<br>');
}
function decodeHtml(html) {
const txt = document.createElement('textarea');
txt.innerHTML = html;
return txt.value;
}
function sendMessage() {
const message = messageInput.value.trim();
if (!message) return;
addMessageToChat('user', message);
messageInput.value = '';
const loadingDiv = document.createElement('div');
loadingDiv.className = 'loading';
loadingDiv.textContent = 'Thinking...';
chatBox.appendChild(loadingDiv);
chatBox.scrollTop = chatBox.scrollHeight;
fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
})
.then(res => res.json())
.then(data => {
chatBox.removeChild(loadingDiv);
addMessageToChat('bot', data.response || 'Sorry, I encountered an error.');
})
.catch(error => {
chatBox.removeChild(loadingDiv);
addMessageToChat('bot', 'Sorry, I encountered an error. Please try again.');
console.error('Error:', error);
});
}
function uploadFile() {
const file = fileInput.files[0];
if (!file) return alert('Please select a file.');
const formData = new FormData();
formData.append('file', file);
const loadingDiv = document.createElement('div');
loadingDiv.className = 'loading';
loadingDiv.textContent = 'Uploading...';
chatBox.appendChild(loadingDiv);
chatBox.scrollTop = chatBox.scrollHeight;
fetch('/api/upload', {
method: 'POST',
body: formData,
})
.then(res => res.json())
.then(data => {
chatBox.removeChild(loadingDiv);
addMessageToChat('bot', data.message || 'File uploaded and processed.');
})
.catch(error => {
chatBox.removeChild(loadingDiv);
addMessageToChat('bot', 'Upload failed. Please try again.');
console.error('Upload error:', error);
});
}
sendButton.addEventListener('click', sendMessage);
messageInput.addEventListener('keypress', function(e) {
if (e.key === 'Enter') sendMessage();
});
uploadButton.addEventListener('click', uploadFile);
});
</script>
</body>
</html>
import os
import chromadb
from typing import List, Dict, Any
import hashlib
class ChromaVectorDB:
def __init__(self, db_path: str = "./data/chroma_db"):
"""Initialize ChromaDB for vector storage"""
os.makedirs(db_path, exist_ok=True)
self.client = chromadb.PersistentClient(path=db_path)
self.collection = self.client.get_or_create_collection("documents")
def add_document(self, file_path: str, text_chunks: List[str], metadata: Dict[str, Any] = None):
"""Add document chunks to the vector database"""
# Generate unique IDs for each chunk
ids = [hashlib.md5(f"{file_path}_{i}".encode()).hexdigest() for i in range(len(text_chunks))]
# Create metadata for each chunk
metadatas = []
for i in range(len(text_chunks)):
chunk_metadata = {"source": file_path, "chunk_id": i}
if metadata:
chunk_metadata.update(metadata)
metadatas.append(chunk_metadata)
# Add to collection
self.collection.add(
documents=text_chunks,
metadatas=metadatas,
ids=ids
)
return ids
def search(self, query: str, n_results: int = 5):
"""Search for relevant document chunks"""
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
return results
def delete_document(self, file_path: str):
"""Deleting all chunks from a specific document"""
# Getting all IDs related to this document
results = self.collection.get(
where={"source": file_path}
)
if results and results['ids']:
self.collection.delete(ids=results['ids'])
def reset_collection(self):
"""Reset the collection by clearing all documents"""
try:
# Getting all document IDs
try:
all_ids = self.collection.get()["ids"]
if all_ids:
# Deleting all documents
self.collection.delete(ids=all_ids)
print(f"Deleted {len(all_ids)} documents from collection")
else:
print("Collection is already empty")
return True
except Exception as e:
print(f"Error getting or deleting documents: {str(e)}")
# Trying to recreate the collection as a fallback
try:
self.client.delete_collection("documents")
self.collection = self.client.get_or_create_collection("documents")
print("Collection recreated successfully")
return True
except Exception as e2:
print(f"Error recreating collection: {str(e2)}")
return False
except Exception as e:
print(f"Error resetting collection: {str(e)}")
return False
from typing import List, Dict, Any
import json
import os
import requests
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename
from PyPDF2 import PdfReader # For PDF extraction
from docx import Document # For DOCX extraction
class QueryEngine:
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx'}
def __init__(self):
pass # If needed, you could pass config or dependency here
def get_llm_endpoint(self):
"""Returns the complete LLM API endpoint URL"""
base_url = os.getenv("LLM_BASE_URL", "")
return f"{base_url}/chat/completions"
def get_model_name(self):
"""Returns the model name to use for API requests"""
return os.getenv("LLM_MODEL_NAME", "")
def allowed_file(self, filename):
"""Checks if the file is allowed"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in self.ALLOWED_EXTENSIONS
def extract_text_from_pdf(self, file_path):
"""Extracts text from a PDF file"""
with open(file_path, 'rb') as file:
reader = PdfReader(file)
text = ''
for page in reader.pages:
text += page.extract_text()
return text
def extract_text_from_docx(self, file_path):
"""Extracts text from a DOCX file"""
doc = Document(file_path)
text = ''
for para in doc.paragraphs:
text += para.text
return text
def extract_text_from_file(self, file_path, file_extension):
"""Extracts text from different file types"""
if file_extension == 'pdf':
return self.extract_text_from_pdf(file_path)
elif file_extension == 'docx':
return self.extract_text_from_docx(file_path)
elif file_extension == 'txt':
with open(file_path, 'r') as file:
return file.read()
return ""
def _call_llm(self, prompt: str, system_prompt: str = "You are a helpful assistant."):
"""Wraps the existing call_llm_api with system + user message formatting"""
full_message = f"{prompt}"
chat_request = {
"model": self.get_model_name(),
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": full_message}
]
}
headers = {"Content-Type": "application/json"}
response = requests.post(
self.get_llm_endpoint(),
headers=headers,
json=chat_request,
timeout=30
)
if response.status_code != 200:
raise Exception(f"API returned status code {response.status_code}: {response.text}")
chat_response = response.json()
if chat_response.get('choices') and len(chat_response['choices']) > 0:
return chat_response['choices'][0]['message']['content'].strip()
raise Exception("No response choices returned from API")
def generate_response(self, query: str, context_docs: List[Dict[str, Any]] = None):
context = ""
if context_docs:
for doc in context_docs.get('documents', [[]])[0]:
context += f"{doc}\n\n"
prompt = f"""You are an AI assistant that helps with document analysis and answering questions.
Context information:
{context}
User question: {query}
Please provide a helpful, accurate, and concise answer based on the context information provided. If the context doesn't contain relevant information, say so instead of making up an answer."""
return self._call_llm(prompt)
def generate_sql_query(self, question: str, table_info: str):
prompt = f"""You are an SQL and data analysis expert. Generate an appropriate SQL query using SQLite syntax for the question provided, without any explanations or code comments.
Table Information:
{table_info}
User Question: {question}
Generate only the SQL query, nothing else."""
return self._call_llm(prompt, system_prompt="You are an SQL expert.")
def analyze_sql_results(self, question: str, sql_query: str, results: str):
prompt = f"""You are a data analyst. Analyze the following SQL query results and provide a clear, concise interpretation.
User Question: {question}
SQL Query: {sql_query}
Query Results:
{results}
Provide a clear analysis of these results that directly answers the user's question."""
return self._call_llm(prompt, system_prompt="You are a data analyst.")
import os
from datetime import datetime
from dotenv import load_dotenv
from typing import List, Dict, Any
# Import local modules
from .db import SimpleDB
from .vector_db import ChromaVectorDB
from .query_engine import QueryEngine
from .document_parser import SimpleDocumentParser
# Load environment variables
load_dotenv()
class DocumentAssistant:
def __init__(self):
"""Initialize the document assistant"""
self.db = SimpleDB()
self.vector_db = ChromaVectorDB(os.getenv("CHROMA_DB_PATH", "./data/chroma_db"))
self.document_parser = SimpleDocumentParser()
def process_query(self, query: str):
"""Process a query and return the response"""
# Log the query
self.db.log_query(query)
# Get relevant documents
relevant_docs = self.vector_db.search(query)
# Generate response
response = self.query_engine.generate_response(query, relevant_docs)
# Log the response
self.db.log_query(query, response)
return response
def upload_document(self, file_path: str):
"""Process and index a document"""
# Get file metadata
filename = os.path.basename(file_path)
file_type = os.path.splitext(filename)[1].lower()
# Parse document
text_chunks = self.document_parser.parse_document(file_path)
# Add to database
doc_id = self.db.add_document(filename, file_path, file_type)
# Add to vector database
self.vector_db.add_document(file_path, text_chunks, {"doc_id": doc_id})
return {
"status": "success",
"message": f"Document {filename} indexed successfully",
"chunks": len(text_chunks)
}
def get_all_documents(self):
"""Get all documents"""
return self.db.get_all_documents()
def reset_database(self):
"""Reset the ChromaDB database"""
try:
# Reset the vector database
if hasattr(self, 'vector_db') and self.vector_db is not None:
# Try to reset the collection
success = self.vector_db.reset_collection()
# Also clear the SimpleDB
if hasattr(self, 'db') and self.db is not None:
try:
self.db.clear_all()
print("SimpleDB cleared successfully")
except Exception as db_error:
print(f"Error clearing SimpleDB: {str(db_error)}")
return success
else:
print("Vector database not initialized")
return False
except Exception as e:
print(f"Error resetting database: {str(e)}")
return False
import os
import fitz # PyMuPDF
import pandas as pd
from typing import List
import docx
class SimpleDocumentParser:
def __init__(self):
"""Initialize simple document parser for various file types"""
pass
def parse_document(self, file_path: str) -> List[str]:
"""Parse a document and return text chunks"""
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
return self.parse_pdf(file_path)
elif file_ext == '.txt':
return self.parse_text(file_path)
elif file_ext == '.docx':
return self.parse_docx(file_path)
elif file_ext in ['.csv', '.xlsx', '.xls']:
return self.parse_tabular(file_path)
else:
return self.parse_text(file_path)
def parse_pdf(self, file_path: str) -> List[str]:
"""Parse PDF using PyMuPDF"""
chunks = []
try:
# Opening the PDF
doc = fitz.open(file_path)
# Extracting text from each page
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text = page.get_text()
# Simple chunking by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
if len(para.strip()) > 0:
chunks.append(para.strip())
doc.close()
except Exception as e:
print(f"Error parsing PDF {file_path}: {e}")
chunks = [f"Error parsing PDF: {str(e)}"]
return chunks
def parse_text(self, file_path: str) -> List[str]:
"""Parse plain text file"""
chunks = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
# Splitting by paragraphs
paragraphs = text.split('\n\n')
for para in paragraphs:
if len(para.strip()) > 0:
chunks.append(para.strip())
except Exception as e:
print(f"Error parsing text file {file_path}: {e}")
chunks = [f"Error parsing text file: {str(e)}"]
return chunks
def parse_docx(self, file_path: str) -> List[str]:
"""Parse DOCX using python-docx"""
chunks = []
try:
doc = docx.Document(file_path)
# Extracting text from paragraphs
for para in doc.paragraphs:
if len(para.text.strip()) > 0:
chunks.append(para.text.strip())
except Exception as e:
print(f"Error parsing DOCX {file_path}: {e}")
chunks = [f"Error parsing DOCX: {str(e)}"]
return chunks
def parse_tabular(self, file_path: str) -> List[str]:
"""Parsing CSV or Excel files using pandas"""
chunks = []
try:
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.csv':
df = pd.read_csv(file_path)
else: # Excel files
df = pd.read_excel(file_path)
# Adding table summary
summary = f"Table with {len(df)} rows and {len(df.columns)} columns. "
summary += f"Columns: {', '.join(df.columns.tolist())}"
chunks.append(summary)
# Adding column descriptions with data types
col_types = df.dtypes.to_dict()
col_desc = "Column details:\n"
for col, dtype in col_types.items():
# Adding sample values for each column (first 3 unique values)
sample_values = df[col].dropna().unique()[:3]
sample_str = ", ".join([str(v) for v in sample_values])
col_desc += f"- {col} (Type: {dtype}): Sample values: {sample_str}\n"
chunks.append(col_desc)
# Converting each row to a text chunk (limit to first 50 rows for indexing)
for index, row in df.head(50).iterrows():
row_text = " | ".join([f"{col}: {val}" for col, val in row.items()])
chunks.append(row_text)
except Exception as e:
print(f"Error parsing tabular file {file_path}: {e}")
chunks = [f"Error parsing tabular file: {str(e)}"]
return chunks
import os
import json
from datetime import datetime
from typing import List, Dict, Any
class SimpleDB:
def __init__(self, db_path: str = "./data/documents.json"):
"""Initialize a simple JSON-based document store"""
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
# Create the DB file if it doesn't exist
if not os.path.exists(db_path):
with open(db_path, 'w') as f:
json.dump({"documents": [], "queries": []}, f)
def _read_db(self):
"""Read the database file"""
with open(self.db_path, 'r') as f:
return json.load(f)
def _write_db(self, data):
"""Write to the database file"""
with open(self.db_path, 'w') as f:
json.dump(data, f, indent=2)
def add_document(self, filename: str, file_path: str, file_type: str):
"""Add a document to the database"""
db = self._read_db()
# Generating a simple ID
doc_id = len(db["documents"]) + 1
# Add document
db["documents"].append({
"id": doc_id,
"filename": filename,
"file_path": file_path,
"file_type": file_type,
"upload_date": str(datetime.now())
})
self._write_db(db)
return doc_id
def get_document(self, doc_id: int):
"""Get document by ID"""
db = self._read_db()
for doc in db["documents"]:
if doc["id"] == doc_id:
return doc
return None
def get_all_documents(self):
"""Get all documents"""
db = self._read_db()
return db["documents"]
def log_query(self, query_text: str, response: str = None):
"""Log a user query and its response"""
db = self._read_db()
# Generating a simple ID
query_id = len(db["queries"]) + 1
# Adding query
db["queries"].append({
"id": query_id,
"query_text": query_text,
"response": response,
"timestamp": str(datetime.now())
})
self._write_db(db)
return query_id
Comments
Post a Comment