ML Notes

 ML Notes

#!/bin/bash


# Get the LLM_MODEL_NAME from .env file

LLM_MODEL_NAME=$(grep -v '^#' .env | grep 'LLM_MODEL_NAME' | cut -d '=' -f2)


# Check if LLM_MODEL_NAME was found

if [ -z "$LLM_MODEL_NAME" ]; then

    echo "Error: LLM_MODEL_NAME not found in .env file or it's commented out."

    exit 1

fi


echo "Using LLM model: $LLM_MODEL_NAME"


# Pull the Docker model

echo "Pulling Docker model..."

docker model pull $LLM_MODEL_NAME


# Build and run Docker container

echo "Running Docker Compose..."

docker compose up --build




services:


  python-genai:

    build:

      context: ./py-genai

      dockerfile: Dockerfile

    ports:

      - "8081:8081"

    environment:

      - PORT=8081

    env_file:

      - .env

    restart: unless-stopped

    extra_hosts:

      - "host.docker.internal:host-gateway"






FROM python:3.11-slim


WORKDIR /app


# Install dependencies

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt


# Copy application code

COPY . .


# Make sure templates directory exists

RUN mkdir -p templates


# Create the template directory 

COPY templates/index.html templates/


# Expose port 8080

EXPOSE 9090


# Run the application

CMD ["python", "app.py"]






import os

import json

import requests

from flask import Flask, render_template, request, jsonify

from werkzeug.utils import secure_filename

from PyPDF2 import PdfReader  # For PDF extraction

from docx import Document  # For DOCX extraction

from backend.query_engine import QueryEngine

app = Flask(__name__)


# Allowed file extensions

ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx'}


@app.route('/')

def index():

    """Serves the chat web interface"""

    return render_template('index.html')


@app.route('/api/chat', methods=['POST'])

def chat_api():

    """Processes chat API requests"""

    data = request.json

    message = data.get('message', '')

    qe = QueryEngine()

    # Special command for getting model info

    if message == "!modelinfo":

        return jsonify({'model': qe.get_model_name()})

    # Call the LLM API with the message

    try:

        response = qe._call_llm(message)

        return jsonify({'response': response})

    except Exception as e:

        app.logger.error(f"Error calling LLM API: {e}")

        return jsonify({'error': 'Failed to get response from LLM'}), 500


@app.route('/api/upload', methods=['POST'])

def upload_file():

    qe = QueryEngine()

    

    """Handles file upload and text extraction"""

    if 'file' not in request.files:

        return jsonify({'error': 'No file part'}), 400

    file = request.files['file']

    

    if file.filename == '':

        return jsonify({'error': 'No selected file'}), 400

    

    if file and qe.allowed_file(file.filename):

        filename = secure_filename(file.filename)

        file_path = os.path.join('', filename)

        file.save(file_path)


        # Extract text from the uploaded file

        file_extension = filename.rsplit('.', 1)[1].lower()

        extracted_text = qe.extract_text_from_file(file_path, file_extension)

        

        # Store the context from the extracted text for future use

        # This could be stored in a session, database, or any storage mechanism.

        app.config['EXTRACTED_CONTEXT'] = extracted_text


        return jsonify({'message': 'File uploaded and text extracted successfully.'})

    

    return jsonify({'error': 'Invalid file type'}), 400



if __name__ == '__main__':

    port = int(os.getenv("PORT", 8080))

    qe = QueryEngine()

    

    print(f"Server starting on http://localhost:{port}")

    print(f"Using LLM endpoint: {qe.get_llm_endpoint()}")

    print(f"Using model: {qe.get_model_name()}")

    

    app.run(host='0.0.0.0', port=port, debug=os.getenv("DEBUG", "false").lower() == "true")




<!DOCTYPE html>

<html lang="en">

<head>

    <meta charset="UTF-8">

    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <title>Hello-GenAI in Python</title>

    <style>

        body {

            font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;

            line-height: 1.6;

            color: #333;

            max-width: 800px;

            margin: 0 auto;

            padding: 20px;

        }

        h1 {

            color: #0078D7;

            text-align: center;

        }

        .container {

            display: flex;

            flex-direction: column;

            height: 80vh;

        }

        #chat-box {

            flex-grow: 1;

            border: 1px solid #ddd;

            border-radius: 8px;

            padding: 15px;

            margin-bottom: 15px;

            overflow-y: auto;

            background-color: #f9f9f9;

        }

        .input-container {

            display: flex;

            gap: 10px;

        }

        #message-input {

            flex-grow: 1;

            padding: 10px;

            border: 1px solid #ddd;

            border-radius: 4px;

            font-size: 16px;

        }

        button {

            padding: 10px 20px;

            background-color: #0078D7;

            color: white;

            border: none;

            border-radius: 4px;

            cursor: pointer;

            font-size: 16px;

        }

        button:hover {

            background-color: #0056a3;

        }

        .message {

            margin-bottom: 15px;

            padding: 10px;

            border-radius: 4px;

        }

        .user-message {

            background-color: #e3f2fd;

            border-left: 4px solid #2196F3;

            text-align: right;

        }

        .bot-message {

            background-color: #f1f1f1;

            border-left: 4px solid #9e9e9e;

        }

        .loading {

            text-align: center;

            margin: 10px 0;

            font-style: italic;

            color: #666;

        }

        .footer {

            margin-top: 20px;

            text-align: center;

            font-size: 0.8rem;

            color: #666;

        }

    </style>

</head>

<body>

    <h1>Hello-GenAI in Python</h1>

    <div class="container">

        <div id="chat-box">

            <div class="message bot-message">

                Hello! I'm your GenAI assistant. How can I help you today?

            </div>

        </div>


        <div class="input-container">

            <input type="text" id="message-input" placeholder="Type your message here..." autofocus>

            <button id="send-button">Send</button>

        </div>


        <!-- File upload section -->

        <div class="input-container">

            <input type="file" id="file-input">

            <button id="upload-button">Upload File</button>

        </div>

    </div>


    <div class="footer">

        © 2025 hello-genai | Powered by <span id="model-name">Loading model info...</span>

    </div>


    <script>

        document.addEventListener('DOMContentLoaded', function() {

            const chatBox = document.getElementById('chat-box');

            const messageInput = document.getElementById('message-input');

            const sendButton = document.getElementById('send-button');

            const uploadButton = document.getElementById('upload-button');

            const fileInput = document.getElementById('file-input');

            const modelNameSpan = document.getElementById('model-name');


            // Get model info

            fetch('/api/chat', {

                method: 'POST',

                headers: { 'Content-Type': 'application/json' },

                body: JSON.stringify({ message: "!modelinfo" }),

            })

            .then(res => res.json())

            .then(data => {

                modelNameSpan.textContent = data.model || "AI Language Model";

            })

            .catch(() => {

                modelNameSpan.textContent = "AI Language Model";

            });


            function addMessageToChat(role, content) {

                const messageDiv = document.createElement('div');

                messageDiv.className = role === 'user' ? 'message user-message' : 'message bot-message';

                messageDiv.innerHTML = formatMarkdown(decodeHtml(content));

                chatBox.appendChild(messageDiv);

                chatBox.scrollTop = chatBox.scrollHeight;

            }


            function formatMarkdown(text) {

                return text.replace(/\*\*(.*?)\*\*/g, '<strong>$1</strong>').replace(/\n/g, '<br>');

            }


            function decodeHtml(html) {

                const txt = document.createElement('textarea');

                txt.innerHTML = html;

                return txt.value;

            }


            function sendMessage() {

                const message = messageInput.value.trim();

                if (!message) return;


                addMessageToChat('user', message);

                messageInput.value = '';


                const loadingDiv = document.createElement('div');

                loadingDiv.className = 'loading';

                loadingDiv.textContent = 'Thinking...';

                chatBox.appendChild(loadingDiv);

                chatBox.scrollTop = chatBox.scrollHeight;


                fetch('/api/chat', {

                    method: 'POST',

                    headers: { 'Content-Type': 'application/json' },

                    body: JSON.stringify({ message }),

                })

                .then(res => res.json())

                .then(data => {

                    chatBox.removeChild(loadingDiv);

                    addMessageToChat('bot', data.response || 'Sorry, I encountered an error.');

                })

                .catch(error => {

                    chatBox.removeChild(loadingDiv);

                    addMessageToChat('bot', 'Sorry, I encountered an error. Please try again.');

                    console.error('Error:', error);

                });

            }


            function uploadFile() {

                const file = fileInput.files[0];

                if (!file) return alert('Please select a file.');


                const formData = new FormData();

                formData.append('file', file);


                const loadingDiv = document.createElement('div');

                loadingDiv.className = 'loading';

                loadingDiv.textContent = 'Uploading...';

                chatBox.appendChild(loadingDiv);

                chatBox.scrollTop = chatBox.scrollHeight;


                fetch('/api/upload', {

                    method: 'POST',

                    body: formData,

                })

                .then(res => res.json())

                .then(data => {

                    chatBox.removeChild(loadingDiv);

                    addMessageToChat('bot', data.message || 'File uploaded and processed.');

                })

                .catch(error => {

                    chatBox.removeChild(loadingDiv);

                    addMessageToChat('bot', 'Upload failed. Please try again.');

                    console.error('Upload error:', error);

                });

            }


            sendButton.addEventListener('click', sendMessage);

            messageInput.addEventListener('keypress', function(e) {

                if (e.key === 'Enter') sendMessage();

            });


            uploadButton.addEventListener('click', uploadFile);

        });

    </script>

</body>


</html>



import os

import chromadb

from typing import List, Dict, Any

import hashlib


class ChromaVectorDB:

    def __init__(self, db_path: str = "./data/chroma_db"):

        """Initialize ChromaDB for vector storage"""

        os.makedirs(db_path, exist_ok=True)

        self.client = chromadb.PersistentClient(path=db_path)

        self.collection = self.client.get_or_create_collection("documents")

    

    def add_document(self, file_path: str, text_chunks: List[str], metadata: Dict[str, Any] = None):

        """Add document chunks to the vector database"""

        # Generate unique IDs for each chunk

        ids = [hashlib.md5(f"{file_path}_{i}".encode()).hexdigest() for i in range(len(text_chunks))]

        

        # Create metadata for each chunk

        metadatas = []

        for i in range(len(text_chunks)):

            chunk_metadata = {"source": file_path, "chunk_id": i}

            if metadata:

                chunk_metadata.update(metadata)

            metadatas.append(chunk_metadata)

        

        # Add to collection

        self.collection.add(

            documents=text_chunks,

            metadatas=metadatas,

            ids=ids

        )

        

        return ids

    

    def search(self, query: str, n_results: int = 5):

        """Search for relevant document chunks"""

        results = self.collection.query(

            query_texts=[query],

            n_results=n_results

        )

        

        return results

    

    def delete_document(self, file_path: str):

        """Deleting all chunks from a specific document"""

        # Getting all IDs related to this document

        results = self.collection.get(

            where={"source": file_path}

        )

        

        if results and results['ids']:

            self.collection.delete(ids=results['ids'])

    

    def reset_collection(self):

        """Reset the collection by clearing all documents"""

        try:

            # Getting all document IDs

            try:

                all_ids = self.collection.get()["ids"]

                if all_ids:

                    # Deleting all documents

                    self.collection.delete(ids=all_ids)

                    print(f"Deleted {len(all_ids)} documents from collection")

                else:

                    print("Collection is already empty")

                return True

            except Exception as e:

                print(f"Error getting or deleting documents: {str(e)}")

                

                # Trying to recreate the collection as a fallback

                try:

                    self.client.delete_collection("documents")

                    self.collection = self.client.get_or_create_collection("documents")

                    print("Collection recreated successfully")

                    return True

                except Exception as e2:

                    print(f"Error recreating collection: {str(e2)}")

                    return False

        except Exception as e:

            print(f"Error resetting collection: {str(e)}")

            return False 



from typing import List, Dict, Any

import json

import os

import requests

from flask import Flask, render_template, request, jsonify

from werkzeug.utils import secure_filename

from PyPDF2 import PdfReader  # For PDF extraction

from docx import Document  # For DOCX extraction


class QueryEngine:

    ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx'}


    def __init__(self):

        pass  # If needed, you could pass config or dependency here



    

    def get_llm_endpoint(self):

        """Returns the complete LLM API endpoint URL"""

        base_url = os.getenv("LLM_BASE_URL", "")

        return f"{base_url}/chat/completions"


    def get_model_name(self):

        """Returns the model name to use for API requests"""

        return os.getenv("LLM_MODEL_NAME", "")


    def allowed_file(self, filename):

        """Checks if the file is allowed"""

        return '.' in filename and filename.rsplit('.', 1)[1].lower() in self.ALLOWED_EXTENSIONS


    def extract_text_from_pdf(self, file_path):

        """Extracts text from a PDF file"""

        with open(file_path, 'rb') as file:

            reader = PdfReader(file)

            text = ''

            for page in reader.pages:

                text += page.extract_text()

            return text


    def extract_text_from_docx(self, file_path):

        """Extracts text from a DOCX file"""

        doc = Document(file_path)

        text = ''

        for para in doc.paragraphs:

            text += para.text

        return text


    def extract_text_from_file(self, file_path, file_extension):

        """Extracts text from different file types"""

        if file_extension == 'pdf':

            return self.extract_text_from_pdf(file_path)

        elif file_extension == 'docx':

            return self.extract_text_from_docx(file_path)

        elif file_extension == 'txt':

            with open(file_path, 'r') as file:

                return file.read()

        return ""



    def _call_llm(self, prompt: str, system_prompt: str = "You are a helpful assistant."):

        """Wraps the existing call_llm_api with system + user message formatting"""

        full_message = f"{prompt}"

        chat_request = {

            "model": self.get_model_name(),

            "messages": [

                {"role": "system", "content": system_prompt},

                {"role": "user", "content": full_message}

            ]

        }


        headers = {"Content-Type": "application/json"}

        response = requests.post(

            self.get_llm_endpoint(),

            headers=headers,

            json=chat_request,

            timeout=30

        )


        if response.status_code != 200:

            raise Exception(f"API returned status code {response.status_code}: {response.text}")


        chat_response = response.json()

        if chat_response.get('choices') and len(chat_response['choices']) > 0:

            return chat_response['choices'][0]['message']['content'].strip()


        raise Exception("No response choices returned from API")


    def generate_response(self, query: str, context_docs: List[Dict[str, Any]] = None):

        context = ""

        if context_docs:

            for doc in context_docs.get('documents', [[]])[0]:

                context += f"{doc}\n\n"


        prompt = f"""You are an AI assistant that helps with document analysis and answering questions.


Context information:

{context}

User question: {query}

Please provide a helpful, accurate, and concise answer based on the context information provided. If the context doesn't contain relevant information, say so instead of making up an answer."""

        

        return self._call_llm(prompt)


    def generate_sql_query(self, question: str, table_info: str):

        prompt = f"""You are an SQL and data analysis expert. Generate an appropriate SQL query using SQLite syntax for the question provided, without any explanations or code comments.


Table Information:

{table_info}

User Question: {question}

Generate only the SQL query, nothing else."""

        

        return self._call_llm(prompt, system_prompt="You are an SQL expert.")


    def analyze_sql_results(self, question: str, sql_query: str, results: str):

        prompt = f"""You are a data analyst. Analyze the following SQL query results and provide a clear, concise interpretation.

User Question: {question}

SQL Query: {sql_query}

Query Results:

{results}

Provide a clear analysis of these results that directly answers the user's question."""

        

        return self._call_llm(prompt, system_prompt="You are a data analyst.")




import os

from datetime import datetime

from dotenv import load_dotenv

from typing import List, Dict, Any


# Import local modules

from .db import SimpleDB

from .vector_db import ChromaVectorDB

from .query_engine import QueryEngine

from .document_parser import SimpleDocumentParser


# Load environment variables

load_dotenv()


class DocumentAssistant:

    def __init__(self):

        """Initialize the document assistant"""

        self.db = SimpleDB()

        self.vector_db = ChromaVectorDB(os.getenv("CHROMA_DB_PATH", "./data/chroma_db"))

        self.document_parser = SimpleDocumentParser()

    

    def process_query(self, query: str):

        """Process a query and return the response"""

        # Log the query

        self.db.log_query(query)

        

        # Get relevant documents

        relevant_docs = self.vector_db.search(query)

        

        # Generate response

        response = self.query_engine.generate_response(query, relevant_docs)

        

        # Log the response

        self.db.log_query(query, response)

        

        return response

    

    def upload_document(self, file_path: str):

        """Process and index a document"""

        # Get file metadata

        filename = os.path.basename(file_path)

        file_type = os.path.splitext(filename)[1].lower()

        

        # Parse document

        text_chunks = self.document_parser.parse_document(file_path)

        

        # Add to database

        doc_id = self.db.add_document(filename, file_path, file_type)

        

        # Add to vector database

        self.vector_db.add_document(file_path, text_chunks, {"doc_id": doc_id})

        

        return {

            "status": "success",

            "message": f"Document {filename} indexed successfully",

            "chunks": len(text_chunks)

        }

    

    def get_all_documents(self):

        """Get all documents"""

        return self.db.get_all_documents()

    

    def reset_database(self):

        """Reset the ChromaDB database"""

        try:

            # Reset the vector database

            if hasattr(self, 'vector_db') and self.vector_db is not None:

                # Try to reset the collection

                success = self.vector_db.reset_collection()

                

                # Also clear the SimpleDB

                if hasattr(self, 'db') and self.db is not None:

                    try:

                        self.db.clear_all()

                        print("SimpleDB cleared successfully")

                    except Exception as db_error:

                        print(f"Error clearing SimpleDB: {str(db_error)}")

                

                return success

            else:

                print("Vector database not initialized")

                return False

        except Exception as e:

            print(f"Error resetting database: {str(e)}")

            return False 




import os

import fitz  # PyMuPDF

import pandas as pd

from typing import List

import docx


class SimpleDocumentParser:

    def __init__(self):

        """Initialize simple document parser for various file types"""

        pass

    

    def parse_document(self, file_path: str) -> List[str]:

        """Parse a document and return text chunks"""

        file_ext = os.path.splitext(file_path)[1].lower()

        

        if file_ext == '.pdf':

            return self.parse_pdf(file_path)

        elif file_ext == '.txt':

            return self.parse_text(file_path)

        elif file_ext == '.docx':

            return self.parse_docx(file_path)

        elif file_ext in ['.csv', '.xlsx', '.xls']:

            return self.parse_tabular(file_path)

        else:

            

            return self.parse_text(file_path)

    

    def parse_pdf(self, file_path: str) -> List[str]:

        """Parse PDF using PyMuPDF"""

        chunks = []

        try:

            # Opening the PDF

            doc = fitz.open(file_path)

            

            # Extracting text from each page

            for page_num in range(len(doc)):

                page = doc.load_page(page_num)

                text = page.get_text()

                

                # Simple chunking by paragraphs

                paragraphs = text.split('\n\n')

                for para in paragraphs:

                    if len(para.strip()) > 0:

                        chunks.append(para.strip())

            

            doc.close()

        except Exception as e:

            print(f"Error parsing PDF {file_path}: {e}")

            chunks = [f"Error parsing PDF: {str(e)}"]

        

        return chunks

    

    def parse_text(self, file_path: str) -> List[str]:

        """Parse plain text file"""

        chunks = []

        try:

            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:

                text = f.read()

            

            # Splitting by paragraphs

            paragraphs = text.split('\n\n')

            for para in paragraphs:

                if len(para.strip()) > 0:

                    chunks.append(para.strip())

        except Exception as e:

            print(f"Error parsing text file {file_path}: {e}")

            chunks = [f"Error parsing text file: {str(e)}"]

        

        return chunks

    

    def parse_docx(self, file_path: str) -> List[str]:

        """Parse DOCX using python-docx"""

        chunks = []

        try:

            doc = docx.Document(file_path)

            

            # Extracting text from paragraphs

            for para in doc.paragraphs:

                if len(para.text.strip()) > 0:

                    chunks.append(para.text.strip())

        except Exception as e:

            print(f"Error parsing DOCX {file_path}: {e}")

            chunks = [f"Error parsing DOCX: {str(e)}"]

        

        return chunks

    

    def parse_tabular(self, file_path: str) -> List[str]:

        """Parsing CSV or Excel files using pandas"""

        chunks = []

        try:

            file_ext = os.path.splitext(file_path)[1].lower()

            

            if file_ext == '.csv':

                df = pd.read_csv(file_path)

            else:  # Excel files

                df = pd.read_excel(file_path)

            

            # Adding table summary

            summary = f"Table with {len(df)} rows and {len(df.columns)} columns. "

            summary += f"Columns: {', '.join(df.columns.tolist())}"

            chunks.append(summary)

            

            # Adding column descriptions with data types

            col_types = df.dtypes.to_dict()

            col_desc = "Column details:\n"

            for col, dtype in col_types.items():

                # Adding sample values for each column (first 3 unique values)

                sample_values = df[col].dropna().unique()[:3]

                sample_str = ", ".join([str(v) for v in sample_values])

                col_desc += f"- {col} (Type: {dtype}): Sample values: {sample_str}\n"

            chunks.append(col_desc)

            

            # Converting each row to a text chunk (limit to first 50 rows for indexing)

            for index, row in df.head(50).iterrows():

                row_text = " | ".join([f"{col}: {val}" for col, val in row.items()])

                chunks.append(row_text)

            

        except Exception as e:

            print(f"Error parsing tabular file {file_path}: {e}")

            chunks = [f"Error parsing tabular file: {str(e)}"]

        

        return chunks 




import os

import json

from datetime import datetime

from typing import List, Dict, Any


class SimpleDB:

    def __init__(self, db_path: str = "./data/documents.json"):

        """Initialize a simple JSON-based document store"""

        self.db_path = db_path

        os.makedirs(os.path.dirname(db_path), exist_ok=True)

        

        # Create the DB file if it doesn't exist

        if not os.path.exists(db_path):

            with open(db_path, 'w') as f:

                json.dump({"documents": [], "queries": []}, f)

    

    def _read_db(self):

        """Read the database file"""

        with open(self.db_path, 'r') as f:

            return json.load(f)

    

    def _write_db(self, data):

        """Write to the database file"""

        with open(self.db_path, 'w') as f:

            json.dump(data, f, indent=2)

    

    def add_document(self, filename: str, file_path: str, file_type: str):

        """Add a document to the database"""

        db = self._read_db()

        

        # Generating a simple ID

        doc_id = len(db["documents"]) + 1

        

        # Add document

        db["documents"].append({

            "id": doc_id,

            "filename": filename,

            "file_path": file_path,

            "file_type": file_type,

            "upload_date": str(datetime.now())

        })

        

        self._write_db(db)

        return doc_id

    

    def get_document(self, doc_id: int):

        """Get document by ID"""

        db = self._read_db()

        for doc in db["documents"]:

            if doc["id"] == doc_id:

                return doc

        return None

    

    def get_all_documents(self):

        """Get all documents"""

        db = self._read_db()

        return db["documents"]

    

    def log_query(self, query_text: str, response: str = None):

        """Log a user query and its response"""

        db = self._read_db()

        

        # Generating a simple ID

        query_id = len(db["queries"]) + 1

        

        # Adding query

        db["queries"].append({

            "id": query_id,

            "query_text": query_text,

            "response": response,

            "timestamp": str(datetime.now())

        })

        

        self._write_db(db)

        return query_id 


Comments

Popular posts from this blog

DDOS (Distributed Denial of Service) Attack in Java

Java 23 Is Here, but Stream API’s filter Still Lags Behind: A Fresh Take

Maven + Struts2 + Spring + Hibernate + Struts2-Convention-Plugin