From 49239e7e25dbf0b409c8dc964bf536e28ba7af95 Mon Sep 17 00:00:00 2001 From: quorploop <> Date: Wed, 24 Dec 2025 17:58:23 +0100 Subject: [PATCH] Implement Nodes to compute text embeddings --- scrape/main.py | 4 +- transform/Dockerfile | 7 +- transform/author_node.py | 16 +- transform/embeddings_node.py | 445 +++++++++++++++++++++++++++++++ transform/ensure_minilm_model.sh | 16 ++ transform/entrypoint.sh | 2 + transform/main.py | 6 +- transform/pipeline.py | 30 ++- transform/requirements.txt | 4 +- 9 files changed, 505 insertions(+), 25 deletions(-) create mode 100644 transform/embeddings_node.py create mode 100644 transform/ensure_minilm_model.sh diff --git a/scrape/main.py b/scrape/main.py index 15f0e72..10b66dd 100755 --- a/scrape/main.py +++ b/scrape/main.py @@ -227,7 +227,9 @@ def main(): num_threads=num_threads, ) - postdf.to_sql("posts", con, if_exists="append") + # Drop category and tags columns as they're stored in separate tables + postdf = postdf.drop(columns=['category', 'tags']) + postdf.to_sql("posts", con, if_exists="append", index=False) # Tags tag_dim, tag_map = build_dimension_and_mapping(postdf, 'tags', 'tag') diff --git a/transform/Dockerfile b/transform/Dockerfile index 682af4f..6f148bd 100644 --- a/transform/Dockerfile +++ b/transform/Dockerfile @@ -17,6 +17,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ENV GLINER_MODEL_ID=urchade/gliner_multi-v2.1 ENV GLINER_MODEL_PATH=/models/gliner_multi-v2.1 +ENV MINILM_MODEL_ID=sentence-transformers/all-MiniLM-L6-v2 +ENV MINILM_MODEL_PATH=/models/all-MiniLM-L6-v2 + WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt @@ -28,8 +31,10 @@ RUN apt install -y cron locales # Ensure GLiNER helper scripts are available COPY ensure_gliner_model.sh /usr/local/bin/ensure_gliner_model.sh +# Ensure MiniLM helper scripts are available +COPY ensure_minilm_model.sh /usr/local/bin/ensure_minilm_model.sh COPY entrypoint.sh /usr/local/bin/entrypoint.sh -RUN chmod +x /usr/local/bin/ensure_gliner_model.sh /usr/local/bin/entrypoint.sh +RUN chmod +x /usr/local/bin/ensure_gliner_model.sh /usr/local/bin/ensure_minilm_model.sh /usr/local/bin/entrypoint.sh COPY *.py . diff --git a/transform/author_node.py b/transform/author_node.py index 719a191..845e87a 100644 --- a/transform/author_node.py +++ b/transform/author_node.py @@ -9,6 +9,8 @@ from concurrent.futures import ThreadPoolExecutor from pipeline import TransformContext from transform_node import TransformNode +logger = logging.getLogger("knack-transform") + try: from gliner import GLiNER import torch @@ -17,9 +19,6 @@ except ImportError: GLINER_AVAILABLE = False logging.warning("GLiNER not available. Install with: pip install gliner") -logger = logging.getLogger("knack-transform") - - class NerAuthorNode(TransformNode): """Transform node that extracts and classifies authors using NER. @@ -257,10 +256,9 @@ class NerAuthorNode(TransformNode): self._store_authors(con, results) # Return context with results - results_df = pd.DataFrame(results) if results else pd.DataFrame() logger.info("AuthorNode transformation complete") - return TransformContext(results_df) + return TransformContext(posts_df) class FuzzyAuthorNode(TransformNode): @@ -309,7 +307,7 @@ class FuzzyAuthorNode(TransformNode): logger.info(f"Found {len(authors_df)} known authors for fuzzy matching") logger.info(f"Found {len(existing_post_ids)} posts with existing author mappings") - + # Filter to posts without author mappings and with non-null author field if 'author' not in df.columns or 'id' not in df.columns: logger.warning("Missing 'author' or 'id' column in input dataframe") @@ -333,12 +331,14 @@ class FuzzyAuthorNode(TransformNode): for _, author_row in authors_df.iterrows(): author_id = author_row['id'] author_name = str(author_row['name']) + # for author names < than 2 characters I want a fault tolerance of 0! + l_dist = self.max_l_dist if len(author_name) > 2 else 0 # Use fuzzysearch to find matches with allowed errors matches = fuzzysearch.find_near_matches( author_name, post_author, - max_l_dist=self.max_l_dist + max_l_dist=l_dist, ) if matches: @@ -417,4 +417,4 @@ class FuzzyAuthorNode(TransformNode): logger.info("FuzzyAuthorNode transformation complete") # Return new context with results - return TransformContext(result_df) + return TransformContext(input_df) diff --git a/transform/embeddings_node.py b/transform/embeddings_node.py new file mode 100644 index 0000000..9821eca --- /dev/null +++ b/transform/embeddings_node.py @@ -0,0 +1,445 @@ +"""Classes of Transformernodes that have to do with +text processing. + +- TextEmbeddingNode calculates text embeddings +- UmapNode calculates xy coordinates on those vector embeddings +- SimilarityNode calculates top n similar posts based on those embeddings + using the spectral distance. +""" +from pipeline import TransformContext +from transform_node import TransformNode +import sqlite3 +import pandas as pd +import logging +import os +import numpy as np + +logger = logging.getLogger("knack-transform") + +try: + from sentence_transformers import SentenceTransformer + import torch + MINILM_AVAILABLE = True +except ImportError: + MINILM_AVAILABLE = False + logging.warning("MiniLM not available. Install with pip!") + +try: + import umap + UMAP_AVAILABLE = True +except ImportError: + UMAP_AVAILABLE = False + logging.warning("UMAP not available. Install with pip install umap-learn!") + +class TextEmbeddingNode(TransformNode): + """Calculates vector embeddings based on a dataframe + of posts. + """ + def __init__(self, + model_name: str = "sentence-transformers/all-MiniLM-L6-v2", + model_path: str = None, + device: str = "cpu"): + """Initialize the ExampleNode. + + Args: + model_name: Name of the ML Model to calculate text embeddings + model_path: Optional local path to a downloaded embedding model + device: Device to use for computations ('cpu', 'cuda', 'mps') + """ + self.model_name = model_name + self.model_path = model_path or os.environ.get('MINILM_MODEL_PATH') + self.device = device + self.model = None + logger.info(f"Initialized TextEmbeddingNode with model_name={model_name}, model_path={model_path}, device={device}") + + def _setup_model(self): + """Init the Text Embedding Model.""" + if not MINILM_AVAILABLE: + raise ImportError("MiniLM is required for TextEmbeddingNode. Please install.") + + model_source = None + if self.model_path: + if os.path.exists(self.model_path): + model_source = self.model_path + logger.info(f"Loading MiniLM model from local path: {self.model_path}") + else: + logger.warning(f"MiniLM_MODEL_PATH '{self.model_path}' not found; Falling back to hub model {self.model_name}") + + if model_source is None: + model_source = self.model_name + logger.info(f"Loading MiniLM model from the hub: {self.model_name}") + + if self.device == "cuda" and torch.cuda.is_available(): + self.model = SentenceTransformer(model_source).to('cuda', dtype=torch.float16) + elif self.device == "mps" and torch.backends.mps.is_available(): + self.model = SentenceTransformer(model_source).to('mps', dtype=torch.float16) + else: + self.model = SentenceTransformer(model_source) + + def _process_data(self, df: pd.DataFrame) -> pd.DataFrame: + """Process the input dataframe. + + Calculates an embedding as a np.array. + Also pickles that array to prepare it to + storage in the database. + + Args: + df: Input dataframe from context + + Returns: + Processed dataframe + """ + logger.info(f"Processing {len(df)} rows") + + if self.model is None: + self._setup_model() + + # Example: Add a new column based on existing data + result_df = df.copy() + + df['embedding'] = df['text'].apply(lambda x: self.model.encode(x, convert_to_numpy=True)) + + logger.info("Processing complete") + return result_df + + def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): + """Store results back to the database using batch updates.""" + if df.empty: + logger.info("No results to store") + return + + logger.info(f"Storing {len(df)} results") + + # Convert numpy arrays to bytes for BLOB storage + # Use tobytes() to serialize numpy arrays efficiently + updates = [(row['embedding'].tobytes(), row['id']) for _, row in df.iterrows()] + con.executemany( + "UPDATE posts SET embedding = ? WHERE id = ?", + updates + ) + + con.commit() + logger.info("Results stored successfully") + + def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: + """Execute the transformation. + + This is the main entry point called by the pipeline. + + Args: + con: SQLite database connection + context: TransformContext containing input dataframe + + Returns: + TransformContext with processed dataframe + """ + logger.info("Starting TextEmbeddingNode transformation") + + # Get input dataframe from context + input_df = context.get_dataframe() + + # Validate input + if input_df.empty: + logger.warning("Empty dataframe provided to TextEmbeddingNdode") + return context + + if 'text' not in input_df.columns: + logger.warning("No 'text' column in context dataframe. Skipping TextEmbeddingNode") + return context + + # Process the data + result_df = self._process_data(input_df) + + # Store results (optional) + self._store_results(con, result_df) + + logger.info("TextEmbeddingNode transformation complete") + + # Return new context with results + return TransformContext(result_df) + + +class UmapNode(TransformNode): + """Calculates 2D coordinates from embeddings using UMAP dimensionality reduction. + + This node takes text embeddings and reduces them to 2D coordinates + for visualization purposes. + """ + + def __init__(self, + n_neighbors: int = 15, + min_dist: float = 0.1, + n_components: int = 2, + metric: str = "cosine", + random_state: int = 42): + """Initialize the UmapNode. + + Args: + n_neighbors: Number of neighbors to consider for UMAP (default: 15) + min_dist: Minimum distance between points in low-dimensional space (default: 0.1) + n_components: Number of dimensions to reduce to (default: 2) + metric: Distance metric to use (default: 'cosine') + random_state: Random seed for reproducibility (default: 42) + """ + self.n_neighbors = n_neighbors + self.min_dist = min_dist + self.n_components = n_components + self.metric = metric + self.random_state = random_state + self.reducer = None + logger.info(f"Initialized UmapNode with n_neighbors={n_neighbors}, min_dist={min_dist}, " + f"n_components={n_components}, metric={metric}, random_state={random_state}") + + def _process_data(self, df: pd.DataFrame) -> pd.DataFrame: + """Process the input dataframe. + + Retrieves embeddings from BLOB storage, converts them back to numpy arrays, + and applies UMAP dimensionality reduction to create 2D coordinates. + + Args: + df: Input dataframe from context + + Returns: + Processed dataframe with umap_x and umap_y columns + """ + logger.info(f"Processing {len(df)} rows") + + if not UMAP_AVAILABLE: + raise ImportError("UMAP is required for UmapNode. Install with: pip install umap-learn") + + result_df = df.copy() + + # Convert BLOB embeddings back to numpy arrays + if 'embedding' not in result_df.columns: + logger.error("No 'embedding' column found in dataframe") + raise ValueError("Input dataframe must contain 'embedding' column") + + logger.info("Converting embeddings from BLOB to numpy arrays") + result_df['embedding'] = result_df['embedding'].apply( + lambda x: np.frombuffer(x, dtype=np.float32) if x is not None else None + ) + + # Filter out rows with None embeddings + valid_rows = result_df['embedding'].notna() + if not valid_rows.any(): + logger.error("No valid embeddings found in dataframe") + raise ValueError("No valid embeddings to process") + + logger.info(f"Found {valid_rows.sum()} valid embeddings out of {len(result_df)} rows") + + # Stack embeddings into a matrix + embeddings_matrix = np.vstack(result_df.loc[valid_rows, 'embedding'].values) + logger.info(f"Embeddings matrix shape: {embeddings_matrix.shape}") + + # Apply UMAP + logger.info("Fitting UMAP reducer...") + self.reducer = umap.UMAP( + n_neighbors=self.n_neighbors, + min_dist=self.min_dist, + n_components=self.n_components, + metric=self.metric, + random_state=self.random_state + ) + + umap_coords = self.reducer.fit_transform(embeddings_matrix) + logger.info(f"UMAP transformation complete. Output shape: {umap_coords.shape}") + + # Add UMAP coordinates to dataframe + result_df.loc[valid_rows, 'umap_x'] = umap_coords[:, 0] + result_df.loc[valid_rows, 'umap_y'] = umap_coords[:, 1] + + # Fill NaN for invalid rows + result_df['umap_x'] = result_df['umap_x'].fillna(None) + result_df['umap_y'] = result_df['umap_y'].fillna(None) + + logger.info("Processing complete") + return result_df + + def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): + """Store UMAP coordinates back to the database. + + Args: + con: Database connection + df: Processed dataframe with umap_x and umap_y columns + """ + if df.empty: + logger.info("No results to store") + return + + logger.info(f"Storing {len(df)} results") + + # Batch update UMAP coordinates + updates = [ + (row['umap_x'], row['umap_y'], row['id']) + for _, row in df.iterrows() + if pd.notna(row.get('umap_x')) and pd.notna(row.get('umap_y')) + ] + + if updates: + con.executemany( + "UPDATE posts SET umap_x = ?, umap_y = ? WHERE id = ?", + updates + ) + con.commit() + logger.info(f"Stored {len(updates)} UMAP coordinate pairs successfully") + else: + logger.warning("No valid UMAP coordinates to store") + + def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: + """Execute the transformation. + + This is the main entry point called by the pipeline. + + Args: + con: SQLite database connection + context: TransformContext containing input dataframe + + Returns: + TransformContext with processed dataframe + """ + logger.info("Starting ExampleNode transformation") + + # Get input dataframe from context + input_df = context.get_dataframe() + + # Validate input + if input_df.empty: + logger.warning("Empty dataframe provided to ExampleNode") + return context + + # Process the data + result_df = self._process_data(input_df) + + # Store results (optional) + self._store_results(con, result_df) + + logger.info("ExampleNode transformation complete") + + # Return new context with results + return TransformContext(result_df) + + +class SimilarityNode(TransformNode): + """Example transform node template. + + This node demonstrates the basic structure for creating + new transformation nodes in the pipeline. + """ + + def __init__(self, + param1: str = "default_value", + param2: int = 42, + device: str = "cpu"): + """Initialize the ExampleNode. + + Args: + param1: Example string parameter + param2: Example integer parameter + device: Device to use for computations ('cpu', 'cuda', 'mps') + """ + self.param1 = param1 + self.param2 = param2 + self.device = device + logger.info(f"Initialized ExampleNode with param1={param1}, param2={param2}") + + def _create_tables(self, con: sqlite3.Connection): + """Create any necessary tables in the database. + + This is optional - only needed if your node creates new tables. + """ + logger.info("Creating example tables") + + con.execute(""" + CREATE TABLE IF NOT EXISTS example_results ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + post_id INTEGER, + result_value TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (post_id) REFERENCES posts(id) + ) + """) + + con.commit() + + def _process_data(self, df: pd.DataFrame) -> pd.DataFrame: + """Process the input dataframe. + + This is where your main transformation logic goes. + + Args: + df: Input dataframe from context + + Returns: + Processed dataframe + """ + logger.info(f"Processing {len(df)} rows") + + # Example: Add a new column based on existing data + result_df = df.copy() + result_df['processed'] = True + result_df['example_value'] = result_df['id'].apply(lambda x: f"{self.param1}_{x}") + + logger.info("Processing complete") + return result_df + + def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): + """Store results back to the database. + + This is optional - only needed if you want to persist results. + + Args: + con: Database connection + df: Processed dataframe to store + """ + if df.empty: + logger.info("No results to store") + return + + logger.info(f"Storing {len(df)} results") + + # Example: Store to database + # df[['post_id', 'result_value']].to_sql( + # 'example_results', + # con, + # if_exists='append', + # index=False + # ) + + con.commit() + logger.info("Results stored successfully") + + def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: + """Execute the transformation. + + This is the main entry point called by the pipeline. + + Args: + con: SQLite database connection + context: TransformContext containing input dataframe + + Returns: + TransformContext with processed dataframe + """ + logger.info("Starting ExampleNode transformation") + + # Get input dataframe from context + input_df = context.get_dataframe() + + # Validate input + if input_df.empty: + logger.warning("Empty dataframe provided to ExampleNode") + return context + + # Create any necessary tables + self._create_tables(con) + + # Process the data + result_df = self._process_data(input_df) + + # Store results (optional) + self._store_results(con, result_df) + + logger.info("ExampleNode transformation complete") + + # Return new context with results + return TransformContext(result_df) diff --git a/transform/ensure_minilm_model.sh b/transform/ensure_minilm_model.sh new file mode 100644 index 0000000..2d58f24 --- /dev/null +++ b/transform/ensure_minilm_model.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [ -d "$MINILM_MODEL_PATH" ] && find "$MINILM_MODEL_PATH" -type f | grep -q .; then + echo "MiniLM model already present at $MINILM_MODEL_PATH" + exit 0 +fi + +echo "Downloading MiniLM model to $MINILM_MODEL_PATH" +mkdir -p "$MINILM_MODEL_PATH" +curl -sL "https://huggingface.co/api/models/${MINILM_MODEL_ID}" | jq -r '.siblings[].rfilename' | while read -r file; do + target="${MINILM_MODEL_PATH}/${file}" + mkdir -p "$(dirname "$target")" + echo "Downloading ${file}" + curl -sL "https://huggingface.co/${MINILM_MODEL_ID}/resolve/main/${file}" -o "$target" +done diff --git a/transform/entrypoint.sh b/transform/entrypoint.sh index 8beab84..96f5932 100644 --- a/transform/entrypoint.sh +++ b/transform/entrypoint.sh @@ -2,7 +2,9 @@ set -euo pipefail # Run model download with output to stdout/stderr +/usr/local/bin/ensure_minilm_model.sh 2>&1 /usr/local/bin/ensure_gliner_model.sh 2>&1 # Start cron in foreground with logging exec cron -f -L 2 +# cd /app && /usr/local/bin/python main.py >> /proc/1/fd/1 2>&1 \ No newline at end of file diff --git a/transform/main.py b/transform/main.py index d07d905..9922eed 100644 --- a/transform/main.py +++ b/transform/main.py @@ -56,9 +56,9 @@ def main(): # Load posts data logger.info("Loading posts from database") - sql = "SELECT id, author FROM posts WHERE author IS NOT NULL AND (is_cleaned IS NULL OR is_cleaned = 0) LIMIT ?" - MAX_CLEANED_POSTS = os.environ.get("MAX_CLEANED_POSTS", 100) - df = pd.read_sql(sql, con, params=[MAX_CLEANED_POSTS]) + sql = "SELECT * FROM posts WHERE author IS NOT NULL AND (is_cleaned IS NULL OR is_cleaned = 0)" + # MAX_CLEANED_POSTS = os.environ.get("MAX_CLEANED_POSTS", 100) + df = pd.read_sql(sql, con) logger.info(f"Loaded {len(df)} uncleaned posts with authors") if df.empty: diff --git a/transform/pipeline.py b/transform/pipeline.py index 1a97f1f..e1f4e9c 100644 --- a/transform/pipeline.py +++ b/transform/pipeline.py @@ -12,6 +12,7 @@ logger = logging.getLogger("knack-transform") class TransformContext: """Context object containing the dataframe for transformation.""" + # Possibly add a dict for the context to give more Information def __init__(self, df: pd.DataFrame): self.df = df @@ -153,7 +154,6 @@ class ParallelPipeline: logger.info(f"Pipeline has {len(stages)} execution stage(s)") results = {} - contexts = {None: initial_context} # Track contexts from each node errors = [] ExecutorClass = ProcessPoolExecutor if self.use_processes else ThreadPoolExecutor @@ -213,6 +213,7 @@ def create_default_pipeline(device: str = "cpu", Configured ParallelPipeline """ from author_node import NerAuthorNode, FuzzyAuthorNode + from embeddings_node import TextEmbeddingNode, UmapNode pipeline = ParallelPipeline(max_workers=max_workers, use_processes=False) @@ -236,17 +237,24 @@ def create_default_pipeline(device: str = "cpu", name='FuzzyAuthorNode' )) + pipeline.add_node(NodeConfig( + node_class=TextEmbeddingNode, + node_kwargs={ + 'device': device, + 'model_path': os.environ.get('MINILM_MODEL_PATH') + }, + dependencies=[], + name='TextEmbeddingNode' + )) + + pipeline.add_node(NodeConfig( + node_class=UmapNode, + node_kwargs={}, + dependencies=['TextEmbeddingNode'], + name='UmapNode' + )) + # TODO: Create Node to compute Text Embeddings and UMAP. - # TODO: Create Node to pre-compute data based on visuals to reduce load time. - - # TODO: Add more nodes here as they are implemented - # Example: - # pipeline.add_node(NodeConfig( - # node_class=EmbeddingNode, - # node_kwargs={'device': device}, - # dependencies=[], # Runs after AuthorNode - # name='EmbeddingNode' - # )) # pipeline.add_node(NodeConfig( # node_class=UMAPNode, diff --git a/transform/requirements.txt b/transform/requirements.txt index e210d05..023d14f 100644 --- a/transform/requirements.txt +++ b/transform/requirements.txt @@ -2,4 +2,6 @@ pandas python-dotenv gliner torch -fuzzysearch \ No newline at end of file +fuzzysearch +sentence_transformers +umap-learn \ No newline at end of file