Merge pull request #1 from Death916/reflex

merging reflex
This commit is contained in:
Death916 2026-01-21 03:17:50 -08:00 committed by GitHub
commit 150e64e92f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
34 changed files with 3078 additions and 7430 deletions

10
.gitignore vendored Normal file → Executable file
View file

@ -1,5 +1,15 @@
.web
.states
assets/external/
*.db
__pycache__/
*.py[cod]
*.png
*.txt
chromedriver
*.pyc
.pyc
*.jpg
*.flox
*.direnv
.envrc

9
README.MD Executable file
View file

@ -0,0 +1,9 @@
# In progress
<img width="1163" height="828" alt="1762349062" src="https://github.com/user-attachments/assets/04888f36-096b-45a6-917f-9d9a9a7b0115" />
Alarm clock dashboard with Raspberry Pi 4
Shows NFL MLB and NBA scores and Current Weather
Has news scroll based off RSS feeds.
rda5807m library from https://github.com/franckinux/python-rd5807m

BIN
assets/favicon.ico Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

View file

@ -1,4 +0,0 @@
*.png
*.txt
chromedriver
*.pyc

1
deathclock/__init__.py Normal file → Executable file
View file

@ -1 +0,0 @@
__version__ = '0.1.0'

View file

@ -1,35 +0,0 @@
import datetime
from dash import html, dcc, Input, Output, State
import alarm
class AlarmModule:
def __init__(self, app):
self.app = app
self.alarm_obj = self.get_alarm_object()
self._alarm_time = None
self.setup_callbacks()
def get_alarm_object(self):
return alarm.Alarm()
def setup_callbacks(self):
@self.app.callback(
Output('time-input', 'style'),
Input('clock-display', 'n_clicks'),
State('time-input', 'style')
)
def toggle_time_input(n_clicks, current_style):
if n_clicks and n_clicks % 2 == 1:
return {'display': 'block', 'margin': '0 auto'}
return {'display': 'none', 'margin': '0 auto'}
@self.app.callback(
Output('output-selected-time', 'children'),
Input('time-input', 'value')
)
def process_selected_time(time_value):
if time_value:
self._alarm_time = time_value
self.alarm_obj.add_alarm(time_value, datetime.datetime.now())
return f'Alarm time: {time_value}'
return 'No time selected yet.'

View file

@ -1,57 +0,0 @@
from dash import Dash, html, dcc
from weather_module import WeatherModule
from news_module import NewsModule
from scores_module import ScoresModule
from alarm_module import AlarmModule
from clock_module import ClockModule
def create_app():
app = Dash(__name__)
app.layout = html.Div([
html.H1(id='clock-display', style={'textAlign': 'center', 'cursor': 'pointer'}),
dcc.Input(id='time-input', type='time', style={'display': 'none', 'margin': '0 auto'}),
html.Div(id='output-selected-time', style={'textAlign': 'center', 'marginBottom': '20px'}),
html.Div([
html.Div([
html.H2("NBA Scores"),
html.Div(id='nba-scores-display', className='score-container')
], id='nba-container', style={'flex': '1'}),
html.Div(id='weather-display', style={"display": "flex", "justify-content": "center", "margin-bottom":"20px", 'flex':'1'}),
html.Div([
html.H2("MLB Scores"),
html.Div(id='mlb-scores-display', className='score-container',style={'column-count': '2', 'column-gap': '10px'})
], id='mlb-container', style={'flex': '1'}),
], id='main-content-container', style={"display": "flex", "gap": "20px", 'flex-wrap': 'wrap'}),
html.Div(id='news-ticker'),
dcc.Interval(id='clock-interval', interval=60000, n_intervals=0),
dcc.Interval(id='weather-interval', interval=550000, n_intervals=0),
dcc.Interval(id='news-interval', interval=300000, n_intervals=0),
dcc.Interval(id='nba-interval', interval=300000, n_intervals=0),
dcc.Interval(id='mlb-interval', interval=300000, n_intervals=0)
])
ClockModule(app)
WeatherModule(app)
NewsModule(app)
scores_module = ScoresModule(app)
scores_module.setup_mlb_callbacks()
alarm_module = AlarmModule(app)
def check_alarms():
trigg = alarm_module.alarm_obj.check_alarm()
if trigg:
print("ALARM TRIGGERED!")
check_alarms()
return app
if __name__ == '__main__':
app = create_app()
app.run_server(debug=False, host='0.0.0.0', port=8050)

View file

@ -1,224 +0,0 @@
body {
background-color: #1a1a2e;
color: #e6e6fa;
font-family: 'Arial', sans-serif;
margin: 0;
padding: 10px;
font-size: 16px;
overflow-x: hidden;
min-width: 360px;
}
h1 {
font-size: 2rem;
margin-bottom: 10px;
}
h2 {
font-size: 1.5rem;
margin-bottom: 10px;
}
/* Main Content Container (Scores and Weather) */
#main-content-container {
display: flex;
flex-wrap: wrap;
gap: 20px;
width: 100%;
justify-content: center;
}
#nba-container,
#mlb-container {
flex: 1;
min-width: 300px;
}
/* Individual Score Containers */
.score-container {
width: 100%;
padding: 0; /* Remove padding from score container */
margin: 0; /* remove margin from score container */
}
#mlb-scores-display {
column-count: 1;
}
.score-box {
background-color: #232338;
padding: 2px 5px; /* Reduced padding on score boxes */
margin-bottom: 2px; /* Reduced margin on score boxes */
border-radius: 6px;
border: 1px solid #4a4a82;
display: flex; /* Use flexbox for inner alignment */
flex-direction: column; /* Stack the game-score and game-period vertically */
justify-content: center; /* Center vertically */
}
.game-score {
font-size: 0.9em;
color: #e6e6fa;
text-align: center;
margin: 2px 0; /* Add a small top/bottom margin */
}
.game-period {
font-size: 0.8em;
color: #b39ddb;
text-align: center;
margin: 2px 0; /* Add a small top/bottom margin */
}
/* Weather Styles */
#weather-display {
background-color: #232338;
border-radius: 10px;
padding: 10px;
margin: 10px auto;
max-width: 100%;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2);
border: 1px solid #4a4a82;
transition: all 0.3s ease;
flex: 1;
}
#weather-display:hover {
box-shadow: 0 6px 8px rgba(179, 157, 219, 0.2);
transform: translateY(-2px);
}
#weather-display img {
width: 100%;
border-radius: 8px;
margin-top: 5px;
}
/* News Ticker Styles */
.ticker {
position: fixed;
bottom: 0;
width: 100%;
overflow: hidden;
white-space: nowrap;
background-color: #232338;
padding: 5px;
border-radius: 8px;
border: 1px solid #4a4a82;
}
.news-item {
display: inline-block;
padding-right: 30px;
color: #d1c4e9;
animation: ticker linear infinite;
}
@keyframes ticker {
0% {
transform: translateX(100%);
}
100% {
transform: translateX(-100%);
}
}
/* Media Queries */
@media (min-width: 750px) {
/* Landscape layout */
body {
padding: 20px;
font-size: 18px;
max-width: 1500px;
margin: 0 auto;
}
#main-content-container {
flex-wrap: nowrap;
justify-content: space-around;
}
#mlb-scores-display {
column-count: 2;
}
.score-container {
padding: 0; /* Remove padding from score container */
margin: 0; /* remove margin from score container */
}
.score-box {
padding: 2px 5px; /* Reduced padding on score boxes */
margin-bottom: 2px; /* Reduced margin on score boxes */
}
#weather-display {
padding: 20px;
margin: 0;
max-width: 400px;
}
.ticker {
padding: 15px;
}
.news-item {
padding-right: 50px;
}
}
@media (max-width: 480px) {
/* Phone screens */
body {
font-size: 14px;
padding: 0;
transform-origin: top left;
transform: scale(0.6);
width: 166.66%;
margin-left: -33.33%;
min-width: 0;
}
h1 {
font-size: 1.5rem;
}
h2 {
font-size: 1.2rem;
}
#main-content-container {
gap: 1px;
}
#nba-container,
#mlb-container {
min-width: 0;
}
.score-container {
padding: 0; /* Remove padding from score container */
margin: 0; /* remove margin from score container */
}
.score-box {
padding: 2px 5px; /* Reduced padding on score boxes */
margin-bottom: 2px; /* Reduced margin on score boxes */
}
.game-score {
font-size: 0.8em;
margin: 2px 0; /* Add a small top/bottom margin */
}
.game-period {
font-size: 0.7em;
margin: 2px 0; /* Add a small top/bottom margin */
}
.ticker {
padding: 5px;
}
.news-item {
padding-right: 15px;
}
}

Binary file not shown.

File diff suppressed because it is too large Load diff

View file

@ -1,15 +0,0 @@
from time import strftime, localtime
from dash import Input, Output
class ClockModule:
def __init__(self, app):
self.app = app
self.setup_callbacks()
def setup_callbacks(self):
@self.app.callback(
Output('clock-display', 'children'),
Input('clock-interval', 'n_intervals')
)
def update_time(n):
return strftime("%B %d, %I:%M %p", localtime())

573
deathclock/deathclock.py Executable file
View file

@ -0,0 +1,573 @@
# deathclock.py
import asyncio
# from utils.alarm import Alarm
import logging
import time
from datetime import datetime, timezone
from typing import Any, Dict, List
import reflex as rx
from utils.news import News
from utils.radio import Radio
from utils.scores import NBAScores, mlbScores, nflScores
from utils.weather import Weather
WEATHER_IMAGE_PATH = "/weather.jpg" # Web path in assets folder
WEATHER_FETCH_INTERVAL = 360
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
)
class State(rx.State):
# --- State Variables ---
current_time: str = ""
alarm_time: str = ""
alarms: List = []
news: List[Dict[str, Any]] = []
current_news_index: int = 0
news_rotate_interval: int = 10
news_text: str = ""
nba_scores: List[Dict[str, Any]] = []
mlb_scores: List[Dict[str, Any]] = []
nfl_scores: List[Dict[str, Any]] = []
# --- Radio State ---
radio_stations: List[str] = []
radio_current_station: float = 90.9
radio_station_input: str = "90.9"
radio_is_playing: bool = False
radio_volume: int = 5
_news_client: News | None = None
last_weather_update: str = "Never"
weather_img: str = WEATHER_IMAGE_PATH
_weather_client: Weather | None = None
_mlb_client: mlbScores | None = None
_nba_client: NBAScores | None = None
_nfl_client: nflScores | None = None
_radio_client: Radio | None = None
last_sports_update: float = 0.0
last_weather_fetch_time: float = 0.0
last_weather_image_path: str = ""
# --- Initialize Utility Client ---
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize background clients
try:
self._weather_client = Weather()
self._news_client = News()
self._mlb_client = mlbScores()
self._nba_client = NBAScores()
self._nfl_client = nflScores()
# Initialize Radio
self._radio_client = Radio()
logging.info("Clients initialized successfully.")
except Exception as e:
logging.error(f"Failed to initialize clients: {e}", exc_info=True)
self._weather_client = None
self.weather_img = "/error_placeholder.png"
self.last_weather_update = "Client Init Error"
self.mlb_scores = []
self.nba_scores = []
self.last_sports_update = 0.0
# --- Radio Event Handlers ---
def toggle_radio(self):
if self._radio_client:
if self.radio_is_playing:
self._radio_client.off()
self.radio_is_playing = False
else:
self._radio_client.on()
self._radio_client.set_volume(self.radio_volume)
self._radio_client.set_station(self.radio_current_station)
self.radio_is_playing = True
def set_radio_volume(self, val: List[int]):
self.radio_volume = val[0]
if self._radio_client and self.radio_is_playing:
self._radio_client.set_volume(self.radio_volume)
def set_radio_station_input(self, val: str):
self.radio_station_input = val
def commit_radio_station(self):
try:
freq = float(self.radio_station_input)
self.radio_current_station = freq
if self._radio_client and self.radio_is_playing:
self._radio_client.set_station(freq)
logging.info(f"Station set to {freq}")
except ValueError:
logging.warning(f"Invalid station input: {self.radio_station_input}")
# Revert input to current valid station
self.radio_station_input = str(self.radio_current_station)
# --- on_load Handler ---
async def start_background_tasks(self):
"""Starts background tasks when the page loads."""
rx.remove_local_storage("chakra-ui-color-mode")
logging.info("Triggering background tasks for this session")
# Always return the tasks so they start for this specific user/tab
return [
State.fetch_weather,
State.fetch_sports,
State.fetch_news,
State.cycle_news,
]
@rx.event(background=True)
async def fetch_sports(self):
# Fetches sports scores periodically
while True:
try:
logging.info("Fetching sports scores...")
# Fetch MLB and NBA scores
# check if sports has updated in last 5 minutes if so skip
if (
self.last_sports_update
and (time.time() - self.last_sports_update) < 300
):
logging.info(
"Sports scores already updated within the last 5 minutes. Skipping fetch."
)
await asyncio.sleep(300)
continue
mlb_scores = await self._mlb_client.get_scores()
logging.info(f"MLB Scores: {mlb_scores}")
# Check if MLB scores are empty
if not mlb_scores:
logging.warning("No MLB scores fetched.")
async with self:
self.mlb_scores = []
yield
nba_scores = await self._nba_client.get_scores()
logging.info(f"NBA Scores: {nba_scores}")
# Check if NBA scores are empty
if not nba_scores:
logging.warning("No NBA scores fetched.")
async with self:
self.nba_scores = []
yield
nfl_scores = await self._nfl_client.get_scores()
logging.info(f"NFL Scores: {nfl_scores}")
# Check if NFL scores are empty
if not nfl_scores:
logging.warning("No NFL scores fetched.")
async with self:
self.nfl_scores = []
yield
# Update state with fetched scores
async with self:
self.mlb_scores = mlb_scores
self.nba_scores = nba_scores
self.nfl_scores = nfl_scores
self.last_sports_update = (
time.time()
) # Update last sports update time
logging.info(
f"Fetched {len(mlb_scores)} MLB scores and {len(nba_scores)} NBA scores and {len(nfl_scores)} NFL scores."
)
yield # Update frontend
except Exception as e:
logging.error(
f"Error in fetch_sports background task: {e}", exc_info=True
)
await asyncio.sleep(500)
@rx.event(background=True)
async def fetch_news(self):
while True:
try:
logging.info("Fetching news...")
news_items = await self._news_client.get_news()
if not news_items:
logging.warning("No news items fetched.")
async with self:
self.news = []
self.current_news_index = 0
yield
else:
logging.info(f"Fetched {len(news_items)} news items.")
async with self:
self.news = news_items
self.current_news_index = 0
yield
except Exception as e:
logging.error(
f"Error in fetch_news background task: {e}", exc_info=True
)
await asyncio.sleep(500)
@rx.event(background=True)
async def cycle_news(self):
while True:
try:
if self.news:
async with self:
self.current_news_index = self.current_news_index + 1
if self.current_news_index >= len(self.news):
self.current_news_index = 0
yield
except Exception as e:
logging.error(
f"Error in cycle_news background task: {e}", exc_info=True
)
await asyncio.sleep(self.news_rotate_interval)
# --- Weather Background Task ---
@rx.event(background=True)
async def fetch_weather(self):
"""Fetches the weather screenshot periodically."""
if not hasattr(self, "_weather_client") or self._weather_client is None:
logging.warning(
"Weather client not initialized. Stopping fetch_weather task."
)
async with self:
self.last_weather_update = "Error: Weather client unavailable"
yield
return
while True:
try:
if (
self.last_weather_fetch_time
and (time.time() - self.last_weather_fetch_time)
< WEATHER_FETCH_INTERVAL
):
logging.info(
"Recent weather fetch detected; skipping immediate fetch to avoid thrashing."
)
await asyncio.sleep(WEATHER_FETCH_INTERVAL)
continue
logging.info("Attempting to fetch weather screenshot...")
img_web_path = self._weather_client.get_weather_screenshot()
if img_web_path:
async with self:
self.weather_img = f"{img_web_path}"
self.last_weather_update = datetime.now(timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
self.last_weather_fetch_time = time.time()
logging.info(
f"State.weather_img updated to: {self.weather_img}"
)
yield
else:
logging.warning(
"get_weather_screenshot returned None. State not updated."
)
async with self:
self.last_weather_update = f"Failed fetch @ {datetime.now(timezone.utc).strftime('%H:%M:%S UTC')}"
yield
except Exception as e:
logging.error(
f"Error in fetch_weather background task: {e}", exc_info=True
)
async with self:
self.last_weather_update = (
f"Error @ {datetime.now(timezone.utc).strftime('%H:%M:%S UTC')}"
)
yield
logging.info("Weather fetch completed")
logging.info("Sleeping for next fetch")
await asyncio.sleep(WEATHER_FETCH_INTERVAL)
def index() -> rx.Component:
# Build NBA scores list (safe access with .get where appropriate)
nba_scores_list = rx.box(
rx.vstack(
rx.foreach(
State.nba_scores,
lambda score: rx.card(
rx.text(
f"{score.get('away_team', '?')} {score.get('away_score', '-')} @ "
f"{score.get('home_team', '?')} {score.get('home_score', '-')} "
f"(Status: {score.get('status', '?')})",
size="1",
),
width="100%",
padding="0.5",
variant="surface",
),
),
spacing="1",
align_items="stretch",
width="auto",
),
max_height="60vh",
overflow_y="auto",
padding="0.25rem",
)
nba_card = rx.card(
rx.box(
rx.text("NBA Scores"),
nba_scores_list,
),
width="auto",
padding="1",
)
# Weather card
weather_card = rx.card(
rx.vstack(
rx.heading("Weather", size="4"),
rx.image(
src=State.weather_img,
alt="Current weather conditions for Sacramento",
width="100%",
height="60vh",
object_fit="contain",
border_radius="var(--radius-3)",
padding_top="0",
padding_bottom="0",
),
rx.text(
f"Last Update: {State.last_weather_update}",
size="1",
color_scheme="gray",
padding_top="0",
),
align="center",
spacing="2",
)
)
# MLB scores list
mlb_scores_list = rx.vstack(
rx.foreach(
State.mlb_scores,
lambda score: rx.card(
rx.text(
f"{score.get('away_team', '?')} {score.get('away_score', '-')} @ "
f"{score.get('home_team', '?')} {score.get('home_score', '-')} "
f"({score.get('status', '?')})",
size="1",
),
size="1",
padding="1",
variant="surface",
width="100%",
),
),
spacing="1",
align_items="stretch",
width="100%",
)
mlb_card = rx.card(
rx.box(
rx.text("MLB Scores"),
mlb_scores_list,
)
)
nfl_scores_list = rx.vstack(
rx.foreach(
State.nfl_scores,
lambda score: rx.card(
rx.text(
f"{score.get('away_team', '?')} {score.get('away_score', '-')} @ "
f"{score.get('home_team', '?')} {score.get('home_score', '-')} "
f"({score.get('status', '?')})",
size="1",
),
size="1",
padding="1",
variant="surface",
width="100%",
),
),
spacing="1",
align_items="stretch",
width="100%",
)
nfl_card = rx.card(
rx.box(
rx.text("NFL Scores"),
nfl_scores_list,
)
)
# Main flexible content area
main_flex = rx.flex(
nba_card,
weather_card,
rx.vstack(
mlb_card,
nfl_card,
),
spacing="2",
width="100%",
justify="center",
align="baseline",
)
# Top clock button
clock_button = rx.button(
rx.moment(interval=1000, format="HH:mm:ss"),
font_size="4xl",
font_weight="bold",
color="white",
background_color="#6f42c1",
)
news_card = rx.card(
rx.box(
rx.center(
rx.text("News Headlines"),
),
rx.center(
rx.cond(
State.news,
rx.text(
rx.heading(State.news[State.current_news_index]["source"]),
rx.spacer(),
State.news[State.current_news_index]["title"],
rx.spacer(),
State.news[State.current_news_index]["publish_date"],
font_size="lg",
color="gray.500",
no_of_lines=1,
white_space="nowrap",
overflow="hidden",
text_overflow="ellipsis",
),
rx.text(
"No news available",
font_size="lg",
color="gray.500",
),
),
width="100%",
min_height="3.25rem",
),
padding="2",
),
variant="surface",
width="100%",
)
# Radio Card Component
radio_card = rx.popover.root(
rx.popover.trigger(rx.button("Radio")),
rx.popover.content(
rx.vstack(
rx.heading("Radio Control", size="3"),
rx.hstack(
rx.text("Status: "),
rx.cond(
State.radio_is_playing,
rx.text("Playing", color="green", weight="bold"),
rx.text("Stopped", color="red", weight="bold")
),
),
rx.button(
rx.cond(State.radio_is_playing, "Stop", "Play"),
on_click=State.toggle_radio,
width="100%"
),
rx.text(f"Volume: {State.radio_volume}"),
rx.slider(
min_=0,
max_=15,
value=[State.radio_volume],
on_change=State.set_radio_volume,
),
rx.text("Station (Freq)"),
rx.hstack(
rx.input(
value=State.radio_station_input,
on_change=State.set_radio_station_input,
placeholder="90.9",
width="100px",
),
rx.button("Set", on_click=State.commit_radio_station),
),
spacing="3",
),
),
)
# Compose the page
page = rx.container( # pyright: ignore[reportReturnType]
rx.theme_panel(default_open=False),
rx.flex(
rx.vstack(
rx.hstack(
radio_card,
clock_button,
),
main_flex,
news_card,
align="center",
spacing="2",
)
),
padding="2rem",
max_width="800px",
max_height="300px",
margin="0 auto",
flex_direction="column",
)
return page
""" # Commented out style block
style = {
"background_color": "black",
"color": "#ffffff",
"box_shadow": "0 4px 8px rgba(0, 0, 0, 0.2)",
"transition": "background-color 0.3s ease, color 0.3s ease",
"hover": {
"background_color": "#3a2b4d",
"color": "#ffffff",
},
}
"""
app = rx.App(
theme=rx.theme(
appearance="dark",
color_scheme="purple",
accent_color="purple",
radius="medium",
gray_color="mauve",
has_background=True,
),
# style=style # using theme instead
)
app.add_page(
index,
title="DeathClock",
on_load=State.start_background_tasks, # Trigger tasks when this page loads
)

View file

@ -1,107 +0,0 @@
#deathclock
import datetime
import time
import sys
from PySide6.QtGui import QGuiApplication
from PySide6.QtQml import QQmlApplicationEngine
from PySide6.QtCore import QTimer, QObject, Signal, Slot, Property
from time import strftime, localtime
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
import os
import news
import weather
class clock():
def update_time(self):
curr_time = strftime("%B %d, %I:%M %p", localtime())
engine.rootObjects()[0].setProperty('currTime', curr_time)
return curr_time
def time_and_date():
print(time.asctime())
def alarm():
alarm_time = input("what time should the alarm be set?")
def ring():
pass
"""
class gui():
def handleTouchAreaPressed(self, signal):
# Implement your desired behavior when the left area is pressed
print("here touch area")
leftTouchAreaMouse = engine.rootObjects()[0].findChild("leftTouchAreaMouse")
leftTouchAreaMouse.connect(b"touchAreaPressed", self.handleTouchAreaPressed)
print("Left area pressed!")
"""
def main():
#gui_obj = gui()
app = QGuiApplication(sys.argv)
global engine
engine = QQmlApplicationEngine()
engine.quit.connect(app.quit)
engine.load( 'main.qml')
# create instance of weather class
weather_obj = weather.Weather()
weather_obj.weatherUpdated.connect(lambda weather_map_path: engine.rootContext().setContextProperty("weatherMapPath", weather_map_path))
# set timer for weather map
weatherTimer = QTimer()
weatherTimer.setInterval(300000) # 10 minutes
weatherTimer.timeout.connect(weather_obj.download_sacramento_weather_map(engine))
weather_obj.download_sacramento_weather_map(engine)
weatherTimer.start()
# create instance of clock class
timeupdate = clock()
# start timer for clock
timer = QTimer()
timer.setInterval(100) # msecs 100 = 1/10th sec
timer.timeout.connect(timeupdate.update_time)
timer.start()
# create instance of news class
news_obj = news.news()
news_ticker = news_obj.get_news()
#print(news_obj._news_dict)
#print(news_ticker)
news_context = engine.rootContext()
news_context.setContextProperty("news", str(news_ticker))
#start timer for news
news_timer = QTimer()
news_timer.timeout.connect(news_obj.get_news)
news_timer.setInterval(300000) # 10 minutes #changed to 5 mins
news_timer.start()
sys.exit(app.exec_())
if __name__ == "__main__":
main()

View file

@ -1,66 +0,0 @@
import datetime
import asyncio
from dash import html, Input, Output, callback, no_update
from dash.exceptions import PreventUpdate
from news import News
class NewsModule:
def __init__(self, app):
self.app = app
self.news_obj = self.get_news_object()
self._last_news_update = datetime.datetime(2000, 1, 1)
self._cached_news = self.create_loading_message() # Initial loading message
self._initial_run = True
self.setup_callbacks()
def get_news_object(self):
return News()
def create_loading_message(self):
return html.Div("Loading...")
def setup_callbacks(self):
@self.app.callback(
Output('news-ticker', 'children'),
Input('news-interval', 'n_intervals')
)
def update_news(n):
if n is None:
return self._cached_news
current_time = datetime.datetime.now()
time_since_update = (current_time - self._last_news_update).total_seconds()
# Only update if it's been more than 5 minutes or it's the initial run
if time_since_update < 300 and not self._initial_run:
return self._cached_news
try:
print("UPDATING NEWS...")
# Create a new event loop for this request
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
headlines_dict = loop.run_until_complete(self.news_obj.get_news())
loop.close()
if not headlines_dict:
return html.Div("No news available at this time.", className="ticker")
combined_items = " | ".join([f"{data['source']}: {headline}"
for headline, data in headlines_dict.items()])
text_px = len(combined_items) * 8
scroll_speed = 75
duration = max(text_px / scroll_speed, 20)
ticker_style = {"animationDuration": f"{duration}s"}
self._cached_news = html.Div(
html.Span(combined_items, className="news-item", style=ticker_style),
className='ticker'
)
self._last_news_update = current_time
self._initial_run = False
return self._cached_news
except Exception as e:
print(f"Error updating news: {e}")
return html.Div("No news available.")

View file

@ -1,96 +0,0 @@
from nba_api.live.nba.endpoints import scoreboard
from datetime import datetime, timedelta
import statsapi
class NBAScores:
def __init__(self):
self._scores = []
def get_scores(self):
try:
# Get scoreboard data
board = scoreboard.ScoreBoard()
data = board.get_dict()
# Check if we have a valid scoreboard response
if 'scoreboard' not in data:
print("No scoreboard data found")
return []
games = data['scoreboard'].get('games', [])
if not games:
print("No games found in scoreboard")
return []
scores_list = []
for game in games:
try:
game_data = {
'home_team': game['homeTeam']['teamTricode'],
'home_score': game['homeTeam']['score'],
'away_team': game['awayTeam']['teamTricode'],
'away_score': game['awayTeam']['score'],
'period': game['period'],
'status': game['gameStatusText']
}
scores_list.append(game_data)
except KeyError as e:
print(f"Error processing game data: {e}")
continue
self._scores = scores_list
return self._scores
except Exception as e:
print(f"Error fetching scores: {e}")
return []
class mlbScores:
def __init__(self):
self._scores = []
self._games = []
def get_games(self):
try:
# Get MLB games data
games = statsapi.schedule()
self._games = games
return self._games
except Exception as e:
print(f"Error fetching MLB games: {e}")
return []
def get_scores(self):
games = self.get_games()
scores_list = []
if not games:
print("No mlb games found")
return []
for game in games:
try:
game_data = {
'home_team': game['home_name'],
'home_score': game['home_score'],
'away_team': game['away_name'],
'away_score': game['away_score'],
'status': game['status']
}
scores_list.append(game_data)
except KeyError as e:
print(f"Error processing game data: {e}")
continue
return scores_list # RETURN THE LIST
if __name__ == "__main__":
scores = NBAScores()
results = scores.get_scores()
print("\nNBA Scores:")
if results:
for game in results:
print(f"{game['away_team']} {game['away_score']} @ "
f"{game['home_team']} {game['home_score']} "
f"(Status: {game['status']})")
else:
print("No games available")

View file

@ -1,69 +0,0 @@
from dash import html, Input, Output
from scores import NBAScores # Import NBAScores class
from scores import mlbScores # Import mlbScores class
class ScoresModule:
def __init__(self, app):
self.app = app
self.scores_obj = self.get_scores_object()
self.mlb_scores_obj = self.get_mlb_scores_object()
self.setup_callbacks()
def get_scores_object(self):
return NBAScores()
def setup_callbacks(self):
@self.app.callback(
Output('nba-scores-display', 'children'),
Input('nba-interval', 'n_intervals')
)
def update_scores(n):
try:
games = self.scores_obj.get_scores()
if not games:
print("no nba games found")
return html.Div("No games available", className='text-center')
return html.Div([
html.Div([
html.Div([
html.Span(f"{game['away_team']} {game['away_score']}"),
html.Span(" @ "),
html.Span(f"{game['home_team']} {game['home_score']}")
], className='game-score'),
html.Div(f"Period: {game['period']}", className='game-period')
], className='score-box')
for game in games
], className='score-container')
except Exception as e:
return html.Div("Scores unavailable")
def get_mlb_scores_object(self):
return mlbScores()
def setup_mlb_callbacks(self):
@self.app.callback(
Output('mlb-scores-display', 'children'),
Input('mlb-interval', 'n_intervals')
)
def update_mlb_scores(n):
try:
games = self.mlb_scores_obj.get_scores()
print("Updating MLB Scores")
if not games:
print("no mlb games found")
return html.Div("No games available", className='text-center')
return html.Div([
html.Div([
html.Div([
html.Span(f"{game['away_team']} {game['away_score']}"),
html.Span(" @ "),
html.Span(f"{game['home_team']} {game['home_score']}")
], className='game-score'),
html.Div(f"Status: {game['status']}", className='game-period')
], className='score-box')
for game in games
], className='score-container')
except Exception as e:
print(f"Error updating MLB Scores: {e}")
return html.Div("Scores unavailable")

View file

@ -1,55 +0,0 @@
import os
import subprocess
import datetime
class Weather:
def __init__(self):
# Get the directory where this script (weather.py) is located
self.script_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the absolute path to the 'assets' directory in the same directory as the script
self.assets_dir = os.path.join(self.script_dir, 'assets')
# Ensure the assets directory exists
if not os.path.exists(self.assets_dir):
os.makedirs(self.assets_dir)
def delete_old_screenshots(self):
"""
Deletes all PNG files in the 'assets' directory that start with 'sacramento_weather_'.
"""
for filename in os.listdir(self.assets_dir):
if filename.startswith("sacramento_weather_"):
os.remove(os.path.join(self.assets_dir, filename))
def get_weather_screenshot(self):
"""
Fetches weather information for Sacramento from wttr.in using curl and saves it as a PNG.
Returns the path to the saved image.
"""
try:
# Create a timestamp for the filename
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
screenshot_filename = f"sacramento_weather_{timestamp}.png"
screenshot_path = os.path.join(self.assets_dir, screenshot_filename) # save to the proper location
# Use curl to get the weather data from wttr.in and save it as a PNG.
# add the scale #2 to make the png larger
curl_command = [
"curl",
"-s", # Silent mode
"v2.wttr.in/Sacramento.png?0pq&scale=.5", # Fetch weather data for Sacramento
"-o",
screenshot_path,
]
self.delete_old_screenshots()
subprocess.run(curl_command, check=True)
return screenshot_path
except subprocess.CalledProcessError as e:
print(f"Error fetching weather data: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None

View file

@ -1,36 +0,0 @@
import datetime
import os
from dash import html, Input, Output
from weather import Weather # Import Weather class
class WeatherModule:
def __init__(self, app):
self.app = app
self.weather_obj = self.get_weather_object()
self.setup_callbacks()
def get_weather_object(self):
return Weather()
def setup_callbacks(self):
@self.app.callback(
Output('weather-display', 'children'),
Input('weather-interval', 'n_intervals')
)
def update_weather(n):
try:
print("UPDATING WEATHER...")
screenshot_path = self.weather_obj.get_weather_screenshot()
image_name = os.path.basename(screenshot_path)
return html.Div(
[
html.H2("Sacramento Weather"),
html.Img(
src=self.app.get_asset_url(image_name + f"?v={datetime.datetime.now().timestamp()}"),
style={"width": "100%", "display": "block", "image-rendering": "crisp-edges"}
),
],
style={"width": "600px", "margin": "0 auto", "border": "1px solid black"}
)
except Exception as e:
return html.Div(f"Weather update error: {str(e)}")

2
feeds.txt Normal file → Executable file
View file

@ -1,4 +1,4 @@
http://feeds.bbci.co.uk/news/rss.xml
http://feeds.feedburner.com/hiphopdx/news
https://news.google.com/news?hl=en&amp;gl=us&amp;q=sacramento+bee&amp;um=1&amp;ie=UTF-8&amp;output=rss
https://www.kcra.com/topstories-rss
https://morss.it/https://feeds.npr.org/1001/rss.xml

170
main.qml
View file

@ -1,170 +0,0 @@
// main.qml
import QtQuick
import QtQuick.Controls
ApplicationWindow {
visible: true
width: 800
height: 480
title: "Clockz"
property string currTime: "00:00:00"
property string onTouchPressed: "Left area not pressed"
Rectangle {
anchors.fill: parent
Image {
anchors.fill: parent
source: "/home/death916/code/python/pyside/images/background.png"
fillMode: Image.PreserveAspectCrop
}
// Left touch area
MouseArea {
id : scoreArea
anchors.left: parent.left
width: 70
height: parent.height // Full height
// Header text inside the MouseArea
Text {
anchors.top: parent.top
width: parent.width
height: 30
text: "Scores"
font.pixelSize: 20
color: "white"
horizontalAlignment: Text.AlignHCenter
verticalAlignment: Text.AlignVCenter
}
onClicked: {
console.log("Left area pressed!")
onTouchPressed = "Left area pressed"
}
}
// Colored rectangle to indicate the active area
Rectangle {
anchors.left: parent.left
width: 70
height: parent.height - newsArea.height // Full height
color: Qt.rgba(0, 0, 1, 0.3) // Slightly opaque blue
}
// Display the message on the screen
Text {
anchors.centerIn: parent
text: onTouchPressed
font.pixelSize: 20
color: "white"
}
// Weather box
Rectangle {
width: parent.width * 1 / 3 // 1/3 of the parent width
height: parent.height * 1 / 3 // 1/3 of the parent height
color: Qt.rgba(0, 0, 1, 0.5) // Semi-transparent blue
anchors.centerIn: parent
// Text "Weather" at the top
Text {
anchors.top: parent.top
width: parent.width * 1 / 3
height: 30
text: "Weather"
font.pixelSize: 20
color: "white"
horizontalAlignment: Text.AlignHCenter
verticalAlignment: Text.AlignVCenter
}
Image {
anchors.fill: parent
source: weatherMapPath
//fillMode: Image.fill // Fill the entire area // Uncomment this line to fill the entire area
}
// Additional weather content can be added here
}
// Clock container
Rectangle {
x: 300
y: 1
width: time.implicitWidth + 20 // Adjusted width based on the text size
height: time.implicitHeight + 20 // Adjusted height based on the text size
border.color: "gray"
color: "transparent"
Text {
id: time
text: currTime
font.pixelSize: 20
color: "black"
horizontalAlignment: Text.AlignHCenter
}
}
//bottom news scroll area
Rectangle {
id: newsArea
width: parent.width
height: parent.height * 1 / 6
color: Qt.rgba(0, 0, 1, 0.5) // Semi-transparent blue
anchors.bottom: parent.bottom
anchors.left: parent.left
// Text "News" at the top
Text {
id: newsHeader
anchors.top: parent.top
width: parent.width
height: 30
text: "News"
font.pixelSize: 20
color: "white"
horizontalAlignment: Text.AlignHCenter
verticalAlignment: Text.AlignVCenter
}
Text {
id: newsText
text: news
font.pixelSize: 15
color: "white"
horizontalAlignment: Text.AlignHCenter
verticalAlignment: Text.AlignVCenter
x: parent.width
anchors.top: newsHeader.bottom
SequentialAnimation on x {
loops: Animation.Infinite
running: true
PropertyAnimation {
from: parent.width
to: -newsText.width
duration: 2000000 // 10 seconds (in milliseconds)
}
}
}
Timer {
id: newsTimer
interval: 300005// 10 minutes
running: true
repeat: true
onTriggered: {
newsText.text = news // Update the news text
}
}
}
// ... Additional UI elements as needed ...
}
}

9
pyproject.toml Normal file → Executable file
View file

@ -3,19 +3,20 @@ name = "deathclock"
version = "0.1.0"
description = ""
authors = [{ name = "Death916", email = "tavn1992@gmail.com" }]
requires-python = ">=3.9,<3.11.4"
requires-python = ">=3.12"
dependencies = [
"pyside6>=6.6.1,<7",
"requests>=2.31.0,<3",
"selenium>=4.16.0,<5",
"feedparser>=6.0.11,<7",
"rich>=13.7.1,<14",
"dash>=2.18.2,<3",
"nba-api>=1.6.1,<2",
"playwright>=1.49.1,<2",
"aiofiles>=24.1.0",
"aiohttp>=3.11.13",
"mlb-statsapi>=1.8.1",
"reflex>=0.6.8",
"aiohttp>=3.11.18",
"reflex-text-loop>=0.0.1",
"pigpio>=1.78",
]
[dependency-groups]

5
rxconfig.py Executable file
View file

@ -0,0 +1,5 @@
import reflex as rx
config = rx.Config(
app_name="deathclock",
)

21
shell.nix Normal file
View file

@ -0,0 +1,21 @@
let
nixconfig = builtins.getFlake "github:death916/nixconfig";
unstable = nixconfig.inputs.nixpkgs-unstable.legacyPackages.x86_64-linux;
pkgs = nixconfig.inputs.nixpkgs.legacyPackages.x86_64-linux;
in
pkgs.mkShell {
packages = with pkgs; [
python313Packages.uv
python313Packages.ninja
python313Packages.numpy
bun
];
shellHook = ''
source .venv/bin/activate
# export PATH="${pkgs.bun}/bin:$PATH"
# export BUN_INSTALL="${pkgs.bun}/bin/bun"
export REFLEX_USE_SYSTEM_BUN=True
echo venv activated and bun versions set
'';
}

View file

View file

@ -1,5 +0,0 @@
from deathclock import __version__
def test_version():
assert __version__ == '0.1.0'

1
deathclock/alarm.py → utils/alarm.py Normal file → Executable file
View file

@ -25,4 +25,3 @@ class Alarm:
print("Alarm triggered!")
return True
return False

82
deathclock/news.py → utils/news.py Normal file → Executable file
View file

@ -1,24 +1,25 @@
import feedparser
import asyncio
import aiofiles
import random
from time import localtime, strftime
import socket
from time import localtime, strftime
import aiofiles
import aiohttp
import feedparser
def print_time():
print(strftime("%B %d, %I:%M %p", localtime()))
class News:
def __init__(self):
self._news_dict = {}
self._news_dict_length = 0
socket.setdefaulttimeout(10) # Set default timeout for socket operations
socket.setdefaulttimeout(10)
async def _fetch_feed(self, session, feed):
"""Fetches and parses a single feed asynchronously."""
max_entries = 10 # Maximum number of entries to fetch from each feed
max_entries = 10
try:
# Add timeout to the request
timeout = aiohttp.ClientTimeout(total=5)
@ -26,30 +27,36 @@ class News:
if response.status != 200:
print(f"Skip feed {feed}: status {response.status}")
return []
text = await response.text()
d = feedparser.parse(text)
if hasattr(d, 'status') and d.status != 200:
if hasattr(d, "status") and d.status != 200:
print(f"Skip feed {feed}: status {d.status}")
return []
feed_entries = []
# Limit the number of entries parsed
for i, post in enumerate(d.entries):
if i >= max_entries:
break # Stop parsing if we've reached the limit
feed_entries.append({
'title': post.title,
'source': d.feed.title if hasattr(d.feed, 'title') else 'Unknown',
'publish_date': post.published if hasattr(post, 'published') else '',
'summary': post.summary if hasattr(post, 'summary') else ''
})
break
feed_entries.append(
{
"title": post.title,
"source": d.feed.title
if hasattr(d.feed, "title")
else "Unknown",
"publish_date": post.published
if hasattr(post, "published")
else "",
"summary": post.summary if hasattr(post, "summary") else "",
}
)
print(f"Added {len(feed_entries)} entries from {feed}")
return feed_entries
except aiohttp.ClientError as e:
print(f"Error processing feed {feed}: {e}")
return []
@ -62,7 +69,7 @@ class News:
feeds = []
self._news_dict = {}
self._news_dict_length = 0
try:
async with aiofiles.open("feeds.txt", "r") as f:
async for line in f:
@ -70,38 +77,37 @@ class News:
except Exception as e:
print(f"Error reading feeds.txt: {e}")
return {}
# Limit the number of feeds to process at once
if len(feeds) > 10:
feeds = random.sample(feeds, 10)
print("Getting news entries...")
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = [self._fetch_feed(session, feed) for feed in feeds]
all_feed_entries_list = await asyncio.gather(*tasks, return_exceptions=True)
all_entries = []
for result in all_feed_entries_list:
if isinstance(result, list) and result:
all_entries.extend(result)
if not all_entries:
print("No entries collected")
return {}
return []
if len(all_entries) > 30:
all_entries = random.sample(all_entries, 30)
for entry in all_entries:
self._news_dict[entry['title']] = entry
try:
async with aiofiles.open("news.txt", "w") as f:
print("Writing news to file...")
for entry in self._news_dict.values():
await f.write(f"[{entry['publish_date']}] {entry['source']}: {entry['title']}\n")
for entry in all_entries:
await f.write(
f"[{entry['publish_date']}] {entry['source']}: {entry['title']}\n"
)
except Exception as e:
print(f"Error writing to news.txt: {e}")
return self._news_dict
return all_entries

View file

@ -0,0 +1,144 @@
# -*- coding: utf8 -*-
import ast
import asyncio
import sys
import time
from utils.python_rd5807m.rda5807m import Rda5807m
class Radio:
def __init__(self):
self.device = Rda5807m(1)
self.commands = {
"on": {"call": self.device.on, "help": "power on device"},
"off": {"call": self.device.off, "help": "power off device"},
"^": {"call": self.device.set_bass, "value": True, "help": "bass boost"},
"v": {"call": self.device.set_bass, "value": False, "help": "normal bass"},
"1": {"call": self.device.set_mute, "value": True, "help": "mute"},
"0": {"call": self.device.set_mute, "value": False, "help": "unmute"},
"s": {"call": self.device.set_stereo, "value": True, "help": "stereo"},
"m": {"call": self.device.set_stereo, "value": False, "help": "mono"},
"v": {"call": self.set_volume, "type": int, "help": "set volume"},
"+": {"call": self.set_volume_plus, "help": "increase volume"},
"-": {"call": self.set_volume_moins, "help": "decrease volume"},
"f": {"call": self.set_frequency, "type": float, "help": "set frequency"},
">": {"call": self.device.set_seek, "value": True, "help": "seek up"},
"<": {"call": self.device.set_seek, "value": False, "help": "seek down"},
"d": {"call": self.set_deemphasis, "type": int, "help": "de-emphasize"},
"i": {"call": self.get_infos, "help": "get infos"},
"h": {"call": self.help, "help": "help"},
"q": {"call": self.quit, "help": "quit"},
}
self.volume = 7 # default volume set in rda5807m.py
def initialize(self):
try:
self.device.init_chip()
except Exception as e:
print(f"problem while initializing: {e}")
raise e
time.sleep(0.2)
def print_prompt(self):
print(">> ", end="", flush=True)
def got_stdin_data(self):
input_string = sys.stdin.readline().strip("\n")
if input_string != "":
self.parse_command(input_string)
self.print_prompt()
def quit(self):
loop.stop()
def help(self):
for k, v in self.commands.items():
print("%s : %s" % (k, v["help"]))
def parse_command(self, entry):
parts = entry.split("=")
command = parts[0]
if command in self.commands:
params = self.commands[command]
call = params["call"]
if len(parts) == 1:
if "value" in params:
value = params["value"]
call(value)
else:
call()
elif len(parts) == 2:
value = ast.literal_eval(parts[1])
if "type" in params:
type_ = params["type"]
if type(value) == type_:
call(value)
else:
print("bad value type")
else:
print("invalid syntax")
else:
print("invalid syntax")
else:
print("command not found")
def set_volume(self, volume):
if not 0 <= volume <= 15:
print("bad volume value (0-15)")
return
self.volume = volume
self.device.set_volume(volume)
def set_volume_moins(self):
if self.volume == 0:
return
self.volume -= 1
print("volume: %d" % (self.volume,))
self.device.set_volume(self.volume)
def set_volume_plus(self):
if self.volume == 15:
return
self.volume += 1
print("volume: %d" % (self.volume,))
self.device.set_volume(self.volume)
def set_frequency(self, frequency):
if not 76 <= frequency <= 107.5:
print("bad frequency value (76-107.5)")
frequency = int(frequency * 10)
self.device.set_frequency(frequency)
def set_deemphasis(self, deemphasis):
if deemphasis not in [50, 75]:
print("bad de-emphasis value (50, 75)")
self.device.set_deemphasis(deemphasis)
def get_infos(self):
infos = self.device.get_infos()
print(infos)
def poll_rds(self):
self.device.process_rds()
loop.call_later(0.1, self.poll_rds)
def close(self):
self.device.close()
if __name__ == "__main__":
radio = Radio()
radio.initialize()
radio.help()
radio.print_prompt()
loop = asyncio.get_event_loop()
loop.add_reader(sys.stdin, radio.got_stdin_data)
loop.call_later(1, radio.poll_rds)
loop.run_forever()
radio.close()

View file

@ -0,0 +1,443 @@
# -*- coding: utf8 -*-
# RDA5807M Radio App for Raspberry Pi
# C version - redhawk 04/04/2014
# Python version version - Franck Barbenoire 17/03/2016
#
# This code is provided to help with programming the RDA chip.
from functools import partial
import pigpio
from string import printable
RDA_I2C_WRITE_ADDRESS = 0x10
RDA_I2C_READ_ADDRESS = 0x11
# CHIP ID
RDA_CHIP_ID = 0x58
# Timing XTAL
RDA_32_768KHZ = 0b0000000000000000
RDA_12MHZ = 0b0000000000010000
RDA_24MHZ = 0b0000000001010000
RDA_13MHZ = 0b0000000000100000
RDA_26MHZ = 0b0000000001100000
RDA_19_2MHZ = 0b0000000000110000
RDA_38_4MHZ = 0b0000000001110000
# Tuning Band
RDA_87_108MHZ = 0b0000000000000000
RDA_76_91MHZ = 0b0000000000000100
RDA_76_108MHZ = 0b0000000000001000
RDA_65_76MHZ = 0b0000000000001100
# Tuning Steps
RDA_100KHZ = 0b0000000000000000
RDA_200KHZ = 0b0000000000000001 # not US band compatible
RDA_50KHZ = 0b0000000000000010
RDA_25KHZ = 0b0000000000000011
# De-emphasis
RDA_50US = 0b0000100000000000
RDA_75US = 0b0000000000000000
# REG 0x02
RDA_DHIZ = 0b1000000000000000
RDA_DMUTE = 0b0100000000000000
RDA_MONO = 0b0010000000000000
RDA_BASS = 0b0001000000000000
RDA_RCLK = 0b0000100000000000
RDA_RCKL_DIM = 0b0000010000000000
RDA_SEEKUP = 0b0000001000000000
RDA_SEEK = 0b0000000100000000
RDA_SKMODE = 0b0000000010000000
RDA_CLK_MODE = 0b0000000001110000
RDA_RDS_EN = 0b0000000000001000
RDA_NEW_METHOD = 0b0000000000000100
RDA_SOFT_RESET = 0b0000000000000010
RDA_ENABLE = 0b0000000000000001
# REG 0x03
RDA_CHAN = 0b1111111111000000
RDA_DIRECT_MODE = 0b0000000000100000
RDA_TUNE = 0b0000000000010000
RDA_BAND = 0b0000000000001100
RDA_SPACE = 0b0000000000000011
# REG 0x04
RDA_DE = 0b0000100000000000
RDA_SOFTMUTE_EN = 0b0000001000000000
RDA_AFCD = 0b0000000100000000
# REG 0x05
RDA_INT_MODE = 0b1000000000000000
RDA_SEEKTH = 0b0000111100000000
RDA_VOLUME = 0b0000000000001111
# REG 0x06
RDA_OPEN_MODE = 0b0110000000000000
# REG 0x07
RDA_BLEND_TH = 0b0111110000000000
RDA_65_50M_MODE = 0b0000001000000000
RDA_SEEK_TH_OLD = 0b0000000011111100
RDA_BLEND_EN = 0b0000000000000010
RDA_FREQ_MODE = 0b0000000000000001
# REG 0x0A
RDA_RDSR = 0b1000000000000000
RDA_STC = 0b0100000000000000
RDA_SF = 0b0010000000000000
RDA_RDSS = 0b0001000000000000
RDA_BLK_E = 0b0000100000000000
RDA_ST = 0b0000010000000000
RDA_READCHAN = 0b0000001111111111
# REG 0x0B
RDA_RSSI = 0b1111110000000000
RDA_FM_TRUE = 0b0000001000000000
RDA_FM_READY = 0b0000000100000000
RDA_ABCD_E = 0b0000000000010000
RDA_BLERA = 0b0000000000001100
RDA_BLERB = 0b0000000000000011
# ========
RDS_GROUP_TYPE_CODE = 0xf000
RDS_PTY = 0x03e0
RDS_B0 = 0x0800
class Rda5807m:
def __init__(self, bus):
# Create I2C device.
self.pi = pigpio.pi()
self.read_handle = self.pi.i2c_open(bus, RDA_I2C_READ_ADDRESS)
self.write_handle = self.pi.i2c_open(bus, RDA_I2C_WRITE_ADDRESS)
self.out_buffer = [0] * 12
self.read_bug = False
self.rds_init()
def read_chip(self, reg):
data = self.pi.i2c_read_word_data(self.read_handle, reg)
if self.read_bug:
return (data >> 8) + ((data & 0xff) << 8)
else:
return data
def write_chip(self, count):
self.pi.i2c_write_device(self.write_handle, bytes(self.out_buffer[:count]))
def init_chip(self):
data = self.read_chip(0)
found = False
if data >> 8 == RDA_CHIP_ID:
found = True
self.read_bug = False
elif data & 0xff == RDA_CHIP_ID:
found = True
self.read_bug = True
if not found:
raise Exception("i2c device not found")
if self.read_chip(13) == 0x5804 and self.read_chip(15) == 0x5804:
# device not already used, initialize it
self.on()
def write_setting(self):
# REG 02 - normal output, enable mute, stereo, no bass boost
# clock = 32.768KHZ, RDS enabled, new demod method, power on
data = RDA_DHIZ | RDA_32_768KHZ | RDA_RDS_EN | RDA_NEW_METHOD | RDA_ENABLE
self.out_buffer[0] = data >> 8
self.out_buffer[1] = data & 0xff
# REG 03 - no auto tune, 87-108 band, 0.1 spacing
data = (RDA_TUNE & 0) | RDA_87_108MHZ | RDA_100KHZ
self.out_buffer[2] = data >> 8
self.out_buffer[3] = data & 0xff
# REG 04 - audio 50US, no soft mute, disable AFC
data = RDA_50US | RDA_AFCD
self.out_buffer[4] = data >> 8
self.out_buffer[5] = data & 0xff
# REG 05 - mid volume
data = RDA_INT_MODE | 0x0880 | (RDA_VOLUME >> 1)
self.out_buffer[6] = data >> 8
self.out_buffer[7] = data & 0xff
# REG 06 - reserved
self.out_buffer[8] = 0
self.out_buffer[9] = 0
# REG 07
blend_threshold = 0b0011110000000000 # mix L+R with falling signal strength
data = blend_threshold | RDA_65_50M_MODE | 0x80 | 0x40 | RDA_BLEND_EN
self.out_buffer[10] = data >> 8
self.out_buffer[11] = data & 0xff
def write_tune(self, value):
data = ((self.out_buffer[2] << 8) | self.out_buffer[3]) | RDA_TUNE
if not value:
data = data ^ RDA_TUNE
self.out_buffer[2] = data >> 8
self.out_buffer[3] = data & 0xff
def write_from_chip(self):
for loop in range(2, 8):
data = self.read_chip(loop)
self.out_buffer[(loop * 2) - 4] = data >> 8
self.out_buffer[(loop * 2) - 3] = data & 0xff
self.write_tune(False) # disable tuning
WRITE_METHODS = {
"off": {"reg": 2, "mask": RDA_ENABLE, "how": "flag", "left-shift": 0},
"dmute": {"reg": 2, "mask": RDA_DMUTE, "how": "flag", "left-shift": 0},
"mono": {"reg": 2, "mask": RDA_MONO, "how": "flag", "left-shift": 0},
"bass": {"reg": 2, "mask": RDA_BASS, "how": "flag", "left-shift": 0},
"seekup": {"reg": 2, "mask": RDA_SEEKUP, "how": "flag", "left-shift": 0},
"seek": {"reg": 2, "mask": RDA_SEEK, "how": "flag", "left-shift": 0},
"skmode": {"reg": 2, "mask": RDA_SKMODE, "how": "flag", "left-shift": 0},
"de": {"reg": 4, "mask": RDA_DE, "how": "value", "left-shift": 0},
"volume": {"reg": 5, "mask": RDA_VOLUME, "how": "value", "left-shift": 0},
"chan": {"reg": 3, "mask": RDA_CHAN, "how": "value", "left-shift": 6},
}
READ_METHODS = {
"dmute": {"reg": 2, "mask": RDA_DMUTE, "right-shift": 0},
"bass": {"reg": 2, "mask": RDA_BASS, "right-shift": 0},
"mono": {"reg": 2, "mask": RDA_MONO, "right-shift": 0},
"band": {"reg": 3, "mask": RDA_BAND, "right-shift": 0},
"space": {"reg": 3, "mask": RDA_SPACE, "right-shift": 0},
"de": {"reg": 4, "mask": RDA_DE, "right-shift": 0},
"volume": {"reg": 5, "mask": RDA_VOLUME, "right-shift": 0},
"st": {"reg": 10, "mask": RDA_ST, "right-shift": 0},
"rssi": {"reg": 11, "mask": RDA_RSSI, "right-shift": 10},
}
def __getattr__(self, name):
parts = name.split('_')
if len(parts) != 2 or parts[0] not in ["read", "write"]:
raise AttributeError("attribute '%s' not found" % (name,))
name = parts[1]
if parts[0] == "read":
if name in self.READ_METHODS:
params = self.READ_METHODS[name]
return partial(
self.read_param,
params["reg"],
params["mask"],
params["right-shift"]
)
elif parts[0] == "write":
if name in self.WRITE_METHODS:
params = self.WRITE_METHODS[name]
return partial(
self.write_param,
params["reg"],
params["mask"],
params["how"],
params["left-shift"]
)
raise AttributeError("attribute '%s' not found" % (name,))
def read_param(self, reg, mask, right_shift):
return (self.read_chip(reg) & mask) >> right_shift
def write_param(self, reg, mask, how, left_shift, value):
data = (self.read_chip(reg) | mask) ^ mask
if how == "flag":
if value:
data = data | mask
elif how == "value":
value <<= left_shift
data = data | value
out_reg = (reg - 2) * 2
self.out_buffer[out_reg] = data >> 8
self.out_buffer[out_reg + 1] = data & 0xff
def set_frequency(self, freq_request):
data = self.read_band()
if data == RDA_87_108MHZ:
start_freq = 870
elif data == RDA_76_108MHZ:
start_freq = 760
elif data == RDA_76_91MHZ:
start_freq = 760
elif data == RDA_65_76MHZ:
start_freq = 650
data = self.read_space()
if data == RDA_200KHZ:
spacing = 0
elif data == RDA_100KHZ:
spacing = 1
elif data == RDA_50KHZ:
spacing = 2
elif data == RDA_25KHZ:
spacing = 4
if spacing > 0:
new_freq = (freq_request - start_freq) * spacing
else:
new_freq = int((freq_request - start_freq) / 2)
self.write_dmute(True)
self.write_chan(new_freq)
self.write_tune(True)
self.write_chip(4)
def on(self):
self.write_setting()
self.write_chip(12)
def off(self):
self.write_off(False)
self.write_chip(2)
def set_mute(self, mute):
self.write_dmute(not mute)
self.write_chip(2)
def set_volume(self, volume):
self.write_from_chip()
self.write_volume(volume)
self.write_chip(8)
def set_bass(self, bass):
self.write_bass(bass)
self.write_chip(2)
def set_stereo(self, stereo):
self.write_mono(not stereo)
self.write_chip(2)
def set_deemphasis(self, deemphasis):
self.write_from_chip()
if deemphasis == 50:
self.write_de(RDA_50US)
else:
self.write_de(RDA_75US)
self.write_chip(6)
def set_seek(self, seek_up):
self.write_seekup(seek_up)
self.write_chip(2)
self.write_seek(True)
self.write_chip(2)
def get_infos(self):
data3 = self.read_chip(3)
data10 = self.read_chip(10)
data11 = self.read_chip(11)
infos = {}
infos["tune-ok"] = (data10 & RDA_STC) != 0
infos["seek-fail"] = (data10 & RDA_SF) != 0
infos["stereo"] = (data10 & RDA_ST) != 0
chan = data10 & RDA_READCHAN
space = data3 & RDA_SPACE
if space == RDA_200KHZ:
space0 = 0.2
elif space == RDA_100KHZ:
space0 = 0.1
elif space == RDA_50KHZ:
space0 = 0.05
elif space == RDA_25KHZ:
space0 = 0.025
band = data3 & RDA_BAND
if band == RDA_87_108MHZ:
band0 = 87.0
elif RDA_76_91MHZ:
band0 = 76.0
elif RDA_76_108MHZ:
band0 = 76.0
elif RDA_65_76MHZ:
band0 = 65.0
infos["freq"] = band0 + chan * space0
signal = (data11 & RDA_RSSI) >> 10
infos["signal"] = "%.1f" % ((signal * 100) / 64,)
infos["fm-station"] = (data11 & RDA_FM_READY) != 0
infos["fm-true"] = (data11 & RDA_FM_TRUE) != 0
infos["PS"] = self.station_name
infos["Text"] = self.text
infos["CTime"] = self.ctime
return infos
def rds_init(self):
self.station_name = "--------"
self.station_name_tmp_1 = ['-'] * 8
self.station_name_tmp_2 = ['-'] * 8
self.text = '-' * 64
self.text_tmp = ['-'] * 64
self.ab = False
self.idx = 0
self.ctime = ""
def process_rds(self):
reg_a_f = self.pi.i2c_read_i2c_block_data(self.read_handle, 10, 12)[1]
reg_a = (reg_a_f[0] << 8) | reg_a_f[1]
reg_b = (reg_a_f[2] << 8) | reg_a_f[3]
# block_a = (reg_a_f[4] << 8) | reg_a_f[5]
block_b = (reg_a_f[6] << 8) | reg_a_f[7]
block_c = (reg_a_f[8] << 8) | reg_a_f[9]
block_d = (reg_a_f[10] << 8) | reg_a_f[11]
if reg_a & RDA_RDSS == 0:
self.rds_init()
if reg_a & RDA_RDSR == 0 or reg_b & RDA_BLERB != 0:
# no new rds group ready
return
group_type = 0x0a + ((block_b & RDS_GROUP_TYPE_CODE) >> 8) | ((block_b & RDS_B0) >> 11)
if group_type in [0x0a, 0x0b]:
# PS name
idx = (block_b & 3) * 2
c1 = chr(block_d >> 8)
c2 = chr(block_d & 0xff)
if c1 in printable and c2 in printable:
if self.station_name_tmp_1[idx:idx + 2] == [c1, c2]:
self.station_name_tmp_2[idx:idx + 2] = [c1, c2]
if self.station_name_tmp_1 == self.station_name_tmp_2:
self.station_name = ''.join(self.station_name_tmp_1)
if self.station_name_tmp_1[idx:idx + 2] != [c1, c2]:
self.station_name_tmp_1[idx:idx + 2] = [c1, c2]
elif group_type == 0x2a:
# Text
idx = (block_b & 0x0f) * 4
if idx < self.idx:
self.text = ''.join(self.text_tmp)
self.idx = idx
ab = (block_b & 0x10) != 0
if ab != self.ab:
self.text = '-' * 64
self.text_tmp = ['-'] * 64
self.ab = ab
c1 = chr(block_c >> 8)
c2 = chr(block_c & 0xff)
c3 = chr(block_d >> 8)
c4 = chr(block_d & 0xff)
if c1 in printable and c2 in printable and c3 in printable and c4 in printable:
self.text_tmp[idx:idx + 4] = [c1, c2, c3, c4]
elif group_type == 0x4a:
offset = block_d & 0x1f
mins = (block_d & 0x0fc0) >> 6
hour = ((block_c & 1) << 4) | (block_d >> 12)
mins += 60 * hour
if block_d & 0x20:
mins -= 30 * offset
else:
mins += 30 * offset
if 0 < mins < 1500:
self.ctime = "CT %02d:%02d" % (int(mins / 60), mins % 60)
def close(self):
self.pi.i2c_close(self.read_handle)
self.pi.i2c_close(self.write_handle)
self.pi.stop()

101
utils/radio.py Normal file
View file

@ -0,0 +1,101 @@
import logging
import time
# Try to import the hardware library, fall back to dummy if fails
try:
from utils.python_rd5807m.radio import Radio as RadioLib
HARDWARE_AVAILABLE = True
except ImportError:
HARDWARE_AVAILABLE = False
class DummyDevice:
def __init__(self):
self.volume = 5
self.freq = 90.9
self.muted = False
self.stereo = True
self.bass = False
def on(self):
logging.info("DummyRadio: Powered ON")
def off(self):
logging.info("DummyRadio: Powered OFF")
def set_volume(self, value):
self.volume = value
logging.info(f"DummyRadio: Set volume to {value}")
def set_frequency(self, frequency):
self.freq = frequency
logging.info(f"DummyRadio: Set frequency to {frequency}")
def set_mute(self, value):
self.muted = value
logging.info(f"DummyRadio: Mute {value}")
def set_stereo(self, value):
self.stereo = value
logging.info(f"DummyRadio: Stereo {value}")
def set_bass(self, value):
self.bass = value
logging.info(f"DummyRadio: Bass {value}")
def close(self):
logging.info("DummyRadio: Closed")
class Radio:
def __init__(self):
self.device = None
self.is_on = False
self.volume = 5
self.station = 90.9
self._initialize_device()
def _initialize_device(self):
if HARDWARE_AVAILABLE:
try:
self.radio_lib = RadioLib()
self.radio_lib.initialize()
self.device = self.radio_lib.device
logging.info("Radio: Hardware device initialized successfully.")
except Exception as e:
logging.error(f"Radio: Hardware init failed ({e}). Using Dummy.")
self.device = DummyDevice()
else:
logging.info("Radio: Hardware lib not found. Using Dummy.")
self.device = DummyDevice()
def on(self):
if self.device:
self.device.on()
self.is_on = True
def off(self):
if self.device:
self.device.off()
self.is_on = False
def set_volume(self, volume: int):
"""Set volume 0-15"""
if self.device:
# Ensure volume is within bounds if needed, though UI likely handles it
vol = max(0, min(15, int(volume)))
self.device.set_volume(vol)
self.volume = vol
def set_station(self, station: float):
if self.device:
self.device.set_frequency(float(station))
self.station = station
def get_info(self):
"""Return current state as dict"""
return {
"is_on": self.is_on,
"volume": self.volume,
"station": self.station,
}

229
utils/scores.py Executable file
View file

@ -0,0 +1,229 @@
# /home/death916/code/python/deathclock/utils/scores.py
import logging
from datetime import datetime
import reflex as rx
import statsapi
from nba_api.live.nba.endpoints import scoreboard
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class NBAScores(rx.Base):
async def get_scores(self): # Make async to match usage in State
"""Fetches NBA scores and returns them as a list of dicts."""
try:
# Get scoreboard data
board = scoreboard.ScoreBoard()
data = board.get_dict()
if "scoreboard" not in data:
logging.warning("No NBA scoreboard data found in response")
return []
games = data["scoreboard"].get("games", [])
if not games:
logging.info("No active NBA games found in scoreboard")
return []
scores_list = []
for game in games:
try:
game_data = {
"home_team": game["homeTeam"]["teamTricode"],
"home_score": game["homeTeam"]["score"],
"away_team": game["awayTeam"]["teamTricode"],
"away_score": game["awayTeam"]["score"],
"period": game["period"],
"status": game["gameStatusText"],
}
scores_list.append(game_data)
except KeyError as e:
logging.error(
f"Error processing NBA game data: {e} for game: {game.get('gameId', 'N/A')}"
)
continue # Skip this game
return scores_list
except Exception as e:
logging.error(f"Error fetching NBA scores: {e}", exc_info=True)
return [] # Return empty list on error
class mlbScores(rx.Base):
async def get_games(self): # Make async
"""Fetches MLB games data."""
try:
games = statsapi.schedule() # Assuming sync for now
return games
except Exception as e:
logging.error(f"Error fetching MLB games: {e}", exc_info=True)
return []
async def get_scores(self): # Make async to match usage in State
"""Fetches and formats MLB scores."""
games = await self.get_games() # Await the async get_games
scores_list = []
if not games:
logging.info("No MLB games found today.")
return []
for game in games:
try:
game_data = {
"home_team": game.get("home_name", "N/A"),
"home_score": game.get("home_score", "-"),
"away_team": game.get("away_name", "N/A"),
"away_score": game.get("away_score", "-"),
"status": game.get("status", "Scheduled"), # Provide default status
}
scores_list.append(game_data)
except KeyError as e:
logging.error(
f"Error processing MLB game data: {e} for game: {game.get('game_id', 'N/A')}"
)
continue
return scores_list
class nflScores:
async def get_scores(self):
# get nfl scores from espn
nfl_scores = []
try:
import aiohttp
# use current local date in YYYYMMDD format
date_str = datetime.now().strftime("%Y%m%d")
url = f"https://site.api.espn.com/apis/site/v2/sports/football/nfl/scoreboard?dates={date_str}"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=15) as resp:
if resp.status != 200:
logging.error(
f"ESPN NFL scoreboard returned status {resp.status}"
)
return []
data = await resp.json()
events = data.get("events", [])
if not events:
logging.info("No NFL games found for date %s", date_str)
return []
nfl_scores = []
for ev in events:
try:
competitions = ev.get("competitions", [])
if not competitions:
logging.warning(
"No competitions found for event: %s", ev.get("id")
)
continue
comp = competitions[0]
competitors = comp.get("competitors", [])
home_team = away_team = "N/A"
home_score = away_score = "-"
for c in competitors:
team = c.get("team", {}) or {}
abbr = (
team.get("abbreviation")
or team.get("displayName")
or team.get("shortDisplayName")
or "N/A"
)
score = (
c.get("score", "-") if c.get("score") is not None else "-"
)
homeAway = c.get("homeAway", "").lower()
if homeAway == "home":
home_team = abbr
home_score = score
else:
away_team = abbr
away_score = score
status = comp.get("status", {}) or {}
status_type = status.get("type", {}) or {}
status_text = (
status_type.get("shortDetail")
or status_type.get("description")
or status_type.get("state")
or status.get("type")
or status.get("detail")
or "Unknown"
)
game_data = {
"home_team": home_team,
"home_score": home_score,
"away_team": away_team,
"away_score": away_score,
"status": status_text,
}
nfl_scores.append(game_data)
except Exception as e:
logging.error(
f"Error processing NFL event: {e} for event {ev.get('id')}",
exc_info=True,
)
continue
print(nfl_scores)
return nfl_scores
except Exception as e:
logging.error(f"Error fetching NFL scores: {e}", exc_info=True)
return []
if __name__ == "__main__":
import asyncio
async def test_scores():
nba_client = NBAScores()
nba_results = await nba_client.get_scores() # await async method
print("\nNBA Scores:")
if nba_results:
for game in nba_results:
print(
f"{game['away_team']} {game['away_score']} @ "
f"{game['home_team']} {game['home_score']} "
f"(Status: {game['status']})"
)
else:
print("No NBA games/scores available")
mlb_client = mlbScores()
mlb_results = await mlb_client.get_scores() # await async method
print("\nMLB Scores:")
if mlb_results:
for game in mlb_results:
print(
f"{game['away_team']} {game['away_score']} @ "
f"{game['home_team']} {game['home_score']} "
f"(Status: {game['status']})"
)
else:
print("No MLB games/scores available")
nfl_client = nflScores()
nfl_results = await nfl_client.get_scores() # await async method
print("\nNFL Scores:")
if nfl_results:
for game in nfl_results:
print(
f"{game['away_team']} {game['away_score']} @ "
f"{game['home_team']} {game['home_score']} "
f"(Status: {game['status']})"
)
else:
print("No NFL games/scores available")
asyncio.run(test_scores())

98
utils/weather.py Executable file
View file

@ -0,0 +1,98 @@
# /home/death916/code/python/deathclock/utils/weather.py
import logging # Optional: Use logging for better error messages
import os
import subprocess
import time
import reflex as rx
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# Define the target filename consistently
WEATHER_FILENAME = ""
# Define the web path expected by the frontend
WEATHER_WEB_PATH = f"/{WEATHER_FILENAME}" # This should be relative to the assets dir
LAST_FILENAME = WEATHER_FILENAME
class Weather(rx.Base):
def _get_assets_dir(self) -> str:
"""Calculates and ensures the assets directory exists within the project."""
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
assets_dir = os.path.join(project_root, "assets")
# Ensure the assets directory exists
if not os.path.exists(assets_dir):
try:
os.makedirs(assets_dir)
logging.info(f"Created assets directory: {assets_dir}")
except OSError as e:
logging.error(f"Failed to create assets directory {assets_dir}: {e}")
return assets_dir
def delete_old_screenshots(self):
"""Deletes the specific weather file in the given 'assets' directory."""
assets_dir = self._get_assets_dir()
global LAST_FILENAME
try:
for fn in os.listdir(assets_dir):
if not fn.endswith("weather.png"):
continue
if LAST_FILENAME and fn == LAST_FILENAME:
logging.info(f"Skipping deletion of current weather file: {fn}")
continue
path = os.path.join(assets_dir, fn)
if os.path.isfile(path):
try:
os.remove(path)
logging.info(f"Deleted old weather file: {path}")
except OSError as e:
logging.error(f"Failed to delete old weather file {path}: {e}")
except Exception as e:
logging.error(f"Error cleaning old weather files in {assets_dir}: {e}")
def get_weather_screenshot(self) -> str | None:
"""
Fetches weather info using curl, saves it to the project's assets dir.
Returns the web path (e.g., '/weather.jpg') or None on failure.
"""
assets_dir = self._get_assets_dir()
global WEATHER_FILENAME
global LAST_FILENAME
WEATHER_FILENAME = str(time.time()) + "weather.png"
screenshot_path = os.path.join(assets_dir, WEATHER_FILENAME)
LAST_FILENAME = screenshot_path
curl_command = [
"curl",
"-s", # Silent mode
"v2.wttr.in/Sacramento.png?u0", # Fetch PNG, no border, no terminal escapes
"-o",
screenshot_path, # Save to the correct assets path
]
logging.info(f"Running curl command to fetch weather: {' '.join(curl_command)}")
try:
subprocess.run(curl_command)
logging.info(
f"Curl command successful. Weather image saved to: {screenshot_path}"
)
global WEATHER_WEB_PATH
WEATHER_WEB_PATH = f"/{WEATHER_FILENAME}"
return WEATHER_WEB_PATH
except subprocess.CalledProcessError as e:
logging.error(f"Curl command failed for path {screenshot_path}: {e}")
logging.error(f"Curl stderr: {e.stderr}")
return None
except Exception as e:
logging.error(
f"An unexpected error occurred saving to {screenshot_path}: {e}"
)
return None

2093
uv.lock generated

File diff suppressed because it is too large Load diff