diff --git a/.gitignore b/.gitignore index 52e71df..2e7a5cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,3 @@ data/ venv/ -experiment/ -__pycache__/ .DS_STORE -.env \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9c94fd6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:slim + +RUN mkdir /app +RUN mkdir /data + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +RUN apt update -y +RUN apt install -y cron +COPY crontab . +RUN crontab crontab + +COPY main.py . \ No newline at end of file diff --git a/Makefile b/Makefile index 47a8063..a669090 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,2 @@ -volume: - docker volume create knack_data - -stop: - docker stop knack-scraper || true - docker rm knack-scraper || true - -up: - docker compose up -d - -down: - docker compose down \ No newline at end of file +build: + docker build -t knack-scraper . \ No newline at end of file diff --git a/README.md b/README.md index ab971fc..e69de29 100644 --- a/README.md +++ b/README.md @@ -1,18 +0,0 @@ -Knack-Scraper does exacly what its name suggests it does. -Knack-Scraper scrapes knack.news and writes to an sqlite -database for later usage. - -## Example for .env - -``` -NUM_THREADS=8 -NUM_SCRAPES=100 -DATABASE_LOCATION='./data/knack.sqlite' -``` - -## Run once - -``` -python main.py -``` - diff --git a/crontab b/crontab new file mode 100644 index 0000000..6b6ae11 --- /dev/null +++ b/crontab @@ -0,0 +1 @@ +5 4 * * * python /app/main.py diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 4ab3b8c..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,43 +0,0 @@ -services: - scraper: - build: - context: ./scrape - dockerfile: Dockerfile - image: knack-scraper - container_name: knack-scraper - env_file: - - scrape/.env - volumes: - - knack_data:/data - restart: unless-stopped - - transform: - build: - context: ./transform - dockerfile: Dockerfile - image: knack-transform - container_name: knack-transform - env_file: - - transform/.env - volumes: - - knack_data:/data - - models:/models - restart: unless-stopped - - sqlitebrowser: - image: lscr.io/linuxserver/sqlitebrowser:latest - container_name: sqlitebrowser - environment: - - PUID=1000 - - PGID=1000 - - TZ=Etc/UTC - volumes: - - knack_data:/data - ports: - - "3000:3000" # noVNC web UI - - "3001:3001" # VNC server - restart: unless-stopped - -volumes: - knack_data: - models: diff --git a/main.py b/main.py new file mode 100755 index 0000000..850ba3c --- /dev/null +++ b/main.py @@ -0,0 +1,167 @@ +#! python3 +import locale +import logging +import os +import sqlite3 +import sys +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime + +import pandas as pd +import requests +import tqdm +from bs4 import BeautifulSoup + +logger = logging.getLogger("knack-scraper") +# ch = logging.StreamHandler() +# formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +# ch.setFormatter(formatter) +# ch.setLevel(logging.INFO) +# logger.addHandler(ch) + + +def table_exists(tablename: str, con: sqlite3.Connection): + query = "SELECT 1 FROM sqlite_master WHERE type='table' AND name=? LIMIT 1" + return len(con.execute(query, [tablename]).fetchall()) > 0 + + +def download(id: int): + if id == 0: + return + base_url = "https://knack.news/" + url = f"{base_url}{id}" + res = requests.get(url) + + # make sure we don't dos knack + time.sleep(2) + + if not (200 <= res.status_code <= 300): + return + + logger.info("Found promising page with id %d!", id) + + content = res.content + soup = BeautifulSoup(content, "html.parser") + date_format = "%d. %B %Y" + + # TODO FIXME: this fails inside the docker container + locale.setlocale(locale.LC_TIME, "de_DE") + pC = soup.find("div", {"class": "postContent"}) + + if pC is None: + # not a normal post + logger.info( + "Page with id %d does not have a .pageContent-div. Skipping for now.", id + ) + return + + # every post has these fields + title = pC.find("h3", {"class": "postTitle"}).text + postText = pC.find("div", {"class": "postText"}) + + # these fields are possible but not required + # TODO: cleanup + try: + date_string = pC.find("span", {"class": "singledate"}).text + parsed_date = datetime.strptime(date_string, date_format) + except AttributeError: + parsed_date = None + + try: + author = pC.find("span", {"class": "author"}).text + except AttributeError: + author = None + + try: + category = pC.find("span", {"class": "categoryInfo"}).find_all() + category = [c.text for c in category] + category = ";".join(category) + except AttributeError: + category = None + + try: + tags = [x.text for x in pC.find("div", {"class": "tagsInfo"}).find_all("a")] + tags = ";".join(tags) + except AttributeError: + tags = None + + img = pC.find("img", {"class": "postImage"}) + if img is not None: + img = img["src"] + + res_dict = { + "id": id, + "title": title, + "author": author, + "date": parsed_date, + "category": category, + "url": url, + "img_link": img, + "tags": tags, + "text": postText.text, + "html": str(postText), + "scraped_at": datetime.now(), + } + + return res_dict + + +def run_downloads(min_id: int, max_id: int, num_threads: int = 8): + res = [] + + logger.info( + "Started parallel scrape of posts from id %d to id %d using %d threads.", + min_id, + max_id - 1, + num_threads, + ) + with ThreadPoolExecutor(max_workers=num_threads) as executor: + # Use a list comprehension to create a list of futures + futures = [executor.submit(download, i) for i in range(min_id, max_id)] + + for future in tqdm.tqdm( + futures, total=max_id - min_id + ): # tqdm to track progress + post = future.result() + if post is not None: + res.append(post) + + # sqlite can't handle lists so let's convert them to a single row csv + # TODO: make sure our database is properly normalized + df = pd.DataFrame(res) + + return df + + +def main(): + num_threads = int(os.environ.get("NUM_THREADS", 8)) + n_scrapes = int(os.environ.get("NUM_SCRAPES", 100)) + database_location = os.environ.get("DATABASE_LOCATION", "/data/knack.sqlite") + + con = sqlite3.connect(database_location) + with con: + post_table_exists = table_exists("posts", con) + + if post_table_exists: + logger.info("found posts retrieved earlier") + # retrieve max post id from db so + # we can skip retrieving known posts + max_id_in_db = con.execute("SELECT MAX(id) FROM posts").fetchone()[0] + logger.info("Got max id %d!", max_id_in_db) + else: + logger.info("no posts scraped so far - starting from 0") + # retrieve from 0 onwards + max_id_in_db = -1 + + con = sqlite3.connect(database_location) + df = run_downloads( + min_id=max_id_in_db + 1, + max_id=max_id_in_db + n_scrapes, + num_threads=num_threads, + ) + df.to_sql("posts", con, if_exists="append") + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7792d83 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +beautifulsoup4==4.12.2 +certifi==2023.7.22 +charset-normalizer==3.3.0 +idna==3.4 +numpy==1.26.1 +pandas==2.1.1 +python-dateutil==2.8.2 +pytz==2023.3.post1 +requests==2.31.0 +six==1.16.0 +soupsieve==2.5 +tqdm==4.66.1 +tzdata==2023.3 +urllib3==2.0.7 diff --git a/scrape/Dockerfile b/scrape/Dockerfile deleted file mode 100644 index a2fbe2e..0000000 --- a/scrape/Dockerfile +++ /dev/null @@ -1,29 +0,0 @@ -FROM python:slim - -RUN mkdir /app -RUN mkdir /data - -#COPY /data/knack.sqlite /data - -WORKDIR /app -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt - -COPY .env . - -RUN apt update -y -RUN apt install -y cron locales - -COPY main.py . - -ENV PYTHONUNBUFFERED=1 -ENV LANG=de_DE.UTF-8 -ENV LC_ALL=de_DE.UTF-8 - -# Create cron job that runs every 15 minutes with environment variables -RUN echo "5 4 * * * cd /app && /usr/local/bin/python main.py >> /proc/1/fd/1 2>&1" > /etc/cron.d/knack-scraper -RUN chmod 0644 /etc/cron.d/knack-scraper -RUN crontab /etc/cron.d/knack-scraper - -# Start cron in foreground -CMD ["cron", "-f"] \ No newline at end of file diff --git a/scrape/main.py b/scrape/main.py deleted file mode 100755 index 10b66dd..0000000 --- a/scrape/main.py +++ /dev/null @@ -1,262 +0,0 @@ -#! python3 -import logging -import os -import sqlite3 -import time -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime -import sys - -from dotenv import load_dotenv -import pandas as pd -import requests -from bs4 import BeautifulSoup - -load_dotenv() - -if (os.environ.get('LOGGING_LEVEL', 'INFO') == 'INFO'): - logging_level = logging.INFO -else: - logging_level = logging.DEBUG - -logging.basicConfig( - level=logging_level, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler("app.log"), - logging.StreamHandler(sys.stdout) - ] -) -logger = logging.getLogger("knack-scraper") - -def table_exists(tablename: str, con: sqlite3.Connection): - query = "SELECT 1 FROM sqlite_master WHERE type='table' AND name=? LIMIT 1" - return len(con.execute(query, [tablename]).fetchall()) > 0 - - -def split_semicolon_list(value: str): - if pd.isna(value): - return [] - return [item.strip() for item in str(value).split(';') if item.strip()] - - -def build_dimension_and_mapping(postdf: pd.DataFrame, field_name: str, dim_col: str): - """Extract unique dimension values and post-to-dimension mappings from a column.""" - if postdf.empty or field_name not in postdf.columns: - return None, None - - values = set() - mapping_rows = [] - - for post_id, raw in zip(postdf['id'], postdf[field_name]): - items = split_semicolon_list(raw) - for item in items: - values.add(item) - mapping_rows.append({'post_id': post_id, dim_col: item}) - - if not values: - return None, None - - dim_df = pd.DataFrame({ - 'id': range(len(values)), - dim_col: sorted(values), - }) - map_df = pd.DataFrame(mapping_rows) - return dim_df, map_df - - -def store_dimension_and_mapping( - con: sqlite3.Connection, - dim_df: pd.DataFrame | None, - map_df: pd.DataFrame | None, - table_name: str, - dim_col: str, - mapping_table: str, - mapping_id_col: str, -): - """Persist a dimension table and its mapping table, merging with existing values.""" - if dim_df is None or dim_df.empty: - return - - if table_exists(table_name, con): - existing = pd.read_sql(f"SELECT id, {dim_col} FROM {table_name}", con) - merged = pd.concat([existing, dim_df], ignore_index=True) - merged = merged.drop_duplicates(subset=[dim_col], keep='first').reset_index(drop=True) - merged['id'] = range(len(merged)) - else: - merged = dim_df.copy() - - # Replace table with merged content - merged.to_sql(table_name, con, if_exists="replace", index=False) - - if map_df is None or map_df.empty: - return - - value_to_id = dict(zip(merged[dim_col], merged['id'])) - map_df = map_df.copy() - map_df[mapping_id_col] = map_df[dim_col].map(value_to_id) - map_df = map_df[['post_id', mapping_id_col]].dropna() - map_df.to_sql(mapping_table, con, if_exists="append", index=False) - - -def download(id: int): - if id == 0: - return - base_url = "https://knack.news/" - url = f"{base_url}{id}" - res = requests.get(url) - - # make sure we don't dos knack - time.sleep(2) - - if not (200 <= res.status_code <= 300): - return - - logger.debug("Found promising page with id %d!", id) - - content = res.content - soup = BeautifulSoup(content, "html.parser") - - pC = soup.find("div", {"class": "postContent"}) - - if pC is None: - # not a normal post - logger.debug( - "Page with id %d does not have a .pageContent-div. Skipping for now.", id - ) - return - - # every post has these fields - title = pC.find("h3", {"class": "postTitle"}).text - postText = pC.find("div", {"class": "postText"}) - - # these fields are possible but not required - # TODO: cleanup - try: - date_parts = pC.find("span", {"class": "singledate"}).text.split(' ') - day = int(date_parts[0][:-1]) - months = {'Januar': 1, 'Februar': 2, 'März': 3, 'April': 4, 'Mai': 5, 'Juni': 6, 'Juli': 7, 'August': 8, 'September': 9, 'Oktober': 10, 'November': 11, 'Dezember': 12} - month = months[date_parts[1]] - year = int(date_parts[2]) - parsed_date = datetime(year, month, day) - except Exception: - parsed_date = None - - try: - author = pC.find("span", {"class": "author"}).text - except AttributeError: - author = None - - try: - category = pC.find("span", {"class": "categoryInfo"}).find_all() - category = [c.text for c in category if c.text != 'Alle Artikel'] - category = ";".join(category) - except AttributeError: - category = None - - try: - tags = [x.text for x in pC.find("div", {"class": "tagsInfo"}).find_all("a")] - tags = ";".join(tags) - except AttributeError: - tags = None - - img = pC.find("img", {"class": "postImage"}) - if img is not None: - img = img["src"] - - res_dict = { - "id": id, - "title": title, - "author": author, - "date": parsed_date, - "category": category, - "url": url, - "img_link": img, - "tags": tags, - "text": postText.text, - "html": str(postText), - "scraped_at": datetime.now(), - "is_cleaned": False - } - - return res_dict - - -def run_downloads(min_id: int, max_id: int, num_threads: int = 8): - res = [] - - logger.info( - "Started parallel scrape of posts from id %d to id %d using %d threads.", - min_id, - max_id - 1, - num_threads, - ) - with ThreadPoolExecutor(max_workers=num_threads) as executor: - # Use a list comprehension to create a list of futures - futures = [executor.submit(download, i) for i in range(min_id, max_id)] - - for future in futures: - post = future.result() - if post is not None: - res.append(post) - - postdf = pd.DataFrame(res) - return postdf - - -def main(): - num_threads = int(os.environ.get("NUM_THREADS", 8)) - n_scrapes = int(os.environ.get("NUM_SCRAPES", 100)) - database_location = os.environ.get("DATABASE_LOCATION", "../data/knack.sqlite") - - logger.debug(f"Started Knack Scraper: \nNUM_THREADS: {num_threads}\nN_SCRAPES: {n_scrapes}\nDATABASE_LOCATION: {database_location}") - - con = sqlite3.connect(database_location) - with con: - if table_exists("posts", con): - logger.info("found posts retrieved earlier") - max_id_in_db = con.execute("SELECT MAX(id) FROM posts").fetchone()[0] - logger.info("Got max id %d!", max_id_in_db) - else: - logger.info("no posts scraped so far - starting from 0") - max_id_in_db = -1 - - postdf = run_downloads( - min_id=max_id_in_db + 1, - max_id=max_id_in_db + n_scrapes, - num_threads=num_threads, - ) - - # Drop category and tags columns as they're stored in separate tables - postdf = postdf.drop(columns=['category', 'tags']) - postdf.to_sql("posts", con, if_exists="append", index=False) - - # Tags - tag_dim, tag_map = build_dimension_and_mapping(postdf, 'tags', 'tag') - store_dimension_and_mapping( - con, - tag_dim, - tag_map, - table_name="tags", - dim_col="tag", - mapping_table="posttags", - mapping_id_col="tag_id", - ) - - # Categories - category_dim, category_map = build_dimension_and_mapping(postdf, 'category', 'category') - store_dimension_and_mapping( - con, - category_dim, - category_map, - table_name="categories", - dim_col="category", - mapping_table="postcategories", - mapping_id_col="category_id", - ) - - logger.info(f"scraped new entries. number of new posts: {len(postdf.index)}") - - -if __name__ == "__main__": - main() diff --git a/scrape/requirements.txt b/scrape/requirements.txt deleted file mode 100644 index 32d5df2..0000000 --- a/scrape/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -pandas -requests -bs4 -dotenv \ No newline at end of file diff --git a/transform/.env.example b/transform/.env.example deleted file mode 100644 index 34cd0e0..0000000 --- a/transform/.env.example +++ /dev/null @@ -1,4 +0,0 @@ -LOGGING_LEVEL=INFO -DB_PATH=/data/knack.sqlite -MAX_CLEANED_POSTS=1000 -COMPUTE_DEVICE=mps \ No newline at end of file diff --git a/transform/Dockerfile b/transform/Dockerfile deleted file mode 100644 index dd847a2..0000000 --- a/transform/Dockerfile +++ /dev/null @@ -1,51 +0,0 @@ -FROM python:3.12-slim - -RUN mkdir -p /app /data /models - -# Install build dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - gcc \ - g++ \ - gfortran \ - libopenblas-dev \ - liblapack-dev \ - pkg-config \ - curl \ - jq \ - && rm -rf /var/lib/apt/lists/* - -ENV GLINER_MODEL_ID=urchade/gliner_multi-v2.1 -ENV GLINER_MODEL_PATH=/models/gliner_multi-v2.1 - -ENV GTE_MODEL_ID=thenlper/gte-large -ENV GTE_MODEL_PATH=/models/thenlper/gte-large - -WORKDIR /app -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt - -COPY .env . - -RUN apt update -y -RUN apt install -y cron locales - -# Ensure GLiNER helper scripts are available -COPY ensure_gliner_model.sh /usr/local/bin/ensure_gliner_model.sh -# Ensure GTE helper scripts are available -COPY ensure_gte_model.sh /usr/local/bin/ensure_gte_model.sh -COPY entrypoint.sh /usr/local/bin/entrypoint.sh -RUN chmod +x /usr/local/bin/ensure_gliner_model.sh /usr/local/bin/ensure_gte_model.sh /usr/local/bin/entrypoint.sh - -COPY *.py . - -# Create cron job that runs every weekend (Sunday at 3 AM) 0 3 * * 0 -# Testing every 30 Minutes */30 * * * * -RUN echo "*/15 * * * * cd /app && /usr/local/bin/python main.py >> /proc/1/fd/1 2>&1" > /etc/cron.d/knack-transform -RUN chmod 0644 /etc/cron.d/knack-transform -RUN crontab /etc/cron.d/knack-transform - -# Persist models between container runs -VOLUME /models - -CMD ["/usr/local/bin/entrypoint.sh"] -#CMD ["python", "main.py"] diff --git a/transform/README.md b/transform/README.md deleted file mode 100644 index 9e3665a..0000000 --- a/transform/README.md +++ /dev/null @@ -1,67 +0,0 @@ -# Knack Transform - -Data transformation pipeline for the Knack scraper project. - -## Overview - -This folder contains the transformation logic that processes data from the SQLite database. It runs on a scheduled basis (every weekend) via cron. - -The pipeline supports **parallel execution** of independent transform nodes, allowing you to leverage multi-core processors for faster data transformation. - -## Structure - -- `base.py` - Abstract base class for transform nodes -- `pipeline.py` - Parallel pipeline orchestration system -- `main.py` - Main entry point and pipeline execution -- `author_node.py` - NER-based author classification node -- `example_node.py` - Template for creating new nodes -- `Dockerfile` - Docker image configuration with cron setup -- `requirements.txt` - Python dependencies - -## Transform Nodes - -Transform nodes inherit from `TransformNode` and implement the `run` method: - -```python -from base import TransformNode, TransformContext -import sqlite3 - -class MyTransform(TransformNode): - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - df = context.get_dataframe() - - # Transform logic here - transformed_df = df.copy() - # ... your transformations ... - - # Optionally write back to database - transformed_df.to_sql("my_table", con, if_exists="replace", index=False) - - return TransformContext(transformed_df) -``` - -## Configuration - -Copy `.env.example` to `.env` and configure: - -- `LOGGING_LEVEL` - Log level (INFO or DEBUG) -- `DB_PATH` - Path to SQLite database - -## Running - -### With Docker - -```bash -docker build -t knack-transform . -docker run -v $(pwd)/data:/data knack-transform -``` - -### Locally - -```bash -python main.py -``` - -## Cron Schedule - -The Docker container runs the transform pipeline every Sunday at 3 AM. diff --git a/transform/app.log b/transform/app.log deleted file mode 100644 index 551ef70..0000000 --- a/transform/app.log +++ /dev/null @@ -1,303 +0,0 @@ -2026-01-18 15:11:40,253 - knack-transform - INFO - Initialized TextEmbeddingNode with model_name=thenlper/gte-small, model_path=None, device=mps -2026-01-18 15:11:40,254 - knack-transform - INFO - index id title author ... embedding umap_x umap_y row -0 0 41 Über uns None ... 0 0.0 0.0 0.0 -1 1 52 Kontakt None ... 0 0.0 0.0 0.0 -2 2 99 Safety First None ... 0 0.0 0.0 0.0 -3 3 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... chakalaka_161 ... 0 0.0 0.0 0.0 -4 4 115 Feuriger Widerstand bei der Räumung der Tiefe ... anonym ... 0 0.0 0.0 0.0 -.. ... ... ... ... ... ... ... ... ... -95 10 643 Bericht vom 6. Prozesstag im Antifa-Ost Verfah... Soli Antifa Ost ... 0 0.0 0.0 0.0 -96 11 650 #le2310 // Aufruf Ost // Kein Freund – Kein He... anonym ... 0 0.0 0.0 0.0 -97 12 652 Aufruf: Am 23. Oktober von Hamburg nach Leipzi... anonym ... 0 0.0 0.0 0.0 -98 13 654 Nach der Demo ging’s bergab kreuzer online ... 0 0.0 0.0 0.0 -99 14 659 Polizistin unterhält romantische Brieffreundsc... Kira Ayyadi ... 0 0.0 0.0 0.0 - -[100 rows x 17 columns] -2026-01-18 15:11:40,271 - knack-transform - INFO - Starting TextEmbeddingNode transformation -2026-01-18 15:11:40,271 - knack-transform - INFO - Processing 100 rows -2026-01-18 15:11:40,271 - knack-transform - INFO - Loading GTE model from the hub: thenlper/gte-small -2026-01-18 15:11:40,392 - sentence_transformers.SentenceTransformer - INFO - Use pytorch device_name: mps -2026-01-18 15:11:40,392 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: thenlper/gte-small -2026-01-18 15:11:54,702 - knack-transform - INFO - Processing complete -2026-01-18 15:11:54,703 - knack-transform - INFO - Storing 100 results -2026-01-18 15:11:55,335 - knack-transform - INFO - Results stored successfully -2026-01-18 15:11:55,335 - knack-transform - INFO - TextEmbeddingNode transformation complete -2026-01-18 15:11:55,335 - knack-transform - INFO - index id title ... umap_x umap_y row -0 0 41 Über uns ... 0.0 0.0 0.0 -1 1 52 Kontakt ... 0.0 0.0 0.0 -2 2 99 Safety First ... 0.0 0.0 0.0 -3 3 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 0.0 0.0 0.0 -4 4 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 0.0 0.0 0.0 -.. ... ... ... ... ... ... ... -95 10 643 Bericht vom 6. Prozesstag im Antifa-Ost Verfah... ... 0.0 0.0 0.0 -96 11 650 #le2310 // Aufruf Ost // Kein Freund – Kein He... ... 0.0 0.0 0.0 -97 12 652 Aufruf: Am 23. Oktober von Hamburg nach Leipzi... ... 0.0 0.0 0.0 -98 13 654 Nach der Demo ging’s bergab ... 0.0 0.0 0.0 -99 14 659 Polizistin unterhält romantische Brieffreundsc... ... 0.0 0.0 0.0 - -[100 rows x 17 columns] -2026-01-18 15:11:55,348 - knack-transform - INFO - Initialized UmapNode with n_neighbors=10, min_dist=0.1, n_components=3, metric=cosine, random_state=42, model_path=None -2026-01-18 15:11:55,348 - knack-transform - INFO - Starting ExampleNode transformation -2026-01-18 15:11:55,349 - knack-transform - INFO - Processing 100 rows -2026-01-18 15:11:55,349 - knack-transform - INFO - Converting embeddings from BLOB to numpy arrays -2026-01-18 15:11:55,349 - knack-transform - INFO - Found 100 valid embeddings out of 100 rows -2026-01-18 15:11:55,349 - knack-transform - INFO - Embeddings matrix shape: (100, 192) -2026-01-18 15:15:27,968 - knack-transform - INFO - Initialized TextEmbeddingNode with model_name=thenlper/gte-small, model_path=None, device=mps -2026-01-18 15:15:27,968 - knack-transform - INFO - index id title author ... embedding umap_x umap_y row -0 15 672 Lina E. als Widerständlerin? CDU fordert Eingr... LVZ ... 0 0.0 0.0 0.0 -1 16 674 Unschuldig verfolgt (4): Lina E., Henry A. und... Michael Freitag ... 0 0.0 0.0 0.0 -2 17 680 Kein Verdacht Konrad Litschko & Andreas Speit ... 0 0.0 0.0 0.0 -3 18 701 Jede Räumung hat ihren Preis – Aufruf von Leip... LeipzigBesetzen ... 0 0.0 0.0 0.0 -4 19 703 From Berlin to Leipzig – TOGETHER IN OUR CITIE... interkiezionale ... 0 0.0 0.0 0.0 -.. ... ... ... ... ... ... ... ... ... -95 32 1131 Nehmt ihr uns die Häuser ab, haun wir euch Gre... G19 und BikeKitchen Freiburg ... 0 0.0 0.0 0.0 -96 33 1136 Interview – Linksextreme aus Leipzig rechtfert... MDR ... 0 0.0 0.0 0.0 -97 34 1147 Polizei-Großaufgebot soll Sachsens Landtag sch... sächsische Zeitung - Annette Binninger ... 0 0.0 0.0 0.0 -98 35 1149 Fackel-Protest: Sachsens Innenminister unter D... sächsische Zeitung - Annette Binninger ... 0 0.0 0.0 0.0 -99 36 1154 23 Thesen über die Revolte – Wie können wir au... anonyme*r Mensch aus Leipzig ... 0 0.0 0.0 0.0 - -[100 rows x 17 columns] -2026-01-18 15:15:27,981 - knack-transform - INFO - Starting TextEmbeddingNode transformation -2026-01-18 15:15:27,981 - knack-transform - INFO - Processing 100 rows -2026-01-18 15:15:27,981 - knack-transform - INFO - Loading GTE model from the hub: thenlper/gte-small -2026-01-18 15:15:28,070 - sentence_transformers.SentenceTransformer - INFO - Use pytorch device_name: mps -2026-01-18 15:15:28,070 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: thenlper/gte-small -2026-01-18 15:15:34,292 - knack-transform - INFO - Processing complete -2026-01-18 15:15:34,293 - knack-transform - INFO - Storing 100 results -2026-01-18 15:15:34,885 - knack-transform - INFO - Results stored successfully -2026-01-18 15:15:34,885 - knack-transform - INFO - TextEmbeddingNode transformation complete -2026-01-18 15:15:34,885 - knack-transform - INFO - index id title ... umap_x umap_y row -0 15 672 Lina E. als Widerständlerin? CDU fordert Eingr... ... 0.0 0.0 0.0 -1 16 674 Unschuldig verfolgt (4): Lina E., Henry A. und... ... 0.0 0.0 0.0 -2 17 680 Kein Verdacht ... 0.0 0.0 0.0 -3 18 701 Jede Räumung hat ihren Preis – Aufruf von Leip... ... 0.0 0.0 0.0 -4 19 703 From Berlin to Leipzig – TOGETHER IN OUR CITIE... ... 0.0 0.0 0.0 -.. ... ... ... ... ... ... ... -95 32 1131 Nehmt ihr uns die Häuser ab, haun wir euch Gre... ... 0.0 0.0 0.0 -96 33 1136 Interview – Linksextreme aus Leipzig rechtfert... ... 0.0 0.0 0.0 -97 34 1147 Polizei-Großaufgebot soll Sachsens Landtag sch... ... 0.0 0.0 0.0 -98 35 1149 Fackel-Protest: Sachsens Innenminister unter D... ... 0.0 0.0 0.0 -99 36 1154 23 Thesen über die Revolte – Wie können wir au... ... 0.0 0.0 0.0 - -[100 rows x 17 columns] -2026-01-18 15:15:34,905 - knack-transform - INFO - Initialized UmapNode with n_neighbors=10, min_dist=0.1, n_components=3, metric=cosine, random_state=42, model_path=None -2026-01-18 15:15:34,905 - knack-transform - INFO - Starting ExampleNode transformation -2026-01-18 15:15:34,905 - knack-transform - INFO - Processing 100 rows -2026-01-18 15:15:34,905 - knack-transform - INFO - Converting embeddings from BLOB to numpy arrays -2026-01-18 15:15:34,906 - knack-transform - INFO - Found 100 valid embeddings out of 100 rows -2026-01-18 15:15:34,906 - knack-transform - INFO - Embeddings matrix shape: (100, 192) -2026-01-18 15:15:34,906 - knack-transform - INFO - Fitting new UMAP reducer... -2026-01-18 15:15:39,113 - knack-transform - INFO - UMAP transformation complete. Output shape: (100, 3) -2026-01-18 15:15:39,113 - knack-transform - ERROR - Failed to save UMAP model to None: 'NoneType' object has no attribute 'split' -2026-01-18 15:15:39,115 - knack-transform - INFO - Processing complete -2026-01-18 15:15:39,115 - knack-transform - INFO - Storing 100 results -2026-01-18 15:26:34,425 - knack-transform - INFO - Initialized TextEmbeddingNode with model_name=thenlper/gte-small, model_path=None, device=mps -2026-01-18 15:26:34,426 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 201 1160 Unkontrollierte Corona-Demos – Der Sheriff, de... ... 0.0 0.0 0.0 -1 202 1164 AfD in Sachsen – Die gefährliche Methode der AfD ... 0.0 0.0 0.0 -2 203 1190 Wer steckt hinter den Corona-Protesten in Baut... ... 0.0 0.0 0.0 -3 204 1192 Geheimnisverrat durch LKA-Beamten nicht bestätigt ... 0.0 0.0 0.0 -4 205 1196 Hat die Polizei die Lage in Sachsen noch im Gr... ... 0.0 0.0 0.0 -.. ... ... ... ... ... ... ... -95 296 1735 Polizei durchsucht seit dem Morgen in Leipzig ... ... 0.0 0.0 0.0 -96 297 1740 Feuer und Flamme der Repression! Solidarität m... ... 0.0 0.0 0.0 -97 298 1745 Wieder brennendes Auto in Leipzig: SUV in Schl... ... 0.0 0.0 0.0 -98 299 1751 Ausschreitungen bei Corona-Protest im Leipzige... ... 0.0 0.0 0.0 -99 300 1761 Gericht bestätigt Verbot kurdischer Verlage ... 0.0 0.0 0.0 - -[100 rows x 17 columns] -2026-01-18 15:26:34,439 - knack-transform - INFO - Starting TextEmbeddingNode transformation -2026-01-18 15:26:34,439 - knack-transform - INFO - Processing 100 rows -2026-01-18 15:26:34,439 - knack-transform - INFO - Loading GTE model from the hub: thenlper/gte-small -2026-01-18 15:26:34,497 - sentence_transformers.SentenceTransformer - INFO - Use pytorch device_name: mps -2026-01-18 15:26:34,497 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: thenlper/gte-small -2026-01-18 15:26:40,814 - knack-transform - INFO - Processing complete -2026-01-18 15:26:40,814 - knack-transform - INFO - Storing 100 results -2026-01-18 15:26:41,115 - knack-transform - INFO - Results stored successfully -2026-01-18 15:26:41,115 - knack-transform - INFO - TextEmbeddingNode transformation complete -2026-01-18 15:26:41,115 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 201 1160 Unkontrollierte Corona-Demos – Der Sheriff, de... ... 0.0 0.0 0.0 -1 202 1164 AfD in Sachsen – Die gefährliche Methode der AfD ... 0.0 0.0 0.0 -2 203 1190 Wer steckt hinter den Corona-Protesten in Baut... ... 0.0 0.0 0.0 -3 204 1192 Geheimnisverrat durch LKA-Beamten nicht bestätigt ... 0.0 0.0 0.0 -4 205 1196 Hat die Polizei die Lage in Sachsen noch im Gr... ... 0.0 0.0 0.0 -.. ... ... ... ... ... ... ... -95 296 1735 Polizei durchsucht seit dem Morgen in Leipzig ... ... 0.0 0.0 0.0 -96 297 1740 Feuer und Flamme der Repression! Solidarität m... ... 0.0 0.0 0.0 -97 298 1745 Wieder brennendes Auto in Leipzig: SUV in Schl... ... 0.0 0.0 0.0 -98 299 1751 Ausschreitungen bei Corona-Protest im Leipzige... ... 0.0 0.0 0.0 -99 300 1761 Gericht bestätigt Verbot kurdischer Verlage ... 0.0 0.0 0.0 - -[100 rows x 17 columns] -2026-01-18 15:26:41,141 - knack-transform - INFO - Initialized UmapNode with n_neighbors=10, min_dist=0.1, n_components=3, metric=cosine, random_state=42, model_path=None -2026-01-18 15:26:41,141 - knack-transform - INFO - Starting ExampleNode transformation -2026-01-18 15:26:41,141 - knack-transform - INFO - Processing 100 rows -2026-01-18 15:26:41,141 - knack-transform - INFO - Converting embeddings from BLOB to numpy arrays -2026-01-18 15:26:41,142 - knack-transform - INFO - Found 100 valid embeddings out of 100 rows -2026-01-18 15:26:41,142 - knack-transform - INFO - Embeddings matrix shape: (100, 192) -2026-01-18 15:26:41,142 - knack-transform - INFO - Fitting new UMAP reducer... -2026-01-18 15:26:44,105 - knack-transform - INFO - UMAP transformation complete. Output shape: (100, 3) -2026-01-18 15:26:44,105 - knack-transform - ERROR - Failed to save UMAP model to None: 'NoneType' object has no attribute 'split' -2026-01-18 15:26:44,106 - knack-transform - INFO - Processing complete -2026-01-18 15:26:44,106 - knack-transform - INFO - Storing 100 results -2026-01-18 15:26:44,282 - knack-transform - INFO - Stored 100 UMAP coordinate pairs successfully -2026-01-18 15:26:44,282 - knack-transform - INFO - ExampleNode transformation complete -2026-01-18 15:26:44,282 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 201 1160 Unkontrollierte Corona-Demos – Der Sheriff, de... ... 5.537961 3.468988 3.757369 -1 202 1164 AfD in Sachsen – Die gefährliche Methode der AfD ... 4.980662 1.629360 3.269084 -2 203 1190 Wer steckt hinter den Corona-Protesten in Baut... ... 1.055900 2.460792 2.076612 -3 204 1192 Geheimnisverrat durch LKA-Beamten nicht bestätigt ... 4.128685 5.247468 4.904186 -4 205 1196 Hat die Polizei die Lage in Sachsen noch im Gr... ... 5.383136 2.068369 4.368077 -.. ... ... ... ... ... ... ... -95 296 1735 Polizei durchsucht seit dem Morgen in Leipzig ... ... 5.897925 5.151130 3.241154 -96 297 1740 Feuer und Flamme der Repression! Solidarität m... ... 2.919075 5.341392 4.516587 -97 298 1745 Wieder brennendes Auto in Leipzig: SUV in Schl... ... 4.852142 1.179675 4.241960 -98 299 1751 Ausschreitungen bei Corona-Protest im Leipzige... ... 5.231822 4.983705 3.941314 -99 300 1761 Gericht bestätigt Verbot kurdischer Verlage ... 0.999596 1.613693 2.039646 - -[100 rows x 17 columns] -2026-01-18 15:28:21,676 - knack-transform - INFO - 3D plot displayed -2026-01-18 15:28:43,419 - knack-transform - INFO - Initialized TextEmbeddingNode with model_name=thenlper/gte-small, model_path=None, device=mps -2026-01-18 15:28:43,420 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 0.0 0.0 0.0 -1 2 52 Kontakt ... 0.0 0.0 0.0 -2 3 99 Safety First ... 0.0 0.0 0.0 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 0.0 0.0 0.0 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 0.0 0.0 0.0 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 0.0 0.0 0.0 -3674 3675 14619 „Klassenhass“ reloaded? ... 0.0 0.0 0.0 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 0.0 0.0 0.0 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 0.0 0.0 0.0 -3677 3678 14627 Applaus für die Angeklagten ... 0.0 0.0 0.0 - -[3678 rows x 17 columns] -2026-01-18 15:28:43,432 - knack-transform - INFO - Starting TextEmbeddingNode transformation -2026-01-18 15:28:43,432 - knack-transform - INFO - Processing 3678 rows -2026-01-18 15:28:43,432 - knack-transform - INFO - Loading GTE model from the hub: thenlper/gte-small -2026-01-18 15:28:43,454 - sentence_transformers.SentenceTransformer - INFO - Use pytorch device_name: mps -2026-01-18 15:28:43,454 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: thenlper/gte-small -2026-01-18 15:30:35,756 - knack-transform - INFO - Processing complete -2026-01-18 15:30:35,757 - knack-transform - INFO - Storing 3678 results -2026-01-18 15:30:42,373 - knack-transform - INFO - Results stored successfully -2026-01-18 15:30:42,374 - knack-transform - INFO - TextEmbeddingNode transformation complete -2026-01-18 15:30:42,374 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 0.0 0.0 0.0 -1 2 52 Kontakt ... 0.0 0.0 0.0 -2 3 99 Safety First ... 0.0 0.0 0.0 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 0.0 0.0 0.0 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 0.0 0.0 0.0 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 0.0 0.0 0.0 -3674 3675 14619 „Klassenhass“ reloaded? ... 0.0 0.0 0.0 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 0.0 0.0 0.0 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 0.0 0.0 0.0 -3677 3678 14627 Applaus für die Angeklagten ... 0.0 0.0 0.0 - -[3678 rows x 17 columns] -2026-01-18 15:30:42,415 - knack-transform - INFO - Initialized UmapNode with n_neighbors=10, min_dist=0.1, n_components=3, metric=cosine, random_state=42, model_path=None -2026-01-18 15:30:42,415 - knack-transform - INFO - Starting ExampleNode transformation -2026-01-18 15:30:42,415 - knack-transform - INFO - Processing 3678 rows -2026-01-18 15:30:42,416 - knack-transform - INFO - Converting embeddings from BLOB to numpy arrays -2026-01-18 15:30:42,418 - knack-transform - INFO - Found 3678 valid embeddings out of 3678 rows -2026-01-18 15:30:42,420 - knack-transform - INFO - Embeddings matrix shape: (3678, 192) -2026-01-18 15:30:42,420 - knack-transform - INFO - Fitting new UMAP reducer... -2026-01-18 15:30:53,542 - knack-transform - INFO - UMAP transformation complete. Output shape: (3678, 3) -2026-01-18 15:30:53,542 - knack-transform - ERROR - Failed to save UMAP model to None: 'NoneType' object has no attribute 'split' -2026-01-18 15:30:53,543 - knack-transform - INFO - Processing complete -2026-01-18 15:30:53,543 - knack-transform - INFO - Storing 3678 results -2026-01-18 15:31:00,254 - knack-transform - INFO - Stored 3678 UMAP coordinate pairs successfully -2026-01-18 15:31:00,255 - knack-transform - INFO - ExampleNode transformation complete -2026-01-18 15:31:00,255 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 6.138411 7.582617 9.574329 -1 2 52 Kontakt ... 6.801492 5.409409 4.112970 -2 3 99 Safety First ... 9.410303 7.564034 8.076056 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 3.972261 5.724514 4.036393 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 5.478312 5.744200 4.765834 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 8.468963 5.995162 5.223534 -3674 3675 14619 „Klassenhass“ reloaded? ... 4.677429 8.059127 8.226499 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 1.877464 8.582388 8.226753 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 12.704015 6.178788 8.685699 -3677 3678 14627 Applaus für die Angeklagten ... 9.530050 3.409181 8.588024 - -[3678 rows x 17 columns] -2026-01-18 15:35:27,488 - knack-transform - INFO - 3D plot displayed -2026-01-18 15:35:37,186 - knack-transform - INFO - Initialized TextEmbeddingNode with model_name=thenlper/gte-small, model_path=None, device=mps -2026-01-18 15:35:37,186 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 6.138411 7.582617 9.574329 -1 2 52 Kontakt ... 6.801492 5.409409 4.112970 -2 3 99 Safety First ... 9.410303 7.564034 8.076056 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 3.972261 5.724514 4.036393 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 5.478312 5.744200 4.765834 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 8.468963 5.995162 5.223534 -3674 3675 14619 „Klassenhass“ reloaded? ... 4.677429 8.059127 8.226499 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 1.877464 8.582388 8.226753 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 12.704015 6.178788 8.685699 -3677 3678 14627 Applaus für die Angeklagten ... 9.530050 3.409181 8.588024 - -[3678 rows x 17 columns] -2026-01-18 15:35:37,196 - knack-transform - INFO - Starting TextEmbeddingNode transformation -2026-01-18 15:35:37,196 - knack-transform - INFO - Processing 3678 rows -2026-01-18 15:35:37,196 - knack-transform - INFO - Loading GTE model from the hub: thenlper/gte-small -2026-01-18 15:35:37,251 - sentence_transformers.SentenceTransformer - INFO - Use pytorch device_name: mps -2026-01-18 15:35:37,251 - sentence_transformers.SentenceTransformer - INFO - Load pretrained SentenceTransformer: thenlper/gte-small -2026-01-18 15:36:25,468 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 6.138411 7.582617 9.574329 -1 2 52 Kontakt ... 6.801492 5.409409 4.112970 -2 3 99 Safety First ... 9.410303 7.564034 8.076056 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 3.972261 5.724514 4.036393 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 5.478312 5.744200 4.765834 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 8.468963 5.995162 5.223534 -3674 3675 14619 „Klassenhass“ reloaded? ... 4.677429 8.059127 8.226499 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 1.877464 8.582388 8.226753 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 12.704015 6.178788 8.685699 -3677 3678 14627 Applaus für die Angeklagten ... 9.530050 3.409181 8.588024 - -[3678 rows x 17 columns] -2026-01-18 15:37:37,881 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 6.138411 7.582617 9.574329 -1 2 52 Kontakt ... 6.801492 5.409409 4.112970 -2 3 99 Safety First ... 9.410303 7.564034 8.076056 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 3.972261 5.724514 4.036393 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 5.478312 5.744200 4.765834 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 8.468963 5.995162 5.223534 -3674 3675 14619 „Klassenhass“ reloaded? ... 4.677429 8.059127 8.226499 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 1.877464 8.582388 8.226753 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 12.704015 6.178788 8.685699 -3677 3678 14627 Applaus für die Angeklagten ... 9.530050 3.409181 8.588024 - -[3678 rows x 17 columns] -2026-01-18 15:38:08,872 - knack-transform - INFO - 3D plot displayed -2026-01-18 15:39:23,498 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 6.138411 7.582617 9.574329 -1 2 52 Kontakt ... 6.801492 5.409409 4.112970 -2 3 99 Safety First ... 9.410303 7.564034 8.076056 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 3.972261 5.724514 4.036393 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 5.478312 5.744200 4.765834 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 8.468963 5.995162 5.223534 -3674 3675 14619 „Klassenhass“ reloaded? ... 4.677429 8.059127 8.226499 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 1.877464 8.582388 8.226753 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 12.704015 6.178788 8.685699 -3677 3678 14627 Applaus für die Angeklagten ... 9.530050 3.409181 8.588024 - -[3678 rows x 17 columns] -2026-01-18 15:39:52,241 - knack-transform - INFO - index id title ... umap_x umap_y umap_z -0 1 41 Über uns ... 6.138411 7.582617 9.574329 -1 2 52 Kontakt ... 6.801492 5.409409 4.112970 -2 3 99 Safety First ... 9.410303 7.564034 8.076056 -3 4 110 Datenleck bei Polizei Sachsen – Funkmitschnitt... ... 3.972261 5.724514 4.036393 -4 5 115 Feuriger Widerstand bei der Räumung der Tiefe ... ... 5.478312 5.744200 4.765834 -... ... ... ... ... ... ... ... -3673 3674 14617 „Sturmlokale“ als „Vorposten im Bürgerkrieg“ ... 8.468963 5.995162 5.223534 -3674 3675 14619 „Klassenhass“ reloaded? ... 4.677429 8.059127 8.226499 -3675 3676 14623 Nur Bewährung: Landgericht kann Lok-Fan nach G... ... 1.877464 8.582388 8.226753 -3676 3677 14625 Angesichts der russischen Bedrohung geben eini... ... 12.704015 6.178788 8.685699 -3677 3678 14627 Applaus für die Angeklagten ... 9.530050 3.409181 8.588024 - -[3678 rows x 17 columns] -2026-01-18 15:41:23,688 - knack-transform - INFO - 3D plot displayed diff --git a/transform/author_node.py b/transform/author_node.py deleted file mode 100644 index 845e87a..0000000 --- a/transform/author_node.py +++ /dev/null @@ -1,420 +0,0 @@ -"""Author classification transform node using NER.""" -import os -import sqlite3 -import pandas as pd -import logging -import fuzzysearch -from concurrent.futures import ThreadPoolExecutor - -from pipeline import TransformContext -from transform_node import TransformNode - -logger = logging.getLogger("knack-transform") - -try: - from gliner import GLiNER - import torch - GLINER_AVAILABLE = True -except ImportError: - GLINER_AVAILABLE = False - logging.warning("GLiNER not available. Install with: pip install gliner") - -class NerAuthorNode(TransformNode): - """Transform node that extracts and classifies authors using NER. - - Creates two tables: - - authors: stores unique authors with their type (Person, Organisation, etc.) - - post_authors: maps posts to their authors - """ - - def __init__(self, model_name: str = "urchade/gliner_multi-v2.1", - model_path: str = None, - threshold: float = 0.5, - max_workers: int = 64, - device: str = "cpu"): - """Initialize the AuthorNode. - - Args: - model_name: GLiNER model to use - model_path: Optional local path to a downloaded GLiNER model - threshold: Confidence threshold for entity predictions - max_workers: Number of parallel workers for prediction - device: Device to run model on ('cpu', 'cuda', 'mps') - """ - self.model_name = model_name - self.model_path = model_path or os.environ.get('GLINER_MODEL_PATH') - self.threshold = threshold - self.max_workers = max_workers - self.device = device - self.model = None - self.labels = ["Person", "Organisation", "Email", "Newspaper", "NGO"] - - def _setup_model(self): - """Initialize the NER model.""" - if not GLINER_AVAILABLE: - raise ImportError("GLiNER is required for AuthorNode. Install with: pip install gliner") - - model_source = None - if self.model_path: - if os.path.exists(self.model_path): - model_source = self.model_path - logger.info(f"Loading GLiNER model from local path: {self.model_path}") - else: - logger.warning(f"GLINER_MODEL_PATH '{self.model_path}' not found; falling back to hub model {self.model_name}") - - if model_source is None: - model_source = self.model_name - logger.info(f"Loading GLiNER model from hub: {self.model_name}") - - if self.device == "cuda" and torch.cuda.is_available(): - self.model = GLiNER.from_pretrained( - model_source, - max_length=255 - ).to('cuda', dtype=torch.float16) - elif self.device == "mps" and torch.backends.mps.is_available(): - self.model = GLiNER.from_pretrained( - model_source, - max_length=255 - ).to('mps', dtype=torch.float16) - else: - self.model = GLiNER.from_pretrained( - model_source, - max_length=255 - ) - - logger.info("Model loaded successfully") - - def _predict(self, text_data: dict): - """Predict entities for a single author text. - - Args: - text_data: Dict with 'author' and 'id' keys - - Returns: - Tuple of (predictions, post_id) or None - """ - if text_data is None or text_data.get('author') is None: - return None - - predictions = self.model.predict_entities( - text_data['author'], - self.labels, - threshold=self.threshold - ) - return predictions, text_data['id'] - - def _classify_authors(self, posts_df: pd.DataFrame): - """Classify all authors in the posts dataframe. - - Args: - posts_df: DataFrame with 'id' and 'author' columns - - Returns: - List of dicts with 'text', 'label', 'id' keys - """ - if self.model is None: - self._setup_model() - - # Prepare input data - authors_data = [] - for idx, row in posts_df.iterrows(): - if pd.notna(row['author']): - authors_data.append({ - 'author': row['author'], - 'id': row['id'] - }) - - logger.info(f"Classifying {len(authors_data)} authors") - - results = [] - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - futures = [executor.submit(self._predict, data) for data in authors_data] - - for future in futures: - result = future.result() - if result is not None: - predictions, post_id = result - for pred in predictions: - results.append({ - 'text': pred['text'], - 'label': pred['label'], - 'id': post_id - }) - - logger.info(f"Classification complete. Found {len(results)} author entities") - return results - - def _create_tables(self, con: sqlite3.Connection): - """Create authors and post_authors tables if they don't exist.""" - logger.info("Creating authors tables") - - con.execute(""" - CREATE TABLE IF NOT EXISTS authors ( - id INTEGER PRIMARY KEY, - name TEXT, - type TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - con.execute(""" - CREATE TABLE IF NOT EXISTS post_authors ( - post_id INTEGER, - author_id INTEGER, - PRIMARY KEY (post_id, author_id), - FOREIGN KEY (post_id) REFERENCES posts(id), - FOREIGN KEY (author_id) REFERENCES authors(id) - ) - """) - - con.commit() - - def _store_authors(self, con: sqlite3.Connection, results: list): - """Store classified authors and their mappings. - - Args: - con: Database connection - results: List of classification results - """ - if not results: - logger.info("No authors to store") - return - - # Convert results to DataFrame - results_df = pd.DataFrame(results) - - # Get unique authors with their types - unique_authors = results_df[['text', 'label']].drop_duplicates() - unique_authors.columns = ['name', 'type'] - - # Get existing authors - existing_authors = pd.read_sql("SELECT id, name FROM authors", con) - - # Find new authors to insert - if not existing_authors.empty: - new_authors = unique_authors[~unique_authors['name'].isin(existing_authors['name'])] - else: - new_authors = unique_authors - - if not new_authors.empty: - logger.info(f"Inserting {len(new_authors)} new authors") - new_authors.to_sql('authors', con, if_exists='append', index=False) - - # Get all authors with their IDs - all_authors = pd.read_sql("SELECT id, name FROM authors", con) - name_to_id = dict(zip(all_authors['name'], all_authors['id'])) - - # Create post_authors mappings - mappings = [] - for _, row in results_df.iterrows(): - author_id = name_to_id.get(row['text']) - if author_id: - mappings.append({ - 'post_id': row['id'], - 'author_id': author_id - }) - - if mappings: - mappings_df = pd.DataFrame(mappings).drop_duplicates() - - # Clear existing mappings for these posts (optional, depends on your strategy) - # post_ids = tuple(mappings_df['post_id'].unique()) - # con.execute(f"DELETE FROM post_authors WHERE post_id IN ({','.join('?' * len(post_ids))})", post_ids) - - logger.info(f"Creating {len(mappings_df)} post-author mappings") - mappings_df.to_sql('post_authors', con, if_exists='append', index=False) - - con.commit() - logger.info("Authors and mappings stored successfully") - - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - """Execute the author classification transformation. - - Args: - con: SQLite database connection - context: TransformContext containing posts dataframe - - Returns: - TransformContext with classified authors dataframe - """ - logger.info("Starting AuthorNode transformation") - - posts_df = context.get_dataframe() - - # Ensure required columns exist - if 'author' not in posts_df.columns: - logger.warning("No 'author' column in dataframe. Skipping AuthorNode.") - return context - - # Create tables - self._create_tables(con) - - # Classify authors - results = self._classify_authors(posts_df) - - # Store results - self._store_authors(con, results) - - # Return context with results - logger.info("AuthorNode transformation complete") - - return TransformContext(posts_df) - - -class FuzzyAuthorNode(TransformNode): - """FuzzyAuthorNode - - This Node takes in data and rules of authornames that have been classified already - and uses those 'rule' to find more similar fields. - """ - - def __init__(self, - max_l_dist: int = 1,): - """Initialize FuzzyAuthorNode. - - Args: - max_l_dist: The number of 'errors' that are allowed by the fuzzy search algorithm - """ - self.max_l_dist = max_l_dist - logger.info(f"Initialized FuzzyAuthorNode with max_l_dist={max_l_dist}") - - def _process_data(self, con: sqlite3.Connection, df: pd.DataFrame) -> pd.DataFrame: - """Process the input dataframe. - - This is where your main transformation logic goes. - - Args: - con: Database connection - df: Input dataframe from context - - Returns: - Processed dataframe - """ - logger.info(f"Processing {len(df)} rows") - - # Retrieve all known authors from the authors table as 'rules' - authors_df = pd.read_sql("SELECT id, name FROM authors", con) - - if authors_df.empty: - logger.warning("No authors found in database for fuzzy matching") - return pd.DataFrame(columns=['post_id', 'author_id']) - - # Get existing post-author mappings to avoid duplicates - existing_mappings = pd.read_sql( - "SELECT post_id, author_id FROM post_authors", con - ) - existing_post_ids = set(existing_mappings['post_id'].unique()) - - logger.info(f"Found {len(authors_df)} known authors for fuzzy matching") - logger.info(f"Found {len(existing_post_ids)} posts with existing author mappings") - - # Filter to posts without author mappings and with non-null author field - if 'author' not in df.columns or 'id' not in df.columns: - logger.warning("Missing 'author' or 'id' column in input dataframe") - return pd.DataFrame(columns=['post_id', 'author_id']) - - posts_to_process = df[ - (df['id'].notna()) & - (df['author'].notna()) & - (~df['id'].isin(existing_post_ids)) - ] - - logger.info(f"Processing {len(posts_to_process)} posts for fuzzy matching") - - # Perform fuzzy matching - mappings = [] - for _, post_row in posts_to_process.iterrows(): - post_id = post_row['id'] - post_author = str(post_row['author']) - - # Try to find matches against all known author names - for _, author_row in authors_df.iterrows(): - author_id = author_row['id'] - author_name = str(author_row['name']) - # for author names < than 2 characters I want a fault tolerance of 0! - l_dist = self.max_l_dist if len(author_name) > 2 else 0 - - # Use fuzzysearch to find matches with allowed errors - matches = fuzzysearch.find_near_matches( - author_name, - post_author, - max_l_dist=l_dist, - ) - - if matches: - logger.debug(f"Found fuzzy match: '{author_name}' in '{post_author}' for post {post_id}") - mappings.append({ - 'post_id': post_id, - 'author_id': author_id - }) - # Only take the first match per post to avoid multiple mappings - break - - # Create result dataframe - result_df = pd.DataFrame(mappings, columns=['post_id', 'author_id']) if mappings else pd.DataFrame(columns=['post_id', 'author_id']) - - logger.info(f"Processing complete. Found {len(result_df)} fuzzy matches") - return result_df - - def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): - """Store results back to the database. - - Uses INSERT OR IGNORE to avoid inserting duplicates. - - Args: - con: Database connection - df: Processed dataframe to store - """ - if df.empty: - logger.info("No results to store") - return - - logger.info(f"Storing {len(df)} results") - - # Use INSERT OR IGNORE to handle duplicates (respects PRIMARY KEY constraint) - cursor = con.cursor() - inserted_count = 0 - - for _, row in df.iterrows(): - cursor.execute( - "INSERT OR IGNORE INTO post_authors (post_id, author_id) VALUES (?, ?)", - (int(row['post_id']), int(row['author_id'])) - ) - if cursor.rowcount > 0: - inserted_count += 1 - - con.commit() - logger.info(f"Results stored successfully. Inserted {inserted_count} new mappings, skipped {len(df) - inserted_count} duplicates") - - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - """Execute the transformation. - - This is the main entry point called by the pipeline. - - Args: - con: SQLite database connection - context: TransformContext containing input dataframe - - Returns: - TransformContext with processed dataframe - """ - logger.info("Starting FuzzyAuthorNode transformation") - - # Get input dataframe from context - input_df = context.get_dataframe() - - # Validate input - if input_df.empty: - logger.warning("Empty dataframe provided to FuzzyAuthorNode") - return context - - # Process the data - result_df = self._process_data(con, input_df) - - # Store results - self._store_results(con, result_df) - - logger.info("FuzzyAuthorNode transformation complete") - - # Return new context with results - return TransformContext(input_df) diff --git a/transform/embeddings_node.py b/transform/embeddings_node.py deleted file mode 100644 index aca8174..0000000 --- a/transform/embeddings_node.py +++ /dev/null @@ -1,536 +0,0 @@ -"""Classes of Transformernodes that have to do with -text processing. - -- TextEmbeddingNode calculates text embeddings -- UmapNode calculates xy coordinates on those vector embeddings -- SimilarityNode calculates top n similar posts based on those embeddings - using the spectral distance. -""" -from pipeline import TransformContext -from transform_node import TransformNode -import sqlite3 -import pandas as pd -import logging -import os -import numpy as np -import sys -import pickle -import matplotlib.pyplot as plt -from mpl_toolkits.mplot3d import Axes3D - -logger = logging.getLogger("knack-transform") - -try: - from sentence_transformers import SentenceTransformer - import torch - GTE_AVAILABLE = True -except ImportError: - GTE_AVAILABLE = False - logging.warning("GTE not available. Install with pip!") - -try: - import umap - UMAP_AVAILABLE = True -except ImportError: - UMAP_AVAILABLE = False - logging.warning("UMAP not available. Install with pip install umap-learn!") - -class TextEmbeddingNode(TransformNode): - """Calculates vector embeddings based on a dataframe - of posts. - """ - def __init__(self, - model_name: str = "thenlper/gte-small", - model_path: str = None, - device: str = "cpu"): - """Initialize the ExampleNode. - - Args: - model_name: Name of the ML Model to calculate text embeddings - model_path: Optional local path to a downloaded embedding model - device: Device to use for computations ('cpu', 'cuda', 'mps') - """ - self.model_name = model_name - self.model_path = model_path or os.environ.get('GTE_MODEL_PATH') - self.device = device - self.model = None - logger.info(f"Initialized TextEmbeddingNode with model_name={model_name}, model_path={model_path}, device={device}") - - def _setup_model(self): - """Init the Text Embedding Model.""" - if not GTE_AVAILABLE: - raise ImportError("GTE is required for TextEmbeddingNode. Please install.") - - model_source = None - if self.model_path: - if os.path.exists(self.model_path): - model_source = self.model_path - logger.info(f"Loading GTE model from local path: {self.model_path}") - else: - logger.warning(f"GTE_MODEL_PATH '{self.model_path}' not found; Falling back to hub model {self.model_name}") - - if model_source is None: - model_source = self.model_name - logger.info(f"Loading GTE model from the hub: {self.model_name}") - - if self.device == "cuda" and torch.cuda.is_available(): - self.model = SentenceTransformer(model_source).to('cuda', dtype=torch.float16) - elif self.device == "mps" and torch.backends.mps.is_available(): - self.model = SentenceTransformer(model_source).to('mps', dtype=torch.float16) - else: - self.model = SentenceTransformer(model_source) - - def _process_data(self, df: pd.DataFrame) -> pd.DataFrame: - """Process the input dataframe. - - Calculates an embedding as a np.array. - Also pickles that array to prepare it to - storage in the database. - - Args: - df: Input dataframe from context - - Returns: - Processed dataframe - """ - logger.info(f"Processing {len(df)} rows") - - if self.model is None: - self._setup_model() - - # Example: Add a new column based on existing data - result_df = df.copy() - - result_df['embedding'] = df['text'].apply(lambda x: self.model.encode(x, convert_to_numpy=True)) - - logger.info("Processing complete") - return result_df - - def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): - """Store results back to the database using batch updates.""" - if df.empty: - logger.info("No results to store") - return - - logger.info(f"Storing {len(df)} results") - - # Convert numpy arrays to bytes for BLOB storage - updates = [(row['embedding'], row['id']) for _, row in df.iterrows()] - con.executemany( - "UPDATE posts SET embedding = ? WHERE id = ?", - updates - ) - - con.commit() - logger.info("Results stored successfully") - - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - """Execute the transformation. - - This is the main entry point called by the pipeline. - - Args: - con: SQLite database connection - context: TransformContext containing input dataframe - - Returns: - TransformContext with processed dataframe - """ - logger.info("Starting TextEmbeddingNode transformation") - - # Get input dataframe from context - input_df = context.get_dataframe() - - # Validate input - if input_df.empty: - logger.warning("Empty dataframe provided to TextEmbeddingNdode") - return context - - if 'text' not in input_df.columns: - logger.warning("No 'text' column in context dataframe. Skipping TextEmbeddingNode") - return context - - # Process the data - result_df = self._process_data(input_df) - - # Store results (optional) - self._store_results(con, result_df) - - logger.info("TextEmbeddingNode transformation complete") - - # Return new context with results - return TransformContext(result_df) - - -class UmapNode(TransformNode): - """Calculates 2D coordinates from embeddings using UMAP dimensionality reduction. - - This node takes text embeddings and reduces them to 2D coordinates - for visualization purposes. - """ - - def __init__(self, - n_neighbors: int = 10, - min_dist: float = 0.1, - n_components: int = 3, - metric: str = "cosine", - random_state: int = 42, - model_path: str = None): - """Initialize the UmapNode. - - Args: - n_neighbors: Number of neighbors to consider for UMAP (default: 15) - min_dist: Minimum distance between points in low-dimensional space (default: 0.1) - n_components: Number of dimensions to reduce to (default: 2) - metric: Distance metric to use (default: 'cosine') - random_state: Random seed for reproducibility (default: 42) - model_path: Path to save/load the fitted UMAP model (default: None, uses 'umap_model.pkl') - """ - self.n_neighbors = n_neighbors - self.min_dist = min_dist - self.n_components = n_components - self.metric = metric - self.random_state = random_state - self.model_path = model_path or os.environ.get('UMAP_MODEL_PATH') - self.reducer = None - logger.info(f"Initialized UmapNode with n_neighbors={n_neighbors}, min_dist={min_dist}, " - f"n_components={n_components}, metric={metric}, random_state={random_state}, " - f"model_path={self.model_path}") - - def _process_data(self, df: pd.DataFrame) -> pd.DataFrame: - """Process the input dataframe. - - Retrieves embeddings from BLOB storage, converts them back to numpy arrays, - and applies UMAP dimensionality reduction to create 2D coordinates. - - Args: - df: Input dataframe from context - - Returns: - Processed dataframe with umap_x and umap_y columns - """ - logger.info(f"Processing {len(df)} rows") - - if not UMAP_AVAILABLE: - raise ImportError("UMAP is required for UmapNode. Install with: pip install umap-learn") - - result_df = df.copy() - - # Convert BLOB embeddings back to numpy arrays - if 'embedding' not in result_df.columns: - logger.error("No 'embedding' column found in dataframe") - raise ValueError("Input dataframe must contain 'embedding' column") - - logger.info("Converting embeddings from BLOB to numpy arrays") - result_df['embedding'] = result_df['embedding'].apply( - lambda x: np.frombuffer(x, dtype=np.float32) if x is not None else None - ) - - # Filter out rows with None embeddings - valid_rows = result_df['embedding'].notna() - if not valid_rows.any(): - logger.error("No valid embeddings found in dataframe") - raise ValueError("No valid embeddings to process") - - logger.info(f"Found {valid_rows.sum()} valid embeddings out of {len(result_df)} rows") - - # Stack embeddings into a matrix - embeddings_matrix = np.vstack(result_df.loc[valid_rows, 'embedding'].values) - logger.info(f"Embeddings matrix shape: {embeddings_matrix.shape}") - - # Check if a saved UMAP model exists - if self.model_path and os.path.exists(self.model_path): - logger.info(f"Loading existing UMAP model from {self.model_path}") - try: - with open(self.model_path, 'rb') as f: - self.reducer = pickle.load(f) - logger.info("UMAP model loaded successfully") - umap_coords = self.reducer.transform(embeddings_matrix) - logger.info(f"UMAP transformation complete using existing model. Output shape: {umap_coords.shape}") - except Exception as e: - logger.warning(f"Failed to load UMAP model from {self.model_path}: {e}") - logger.info("Falling back to fitting a new model") - self.reducer = None - - # If no saved model or loading failed, fit a new model - if self.reducer is None: - logger.info("Fitting new UMAP reducer...") - self.reducer = umap.UMAP( - n_neighbors=self.n_neighbors, - min_dist=self.min_dist, - n_components=self.n_components, - metric=self.metric, - random_state=self.random_state - ) - - umap_coords = self.reducer.fit_transform(embeddings_matrix) - logger.info(f"UMAP transformation complete. Output shape: {umap_coords.shape}") - - # Save the fitted model - try: - umap_folder = '/'.join(self.model_path.split('/')[:1]) - os.mkdir(umap_folder) - with open(self.model_path, 'wb') as f: - pickle.dump(self.reducer, f) - logger.info(f"UMAP model saved to {self.model_path}") - except Exception as e: - logger.error(f"Failed to save UMAP model to {self.model_path}: {e}") - - # Add UMAP coordinates to dataframe - result_df.loc[valid_rows, 'umap_x'] = umap_coords[:, 0] - result_df.loc[valid_rows, 'umap_y'] = umap_coords[:, 1] - result_df.loc[valid_rows, 'umap_z'] = umap_coords[:, 2] - - # Fill NaN for invalid rows - result_df['umap_x'] = result_df['umap_x'].fillna(value=0) - result_df['umap_y'] = result_df['umap_y'].fillna(value=0) - result_df['umap_z'] = result_df['umap_z'].fillna(value=0) - - logger.info("Processing complete") - return result_df - - def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): - """Store UMAP coordinates back to the database. - - Args: - con: Database connection - df: Processed dataframe with umap_x and umap_y columns - """ - if df.empty: - logger.info("No results to store") - return - - logger.info(f"Storing {len(df)} results") - - # Batch update UMAP coordinates - updates = [ - (row['umap_x'], row['umap_y'], row['umap_z'], row['id']) - for _, row in df.iterrows() - if pd.notna(row.get('umap_x')) and pd.notna(row.get('umap_y')) and pd.notna(row.get('umap_z')) - ] - - if updates: - con.executemany( - "UPDATE posts SET umap_x = ?, umap_y = ?, umap_z = ? WHERE id = ?", - updates - ) - con.commit() - logger.info(f"Stored {len(updates)} UMAP coordinate pairs successfully") - else: - logger.warning("No valid UMAP coordinates to store") - - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - """Execute the transformation. - - This is the main entry point called by the pipeline. - - Args: - con: SQLite database connection - context: TransformContext containing input dataframe - - Returns: - TransformContext with processed dataframe - """ - logger.info("Starting ExampleNode transformation") - - # Get input dataframe from context - input_df = context.get_dataframe() - - # Validate input - if input_df.empty: - logger.warning("Empty dataframe provided to ExampleNode") - return context - - # Process the data - result_df = self._process_data(input_df) - - # Store results (optional) - self._store_results(con, result_df) - - logger.info("ExampleNode transformation complete") - - # Return new context with results - return TransformContext(result_df) - - -class SimilarityNode(TransformNode): - """Example transform node template. - - This node demonstrates the basic structure for creating - new transformation nodes in the pipeline. - """ - - def __init__(self, - param1: str = "default_value", - param2: int = 42, - device: str = "cpu"): - """Initialize the ExampleNode. - - Args: - param1: Example string parameter - param2: Example integer parameter - device: Device to use for computations ('cpu', 'cuda', 'mps') - """ - self.param1 = param1 - self.param2 = param2 - self.device = device - logger.info(f"Initialized ExampleNode with param1={param1}, param2={param2}") - - def _create_tables(self, con: sqlite3.Connection): - """Create any necessary tables in the database. - - This is optional - only needed if your node creates new tables. - """ - logger.info("Creating example tables") - - con.execute(""" - CREATE TABLE IF NOT EXISTS example_results ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - post_id INTEGER, - result_value TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (post_id) REFERENCES posts(id) - ) - """) - - con.commit() - - def _process_data(self, df: pd.DataFrame) -> pd.DataFrame: - """Process the input dataframe. - - This is where your main transformation logic goes. - - Args: - df: Input dataframe from context - - Returns: - Processed dataframe - """ - logger.info(f"Processing {len(df)} rows") - - # Example: Add a new column based on existing data - result_df = df.copy() - result_df['processed'] = True - result_df['example_value'] = result_df['id'].apply(lambda x: f"{self.param1}_{x}") - - logger.info("Processing complete") - return result_df - - def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): - """Store results back to the database. - - This is optional - only needed if you want to persist results. - - Args: - con: Database connection - df: Processed dataframe to store - """ - if df.empty: - logger.info("No results to store") - return - - logger.info(f"Storing {len(df)} results") - - # Example: Store to database - # df[['post_id', 'result_value']].to_sql( - # 'example_results', - # con, - # if_exists='append', - # index=False - # ) - - con.commit() - logger.info("Results stored successfully") - - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - """Execute the transformation. - - This is the main entry point called by the pipeline. - - Args: - con: SQLite database connection - context: TransformContext containing input dataframe - - Returns: - TransformContext with processed dataframe - """ - logger.info("Starting ExampleNode transformation") - - # Get input dataframe from context - input_df = context.get_dataframe() - - # Validate input - if input_df.empty: - logger.warning("Empty dataframe provided to ExampleNode") - return context - - # Create any necessary tables - self._create_tables(con) - - # Process the data - result_df = self._process_data(input_df) - - # Store results (optional) - self._store_results(con, result_df) - - logger.info("ExampleNode transformation complete") - - # Return new context with results - return TransformContext(result_df) - -def main(): - - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler("app.log"), - logging.StreamHandler(sys.stdout) - ] - ) - logger = logging.getLogger("knack-transform") - - con = sqlite3.connect("/Users/linussilberstein/Documents/Knack-Scraper/data/knack.sqlite") - df = pd.read_sql('select * from posts;', con) - #node = TextEmbeddingNode(device='mps') - #context = TransformContext(df) - - logger.info(df) - #new_context = node.run(con, context) - #logger.info(new_context.get_dataframe()) - - #umapNode = UmapNode() - #new_context = umapNode.run(con, new_context) - - #logger.info(new_context.get_dataframe()) - - # Create 3D scatter plot of UMAP coordinates - result_df = df - - fig = plt.figure(figsize=(12, 9)) - ax = fig.add_subplot(111, projection='3d') - - scatter = ax.scatter( - result_df['umap_x'], - result_df['umap_y'], - result_df['umap_z'], - c=result_df['id'], - cmap='viridis', - alpha=0.6, - s=50 - ) - - ax.set_xlabel('UMAP X') - ax.set_ylabel('UMAP Y') - ax.set_zlabel('UMAP Z') - ax.set_title('3D UMAP Visualization of Post Embeddings') - - plt.colorbar(scatter, ax=ax, label='Post Index') - plt.tight_layout() - plt.show() - - logger.info("3D plot displayed") - - -if __name__ == '__main__': - main() diff --git a/transform/ensure_gliner_model.sh b/transform/ensure_gliner_model.sh deleted file mode 100644 index 4df8215..0000000 --- a/transform/ensure_gliner_model.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -if [ -d "$GLINER_MODEL_PATH" ] && find "$GLINER_MODEL_PATH" -type f | grep -q .; then - echo "GLiNER model already present at $GLINER_MODEL_PATH" - exit 0 -fi - -echo "Downloading GLiNER model to $GLINER_MODEL_PATH" -mkdir -p "$GLINER_MODEL_PATH" -curl -sL "https://huggingface.co/api/models/${GLINER_MODEL_ID}" | jq -r '.siblings[].rfilename' | while read -r file; do - target="${GLINER_MODEL_PATH}/${file}" - mkdir -p "$(dirname "$target")" - echo "Downloading ${file}" - curl -sL "https://huggingface.co/${GLINER_MODEL_ID}/resolve/main/${file}" -o "$target" -done diff --git a/transform/ensure_gte_model.sh b/transform/ensure_gte_model.sh deleted file mode 100644 index 41addd4..0000000 --- a/transform/ensure_gte_model.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -if [ -d "$GTE_MODEL_PATH" ] && find "$GTE_MODEL_PATH" -type f | grep -q .; then - echo "GTE model already present at $GTE_MODEL_PATH" - exit 0 -fi - -echo "Downloading GTE model to $GTE_MODEL_PATH" -mkdir -p "$GTE_MODEL_PATH" -curl -sL "https://huggingface.co/api/models/${GTE_MODEL_ID}" | jq -r '.siblings[].rfilename' | while read -r file; do - target="${GTE_MODEL_PATH}/${file}" - mkdir -p "$(dirname "$target")" - echo "Downloading ${file}" - curl -sL "https://huggingface.co/${GTE_MODEL_ID}/resolve/main/${file}" -o "$target" -done diff --git a/transform/entrypoint.sh b/transform/entrypoint.sh deleted file mode 100644 index f5f79c4..0000000 --- a/transform/entrypoint.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Run model download with output to stdout/stderr -/usr/local/bin/ensure_gte_model.sh 2>&1 -/usr/local/bin/ensure_gliner_model.sh 2>&1 - -# Start cron in foreground with logging -exec cron -f -L 2 -# cd /app && /usr/local/bin/python main.py >> /proc/1/fd/1 2>&1 \ No newline at end of file diff --git a/transform/example_node.py b/transform/example_node.py deleted file mode 100644 index 69900d1..0000000 --- a/transform/example_node.py +++ /dev/null @@ -1,170 +0,0 @@ -"""Example template node for the transform pipeline. - -This is a template showing how to create new transform nodes. -Copy this file and modify it for your specific transformation needs. -""" -from pipeline import TransformContext -from transform_node import TransformNode -import sqlite3 -import pandas as pd -import logging - -logger = logging.getLogger("knack-transform") - - -class ExampleNode(TransformNode): - """Example transform node template. - - This node demonstrates the basic structure for creating - new transformation nodes in the pipeline. - """ - - def __init__(self, - param1: str = "default_value", - param2: int = 42, - device: str = "cpu"): - """Initialize the ExampleNode. - - Args: - param1: Example string parameter - param2: Example integer parameter - device: Device to use for computations ('cpu', 'cuda', 'mps') - """ - self.param1 = param1 - self.param2 = param2 - self.device = device - logger.info(f"Initialized ExampleNode with param1={param1}, param2={param2}") - - def _create_tables(self, con: sqlite3.Connection): - """Create any necessary tables in the database. - - This is optional - only needed if your node creates new tables. - """ - logger.info("Creating example tables") - - con.execute(""" - CREATE TABLE IF NOT EXISTS example_results ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - post_id INTEGER, - result_value TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (post_id) REFERENCES posts(id) - ) - """) - - con.commit() - - def _process_data(self, df: pd.DataFrame) -> pd.DataFrame: - """Process the input dataframe. - - This is where your main transformation logic goes. - - Args: - df: Input dataframe from context - - Returns: - Processed dataframe - """ - logger.info(f"Processing {len(df)} rows") - - # Example: Add a new column based on existing data - result_df = df.copy() - result_df['processed'] = True - result_df['example_value'] = result_df['id'].apply(lambda x: f"{self.param1}_{x}") - - logger.info("Processing complete") - return result_df - - def _store_results(self, con: sqlite3.Connection, df: pd.DataFrame): - """Store results back to the database. - - This is optional - only needed if you want to persist results. - - Args: - con: Database connection - df: Processed dataframe to store - """ - if df.empty: - logger.info("No results to store") - return - - logger.info(f"Storing {len(df)} results") - - # Example: Store to database - # df[['post_id', 'result_value']].to_sql( - # 'example_results', - # con, - # if_exists='append', - # index=False - # ) - - con.commit() - logger.info("Results stored successfully") - - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - """Execute the transformation. - - This is the main entry point called by the pipeline. - - Args: - con: SQLite database connection - context: TransformContext containing input dataframe - - Returns: - TransformContext with processed dataframe - """ - logger.info("Starting ExampleNode transformation") - - # Get input dataframe from context - input_df = context.get_dataframe() - - # Validate input - if input_df.empty: - logger.warning("Empty dataframe provided to ExampleNode") - return context - - # Create any necessary tables - self._create_tables(con) - - # Process the data - result_df = self._process_data(input_df) - - # Store results (optional) - self._store_results(con, result_df) - - logger.info("ExampleNode transformation complete") - - # Return new context with results - return TransformContext(result_df) - - -# Example usage: -if __name__ == "__main__": - # This allows you to test your node independently - import os - os.chdir('/Users/linussilberstein/Documents/Knack-Scraper/transform') - - from pipeline import TransformContext - import sqlite3 - - # Create test data - test_df = pd.DataFrame({ - 'id': [1, 2, 3], - 'author': ['Test Author 1', 'Test Author 2', 'Test Author 3'] - }) - - # Create test database connection - test_con = sqlite3.connect(':memory:') - - # Create and run node - node = ExampleNode(param1="test", param2=100) - context = TransformContext(test_df) - result_context = node.run(test_con, context) - - # Check results - result_df = result_context.get_dataframe() - print("\nResult DataFrame:") - print(result_df) - - test_con.close() - print("\n✓ ExampleNode test completed successfully!") diff --git a/transform/main.py b/transform/main.py deleted file mode 100644 index 9922eed..0000000 --- a/transform/main.py +++ /dev/null @@ -1,102 +0,0 @@ -#! python3 -import logging -import os -import sqlite3 -import sys -from dotenv import load_dotenv - -load_dotenv() - -if (os.environ.get('LOGGING_LEVEL', 'INFO') == 'INFO'): - logging_level = logging.INFO -else: - logging_level = logging.DEBUG - -logging.basicConfig( - level=logging_level, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler("app.log"), - logging.StreamHandler(sys.stdout) - ] -) -logger = logging.getLogger("knack-transform") - - -def setup_database_connection(): - """Create connection to the SQLite database.""" - db_path = os.environ.get('DB_PATH', '/data/knack.sqlite') - logger.info(f"Connecting to database: {db_path}") - return sqlite3.connect(db_path) - - -def table_exists(tablename: str, con: sqlite3.Connection): - """Check if a table exists in the database.""" - query = "SELECT 1 FROM sqlite_master WHERE type='table' AND name=? LIMIT 1" - return len(con.execute(query, [tablename]).fetchall()) > 0 - - -def main(): - """Main entry point for the transform pipeline.""" - logger.info("Starting transform pipeline") - - try: - con = setup_database_connection() - logger.info("Database connection established") - - # Check if posts table exists - if not table_exists('posts', con): - logger.warning("Posts table does not exist yet. Please run the scraper first to populate the database.") - logger.info("Transform pipeline skipped - no data available") - return - - # Import transform components - from pipeline import create_default_pipeline, TransformContext - import pandas as pd - - # Load posts data - logger.info("Loading posts from database") - sql = "SELECT * FROM posts WHERE author IS NOT NULL AND (is_cleaned IS NULL OR is_cleaned = 0)" - # MAX_CLEANED_POSTS = os.environ.get("MAX_CLEANED_POSTS", 100) - df = pd.read_sql(sql, con) - logger.info(f"Loaded {len(df)} uncleaned posts with authors") - - if df.empty: - logger.info("No uncleaned posts found. Transform pipeline skipped.") - return - - # Create initial context - context = TransformContext(df) - - # Create and run parallel pipeline - device = os.environ.get('COMPUTE_DEVICE', 'cpu') - max_workers = int(os.environ.get('MAX_WORKERS', 4)) - - pipeline = create_default_pipeline(device=device, max_workers=max_workers) - results = pipeline.run( - db_path=os.environ.get('DB_PATH', '/data/knack.sqlite'), - initial_context=context, - fail_fast=False # Continue even if some nodes fail - ) - - logger.info(f"Pipeline completed. Processed {len(results)} node(s)") - - # Mark all processed posts as cleaned - post_ids = df['id'].tolist() - if post_ids: - placeholders = ','.join('?' * len(post_ids)) - con.execute(f"UPDATE posts SET is_cleaned = 1 WHERE id IN ({placeholders})", post_ids) - con.commit() - logger.info(f"Marked {len(post_ids)} posts as cleaned") - - except Exception as e: - logger.error(f"Error in transform pipeline: {e}", exc_info=True) - sys.exit(1) - finally: - if 'con' in locals(): - con.close() - logger.info("Database connection closed") - - -if __name__ == "__main__": - main() diff --git a/transform/pipeline.py b/transform/pipeline.py deleted file mode 100644 index 5a499c7..0000000 --- a/transform/pipeline.py +++ /dev/null @@ -1,266 +0,0 @@ -"""Parallel pipeline orchestration for transform nodes.""" -import logging -import os -import sqlite3 -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed -from typing import List, Dict, Optional - -import pandas as pd -import multiprocessing as mp - -logger = logging.getLogger("knack-transform") - -class TransformContext: - """Context object containing the dataframe for transformation.""" - # Possibly add a dict for the context to give more Information - - def __init__(self, df: pd.DataFrame): - self.df = df - - def get_dataframe(self) -> pd.DataFrame: - """Get the pandas dataframe from the context.""" - return self.df - -class NodeConfig: - """Configuration for a transform node.""" - - def __init__(self, - node_class: type, - node_kwargs: Dict = None, - dependencies: List[str] = None, - name: str = None): - """Initialize node configuration. - - Args: - node_class: The TransformNode class to instantiate - node_kwargs: Keyword arguments to pass to node constructor - dependencies: List of node names that must complete before this one - name: Optional name for the node (defaults to class name) - """ - self.node_class = node_class - self.node_kwargs = node_kwargs or {} - self.dependencies = dependencies or [] - self.name = name or node_class.__name__ - -class ParallelPipeline: - """Pipeline for executing transform nodes in parallel where possible. - - The pipeline analyzes dependencies between nodes and executes - independent nodes concurrently using multiprocessing or threading. - """ - - def __init__(self, - max_workers: Optional[int] = None, - use_processes: bool = False): - """Initialize the parallel pipeline. - - Args: - max_workers: Maximum number of parallel workers (defaults to CPU count) - use_processes: If True, use ProcessPoolExecutor; if False, use ThreadPoolExecutor - """ - self.max_workers = max_workers or mp.cpu_count() - self.use_processes = use_processes - self.nodes: Dict[str, NodeConfig] = {} - logger.info(f"Initialized ParallelPipeline with {self.max_workers} workers " - f"({'processes' if use_processes else 'threads'})") - - def add_node(self, config: NodeConfig): - """Add a node to the pipeline. - - Args: - config: NodeConfig with node details and dependencies - """ - self.nodes[config.name] = config - logger.info(f"Added node '{config.name}' with dependencies: {config.dependencies}") - - def _get_execution_stages(self) -> List[List[str]]: - """Determine execution stages based on dependencies. - - Returns: - List of stages, where each stage contains node names that can run in parallel - """ - stages = [] - completed = set() - remaining = set(self.nodes.keys()) - - while remaining: - # Find nodes whose dependencies are all completed - ready = [] - for node_name in remaining: - config = self.nodes[node_name] - if all(dep in completed for dep in config.dependencies): - ready.append(node_name) - - if not ready: - # Circular dependency or missing dependency - raise ValueError(f"Cannot resolve dependencies. Remaining nodes: {remaining}") - - stages.append(ready) - completed.update(ready) - remaining -= set(ready) - - return stages - - def _execute_node(self, - node_name: str, - db_path: str, - context: TransformContext) -> tuple: - """Execute a single node. - - Args: - node_name: Name of the node to execute - db_path: Path to the SQLite database - context: TransformContext for the node - - Returns: - Tuple of (node_name, result_context, error) - """ - try: - # Create fresh database connection (not shared across processes/threads) - con = sqlite3.connect(db_path) - - config = self.nodes[node_name] - node = config.node_class(**config.node_kwargs) - - logger.info(f"Executing node: {node_name}") - result_context = node.run(con, context) - - con.close() - logger.info(f"Node '{node_name}' completed successfully") - - return node_name, result_context, None - - except Exception as e: - logger.error(f"Error executing node '{node_name}': {e}", exc_info=True) - return node_name, None, str(e) - - def run(self, - db_path: str, - initial_context: TransformContext, - fail_fast: bool = False) -> Dict[str, TransformContext]: - """Execute the pipeline. - - Args: - db_path: Path to the SQLite database - initial_context: Initial TransformContext for the pipeline - fail_fast: If True, stop execution on first error - - Returns: - Dict mapping node names to their output TransformContext - """ - logger.info("Starting parallel pipeline execution") - - stages = self._get_execution_stages() - logger.info(f"Pipeline has {len(stages)} execution stage(s)") - - results = {} - errors = [] - - ExecutorClass = ProcessPoolExecutor if self.use_processes else ThreadPoolExecutor - - for stage_num, stage_nodes in enumerate(stages, 1): - logger.info(f"Stage {stage_num}/{len(stages)}: Executing {len(stage_nodes)} node(s) in parallel: {stage_nodes}") - - # For nodes in this stage, use the context from their dependencies - # If multiple dependencies, we'll use the most recent one (or could merge) - stage_futures = {} - - with ExecutorClass(max_workers=min(self.max_workers, len(stage_nodes))) as executor: - for node_name in stage_nodes: - config = self.nodes[node_name] - - # Get context from dependencies (use the last dependency's output) - if config.dependencies: - context = results.get(config.dependencies[-1], initial_context) - else: - context = initial_context - - future = executor.submit(self._execute_node, node_name, db_path, context) - stage_futures[future] = node_name - - # Wait for all nodes in this stage to complete - for future in as_completed(stage_futures): - node_name = stage_futures[future] - name, result_context, error = future.result() - - if error: - errors.append((name, error)) - if fail_fast: - logger.error(f"Pipeline failed at node '{name}': {error}") - raise RuntimeError(f"Node '{name}' failed: {error}") - else: - results[name] = result_context - - if errors: - logger.warning(f"Pipeline completed with {len(errors)} error(s)") - for name, error in errors: - logger.error(f" - {name}: {error}") - else: - logger.info("Pipeline completed successfully") - - return results - - -def create_default_pipeline(device: str = "cpu", - max_workers: Optional[int] = None) -> ParallelPipeline: - """Create a pipeline with default transform nodes. - - Args: - device: Device to use for compute-intensive nodes ('cpu', 'cuda', 'mps') - max_workers: Maximum number of parallel workers - - Returns: - Configured ParallelPipeline - """ - from author_node import NerAuthorNode, FuzzyAuthorNode - from embeddings_node import TextEmbeddingNode, UmapNode - - pipeline = ParallelPipeline(max_workers=max_workers, use_processes=False) - - # Add AuthorNode (no dependencies) - pipeline.add_node(NodeConfig( - node_class=NerAuthorNode, - node_kwargs={ - 'device': device, - 'model_path': os.environ.get('GLINER_MODEL_PATH') - }, - dependencies=[], - name='AuthorNode' - )) - - pipeline.add_node(NodeConfig( - node_class=FuzzyAuthorNode, - node_kwargs={ - 'max_l_dist': 1 - }, - dependencies=['AuthorNode'], - name='FuzzyAuthorNode' - )) - - pipeline.add_node(NodeConfig( - node_class=TextEmbeddingNode, - node_kwargs={ - 'device': device, - 'model_path': os.environ.get('GTE_MODEL_PATH') - }, - dependencies=[], - name='TextEmbeddingNode' - )) - - pipeline.add_node(NodeConfig( - node_class=UmapNode, - node_kwargs={}, - dependencies=['TextEmbeddingNode'], - name='UmapNode' - )) - - # TODO: Create Node to compute Text Embeddings and UMAP. - - # pipeline.add_node(NodeConfig( - # node_class=UMAPNode, - # node_kwargs={'device': device}, - # dependencies=['EmbeddingNode'], # Runs after EmbeddingNode - # name='UMAPNode' - # )) - - return pipeline diff --git a/transform/requirements.txt b/transform/requirements.txt deleted file mode 100644 index 023d14f..0000000 --- a/transform/requirements.txt +++ /dev/null @@ -1,7 +0,0 @@ -pandas -python-dotenv -gliner -torch -fuzzysearch -sentence_transformers -umap-learn \ No newline at end of file diff --git a/transform/transform_node.py b/transform/transform_node.py deleted file mode 100644 index 54e6bed..0000000 --- a/transform/transform_node.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Base transform node for data pipeline.""" -from abc import ABC, abstractmethod -import sqlite3 - -from pipeline import TransformContext - -class TransformNode(ABC): - """Abstract base class for transformation nodes. - - Each transform node implements a single transformation step - that takes data from the database, transforms it, and - potentially writes results back. - """ - - @abstractmethod - def run(self, con: sqlite3.Connection, context: TransformContext) -> TransformContext: - """Execute the transformation. - - Args: - con: SQLite database connection - context: TransformContext containing the input dataframe - - Returns: - TransformContext with the transformed dataframe - """ - pass diff --git a/visualisation/environment.yml b/visualisation/environment.yml deleted file mode 100644 index 2af896c..0000000 --- a/visualisation/environment.yml +++ /dev/null @@ -1,13 +0,0 @@ -name: knack-viz -channels: - - conda-forge - - defaults -dependencies: - - python=3.11 - - pandas>=2.0.0 - - altair>=5.0.0 - - notebook - - ipykernel - - pip - - pip: - - vega_datasets diff --git a/visualisation/knack_visualization.ipynb b/visualisation/knack_visualization.ipynb deleted file mode 100644 index 4af5a03..0000000 --- a/visualisation/knack_visualization.ipynb +++ /dev/null @@ -1,1381 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "8495708c", - "metadata": {}, - "source": [ - "# Knack Database Visualization\n", - "\n", - "This notebook explores and visualizes the findings from the `knack.sqlite` database using Altair for interactive data visualizations." - ] - }, - { - "cell_type": "markdown", - "id": "75cdd349", - "metadata": {}, - "source": [ - "## 1. Import Required Libraries\n", - "\n", - "Import necessary libraries for data manipulation and visualization." - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "id": "c99dde85", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Libraries imported successfully!\n" - ] - } - ], - "source": [ - "import sqlite3\n", - "import pandas as pd\n", - "import altair as alt\n", - "from pathlib import Path\n", - "\n", - "# Configure Altair\n", - "alt.data_transformers.disable_max_rows()\n", - "alt.renderers.enable('default')\n", - "\n", - "print(\"Libraries imported successfully!\")" - ] - }, - { - "cell_type": "markdown", - "id": "198121f5", - "metadata": {}, - "source": [ - "## 2. Connect to SQLite Database\n", - "\n", - "Establish connection to the knack.sqlite database and explore its structure." - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "98ddc787", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Tables in the database:\n", - " - posts\n", - " - posttags\n", - " - postcategories\n", - " - tags\n", - " - categories\n", - " - authors\n", - " - post_authors\n" - ] - } - ], - "source": [ - "# Connect to the database\n", - "db_path = Path('../data/knack.transformed.sqlite')\n", - "conn = sqlite3.connect(db_path)\n", - "cursor = conn.cursor()\n", - "\n", - "# Get all table names\n", - "cursor.execute(\"SELECT name FROM sqlite_master WHERE type='table';\")\n", - "tables = cursor.fetchall()\n", - "\n", - "print(\"Tables in the database:\")\n", - "for table in tables:\n", - " print(f\" - {table[0]}\")" - ] - }, - { - "cell_type": "markdown", - "id": "4f216388", - "metadata": {}, - "source": [ - "## 3. Explore Database Schema\n", - "\n", - "Examine the structure of each table to understand the data." - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "e51dd105", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "============================================================\n", - "Table: posts\n", - "============================================================\n", - "\n", - "Columns:\n", - " index INTEGER \n", - " id INTEGER \n", - " title TEXT \n", - " author TEXT \n", - " date TIMESTAMP \n", - " category TEXT \n", - " url TEXT \n", - " img_link TEXT \n", - " tags TEXT \n", - " text TEXT \n", - " html TEXT \n", - " scraped_at TIMESTAMP \n", - " is_cleaned BOOLEAN \n", - " embedding BLOB \n", - " umap_x REAL \n", - " umap_y REAL \n", - "\n", - "Total rows: 3678\n", - "\n", - "============================================================\n", - "Table: posttags\n", - "============================================================\n", - "\n", - "Columns:\n", - " post_id INTEGER \n", - " tag_id INTEGER \n", - "\n", - "Total rows: 14272\n", - "\n", - "============================================================\n", - "Table: postcategories\n", - "============================================================\n", - "\n", - "Columns:\n", - " post_id INTEGER \n", - " category_id INTEGER \n", - "\n", - "Total rows: 3691\n", - "\n", - "============================================================\n", - "Table: tags\n", - "============================================================\n", - "\n", - "Columns:\n", - " id INTEGER \n", - " tag TEXT \n", - "\n", - "Total rows: 64\n", - "\n", - "============================================================\n", - "Table: categories\n", - "============================================================\n", - "\n", - "Columns:\n", - " id INTEGER \n", - " category TEXT \n", - "\n", - "Total rows: 6\n", - "\n", - "============================================================\n", - "Table: authors\n", - "============================================================\n", - "\n", - "Columns:\n", - " id INTEGER \n", - " name TEXT \n", - " type TEXT \n", - " created_at TIMESTAMP \n", - "\n", - "Total rows: 1143\n", - "\n", - "============================================================\n", - "Table: post_authors\n", - "============================================================\n", - "\n", - "Columns:\n", - " post_id INTEGER \n", - " author_id INTEGER \n", - "\n", - "Total rows: 4934\n" - ] - } - ], - "source": [ - "# Examine schema for each table\n", - "for table in tables:\n", - " table_name = table[0]\n", - " print(f\"\\n{'='*60}\")\n", - " print(f\"Table: {table_name}\")\n", - " print('='*60)\n", - " \n", - " # Get column information\n", - " cursor.execute(f\"PRAGMA table_info({table_name})\")\n", - " columns = cursor.fetchall()\n", - " \n", - " print(\"\\nColumns:\")\n", - " for col in columns:\n", - " print(f\" {col[1]:20} {col[2]:15} {'NOT NULL' if col[3] else ''}\")\n", - " \n", - " # Get row count\n", - " cursor.execute(f\"SELECT COUNT(*) FROM {table_name}\")\n", - " count = cursor.fetchone()[0]\n", - " print(f\"\\nTotal rows: {count}\")" - ] - }, - { - "cell_type": "markdown", - "id": "25ffce32", - "metadata": {}, - "source": [ - "## 4. Load Data from Database\n", - "\n", - "Load the data from tables into pandas DataFrames for analysis and visualization." - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "1459d68a", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Loaded posts: 3678 rows, 16 columns\n", - "Loaded posttags: 14272 rows, 2 columns\n", - "Loaded postcategories: 3691 rows, 2 columns\n", - "Loaded tags: 64 rows, 2 columns\n", - "Loaded categories: 6 rows, 2 columns\n", - "Loaded authors: 1143 rows, 4 columns\n", - "Loaded post_authors: 4934 rows, 2 columns\n", - "\n", - "Available dataframes: ['posts', 'posttags', 'postcategories', 'tags', 'categories', 'authors', 'post_authors']\n" - ] - } - ], - "source": [ - "# Load all tables into DataFrames\n", - "dataframes = {}\n", - "\n", - "for table in tables:\n", - " table_name = table[0]\n", - " query = f\"SELECT * FROM {table_name}\"\n", - " df = pd.read_sql_query(query, conn)\n", - " dataframes[table_name] = df\n", - " print(f\"Loaded {table_name}: {df.shape[0]} rows, {df.shape[1]} columns\")\n", - "\n", - "# Display available dataframes\n", - "print(f\"\\nAvailable dataframes: {list(dataframes.keys())}\")" - ] - }, - { - "cell_type": "markdown", - "id": "c34b1bc5", - "metadata": {}, - "source": [ - "## 5. Explore Data Structure\n", - "\n", - "Examine the first dataframe to understand the data better." - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "91616185", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Exploring: posts\n", - "\n", - "Shape: (3678, 16)\n", - "\n", - "Data types:\n", - "index int64\n", - "id int64\n", - "title object\n", - "author object\n", - "date object\n", - "category object\n", - "url object\n", - "img_link object\n", - "tags object\n", - "text object\n", - "html object\n", - "scraped_at object\n", - "is_cleaned int64\n", - "embedding object\n", - "umap_x float64\n", - "umap_y float64\n", - "dtype: object\n", - "\n", - "Missing values:\n", - "index 0\n", - "id 0\n", - "title 0\n", - "author 3\n", - "date 3\n", - "category 3\n", - "url 0\n", - "img_link 148\n", - "tags 4\n", - "text 0\n", - "html 0\n", - "scraped_at 0\n", - "is_cleaned 0\n", - "embedding 0\n", - "umap_x 0\n", - "umap_y 0\n", - "dtype: int64\n" - ] - } - ], - "source": [ - "# Select the first table to explore (or specify a specific table)\n", - "if dataframes:\n", - " first_table = list(dataframes.keys())[0]\n", - " df = dataframes[first_table]\n", - " \n", - " print(f\"Exploring: {first_table}\")\n", - " print(f\"\\nShape: {df.shape}\")\n", - " print(f\"\\nData types:\\n{df.dtypes}\")\n", - " \n", - " print(f\"\\nMissing values:\")\n", - " print(df.isnull().sum())" - ] - }, - { - "cell_type": "markdown", - "id": "f9b0e8d7", - "metadata": {}, - "source": [ - "## 7. Create Time Series Visualizations\n", - "\n", - "If the data contains temporal information, create time series visualizations." - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "2190a06b", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Found potential date columns: ['date']\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/var/folders/j5/hpq7xq6x1p18cds26_lb_3gr0000gn/T/ipykernel_46007/4118830821.py:19: FutureWarning: 'M' is deprecated and will be removed in a future version, please use 'ME' instead.\n", - " time_series = df.groupby(pd.Grouper(key=date_col, freq='M')).size().reset_index(name='count')\n" - ] - }, - { - "data": { - "text/html": [ - "\n", - "\n", - "
\n", - "" - ], - "text/plain": [ - "alt.Chart(...)" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], - "source": [ - "# Check for date/time columns and create time series visualizations\n", - "if dataframes:\n", - " df = dataframes[list(dataframes.keys())[0]]\n", - " \n", - " # Look for columns that might contain dates (check column names)\n", - " date_like_cols = [col for col in df.columns if any(\n", - " keyword in col.lower() for keyword in ['date', 'time', 'created', 'updated', 'timestamp']\n", - " )]\n", - " \n", - " if date_like_cols:\n", - " print(f\"Found potential date columns: {date_like_cols}\")\n", - " \n", - " # Try to convert the first date-like column to datetime\n", - " date_col = date_like_cols[0]\n", - " try:\n", - " df[date_col] = pd.to_datetime(df[date_col], errors='coerce')\n", - " \n", - " # Create a time series chart - count records over time\n", - " time_series = df.groupby(pd.Grouper(key=date_col, freq='M')).size().reset_index(name='count')\n", - " \n", - " chart = alt.Chart(time_series).mark_line(point=True).encode(\n", - " x=alt.X(f'{date_col}:T', title='Date'),\n", - " y=alt.Y('count:Q', title='Count'),\n", - " tooltip=[date_col, 'count']\n", - " ).properties(\n", - " title=f'Records Over Time',\n", - " width=700,\n", - " height=400\n", - " ).interactive()\n", - " \n", - " display(chart)\n", - " except Exception as e:\n", - " print(f\"Could not create time series chart: {e}\")\n", - " else:\n", - " print(\"No date/time columns found\")" - ] - }, - { - "cell_type": "markdown", - "id": "793026df", - "metadata": {}, - "source": [ - "### Articles per Category\n", - "\n", - "Visualize the distribution of articles across different categories." - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "id": "22c47b71", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "dict_keys(['posts', 'posttags', 'postcategories', 'tags', 'categories', 'authors', 'post_authors'])" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "dataframes.keys()" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "id": "1ac9fae5", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "\n", - "\n", - "\n", - "" - ], - "text/plain": [ - "alt.Chart(...)" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Total categories: 6\n", - "Most articles in a category: 2098\n", - "Average articles per category: 615.17\n" - ] - } - ], - "source": [ - "# Check if categorisation data exists and create histogram\n", - "if 'postcategories' in dataframes and 'categories' in dataframes:\n", - " df_post_cat = dataframes['postcategories']\n", - " df_categories = dataframes['categories']\n", - " \n", - " # Join postcategories with categories to get category names\n", - " if 'category_id' in df_post_cat.columns and 'id' in df_categories.columns and 'category' in df_categories.columns:\n", - " # Merge the two tables\n", - " df_merged = df_post_cat.merge(\n", - " df_categories[['id', 'category']], \n", - " left_on='category_id', \n", - " right_on='id',\n", - " how='left'\n", - " )\n", - " \n", - " # Count articles per category\n", - " category_counts = df_merged['category'].value_counts().reset_index()\n", - " category_counts.columns = ['category', 'article_count']\n", - " \n", - " # Sort by count descending\n", - " category_counts = category_counts.sort_values('article_count', ascending=False)\n", - " \n", - " chart = alt.Chart(category_counts).mark_bar().encode(\n", - " x=alt.X('category:N', sort='-y', title='Category', axis=alt.Axis(labelAngle=-45)),\n", - " y=alt.Y('article_count:Q', title='Number of Articles'),\n", - " color=alt.Color('article_count:Q', scale=alt.Scale(scheme='viridis'), legend=None),\n", - " tooltip=['category', alt.Tooltip('article_count:Q', title='Articles')]\n", - " ).properties(\n", - " title='Distribution of Articles per Category',\n", - " width=700,\n", - " height=450\n", - " ).interactive()\n", - " \n", - " display(chart)\n", - " \n", - " # Show summary statistics\n", - " print(f\"\\nTotal categories: {len(category_counts)}\")\n", - " print(f\"Most articles in a category: {category_counts['article_count'].max()}\")\n", - " print(f\"Average articles per category: {category_counts['article_count'].mean():.2f}\")\n", - " else:\n", - " print(\"Could not find required columns for joining tables\")\n", - "else:\n", - " print(\"Need both 'postcategories' and 'categories' tables in database\")" - ] - }, - { - "cell_type": "markdown", - "id": "56c89ec3", - "metadata": {}, - "source": [ - "### Articles per Tag\n", - "\n", - "Visualize the distribution of articles across different tags." - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "id": "95a28c5f", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "\n", - "\n", - "\n", - "" - ], - "text/plain": [ - "alt.Chart(...)" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Total tags: 64\n", - "Most articles with a tag: 1954\n", - "Average articles per tag: 223.00\n", - "Median articles per tag: 101.50\n" - ] - } - ], - "source": [ - "# Check if tag data exists and create histogram\n", - "if 'posttags' in dataframes and 'tags' in dataframes:\n", - " df_post_tags = dataframes['posttags']\n", - " df_tags = dataframes['tags']\n", - " \n", - " # Join posttags with tags to get tag names\n", - " if 'tag_id' in df_post_tags.columns and 'id' in df_tags.columns and 'tag' in df_tags.columns:\n", - " # Merge the two tables\n", - " df_merged = df_post_tags.merge(\n", - " df_tags[['id', 'tag']], \n", - " left_on='tag_id', \n", - " right_on='id',\n", - " how='left'\n", - " )\n", - " \n", - " # Count articles per tag\n", - " tag_counts = df_merged['tag'].value_counts().reset_index()\n", - " tag_counts.columns = ['tag', 'article_count']\n", - " \n", - " # Show top 30 tags for readability\n", - " tag_counts_top = tag_counts.head(30).sort_values('article_count', ascending=False)\n", - " \n", - " chart = alt.Chart(tag_counts_top).mark_bar().encode(\n", - " x=alt.X('tag:N', sort='-y', title='Tag', axis=alt.Axis(labelAngle=-45)),\n", - " y=alt.Y('article_count:Q', title='Number of Articles'),\n", - " color=alt.Color('article_count:Q', scale=alt.Scale(scheme='oranges'), legend=None),\n", - " tooltip=['tag', alt.Tooltip('article_count:Q', title='Articles')]\n", - " ).properties(\n", - " title='Distribution of Articles per Tag (Top 30)',\n", - " width=700,\n", - " height=450\n", - " ).interactive()\n", - " \n", - " display(chart)\n", - " \n", - " # Show summary statistics\n", - " print(f\"\\nTotal tags: {len(tag_counts)}\")\n", - " print(f\"Most articles with a tag: {tag_counts['article_count'].max()}\")\n", - " print(f\"Average articles per tag: {tag_counts['article_count'].mean():.2f}\")\n", - " print(f\"Median articles per tag: {tag_counts['article_count'].median():.2f}\")\n", - " else:\n", - " print(\"Could not find required columns for joining tables\")\n", - "else:\n", - " print(\"Need both 'posttags' and 'tags' tables in database\")" - ] - }, - { - "cell_type": "markdown", - "id": "549e6f38", - "metadata": {}, - "source": [ - "### Articles per Author\n", - "\n", - "Visualize the distribution of articles across different authors." - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "id": "a49be6f5", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "\n", - "\n", - "\n", - "" - ], - "text/plain": [ - "alt.Chart(...)" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Total authors: 1126\n", - "Most articles with a author: 700\n", - "Average articles per author: 4.38\n", - "Median articles per author: 1.00\n" - ] - } - ], - "source": [ - "# Check if author data exists and create histogram\n", - "if 'post_authors' in dataframes and 'authors' in dataframes:\n", - " df_post_tags = dataframes['post_authors']\n", - " df_tags = dataframes['authors']\n", - " \n", - " # Join posttags with tags to get tag names\n", - " if 'author_id' in df_post_tags.columns and 'id' in df_tags.columns and 'name' in df_tags.columns:\n", - " # Merge the two tables\n", - " df_merged = df_post_tags.merge(\n", - " df_tags[['id', 'name']], \n", - " left_on='author_id', \n", - " right_on='id',\n", - " how='left'\n", - " )\n", - " \n", - " # Count articles per tag\n", - " tag_counts = df_merged['name'].value_counts().reset_index()\n", - " tag_counts.columns = ['author', 'article_count']\n", - " \n", - " # Show top 30 tags for readability\n", - " tag_counts_top = tag_counts.head(30).sort_values('article_count', ascending=False)\n", - " \n", - " chart = alt.Chart(tag_counts_top).mark_bar().encode(\n", - " x=alt.X('author:N', sort='-y', title='Author', axis=alt.Axis(labelAngle=-45)),\n", - " y=alt.Y('article_count:Q', title='Number of Articles'),\n", - " color=alt.Color('article_count:Q', scale=alt.Scale(scheme='oranges'), legend=None),\n", - " tooltip=['author', alt.Tooltip('article_count:Q', title='Articles')]\n", - " ).properties(\n", - " title='Distribution of Articles per Author (Top 30)',\n", - " width=700,\n", - " height=450\n", - " ).interactive()\n", - " \n", - " display(chart)\n", - " \n", - " # Show summary statistics\n", - " print(f\"\\nTotal authors: {len(tag_counts)}\")\n", - " print(f\"Most articles with a author: {tag_counts['article_count'].max()}\")\n", - " print(f\"Average articles per author: {tag_counts['article_count'].mean():.2f}\")\n", - " print(f\"Median articles per author: {tag_counts['article_count'].median():.2f}\")\n", - " else:\n", - " print(\"Could not find required columns for joining tables\")\n", - "else:\n", - " print(\"Need both 'post_authors' and 'authors' tables in database\")" - ] - }, - { - "cell_type": "markdown", - "id": "7f6f1539", - "metadata": {}, - "source": [ - "### UMAP Visualization\n", - "\n", - "Visualize the UMAP dimensionality reduction in 2D space." - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "196be503", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Found UMAP coordinates in table: posts\n" - ] - }, - { - "data": { - "text/html": [ - "\n", - "\n", - "\n", - "" - ], - "text/plain": [ - "alt.Chart(...)" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "Total points: 5021\n", - "Unique authors: 1127\n", - "Top 15 authors shown in legend (others grouped as 'Other')\n" - ] - } - ], - "source": [ - "# Check for UMAP coordinates and create scatter plot with author coloring\n", - "umap_found = False\n", - "\n", - "# Look for tables with umap_x and umap_y columns\n", - "for table_name, df in dataframes.items():\n", - " if 'umap_x' in df.columns and 'umap_y' in df.columns:\n", - " print(f\"Found UMAP coordinates in table: {table_name}\")\n", - " umap_found = True\n", - " \n", - " # Check if we can join with authors\n", - " if 'posts' in dataframes and 'post_authors' in dataframes and 'authors' in dataframes:\n", - " df_posts = dataframes['posts']\n", - " df_post_authors = dataframes['post_authors']\n", - " df_authors = dataframes['authors']\n", - " \n", - " # Check if the current table has necessary columns for joining\n", - " if 'id' in df.columns or 'post_id' in df.columns:\n", - " post_id_col = 'id' if 'id' in df.columns else 'post_id'\n", - " \n", - " # Start with posts table that has UMAP coordinates\n", - " df_umap = df[[post_id_col, 'umap_x', 'umap_y']].dropna(subset=['umap_x', 'umap_y'])\n", - " \n", - " # Join with post_authors to get author_id\n", - " if 'post_id' in df_post_authors.columns and 'author_id' in df_post_authors.columns:\n", - " df_umap = df_umap.merge(\n", - " df_post_authors[['post_id', 'author_id']],\n", - " left_on=post_id_col,\n", - " right_on='post_id',\n", - " how='left'\n", - " )\n", - " \n", - " # Join with authors to get author name\n", - " if 'id' in df_authors.columns and 'name' in df_authors.columns:\n", - " df_umap = df_umap.merge(\n", - " df_authors[['id', 'name']],\n", - " left_on='author_id',\n", - " right_on='id',\n", - " how='left'\n", - " )\n", - " \n", - " # Rename name column to author for clarity\n", - " df_umap = df_umap.rename(columns={'name': 'author'})\n", - " \n", - " # Fill missing authors with 'Unknown'\n", - " df_umap['author'] = df_umap['author'].fillna('Unknown')\n", - " \n", - " # Get top 15 authors by count for better visualization\n", - " top_authors = df_umap['author'].value_counts().head(15).index.tolist()\n", - " df_umap['author_group'] = df_umap['author'].apply(\n", - " lambda x: x if x in top_authors else 'Other'\n", - " )\n", - " \n", - " # Create scatter plot with author coloring\n", - " scatter = alt.Chart(df_umap).mark_circle(size=40, opacity=0.7).encode(\n", - " x=alt.X('umap_x:Q', title='UMAP Dimension 1'),\n", - " y=alt.Y('umap_y:Q', title='UMAP Dimension 2'),\n", - " color=alt.Color('author_group:N', title='Author', scale=alt.Scale(scheme='tableau20')),\n", - " tooltip=['author', 'umap_x', 'umap_y']\n", - " ).properties(\n", - " title='UMAP 2D Projection by Author',\n", - " width=800,\n", - " height=600\n", - " ).interactive()\n", - " \n", - " display(scatter)\n", - " \n", - " print(f\"\\nTotal points: {len(df_umap)}\")\n", - " print(f\"Unique authors: {df_umap['author'].nunique()}\")\n", - " print(f\"Top 15 authors shown in legend (others grouped as 'Other')\")\n", - " else:\n", - " print(\"Could not find required columns in authors table\")\n", - " else:\n", - " print(\"Could not find required columns in post_authors table\")\n", - " else:\n", - " print(f\"Could not find post_id column in {table_name} table\")\n", - " else:\n", - " # Fallback: create plot without author coloring\n", - " df_umap = df[['umap_x', 'umap_y']].dropna()\n", - " \n", - " scatter = alt.Chart(df_umap).mark_circle(size=30, opacity=0.6).encode(\n", - " x=alt.X('umap_x:Q', title='UMAP Dimension 1'),\n", - " y=alt.Y('umap_y:Q', title='UMAP Dimension 2'),\n", - " tooltip=['umap_x', 'umap_y']\n", - " ).properties(\n", - " title='UMAP 2D Projection',\n", - " width=700,\n", - " height=600\n", - " ).interactive()\n", - " \n", - " display(scatter)\n", - " \n", - " print(f\"\\nTotal points: {len(df_umap)}\")\n", - " print(\"Note: Author coloring not available (missing required tables)\")\n", - " \n", - " break\n", - "\n", - "if not umap_found:\n", - " print(\"No UMAP coordinates (umap_x, umap_y) found in any table\")" - ] - }, - { - "cell_type": "markdown", - "id": "c57a57fa", - "metadata": {}, - "source": [ - "### 3D Embedding Visualization\n", - "\n", - "Visualize the high-dimensional embeddings in 3D space using PCA for dimensionality reduction.\n" - ] - }, - { - "cell_type": "code", - "execution_count": 16, - "id": "42352fef", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Found embedding column in posts table\n", - "No valid embeddings found\n" - ] - } - ], - "source": [ - "import numpy as np\n", - "import plotly.graph_objects as go\n", - "import json\n", - "\n", - "# Check if posts table has embedding column\n", - "if 'posts' in dataframes:\n", - " df_posts = dataframes['posts']\n", - " \n", - " if 'embedding' in df_posts.columns:\n", - " print(\"Found embedding column in posts table\")\n", - " \n", - " # Extract embeddings and convert to array\n", - " embeddings_3d = []\n", - " valid_indices = []\n", - " \n", - " for idx, embedding in enumerate(df_posts['embedding']):\n", - " try:\n", - " # Handle different embedding formats (string, list, array, bytes)\n", - " if isinstance(embedding, bytes):\n", - " emb_array = np.array(json.loads(embedding.decode('utf-8')))\n", - " elif isinstance(embedding, str):\n", - " emb_array = np.array(json.loads(embedding))\n", - " elif isinstance(embedding, (list, tuple)):\n", - " emb_array = np.array(embedding)\n", - " else:\n", - " emb_array = embedding\n", - " \n", - " if emb_array is not None and len(emb_array) >= 3:\n", - " # Take only the first 3 dimensions\n", - " embeddings_3d.append(emb_array[:3])\n", - " valid_indices.append(idx)\n", - " except Exception as e:\n", - " continue\n", - " \n", - " if embeddings_3d:\n", - " # Convert to numpy array and ensure it's 2D (n_embeddings, 3)\n", - " embeddings_3d = np.array(embeddings_3d)\n", - " if embeddings_3d.ndim == 1:\n", - " embeddings_3d = embeddings_3d.reshape(-1, 3)\n", - " print(f\"Extracted {len(embeddings_3d)} embeddings with shape {embeddings_3d.shape}\")\n", - " \n", - " # Create a dataframe with 3D coordinates\n", - " df_3d = pd.DataFrame({\n", - " 'dim_1': embeddings_3d[:, 0],\n", - " 'dim_2': embeddings_3d[:, 1],\n", - " 'dim_3': embeddings_3d[:, 2]\n", - " })\n", - " \n", - " # Try to add author information\n", - " if 'post_authors' in dataframes and 'authors' in dataframes:\n", - " try:\n", - " df_post_authors = dataframes['post_authors']\n", - " df_authors = dataframes['authors']\n", - " \n", - " # Get author names for valid indices\n", - " authors = []\n", - " for idx in valid_indices:\n", - " post_id = df_posts.iloc[idx]['id'] if 'id' in df_posts.columns else None\n", - " if post_id is not None:\n", - " author_rows = df_post_authors[df_post_authors['post_id'] == post_id]\n", - " if not author_rows.empty:\n", - " author_id = author_rows.iloc[0]['author_id']\n", - " author_name = df_authors[df_authors['id'] == author_id]['name'].values\n", - " authors.append(author_name[0] if len(author_name) > 0 else 'Unknown')\n", - " else:\n", - " authors.append('Unknown')\n", - " else:\n", - " authors.append('Unknown')\n", - " \n", - " df_3d['author'] = authors\n", - " \n", - " # Get top 10 authors for coloring\n", - " top_authors = df_3d['author'].value_counts().head(10).index.tolist()\n", - " df_3d['author_group'] = df_3d['author'].apply(\n", - " lambda x: x if x in top_authors else 'Other'\n", - " )\n", - " \n", - " # Create 3D scatter plot with Plotly\n", - " fig = go.Figure(data=[go.Scatter3d(\n", - " x=df_3d['dim_1'],\n", - " y=df_3d['dim_2'],\n", - " z=df_3d['dim_3'],\n", - " mode='markers',\n", - " marker=dict(\n", - " size=4,\n", - " color=[top_authors.index(author) if author in top_authors else len(top_authors) \n", - " for author in df_3d['author_group']],\n", - " colorscale='Viridis',\n", - " showscale=True,\n", - " colorbar=dict(title=\"Author Group\"),\n", - " opacity=0.7\n", - " ),\n", - " text=df_3d['author'],\n", - " hovertemplate='%{text}