redo parsing do to zfire being down. added initial knaben search parse

This commit is contained in:
death916 2025-12-17 03:55:38 -08:00
parent b61b888b22
commit d73b179181
6 changed files with 79 additions and 180 deletions

1
.gitignore vendored
View file

@ -89,3 +89,4 @@ uv.lock
old.pyproject.toml
.github
.flox

1
.python-version Normal file
View file

@ -0,0 +1 @@
3.13

0
README.md Normal file
View file

View file

@ -3,7 +3,7 @@ name = "c2cscrape"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.13"
dependencies = [
"bs4>=0.0.2",
"requests>=2.32.5",

21
shell.nix Normal file
View file

@ -0,0 +1,21 @@
let
nixconfig = builtins.getFlake "github:death916/nixconfig";
unstable = nixconfig.inputs.nixpkgs-unstable.legacyPackages.x86_64-linux;
pkgs = nixconfig.inputs.nixpkgs.legacyPackages.x86_64-linux;
in
pkgs.mkShell {
packages = with pkgs; [
python313Packages.uv
python313Packages.ninja
python313Packages.numpy
bun
];
shellHook = ''
source .venv/bin/activate
# export PATH="${pkgs.bun}/bin:$PATH"
# export BUN_INSTALL="${pkgs.bun}/bin/bun"
export REFLEX_USE_SYSTEM_BUN=True
echo venv activated and bun version set
'';
}

View file

@ -5,16 +5,16 @@ import os
import random
import re
import time
from calendar import c
import requests
from bs4 import BeautifulSoup
class C2CScrape:
class TorrentScrape:
def __init__(self):
self.url = "https://zfirelight.blogspot.com/"
self.url = "https://knaben.org/search/coast%20to%20coast%20am/0/1/date"
self.episodes = []
self.download_amount = 5
self.last_download = None
self.last_download_link = None
self.headers = {
@ -27,188 +27,64 @@ class C2CScrape:
# Remove or replace invalid filename characters
return re.sub(r'[<>:"/\\|?*]', "-", filename)
def get_drive_link(self, url):
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
self.soup = BeautifulSoup(response.text, "html.parser")
iframes = self.soup.find_all("iframe")
for iframe in iframes:
src = iframe.get("src")
if src and "drive.google.com" in src:
print("Found drive link:", src)
return src
except requests.RequestException as e:
print(f"Error fetching page: {e}")
return None
def get_episode_info(self, soup):
title_element = soup.find("h3", class_="post-title entry-title")
if not title_element:
return None
title_link = title_element.find("a")
if not title_link:
return None
full_title = title_link.text
date_str = full_title.split(" ")[0]
return {"title": full_title, "date": date_str, "url": title_link["href"]}
def create_download_link(self):
url = self.get_drive_link(self.url)
if not url:
return None
print("Creating download link for:", url)
try:
cleaned_url = url.split("/file/d/")[1].split("/")[0]
download_link = (
f"https://drive.google.com/uc?export=download&id={cleaned_url}"
)
print("Download link:", download_link)
return download_link
except IndexError:
print("Error: Invalid URL format")
return None
def download_episode(self, url):
try:
episode_data = self.get_episode_info(self.soup)
if not episode_data:
print("Error: Could not get episode info")
return
# Create downloads directory if it doesn't exist
download_dir = "./downloads"
os.makedirs(download_dir, exist_ok=True)
# Get current date
date = datetime.datetime.now().strftime("%Y-%m-%d")
# Create sanitized filename
filename = f"{episode_data['title']}.mp4"
safe_filename = self.sanitize_filename(filename)
filepath = os.path.join(download_dir, safe_filename)
# Check if file already exists
if os.path.exists(filepath):
print(f"File already exists: {safe_filename}")
return
# Download the file
response = requests.get(url, headers=self.headers)
response.raise_for_status()
with open(filepath, "wb") as f:
f.write(response.content)
print(f"Downloaded: {safe_filename}")
print("sleeping for 3-7 seconds")
time.sleep(random.randint(3, 7))
# Update last download info
self.episodes_downloaded += 1
self.last_download = date
self.last_download_link = url
except requests.RequestException as e:
print(f"Error downloading episode: {e}")
except Exception as e:
print(f"Error: {e}")
def is_duplicate_file(self, soup):
try:
episode_data = self.get_episode_info(soup)
if not episode_data:
return False
date = datetime.datetime.now().strftime("%Y-%m-%d")
filename = f"{episode_data['title']} {date}.mp4"
safe_filename = self.sanitize_filename(filename)
filepath = os.path.join("downloads", safe_filename)
return os.path.exists(filepath)
except Exception as e:
print(f"Error checking duplicate: {e}")
return False
def process_episode(self):
try:
drive_url = self.get_drive_link(self.url) # This sets self.soup
if not drive_url:
return
if self.is_duplicate_file(self.soup):
print("Episode already exists, skipping download")
return
download_url = self.create_download_link()
if download_url:
self.download_episode(download_url)
except requests.RequestException as e:
print(f"Error processing episode: {e}")
except Exception as e:
print(f"Error: {e}")
# timer to check for new episodes every 12 hours
def start(self):
# reset base url
self.url = "https://zfirelight.blogspot.com/"
original_url = self.url
try:
# Run our core operations
self.process_episode()
self.get_older_posts()
print(f"Episodes downloaded: {self.episodes_downloaded}")
finally:
self.url = original_url
self.last_download = None
self.last_download_link = None
self.episodes_downloaded = 0
# navigate to older posts button 5 times and get last 5 episodes with no repeats/ span id is blog-pager-older-link
def get_older_posts(self, limit=5):
def get_torrent_page(self):
try:
response = requests.get(self.url, headers=self.headers)
soup = BeautifulSoup(response.text, "html.parser")
older_posts = soup.find("span", id="blog-pager-older-link")
processed_urls = set()
posts_processed = 0
while older_posts and posts_processed < limit:
older_link = older_posts.find("a")["href"]
if older_link in processed_urls:
break
processed_urls.add(older_link)
# Get the older posts page
self.url = older_link # Update URL to use existing functions
print(f"Processing page: {older_link}")
# Use existing process_episode method
self.process_episode()
posts_processed += 1
# Get next page of older posts
response = requests.get(older_link, headers=self.headers)
soup = BeautifulSoup(response.text, "html.parser")
older_posts = soup.find("span", id="blog-pager-older-link")
response.raise_for_status()
print(response)
return response.text
except requests.RequestException as e:
print(f"Error fetching older posts: {e}")
except Exception as e:
print(f"Error: {e}")
print(f"Error fetching page: {e}")
return None
def get_episode_info(self):
page = self.get_torrent_page()
if not page:
print("No page found")
return None
soup = BeautifulSoup(page, "html.parser")
main_class = soup.find("div", class_="p-3")
if not main_class:
print("No main class found")
return None
episode_elems = main_class.select(".text-wrap.w-100")
if episode_elems:
print(
f"Found {len(episode_elems)} episode elements using selector .text-wrap.w-100"
)
if len(episode_elems) > self.download_amount:
print(
f"Too many episodes found, only downloading last {self.download_amount}"
)
episode_elems = episode_elems[: self.download_amount]
for ep in episode_elems:
a = ep.find("a")
if not a:
continue
title = a.get("title") or a.get_text(strip=True)
# skip episodes that don't start with "Coast" as sometimes they show up in search
if not title.startswith("Coast"):
print(f"Skipping episode {title}")
continue
link = a.get("href")
self.episodes_downloaded += 1
print(f"found episode {title}")
# print(f"link: {link}")
print("done")
return # need to return link later to qbit but need to decide logic
class Qbittorrent:
pass
if __name__ == "__main__":
c2c = C2CScrape()
# Start initial timer immediately
c2c.start()
c2c = TorrentScrape()
c2c.get_episode_info()
# Keep main thread alive with minimal resource usage
"""
try: