forked from lukaszett/Knack-Scraper
Adds Node to precalculate jsons for visualisations
This commit is contained in:
parent
7c2e34906e
commit
d9d0441ddd
3 changed files with 194 additions and 18 deletions
|
|
@ -215,6 +215,7 @@ def create_default_pipeline(device: str = "cpu",
|
|||
from author_node import NerAuthorNode, FuzzyAuthorNode
|
||||
from embeddings_node import TextEmbeddingNode, UmapNode
|
||||
from url_node import URLNode
|
||||
from to_d3_node import ToD3Node
|
||||
|
||||
pipeline = ParallelPipeline(max_workers=max_workers, use_processes=False)
|
||||
|
||||
|
|
@ -261,6 +262,21 @@ def create_default_pipeline(device: str = "cpu",
|
|||
name='UmapNode'
|
||||
))
|
||||
|
||||
pipeline.add_node(NodeConfig(
|
||||
node_class=ToD3Node,
|
||||
dependencies=[
|
||||
'UmapNode',
|
||||
'TextEmbeddingNode',
|
||||
'FuzzyAuthorNode',
|
||||
'AuthorNode',
|
||||
'URLNode'
|
||||
],
|
||||
node_kwargs={
|
||||
'output_path': './data/json/'
|
||||
},
|
||||
name='ToD3Node'
|
||||
))
|
||||
|
||||
# TODO: Create Node to compute Text Embeddings and UMAP.
|
||||
|
||||
# pipeline.add_node(NodeConfig(
|
||||
|
|
|
|||
102
transform/to_d3_node.py
Normal file
102
transform/to_d3_node.py
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
"""Node to query data from the database and generate individual json file
|
||||
for visualisations in the d3.js framework"""
|
||||
import sqlite3
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
|
||||
from pipeline import TransformContext
|
||||
from transform_node import TransformNode
|
||||
|
||||
logger = logging.getLogger("knack-transform")
|
||||
|
||||
class ToD3Node(TransformNode):
|
||||
"""Node that takes the data in a sqlite3 database and generates visualisation data
|
||||
as json files in a specific folder.
|
||||
"""
|
||||
|
||||
def __init__(self, output_path: str):
|
||||
self.output_path = output_path
|
||||
self.queries = {
|
||||
'authors': 'select name, min(type) as type, count(posts.id) as count from authors inner join post_authors on authors.id = author_id inner join posts on posts.id = post_id group by name order by count desc limit 25;',
|
||||
'categories': "select category, count(id) as count from categories inner join postcategories on id = category_id group by category order by count desc limit 35;",
|
||||
'posts_per_month': "SELECT strftime('%Y-%m', date) AS month, category, COUNT(*) AS count FROM posts WHERE date > '2020-01-01' AND category NOT NULL GROUP BY strftime('%Y-%m', date), category ORDER BY month;",
|
||||
'tag_chords': "SELECT t1.tag AS source, t2.tag AS target, COUNT(*) AS weight FROM posttags pt1 JOIN posttags pt2 ON pt1.post_id = pt2.post_id AND pt1.tag_id < pt2.tag_id JOIN tags t1 ON t1.id = pt1.tag_id JOIN tags t2 ON t2.id = pt2.tag_id GROUP BY t1.tag, t2.tag HAVING weight > 1 ORDER BY weight DESC;",
|
||||
'tags': 'select tag, count(id) as count from tags inner join posttags on id = tag_id group by tag order by count desc limit 35;',
|
||||
'urls_l1': "SELECT 'knack[punkt]news' AS source, CASE WHEN tld_count < 10 THEN 'other' ELSE tld END AS target, SUM(tld_count) AS value FROM (SELECT tld, COUNT(*) as tld_count FROM urls WHERE tld IS NOT NULL GROUP BY tld ) GROUP BY target;",
|
||||
'urls_l2': "SELECT tld AS source, CASE WHEN host_count < 10 THEN 'other' ELSE host END AS target, SUM(host_count) AS value FROM (SELECT tld, host, COUNT(*) as host_count FROM urls WHERE tld IS NOT NULL AND host IS NOT NULL GROUP BY tld, host) WHERE source != '' AND target != 'other' GROUP BY tld, target"
|
||||
}
|
||||
super().__init__()
|
||||
logger.info(f"Init ToD3Node, Storing files to {self.output_path}")
|
||||
|
||||
def _query_db(self, con: sqlite3.Connection, query: str):
|
||||
cursor = con.cursor()
|
||||
cursor.execute(query)
|
||||
r = [dict((cursor.description[i][0], value) \
|
||||
for i, value in enumerate(row)) for row in cursor.fetchall()]
|
||||
return r
|
||||
|
||||
def _calculate_files(self, con: sqlite3.Connection):
|
||||
for key in self.queries.keys():
|
||||
q = self._query_db(con, self.queries[key])
|
||||
with open(f'{self.output_path}{key}.json', 'w') as f:
|
||||
f.write(json.dumps(q))
|
||||
|
||||
return len(self.queries.keys())
|
||||
|
||||
|
||||
def run(self, con: sqlite3.Connection, context: TransformContext):
|
||||
"""Executes the toD3 Node
|
||||
Writes to a bunch of files, each for each query.
|
||||
|
||||
Args:
|
||||
con (sqlite3.Connection): SQLite database connection
|
||||
context (TransformContext): TransformContext, containing the input
|
||||
dataframe of all post.
|
||||
|
||||
Returns:
|
||||
TransformContext with processed dataframe.
|
||||
"""
|
||||
logger.info("Starting ToD3Node transformation")
|
||||
|
||||
if not os.path.isdir(self.output_path):
|
||||
logger.warning(f"output_dir does not exist, creating dir...")
|
||||
os.mkdir(self.output_path)
|
||||
|
||||
count = self._calculate_files(con)
|
||||
|
||||
logger.info(f"Successfully generated {count} json files.")
|
||||
|
||||
return context
|
||||
|
||||
def main():
|
||||
import sys
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout)
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger("knack-transform")
|
||||
|
||||
# Connect to database
|
||||
db_path = "/Users/linussilberstein/Documents/Knack-Scraper/data/knack.sqlite"
|
||||
con = sqlite3.connect(db_path)
|
||||
|
||||
try:
|
||||
context = TransformContext(None)
|
||||
|
||||
node = ToD3Node('/Users/linussilberstein/Documents/Knack-Scraper/data/json/')
|
||||
|
||||
context = node.run(con, context)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during transformation: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
con.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue