pivotalcli/commands/stories.py

385 lines
11 KiB
Python

import math
import inquirer
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, DefaultDict, Dict, List, Sequence, Tuple, Optional
from tqdm import tqdm
import base32_crockford as base32
import click
import tabulate
import api.projects
import api.stories
from config import Config
from util import require_login
from color import Color
from . import (
COLOR_HEADER,
COLOR_PLANNED,
COLOR_STARTED,
COLOR_FINISHED,
COLOR_DELIVERED,
COLOR_ACCEPTED,
COLOR_REJECTED,
_format_state,
_get_persons,
)
from ._stories_info import stories_info
from .cli import cli
STATES = (
"unstarted",
"planned",
"started",
"finished",
"delivered",
"accepted",
"rejected",
)
Persons = Dict[int, Dict[str, Any]]
Totals = DefaultDict[int, Dict[str, int]]
def __burndown(color: Color, letter: str, points: float) -> str:
return color.format(letter * int(math.ceil(points)))
def __get_row(
item: Tuple[int, Dict[str, int]], persons: Persons, show_accepted: bool
) -> Sequence:
owner_id, points = item
name = persons[owner_id]["name"]
estimates = [round(points[state], 1) for state in STATES]
progress = "["
if show_accepted:
progress += __burndown(COLOR_ACCEPTED, "A", points["accepted"])
progress += (
__burndown(COLOR_DELIVERED, "D", points["delivered"])
+ __burndown(COLOR_FINISHED, "F", points["finished"])
+ __burndown(COLOR_STARTED, "S", points["started"])
+ __burndown(COLOR_REJECTED, "R", points["rejected"])
+ __burndown(COLOR_PLANNED, "P", points["planned"])
+ __burndown(COLOR_PLANNED, "U", points["unstarted"])
+ "]"
)
return name, (*estimates), sum(estimates), progress
def __format_story(
story: Dict[str, Any], persons: Persons, totals: Totals
) -> Tuple[str, str, str, str, str, int]:
code = base32.encode(story["id"])
is_owner = False
initials = []
owner_ids = story.get("owner_ids", [])
for owner_id in owner_ids:
email = persons[owner_id]["email"]
value = persons[owner_id]["initials"]
if email == Config["user"]["email"]:
is_owner = True
value = Color(Color.CYAN).format(value)
initials.append(value)
type_ = story["story_type"]
if type_ == "release":
code = f"\033[44m{code}"
estimate = story.get("estimate")
if estimate:
for owner_id in owner_ids:
state = story["current_state"]
totals[owner_id][state] += estimate / len(owner_ids)
estimate = f"{estimate}\033[0m"
if is_owner:
code = f"\033[1;97m{code}"
estimate = f"\033[97m{estimate}\033[0m" if estimate else None
owners = ", ".join(initials)
state = _format_state(story["current_state"])
return code, type_, story["name"], owners, state, estimate
def __print_stories(
stories_: List[Dict[str, Any]],
persons: Persons,
totals: Totals,
show_accepted: bool,
) -> None:
table = (
__format_story(story, persons, totals)
for story in stories_
if show_accepted or story["current_state"] != "accepted"
)
headers = "Code", "Type", "Story name", "Owners", "State", "Pts"
print(tabulate.tabulate(table, headers=headers), end="\n\n")
def __print_totals(totals: Totals, persons: Persons, show_accepted: bool) -> None:
COLOR_HEADER.print("Point totals:", end="\n\n")
state_headers = [_format_state(state) for state in STATES]
headers = ("Owner", *state_headers, "Total", "Progress")
data = sorted(
(__get_row(item, persons, show_accepted) for item in totals.items()),
key=lambda row: row[-2],
reverse=True,
)
print(tabulate.tabulate(data, headers), end="\n\n")
def __print_burndown(iteration: Dict[str, Any], show_accepted: bool) -> None:
COLOR_HEADER.print("Burndown:", end="\n\n")
start = datetime.strptime(iteration["start"], "%Y-%m-%dT%H:%M:%SZ")
history = api.projects.get_history_days(
iteration["project_id"], start - timedelta(days=1)
)
accepted_points = history["data"][0][1]
last_counts: List[int] = []
for date, *counts in history["data"][1:]:
if len(counts) > 0:
# Update the last_counts variable if an update is available.
last_counts = counts
elif len(last_counts) > 0:
# If no counts are available, use those of the previous day.
counts = last_counts
else:
# If there are no last_counts either, just skip the day
continue
progress = ""
if show_accepted:
progress += __burndown(COLOR_ACCEPTED, "A", counts[0] - accepted_points)
progress += (
__burndown(COLOR_DELIVERED, "D", counts[1])
+ __burndown(COLOR_FINISHED, "F", counts[2])
+ __burndown(COLOR_STARTED, "S", counts[3])
+ __burndown(COLOR_PLANNED, "P", counts[5])
+ __burndown(COLOR_PLANNED, "U", counts[6])
)
print(f"{date}: {progress}")
print()
def _stories_current(project: str, scope: str, show_accepted: bool) -> None:
try:
project_id = int(Config["project_aliases"][project])
except KeyError:
project_id = base32.decode(project)
iterations = api.projects.get_iterations(project_id, scope=scope)
if not iterations:
print(f"No stories in {scope}")
return
iteration = iterations[0]
persons = _get_persons(project_id=project_id)
totals: DefaultDict[int, Dict[str, int]] = defaultdict(
lambda: dict((state, 0) for state in STATES)
)
__print_stories(iteration["stories"], persons, totals, show_accepted)
__print_totals(totals, persons, show_accepted)
if scope == "current":
__print_burndown(iteration, show_accepted)
def _complete_projects(
ctx: click.Context, args: List[str], incomplete: str
) -> List[str]:
return [
alias for alias in Config["project_aliases"] if alias.startswith(incomplete)
]
@cli.command("stories")
@click.argument("project", type=click.STRING, autocompletion=_complete_projects)
@click.argument("story", required=False)
@click.argument("action", required=False)
@click.option("--scope", default="current")
@click.option("--show-accepted/--hide-accepted", default=True)
@require_login
def stories(
project: str,
story: Optional[str],
action: Optional[str],
scope: str,
show_accepted: bool,
) -> None:
project_id = int(Config["project_aliases"][project])
if story is not None:
story_id = base32.decode(story)
state_actions = "start", "finish", "deliver", "accept", "reject"
if action is not None:
if action in state_actions:
api.stories.put_story(story_id, current_state=action + "ed")
elif action == "comment":
print("Enter the comment, and press Ctrl+D to confirm.")
comment = sys.stdin.read()
print()
api.stories.post_comment(project_id, story_id, text=comment)
stories_info(story)
else:
_stories_current(project, scope, show_accepted)
def __calculate_stories(all_owners, hours, point_scale, allow_split=False):
owners = all_owners.copy()
stories = []
original_hours = hours
while owners:
best, done = None, False
for n in range(min(len(owners), 3), 0, -1):
if done:
break
for points in point_scale:
error = hours * n - points
if error in (point_scale if allow_split else (0,)):
done = True
best = points, n
break
if best is None:
if not allow_split:
return __calculate_stories(all_owners, hours, point_scale, True)
else:
return None
points, n = best
last_points = points / n
story_owners = owners[:n]
del owners[:n]
stories.append((story_owners, points))
if error != 0:
stories.append((story_owners, error))
return stories
@cli.command("meeting")
@click.argument("project", type=click.STRING)
@require_login
def meeting(project: str):
project_id = int(Config["project_aliases"][project])
members = api.projects.get_memberships(project_id)
members = {m["person"]["name"]: m["person"]["id"] for m in members}
labels = api.projects.get_labels(project_id)
labels = {l["name"]: l["id"] for l in labels}
project_info = api.projects.get_project(project_id)
point_scale = [
1 if p == "0" else int(float(p) * 2)
for p in project_info["point_scale"].split(",")
][::-1] + [0]
answers = inquirer.prompt(
[
inquirer.Text("name", message="Meeting name"),
inquirer.Text(
"hours",
message="Meeting length (hours, multiple of 0.5)",
validate=lambda _, x: re.match(r"^[0-9]+(\.[05])?$", x),
),
inquirer.Checkbox(
"owners",
message="Attendees (select with space, confirm with enter)",
choices=list(members.keys()),
),
]
)
if not answers:
return
name = answers["name"]
hours = round(float(answers["hours"]) * 2)
owners = answers["owners"]
total_hours = hours * len(owners)
print(
f"{len(owners)} attendees on a {hours / 2} hour meeting, for a total "
f"of {total_hours / 2} points."
)
stories = __calculate_stories(owners, hours, point_scale)
if stories is None:
print("Could not find a solution")
return
print("\nGoing to create:")
for story_owners, points in stories:
print(
f" - A {points / 2} point story for {', '.join(story_owners)} "
f"({points / 2 / len(story_owners)} each)"
)
print()
answers = inquirer.prompt(
[
inquirer.List("state", message="State of the story", choices=STATES),
inquirer.Checkbox(
"labels",
message="Labels to add (select with space, confirm with enter)",
choices=list(labels.keys()),
),
inquirer.List(
"requester", message="Story requester", choices=list(members.keys())
),
]
)
if not answers:
return
print("Enter the story description, and press Ctrl+D to confirm.")
description = sys.stdin.read()
print()
primary_story = None
for i, (story_owners, points) in enumerate(tqdm(stories, "Creating stories...")):
s_name = f"{name} ({i + 1}/{len(stories)})"
s_requester = members[answers["requester"]]
s_label_ids = [labels[name] for name in answers["labels"]]
s_owner_ids = [members[name] for name in story_owners]
s_description = description.strip()
story = api.stories.post(
token,
project_id,
name=s_name,
requested_by_id=s_requester,
label_ids=s_label_ids,
owner_ids=s_owner_ids,
description=s_description,
estimate=points // 2,
)
s_id = int(story["id"])
if primary_story is None:
primary_story = s_id