#! python3 import argparse import logging import os import sqlite3 import sys from dotenv import load_dotenv load_dotenv() if (os.environ.get('LOGGING_LEVEL', 'INFO') == 'INFO'): logging_level = logging.INFO else: logging_level = logging.DEBUG logging.basicConfig( level=logging_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger("knack-transform") def setup_database_connection(db_path=None): """Create connection to the SQLite database.""" if db_path is None: db_path = os.environ.get('DB_PATH', '/data/knack.sqlite') logger.info(f"Connecting to database: {db_path}") return sqlite3.connect(db_path) def table_exists(tablename: str, con: sqlite3.Connection): """Check if a table exists in the database.""" query = "SELECT 1 FROM sqlite_master WHERE type='table' AND name=? LIMIT 1" return len(con.execute(query, [tablename]).fetchall()) > 0 def run_from_database(db_path=None): """Run the pipeline using database as input and output.""" logger.info("Starting transform pipeline (database mode)") try: con = setup_database_connection(db_path) logger.info("Database connection established") # Check if posts table exists if not table_exists('posts', con): logger.warning("Posts table does not exist yet. Please run the scraper first to populate the database.") logger.info("Transform pipeline skipped - no data available") return # Import transform components from pipeline import create_default_pipeline, TransformContext import pandas as pd # Load posts data logger.info("Loading posts from database") sql = "SELECT * FROM posts WHERE author IS NOT NULL AND (is_cleaned IS NULL OR is_cleaned = 0)" # MAX_CLEANED_POSTS = os.environ.get("MAX_CLEANED_POSTS", 100) df = pd.read_sql(sql, con) logger.info(f"Loaded {len(df)} uncleaned posts with authors") if df.empty: logger.info("No uncleaned posts found. Transform pipeline skipped.") return # Create initial context context = TransformContext(df) # Create and run parallel pipeline device = os.environ.get('COMPUTE_DEVICE', 'cpu') max_workers = int(os.environ.get('MAX_WORKERS', 4)) pipeline = create_default_pipeline(device=device, max_workers=max_workers) effective_db_path = db_path or os.environ.get('DB_PATH', '/data/knack.sqlite') results = pipeline.run( db_path=effective_db_path, initial_context=context, fail_fast=False # Continue even if some nodes fail ) logger.info(f"Pipeline completed. Processed {len(results)} node(s)") # Mark all processed posts as cleaned post_ids = df['id'].tolist() if post_ids: placeholders = ','.join('?' * len(post_ids)) con.execute(f"UPDATE posts SET is_cleaned = 1 WHERE id IN ({placeholders})", post_ids) con.commit() logger.info(f"Marked {len(post_ids)} posts as cleaned") except Exception as e: logger.error(f"Error in transform pipeline: {e}", exc_info=True) sys.exit(1) finally: if 'con' in locals(): con.close() logger.info("Database connection closed") def main(): """Main entry point with command-line argument support.""" parser = argparse.ArgumentParser( description='Transform pipeline for Knack scraper data', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run with database (Docker mode) python main.py # Run with custom device and workers python main.py --database /path/to/knack.sqlite --device mps --workers 8 # Run with specific database file python main.py --database /path/to/knack.sqlite """ ) parser.add_argument( '--database', help='Path to SQLite database (for database mode). Defaults to DB_PATH env var or /data/knack.sqlite' ) parser.add_argument( '--device', default=os.environ.get('COMPUTE_DEVICE', 'cpu'), choices=['cpu', 'cuda', 'mps'], help='Device to use for compute-intensive operations (default: cpu)' ) parser.add_argument( '--workers', type=int, default=int(os.environ.get('MAX_WORKERS', 4)), help='Maximum number of parallel workers (default: 4)' ) args = parser.parse_args() # Determine mode based on arguments if args.database: # Database mode (original behavior) run_from_database(db_path=args.database) logger.info("Database connection closed") if __name__ == "__main__": main()