deathclock/utils/news.py

103 lines
3.2 KiB
Python
Executable file

import asyncio
import random
import socket
from time import localtime, strftime
import aiofiles
import aiohttp
import feedparser
def print_time():
print(strftime("%B %d, %I:%M %p", localtime()))
class News:
def __init__(self):
socket.setdefaulttimeout(10)
async def _fetch_feed(self, session, feed):
"""Fetches and parses a single feed asynchronously."""
max_entries = 10
try:
# Add timeout to the request
timeout = aiohttp.ClientTimeout(total=5)
async with session.get(feed, timeout=timeout) as response:
if response.status != 200:
print(f"Skip feed {feed}: status {response.status}")
return []
text = await response.text()
d = feedparser.parse(text)
if hasattr(d, "status") and d.status != 200:
print(f"Skip feed {feed}: status {d.status}")
return []
feed_entries = []
# Limit the number of entries parsed
for i, post in enumerate(d.entries):
if i >= max_entries:
break
feed_entries.append(
{
"title": post.title,
"source": d.feed.title
if hasattr(d.feed, "title")
else "Unknown",
"publish_date": post.published
if hasattr(post, "published")
else "",
"summary": post.summary if hasattr(post, "summary") else "",
}
)
print(f"Added {len(feed_entries)} entries from {feed}")
return feed_entries
except aiohttp.ClientError as e:
print(f"Error processing feed {feed}: {e}")
return []
except Exception as e:
print(f"Error processing feed {feed}: {e}")
return []
async def get_news(self):
print_time()
feeds = []
self._news_dict = {}
self._news_dict_length = 0
try:
async with aiofiles.open("feeds.txt", "r") as f:
async for line in f:
feeds.append(line.strip())
except Exception as e:
print(f"Error reading feeds.txt: {e}")
return {}
# Limit the number of feeds to process at once
if len(feeds) > 10:
feeds = random.sample(feeds, 10)
print("Getting news entries...")
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = [self._fetch_feed(session, feed) for feed in feeds]
all_feed_entries_list = await asyncio.gather(*tasks, return_exceptions=True)
all_entries = []
for result in all_feed_entries_list:
if isinstance(result, list) and result:
all_entries.extend(result)
if not all_entries:
print("No entries collected")
return []
if len(all_entries) > 30:
all_entries = random.sample(all_entries, 30)
return all_entries