import configparser import datetime import json import os import smtplib import sys from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import pyotp import requests from fastapi import FastAPI, status from fastapi.responses import JSONResponse # # CONF PART # SCRIPT_DIR = os.path.dirname(__file__) CONF_FILE = os.path.join(SCRIPT_DIR, "conf.ini") TEMPLATE_FILE = os.path.join(SCRIPT_DIR, "template.html") class EnvInjection(configparser.Interpolation): """ Derived interpolation to take env variable before file variable. Permit to keep the ini file for local / traditionnal use And use env variable to overload configuration in a Docker usage. """ def before_get(self, parser, section, option, value, defaults): file_value = super().before_get(parser, section, option, value, defaults) if section != parser.default_section: return file_value env_value = os.getenv(option.upper()) return env_value if env_value else file_value def load_conf(conf_file=CONF_FILE) -> configparser.ConfigParser: parser = configparser.ConfigParser( default_section="config", interpolation=EnvInjection() ) parser.read(conf_file) return parser def load_totp(): parser = load_conf() if parser.getboolean("api.totp", "enabled", fallback=True): totp = pyotp.TOTP(parser.get("api.totp", "key", fallback=pyotp.random_base32())) # TODO Catch totp.now() error print( f"""TOTP enabled. Information: {totp.provisioning_uri()} TOTP check: {totp.now()}""" ) return totp else: print("WARNING: Api is open without security") return type("totp", (object,), {"verify": lambda str: True}) # # API PARTS # app = FastAPI() TOTP = load_totp() @app.get("/subscriptions") def get_projects(): parser = load_conf() projects = json.loads(parser["projects"].get("projects")) return projects @app.put("/subscriptions") def put_projects( project: str, author: str | None = None, credentials: str | None = None ): if not TOTP.verify(credentials): return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content="Unauthorized" ) # TODO Check if project really exist? parser = load_conf() projects = json.loads(parser.get("projects", "projects")) if author: project = f"{author}/{project}" if project in projects: return project projects.append(project) # TODO Watch a converter for list: https://stackoverflow.com/a/53274707/1346391 parser.set("projects", "projects", json.dumps(projects, indent=0)) with open("conf.ini", "w", encoding="utf-8") as configfile: parser.write(configfile) return JSONResponse(status_code=status.HTTP_201_CREATED, content=project) # # SCRIPT PART # def main(): parser = load_conf() default_config = parser["config"] try: projects = json.loads(parser.get("projects", "projects")) except json.decoder.JSONDecodeError as jse: print("ERROR: config file is not correctly JSON formatted!", end="\n\t") print(jse) sys.exit(1) new_releases = [] new_projects = [] if not parser.has_section("release"): parser.add_section("release") for project in projects: last_release = get_last_release(project) if not parser.has_option("release", project): new_projects.append(last_release) else: last_config_tag = parser.get("release", project) if last_config_tag != last_release["release_tag"]: last_release["previous_tag"] = last_config_tag new_releases.append(last_release) parser.set("release", project, last_release["release_tag"]) if not new_releases and not new_projects: print("No new projets or new release detected. Bye!") return content = "" news = [] for new_r in new_releases: news.append(new_r["project_name"]) content += f"""
  • {get_html_project_link(new_r)} : new release {get_html_release_link(new_r)} available (old: {new_r["previous_tag"]}). (published {convert_date(new_r["published_date"])})
  • """ for new_p in new_projects: news.append(new_p["project_name"]) content += f"""
  • {get_html_project_link(new_p)} was added to your configuration. Last release: {get_html_release_link(new_p)} (published {convert_date(new_p["published_date"])})
  • """ with open(TEMPLATE_FILE, "r", encoding="utf-8") as f_template: template = f_template.read() send_mail(template.replace("{{content}}", content), default_config) print(f"Send a mail for new releases and projects on: {', '.join(news)}") with open("conf.ini", "w", encoding="utf-8") as configfile: parser.write(configfile) def get_last_release(project): url = f"https://api.github.com/repos/{project}/releases/latest" result = requests.get(url, timeout=10) print(f"Check {project} - {url}") release = result.json() release_tag = release["tag_name"] published_date = release["published_at"] # body = release['body'] release_url = release["html_url"] return { "release_tag": release_tag, "published_date": published_date, # 'body': body, "project_name": project, "release_url": release_url, } def get_html_project_link(el): project_url = f'https://github.com/{el["project_name"]}' return f'{el["project_name"]}' def get_html_release_link(el): return f'{el["release_tag"]}' def send_mail(content, config): smtp_port = config.get("smtp_port") smtp_server = config.get("smtp_server") sender_email = config.get("sender_email") receiver_email = config.get("receiver_email") message = MIMEMultipart("alternative") message["Subject"] = "New Github releases" message["From"] = sender_email message["To"] = receiver_email # part1 = MIMEText(text, "plain") part2 = MIMEText(content, "html") # message.attach(part1) message.attach(part2) with smtplib.SMTP(smtp_server, smtp_port) as server: server.sendmail(sender_email, receiver_email, message.as_string()) def convert_date(date: str, dest_format="%d %b %Y at %H:%M") -> str: return datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ").strftime(dest_format) if __name__ == "__main__": main()