# import sqlite3 import getpass import datetime import logging import sys import sqlite3 import task from action import record_action from action import set_active from action import TypeAction from args import get_string_arg from args import arg_number from utils import get_pits_path class Project: def __init__(self, status=None): self.name = None self.status = status def __str__(self): return str(self.__dict__) def handle_project(args, last_action, conn): logging.info("> handle project") logging.debug("args: %s", args) if not args: list_project(last_action.project_id, conn) sys.exit(0) if args[0].isdigit(): view_project_set_active(int(args[0]), last_action, conn) elif args[0] == "-c": if len(args) == 1: print("missing project name") sys.exit(1) project = Project("active") project.name = get_string_arg(args[1:], "project name") parse_project_args(project, args[2:]) create_project(project, conn) elif args[0] == "-e": if len(args) == 1: print("nothing to update") sys.exit(1) project = Project() if args[1].isdigit(): project_id = int(args[1]) parse_project_args(project, args[2:]) else: project_id = last_action.project_id parse_project_args(project, args[1:]) logging.debug("Project: (%s) %s", project_id, project) edit_project(project, project_id, conn) elif args[0] == "-d": if len(args) > 1: delete_project(arg_number(args[1]), conn) else: delete_project(last_action.project_id, conn) else: print(f"Invalid project option: {args[0]}") def parse_project_args(project, args): if not args: return i = 0 while i < len(args): logging.debug(args[i]) if args[i] == "-s": i += 1 project.status = get_string_arg(args[i : i + 1], "project status") elif args[i] == "-n": i += 1 project.name = get_string_arg(args[i : i + 1], "project name") else: print(f"Invalid project option: {args[i]}") sys.exit(1) i += 1 def create_project(project, conn): logging.info(">> Create project") query = """ INSERT INTO project (username, name, status, created_at) VALUES (?, ?, ?, ?); """ cursor = conn.cursor() try: cursor.execute( query, (getpass.getuser(), project.name, project.status, datetime.datetime.now()), ) project_id = cursor.lastrowid action_message = "{} (status: {})".format(project.name, project.status) record_action(cursor, TypeAction.CREATE, action_message, project_id=project_id) logging.debug(action_message) print( "created project {}: {} (status: {})".format( project_id, project.name, project.status ) ) except sqlite3.IntegrityError as error: print("project with the same name already exists") logging.info(error) def edit_project(project, project_id, conn): # FIXME Duplicate code with edit_task logging.info(">> Edit project") update_args = [item for item in project.__dict__.items() if item[1] is not None] if not update_args: print("nothing to update") print( "Tips: if you want to active a project, just do '{} project '".format( get_pits_path() ) ) sys.exit(1) logging.debug("Project update args: %s", update_args) # Retrieve name if not updated project_name = None if "name" not in update_args[0]: project_name = get_project_name(project_id, conn) query = "UPDATE project SET {} WHERE id = ?" query = query.format(", ".join("%s = '%s'" % (k, v) for k, v in update_args)) logging.debug("update project query: %s", query) cursor = conn.cursor() logging.debug("Do a project update") cursor.execute(query, (project_id,)) log_args = ", ".join("%s: %s" % (k, v) for k, v in update_args) if project_name: log_message = "{} ({})".format(project_name, log_args) else: log_message = "({})".format(log_args) print("updated project {}: {}".format(project_id, log_message)) record_action(cursor, TypeAction.UPDATE, log_message, project_id) def delete_project(project_id, conn): logging.info(">> Remove project") project_name = get_project_name(project_id, conn) cursor = conn.cursor() # Cascade deleting: task query = "SELECT id FROM task WHERE project_id = ?;" cursor.execute(query, (project_id,)) deleted_task = 0 for row in cursor.fetchall(): task.delete_task(row[0], conn) deleted_task += 1 query = "DELETE FROM project WHERE id = ?;" cursor.execute(query, (project_id,)) if cursor.rowcount != 1: logging.error("DELETE FAILED") print("could not find project {}".format(project_id)) if deleted_task: log_message = "{} with {} task{}".format( project_name, deleted_task, "s" if deleted_task > 1 else "" ) else: log_message = project_name print("deleted project {}: {}".format(project_id, log_message)) record_action(cursor, TypeAction.DELETE, log_message, project_id) def list_project(active_project_id, conn): query = """ SELECT id, username, name, status, (SELECT count(*) FROM task WHERE task.project_id = project.id) FROM project; """ cursor = conn.cursor() cursor.execute(query) for row in cursor.fetchall(): logging.debug("Project row: %s", row) project_id, username, name, status, nb_task = row # TODO Formatting track: https://stackoverflow.com/questions/9989334/create-nice-column-output-in-python print( "{:1} {:2d}: ({:8}) | {} | {} ({} tasks)".format( "*" if active_project_id == project_id else "", project_id, username, status, name, nb_task, ) ) def view_project_set_active(project_id, last_action, conn): # FIXME duplicate with list_project query = """ SELECT id, username, name, status, (SELECT count(*) FROM task WHERE task.project_id = project.id) FROM project WHERE id = ?; """ cursor = conn.cursor() cursor.execute(query, (project_id,)) row = cursor.fetchone() if not row: print("Could not find project {}".format(project_id)) sys.exit(1) print("* {:d}: ({}) {} (status: {}, {} tasks)".format(*row)) # FIXME duplicate with list_task query = """ SELECT id, username, name, status, priority, date, time, (SELECT count(*) FROM note WHERE note.task_id = task.id) FROM task WHERE project_id = ? """ cursor.execute(query, (project_id,)) for row in cursor.fetchall(): logging.debug("Task row: %s", row) task_id, username, name, status, priority, date, time, nb_note = row message = date + time + name print( " {} {:d}: ({}) [{}] [{}] {} ({} notes)".format( "*" if last_action.task_id == row[0] else " ", task_id, username, status, priority, message, nb_note, ) ) set_active(cursor, project_id=project_id) # TODO Add a -v option to see notes? def get_project_name(project_id, conn): query = "SELECT name FROM project WHERE id = ?" cursor = conn.cursor() cursor.execute(query, (project_id,)) return cursor.fetchone()[0]