Source code for fantasyfootball.data

from __future__ import annotations  # noqa: F404

import logging
from pathlib import PosixPath
from typing import List
from urllib.error import HTTPError

import numpy as np
import pandas as pd
from janitor import clean_names

from fantasyfootball.config import data_sources, root_dir, scoring

logger = logging.getLogger("fantasydata")
logger.setLevel(logging.INFO)


[docs]class FantasyData: """Loads historical fantasy football data. Args: season_year_start (int): The first year of the season. season_year_end (int): The last year of the season. """ def __init__(self, season_year_start: int, season_year_end: int): self.season_year_start = season_year_start self.season_year_end = season_year_end self._validate_season_year_range() # Set when FantasyData object is created. self.ff_data = None self.load_data() self.scoring = scoring
[docs] def _validate_season_year_range(self) -> bool: """Ensures that the season year range is valid. Raises: ValueError: If the season year is less than the minimum year ValueError: If the season year is greater than the maximum year Returns: bool: True if the season year range is valid. """ season_years = [ int(str(x).split("/")[-1]) for x in (root_dir / "datasets" / "season").glob("*") if str(x).split("/")[-1].isdigit() ] min_year = min(season_years) max_year = max(season_years) if self.season_year_start < min_year: raise ValueError( f"Season year start {self.season_year_start}\n" f"is less than minimum year {min_year}" ) if self.season_year_end > max_year: raise ValueError( f"Season year end {self.season_year_end}\n" f" is greater than maximum year {max_year}" ) return True
[docs] @staticmethod def _refresh_data(ff_data_dir: PosixPath, data_sources: dict) -> bool: """Use the datasets specified in the `config.py` to identify if a dataset is missing from the installed version of the package. When a missing dataset is identified, the most recent version is downloaded from Git. Args: ff_data_dir (PosixPath): The directory containing the seasonal data. data_sources (dict): A dictionary indicating the names of the data sources used in the fantasyfootball package. Returns: bool: True if data in current package is up to date or data was succesfully downloaded from remote repo. """ season_year = ff_data_dir.name base_url = f"https://github.com/thecodeforest/fantasyfootball/blob/main/datasets/season/{season_year}" # noqa E501 expected_data_sources = data_sources.keys() local_data_sources = [ x.name.replace(".gz", "") for x in ff_data_dir.glob("*.gz") ] missing_local_data_sources = set(expected_data_sources) - set( local_data_sources ) if missing_local_data_sources: for missing_data in missing_local_data_sources: missing_data_url = f"{base_url}/{missing_data}.gz?raw=true" try: logger.info(f"Fetching most recent {missing_data} data from remote") missing_data_df = pd.read_csv(missing_data_url, compression="gzip") missing_data_df.to_csv( ff_data_dir / f"{missing_data}.gz", index=False, compression="gzip", ) logger.info( f"Succussfully downloaded {missing_data} data from remote" ) except HTTPError as error: logger.error("Error:", error) logger.error(f"{missing_data} data not available on remote") raise return True
[docs] @staticmethod def _load_data( ff_data_dir: PosixPath, data_sources: dict, *exclude: str ) -> pd.DataFrame: """Helper method to load all other data, excluding the season calendar and roster of active players for a season. Args: ff_data_dir (PosixPath): The directory containing the season data. data_sources (dict): A dictionary indicating the names of the data sources used in the fantasyfootball package. exclude (str): The names of the files to exclude from the data load. Raises: ValueError: If the exclude file name is a required file. ValueError: If the columns used to join the data are not found. Returns: pd.DataFrame: The dataframe containing all historical fantasy football data for a single season. """ required_data = [ k for k in data_sources.keys() if data_sources[k]["is_required"] ] if set(exclude).intersection(required_data): raise ValueError( f"Cannot exclude required data: {required_data}. " f"Please do not exclude required data and try again." ) calendar_df = pd.read_csv(ff_data_dir / "calendar.gz", compression="gzip") players_df = pd.read_csv(ff_data_dir / "players.gz", compression="gzip") season_ff_df = pd.merge( calendar_df, players_df, how="inner", on=["team", "season_year"] ) supplementary_data = set(data_sources.keys()) - set(required_data) for data in data_sources: if data in exclude: continue if data in supplementary_data: dataset_df = pd.read_csv(ff_data_dir / f"{data}.gz", compression="gzip") keys = data_sources[data]["keys"] if not set(keys).issubset(set(dataset_df.columns)): raise ValueError( f"{data} does not contain all the required keys: {keys}" ) season_ff_df = pd.merge(season_ff_df, dataset_df, how="left") return season_ff_df
[docs] def _filter_to_most_recent_complete_week(self, df: pd.DataFrame) -> pd.DataFrame: """Filters the dataframe to the most recent week that has complete data in-season. Args: df (pd.DataFrame): The dataframe containing all historical fantasy football data Raises: ValueError: If the most recent week in a season has less than 100 observations.Typically, values should be around 400. Less than 100 indicates an incomplete week. Returns: pd.DataFrame: If the most recent week is in season, the dataframe is filtered to the most recent week. """ data_path = root_dir / "datasets" / "season" / str(self.season_year_end) calendar_df = pd.read_csv(data_path / "calendar.gz", compression="gzip") stats_df = pd.read_csv(data_path / "stats.gz", compression="gzip") # find most recent season in calendar max_season_year_calendar = calendar_df["season_year"].max() if self.season_year_end == max_season_year_calendar: calendar_df = calendar_df[ calendar_df["season_year"] == self.season_year_end ] # filter only to complete games this season stats_df = stats_df.merge( calendar_df, on=["date", "team", "opp", "is_away"] ) # take a count of number of observations for the most recent week obs_most_recent_wk = ( stats_df["week"] .value_counts() .reset_index() .sort_values("index") .rename(columns={"index": "week", "week": "count"}) .tail(1) ) if obs_most_recent_wk["count"].values[0] > 100: most_recent_wk = obs_most_recent_wk["week"].values[0] most_recent_wk_date = calendar_df[ calendar_df["week"] == most_recent_wk ]["date"].max() # filter ff_df to be less than or equal to the most recent week date df = df[df["date"] <= most_recent_wk_date] return df else: raise ValueError( "Most recent week < 100 observations. Check data is complete." ) else: return df
[docs] def load_data( self, data_sources: dict = data_sources, filter_final_season_week: bool = True ) -> FantasyData: """Loads all historical fantasy football data from the season year range provided. Each season year is loaded separately and then concatenated together. Args: data_sources (dict): A dictionary indicating the names of the data sources used in the fantasyfootball package. filter_final_season_week (bool): If True, the final week of each season is filtered out. These weeks are filtered because many players are 'active' but have minimal participation in the game to avoid injury when their team has secured a playoff spot. Excluding these weeks allows for more accurate predictions. Default is True. Returns: FantasyData: The dataframe containing all historical fantasy football data for the specified season year range. """ logger.info( f"Loading data from {self.season_year_start} to {self.season_year_end}" ) ff_data_dir = root_dir / "datasets" / "season" ff_df = pd.DataFrame() for season_year in range(self.season_year_start, self.season_year_end + 1): if season_year < 2016: logger.warning("Player injury data not available prior to 2016 season") # TO DO: Do not throw expection if offline; raise warning instead self._refresh_data(ff_data_dir / str(season_year), data_sources) season_ff_df = self._load_data(ff_data_dir / str(season_year), data_sources) if filter_final_season_week: max_week = max(season_ff_df["week"]) logger.info( f"Dropping final week (week {max_week}) of season {season_year}" ) season_ff_df = season_ff_df[season_ff_df["week"] != max_week] ff_df = pd.concat([ff_df, season_ff_df]) # if most recent season is incomplete, filter to the most recent complete week ff_df = self._filter_to_most_recent_complete_week(ff_df) self.ff_data = ff_df
[docs] @staticmethod def _validate_scoring_source_rules(source_rules: dict, ff_df_columns: list) -> None: """Validates the scoring source rules provided. Args: source_rules (dict): The scoring source rules to validate. ff_df_columns (list): The list of columns in the dataframe. Raises: ValueError: If the scoring source rules are not valid. TypeError: If the scoring source rules are not a dictionary. KeyError: If the scoring source keys do not include 'scoring_columns' or 'multiplier'. KeyError: If scoring columns are not a subset of the dataframe columns. TypeError: If the scoring multiplier is not a dictionary. KeyError: If the scoring multiplier keys do not include 'threshold' and 'points'. KeyError: If the multiplier values are not a subset of the dataframe columns. """ # validate that source name is present source_name = list(source_rules.keys())[0] if not source_name: raise ValueError("Source name is required") scoring_rules = source_rules[source_name] if not isinstance(source_rules, dict): raise TypeError("scoring_source_rules must be a dictionary") # validate required keys for scoring required_keys = {"scoring_columns", "multiplier"} if not set(scoring_rules.keys()) == required_keys: raise KeyError(f"Scoring rules must contain keys {required_keys}") # validate scoring columns are subset of columns present in data columns if not set(scoring_rules["scoring_columns"].keys()) <= set(ff_df_columns): raise KeyError( "Scoring columns must be a subset of columns present in dataframe" ) # validate multiplier is a dictionary if not isinstance(scoring_rules["multiplier"], dict): raise TypeError("scoring_rules['multiplier'] must be a dictionary") # validate if multiplier is present, it has to have threhold and points keys if scoring_rules["multiplier"] is not None: for column_multipler in scoring_rules["multiplier"].keys(): if not set(scoring_rules["multiplier"][column_multipler].keys()) == { "threshold", "points", }: raise KeyError( f"scoring_rules['multiplier'][{column_multipler}] must\n" f"have 'threshold' and 'points' keys" ) # validate multiplier scoring column is subset of scoring columns if scoring_rules["multiplier"] is not None: if not set(scoring_rules["multiplier"].keys()) <= set( scoring_rules["scoring_columns"].keys() ): raise KeyError( "Multiplier scoring column must be a subset of scoring columns" )
[docs] def add_scoring_source(self, scoring_source_rules: dict) -> FantasyData: """Updates the scoring source rules. Args: scoring_source_rules (dict): Scoring source rules. Required keys are the source name (e.g., 'custom'), 'scoring_columns', and 'multiplier'. Returns: FantasyData: An updated FantasyData object with the new scoring source rules. Example: >>> from fantasyfootball.data import FantasyData >>> fantasy_data = FantasyData(season_year_start=2019, season_year_end=2021 ) >>> new_scoring_source = {"my league": {"scoring_columns": { "passing_td": 4, "passing_yds": 0.04, "passing_int": -3, "rushing_td": 6, "rushing_yds": 0.1, "receiving_rec": 0.5, "receiving_td": 4, "receiving_yds": 0.1, "fumbles_fmb": -3, "scoring_2pm": 4, "punt_returns_td": 6, }, "multiplier": {"rushing_yds" : {"threshold": 100,"points": 5}, "passing_yds": {"threshold": 300, "points": 3}, "receiving_yds": {"threshold": 100, "points": 3}, } }} >>> fantasy_data.add_scoring_source(new_scoring_source) """ self._validate_scoring_source_rules( scoring_source_rules, self.ff_data.columns.tolist() ) new_scoring_source_name = [key for key in scoring_source_rules.keys()][0] self.scoring = {**self.scoring, **scoring_source_rules} print(f"Added scoring source: {new_scoring_source_name}") logger.info(f"Scoring source '{new_scoring_source_name}' Added")
[docs] @staticmethod def score_player( player_df: pd.DataFrame, scoring_columns: set, scoring_source_rules: dict ) -> np.array: """Calculates the total number of points scored for a single week Args: player_df (pd.DataFrame): Weekly stats for a single player for the season. scoring_columns (set): Columns to use for scoring scoring_source_rules (dict): Rules for scoring Returns: np.array: The total number of points scored for a single week. """ player_weekly_points = [0] * player_df.shape[0] for column in scoring_columns: point_amount = scoring_source_rules["scoring_columns"][column] scoring_amount = player_df[column] weekly_points_scored = [x * point_amount for x in scoring_amount] player_weekly_points = np.add(player_weekly_points, weekly_points_scored) if scoring_source_rules.get("multiplier"): column_multiplier = scoring_source_rules["multiplier"].get(column) if column_multiplier: weekly_mult_points_scored = [ column_multiplier["points"] if x > column_multiplier["threshold"] else 0 for x in scoring_amount ] player_weekly_points = np.add( player_weekly_points, weekly_mult_points_scored ) return player_weekly_points
[docs] def create_fantasy_points_column(self, scoring_source: str) -> FantasyData: """Creates a fantasy points column for the scoring source provided. Args: scoring_source (str): Name of the scoring source to use (e.g., 'draft kings', 'yahoo', 'custom'). Returns: FantasyData: An updated FantasyData object with the new fantasy points column. """ # ensure scoring source is valid if scoring_source not in self.scoring.keys(): raise KeyError(f"Scoring source '{scoring_source}' not found") scoring_source_rules = self.scoring[scoring_source] scoring_columns = set(scoring_source_rules["scoring_columns"].keys()) & set( self.ff_data.columns ) all_player_pts = list() for row in self.ff_data[["name", "pid"]].drop_duplicates().itertuples(): player_df = self.ff_data[ (self.ff_data["name"] == row.name) & (self.ff_data["pid"] == row.pid) ] player_weekly_points = self.score_player( player_df, scoring_columns, scoring_source_rules ) all_player_pts.append( [row.name, row.pid, player_df["date"], player_weekly_points] ) all_pts_df = ( pd.DataFrame( all_player_pts, columns=["name", "pid", "date", f"ff_pts_{scoring_source}"], ) .set_index(["name", "pid"]) .apply(pd.Series.explode) .reset_index() .clean_names() ) self.ff_data = pd.merge( self.ff_data, all_pts_df, on=["name", "pid", "date"], how="inner" ) logger.info(f"Fantasy points column '{self.ff_data.columns[-1]}' added")
# return FantasyData
[docs] def show_scoring_sources(self) -> List[str]: return list(self.scoring.keys())
@property def data(self) -> pd.DataFrame: """Returns the dataframe of the historical NFL Fantasy data. Returns: pd.DataFrame: Historical NFL Fantasy data. """ return self.ff_data
[docs] def __str__(self) -> str: """Returns a string representation of the FantasyData object. Returns: str: Top 5 rows of the FantasyData object. """ return str(self.ff_data.head())