import asyncio import random import socket from time import localtime, strftime import aiofiles import aiohttp import feedparser def print_time(): print(strftime("%B %d, %I:%M %p", localtime())) class News: def __init__(self): socket.setdefaulttimeout(10) async def _fetch_feed(self, session, feed): """Fetches and parses a single feed asynchronously.""" max_entries = 10 try: # Add timeout to the request timeout = aiohttp.ClientTimeout(total=5) async with session.get(feed, timeout=timeout) as response: if response.status != 200: print(f"Skip feed {feed}: status {response.status}") return [] text = await response.text() d = feedparser.parse(text) if hasattr(d, "status") and d.status != 200: print(f"Skip feed {feed}: status {d.status}") return [] feed_entries = [] # Limit the number of entries parsed for i, post in enumerate(d.entries): if i >= max_entries: break feed_entries.append( { "title": post.title, "source": d.feed.title if hasattr(d.feed, "title") else "Unknown", "publish_date": post.published if hasattr(post, "published") else "", "summary": post.summary if hasattr(post, "summary") else "", } ) print(f"Added {len(feed_entries)} entries from {feed}") return feed_entries except aiohttp.ClientError as e: print(f"Error processing feed {feed}: {e}") return [] except Exception as e: print(f"Error processing feed {feed}: {e}") return [] async def get_news(self): print_time() feeds = [] self._news_dict = {} self._news_dict_length = 0 try: async with aiofiles.open("feeds.txt", "r") as f: async for line in f: feeds.append(line.strip()) except Exception as e: print(f"Error reading feeds.txt: {e}") return {} # Limit the number of feeds to process at once if len(feeds) > 10: feeds = random.sample(feeds, 10) print("Getting news entries...") timeout = aiohttp.ClientTimeout(total=15) async with aiohttp.ClientSession(timeout=timeout) as session: tasks = [self._fetch_feed(session, feed) for feed in feeds] all_feed_entries_list = await asyncio.gather(*tasks, return_exceptions=True) all_entries = [] for result in all_feed_entries_list: if isinstance(result, list) and result: all_entries.extend(result) if not all_entries: print("No entries collected") return [] if len(all_entries) > 30: all_entries = random.sample(all_entries, 30) return all_entries