File size: 4,662 Bytes
c3bc884 2b7b43f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import polars as pl
import numpy as np
def feature_engineering(df: pl.DataFrame) -> pl.DataFrame:
# Extract the year from the game_date column
df = df.with_columns(
pl.col('game_date').str.slice(0, 4).alias('year')
)
df = df.with_columns([
(-(pl.col('vy0')**2 - (2 * pl.col('ay') * (pl.col('y0') - 17/12)))**0.5).alias('vy_f'),
])
df = df.with_columns([
((pl.col('vy_f') - pl.col('vy0')) / pl.col('ay')).alias('t'),
])
df = df.with_columns([
(pl.col('vz0') + (pl.col('az') * pl.col('t'))).alias('vz_f'),
(pl.col('vx0') + (pl.col('ax') * pl.col('t'))).alias('vx_f')
])
df = df.with_columns([
(-np.arctan(pl.col('vz_f') / pl.col('vy_f')) * (180 / np.pi)).alias('vaa'),
(-np.arctan(pl.col('vx_f') / pl.col('vy_f')) * (180 / np.pi)).alias('haa')
])
# Mirror horizontal break for left-handed pitchers
df = df.with_columns(
pl.when(pl.col('pitcher_hand') == 'L')
.then(-pl.col('ax'))
.otherwise(pl.col('ax'))
.alias('ax')
)
# Mirror horizontal break for left-handed pitchers
df = df.with_columns(
pl.when(pl.col('pitcher_hand') == 'L')
.then(-pl.col('hb'))
.otherwise(pl.col('hb'))
.alias('hb')
)
# Mirror horizontal release point for left-handed pitchers
df = df.with_columns(
pl.when(pl.col('pitcher_hand') == 'L')
.then(pl.col('x0'))
.otherwise(-pl.col('x0'))
.alias('x0')
)
# Define the pitch types to be considered
pitch_types = ['SI', 'FF', 'FC']
# Filter the DataFrame to include only the specified pitch types
df_filtered = df.filter(pl.col('pitch_type').is_in(pitch_types))
# Group by pitcher_id and year, then aggregate to calculate average speed and usage percentage
df_agg = df_filtered.group_by(['pitcher_id', 'year', 'pitch_type']).agg([
pl.col('start_speed').mean().alias('avg_fastball_speed'),
pl.col('az').mean().alias('avg_fastball_az'),
pl.col('ax').mean().alias('avg_fastball_ax'),
pl.len().alias('count')
])
# Sort the aggregated data by count and average fastball speed
df_agg = df_agg.sort(['count', 'avg_fastball_speed'], descending=[True, True])
df_agg = df_agg.unique(subset=['pitcher_id', 'year'], keep='first')
# Join the aggregated data with the main DataFrame
df = df.join(df_agg, on=['pitcher_id', 'year'])
# If no fastball, use the fastest pitch for avg_fastball_speed
df = df.with_columns(
pl.when(pl.col('avg_fastball_speed').is_null())
.then(pl.col('start_speed').max().over('pitcher_id'))
.otherwise(pl.col('avg_fastball_speed'))
.alias('avg_fastball_speed')
)
# If no fastball, use the fastest pitch for avg_fastball_az
df = df.with_columns(
pl.when(pl.col('avg_fastball_az').is_null())
.then(pl.col('az').max().over('pitcher_id'))
.otherwise(pl.col('avg_fastball_az'))
.alias('avg_fastball_az')
)
# If no fastball, use the fastest pitch for avg_fastball_ax
df = df.with_columns(
pl.when(pl.col('avg_fastball_ax').is_null())
.then(pl.col('ax').max().over('ax'))
.otherwise(pl.col('avg_fastball_ax'))
.alias('avg_fastball_ax')
)
# Calculate pitch differentials
df = df.with_columns(
(pl.col('start_speed') - pl.col('avg_fastball_speed')).alias('speed_diff'),
(pl.col('az') - pl.col('avg_fastball_az')).alias('az_diff'),
(pl.col('ax') - pl.col('avg_fastball_ax')).abs().alias('ax_diff')
)
# Cast the year column to integer type
df = df.with_columns(
pl.col('year').cast(pl.Int64)
)
df = df.with_columns([
pl.lit('All').alias('all')
])
# Calculate mound_to_release as 60.5 - extension
df = df.with_columns([
(60.5 - df["extension"]).alias("release_pos_y")
])
# Calculate delta time (Δt)
delta_t = (df["release_pos_y"] - df["y0"]) / df["vy0"]
# print((df["vx0"] * delta_t + 0.5 * df["ax"] * delta_t ** 2))
# Corrected back-calculation of release_pos_x and release_pos_z
df = df.with_columns(
pl.when(pl.col('pitcher_hand')== 'R')
.then(df["x0"] - df["vx0"] * delta_t - 0.5 * df["ax"] * delta_t ** 2)
.otherwise(df["x0"] + df["vx0"] * delta_t - 0.5 * df["ax"] * delta_t ** 2)
.alias('release_pos_x')
)
df = df.with_columns([
(df["z0"] + df["vz0"] * delta_t + 0.5 * df["az"] * delta_t ** 2).alias("release_pos_z")
])
return df |