Knack-Scraper/main.py
quorploop bcd210ce01 Dockerized Scraper
- Implements Dockerized Version of Scraper
- Atomized tags and categories columns
2025-12-20 20:55:04 +01:00

306 lines
11 KiB
Python
Executable file

#! python3
import logging
import os
import sqlite3
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import sys
from dotenv import load_dotenv
import pandas as pd
import requests
import tqdm
from bs4 import BeautifulSoup
load_dotenv()
if (os.environ.get('LOGGING_LEVEL', 'INFO') == 'INFO'):
logging_level = logging.INFO
else:
logging_level = logging.DEBUG
logging.basicConfig(
level=logging_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger("knack-scraper")
def table_exists(tablename: str, con: sqlite3.Connection):
query = "SELECT 1 FROM sqlite_master WHERE type='table' AND name=? LIMIT 1"
return len(con.execute(query, [tablename]).fetchall()) > 0
def download(id: int):
if id == 0:
return
base_url = "https://knack.news/"
url = f"{base_url}{id}"
res = requests.get(url)
# make sure we don't dos knack
time.sleep(2)
if not (200 <= res.status_code <= 300):
return
logger.debug("Found promising page with id %d!", id)
content = res.content
soup = BeautifulSoup(content, "html.parser")
pC = soup.find("div", {"class": "postContent"})
if pC is None:
# not a normal post
logger.debug(
"Page with id %d does not have a .pageContent-div. Skipping for now.", id
)
return
# every post has these fields
title = pC.find("h3", {"class": "postTitle"}).text
postText = pC.find("div", {"class": "postText"})
# these fields are possible but not required
# TODO: cleanup
try:
date_parts = pC.find("span", {"class": "singledate"}).text.split(' ')
day = int(date_parts[0][:-1])
months = {'Januar': 1, 'Februar': 2, 'März': 3, 'April': 4, 'Mai': 5, 'Juni': 6, 'Juli': 7, 'August': 8, 'September': 9, 'Oktober': 10, 'November': 11, 'Dezember': 12}
month = months[date_parts[1]]
year = int(date_parts[2])
parsed_date = datetime(year, month, day)
except Exception:
parsed_date = None
try:
author = pC.find("span", {"class": "author"}).text
except AttributeError:
author = None
try:
category = pC.find("span", {"class": "categoryInfo"}).find_all()
category = [c.text for c in category if c.text != 'Alle Artikel']
category = ";".join(category)
except AttributeError:
category = None
try:
tags = [x.text for x in pC.find("div", {"class": "tagsInfo"}).find_all("a")]
tags = ";".join(tags)
except AttributeError:
tags = None
img = pC.find("img", {"class": "postImage"})
if img is not None:
img = img["src"]
res_dict = {
"id": id,
"title": title,
"author": author,
"date": parsed_date,
"category": category,
"url": url,
"img_link": img,
"tags": tags,
"text": postText.text,
"html": str(postText),
"scraped_at": datetime.now(),
}
return res_dict
def run_downloads(min_id: int, max_id: int, num_threads: int = 8):
res = []
logger.info(
"Started parallel scrape of posts from id %d to id %d using %d threads.",
min_id,
max_id - 1,
num_threads,
)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
# Use a list comprehension to create a list of futures
futures = [executor.submit(download, i) for i in range(min_id, max_id)]
for future in tqdm.tqdm(
futures, total=max_id - min_id
): # tqdm to track progress
post = future.result()
if post is not None:
res.append(post)
# sqlite can't handle lists so let's convert them to a single row csv
# TODO: make sure our database is properly normalized
postdf = pd.DataFrame(res)
tagdf = None
posttotagdf = None
categorydf = None
postcategorydf = None
# Extract and create tags dataframe
if not postdf.empty and 'tags' in postdf.columns:
# Collect all unique tags
all_tags = set()
for tags_str in postdf['tags']:
if pd.notna(tags_str):
tags_list = [tag.strip() for tag in tags_str.split(';')]
all_tags.update(tags_list)
# Create tagdf with id and text columns
if all_tags:
all_tags = sorted(list(all_tags))
tagdf = pd.DataFrame({
'id': range(len(all_tags)),
'tag': all_tags
})
# Create posttotagdf mapping table
rows = []
for post_id, tags_str in zip(postdf['id'], postdf['tags']):
if pd.notna(tags_str):
tags_list = [tag.strip() for tag in tags_str.split(';')]
for tag_text in tags_list:
tag_id = tagdf[tagdf['tag'] == tag_text]['id'].values[0]
rows.append({'post_id': post_id, 'tag_id': tag_id})
if rows:
posttotagdf = pd.DataFrame(rows)
# Extract and create categories dataframe
if not postdf.empty and 'category' in postdf.columns:
# Collect all unique categories
all_categories = set()
for category_str in postdf['category']:
if pd.notna(category_str):
category_list = [cat.strip() for cat in category_str.split(';')]
all_categories.update(category_list)
# Create categorydf with id and category columns
if all_categories:
all_categories = sorted(list(all_categories))
categorydf = pd.DataFrame({
'id': range(len(all_categories)),
'category': all_categories
})
# Create postcategorydf mapping table
rows = []
for post_id, category_str in zip(postdf['id'], postdf['category']):
if pd.notna(category_str):
category_list = [cat.strip() for cat in category_str.split(';')]
for category_text in category_list:
category_id = categorydf[categorydf['category'] == category_text]['id'].values[0]
rows.append({'post_id': post_id, 'category_id': category_id})
if rows:
postcategorydf = pd.DataFrame(rows)
return postdf, tagdf, posttotagdf, categorydf, postcategorydf
def main():
num_threads = int(os.environ.get("NUM_THREADS", 8))
n_scrapes = int(os.environ.get("NUM_SCRAPES", 100))
database_location = os.environ.get("DATABASE_LOCATION", "../data/knack.sqlite")
logger.debug(f"Started Knack Scraper: \nNUM_THREADS: {num_threads}\nN_SCRAPES: {n_scrapes}\nDATABASE_LOCATION: {database_location}")
con = sqlite3.connect(database_location)
with con:
post_table_exists = table_exists("posts", con)
if post_table_exists:
logger.info("found posts retrieved earlier")
# retrieve max post id from db so
# we can skip retrieving known posts
max_id_in_db = con.execute("SELECT MAX(id) FROM posts").fetchone()[0]
logger.info("Got max id %d!", max_id_in_db)
else:
logger.info("no posts scraped so far - starting from 0")
# retrieve from 0 onwards
max_id_in_db = -1
con = sqlite3.connect(database_location)
postdf, tagdf, posttotagdf, categorydf, postcategorydf = run_downloads(
min_id=max_id_in_db + 1,
max_id=max_id_in_db + n_scrapes,
num_threads=num_threads,
)
postdf.to_sql("posts", con, if_exists="append")
# Handle tags dataframe merging and storage
if tagdf is not None and not tagdf.empty:
# Check if tags table already exists
if table_exists("tags", con):
# Read existing tags from database
existing_tagdf = pd.read_sql("SELECT id, tag FROM tags", con)
# Merge new tags with existing tags, avoiding duplicates
merged_tagdf = pd.concat([existing_tagdf, tagdf], ignore_index=False)
merged_tagdf = merged_tagdf.drop_duplicates(subset=['tag'], keep='first')
merged_tagdf = merged_tagdf.reset_index(drop=True)
merged_tagdf['id'] = range(len(merged_tagdf))
# Drop the old table and insert the merged data
con.execute("DROP TABLE tags")
con.commit()
merged_tagdf.to_sql("tags", con, if_exists="append", index=False)
# Update tag_id references in posttotagdf
if posttotagdf is not None and not posttotagdf.empty:
#tag_mapping = dict(zip(tagdf['tag'], tagdf['id']))
posttotagdf['tag_id'] = posttotagdf['tag_id'].map(
lambda old_id: merged_tagdf[merged_tagdf['tag'] == tagdf.loc[old_id, 'tag']]['id'].values[0]
)
else:
# First time creating tags table
tagdf.to_sql("tags", con, if_exists="append", index=False)
# Store posttags (post to tags mapping)
if posttotagdf is not None and not posttotagdf.empty:
posttotagdf.to_sql("posttags", con, if_exists="append", index=False)
# Handle categories dataframe merging and storage
if categorydf is not None and not categorydf.empty:
# Check if categories table already exists
if table_exists("categories", con):
# Read existing categories from database
existing_categorydf = pd.read_sql("SELECT id, category FROM categories", con)
# Merge new categories with existing categories, avoiding duplicates
merged_categorydf = pd.concat([existing_categorydf, categorydf], ignore_index=False)
merged_categorydf = merged_categorydf.drop_duplicates(subset=['category'], keep='first')
merged_categorydf = merged_categorydf.reset_index(drop=True)
merged_categorydf['id'] = range(len(merged_categorydf))
# Drop the old table and insert the merged data
con.execute("DROP TABLE categories")
con.commit()
merged_categorydf.to_sql("categories", con, if_exists="append", index=False)
# Update category_id references in postcategorydf
if postcategorydf is not None and not postcategorydf.empty:
postcategorydf['category_id'] = postcategorydf['category_id'].map(
lambda old_id: merged_categorydf[merged_categorydf['category'] == categorydf.loc[old_id, 'category']]['id'].values[0]
)
else:
# First time creating categories table
categorydf.to_sql("categories", con, if_exists="append", index=False)
# Store postcategories (post to categories mapping)
if postcategorydf is not None and not postcategorydf.empty:
postcategorydf.to_sql("postcategories", con, if_exists="append", index=False)
logger.info(f"scraped new entries. number of new posts: {len(postdf.index)}")
if __name__ == "__main__":
main()