Spaces:
Build error
Build error
import os | |
from typing import List, Optional, Tuple | |
import ipyleaflet as leaf | |
import ipywidgets | |
import matplotlib as mpl | |
import numpy as np | |
import pandas as pd | |
import plotly.figure_factory as ff | |
import plotly.graph_objs as go | |
from htmltools import head_content | |
from ipyleaflet import basemaps | |
from matplotlib import cm | |
from shiny import * | |
from shiny.types import SilentException | |
from shinywidgets import * | |
color_palette = cm.get_cmap("viridis", 10) | |
# TODO: how to handle nas (pd.isna)? | |
def col_numeric(domain: Tuple[float, float], na_color: str = "#808080"): | |
rescale = mpl.colors.Normalize(domain[0], domain[1]) | |
def _(vals: List[float]) -> List[str]: | |
cols = color_palette(rescale(vals)) | |
return [mpl.colors.to_hex(v) for v in cols] | |
return _ | |
# TODO: when this issue is fixed, we won't have to sample anymore | |
# https://github.com/rstudio/prism/issues/119 | |
app_dir = os.path.dirname(__file__) | |
allzips = pd.read_csv(os.path.join(app_dir, "superzip.csv")).sample( | |
n=10000, random_state=42 | |
) | |
# ------------------------------------------------------------------------ | |
# Define user interface | |
# ------------------------------------------------------------------------ | |
vars = { | |
"Score": "Overall score", | |
"College": "% college educated", | |
"Income": "Median income", | |
"Population": "Population", | |
} | |
css = open(os.path.join(app_dir, "styles.css"), "r").readlines() | |
ui_map = ui.TagList( | |
output_widget("map", width="100%", height="100%"), | |
ui.panel_fixed( | |
ui.h2("SuperZIP explorer"), | |
ui.input_select("variable", "Heatmap variable", vars), | |
output_widget("density_score", height="200px"), | |
output_widget("density_college", height="200px"), | |
output_widget("density_income", height="200px"), | |
output_widget("density_pop", height="200px"), | |
id="controls", | |
class_="panel panel-default", | |
width="330px", | |
height="auto", | |
draggable=True, | |
top="60px", | |
left="auto", | |
right="20px", | |
bottom="auto", | |
), | |
ui.div( | |
"Data compiled for ", | |
ui.tags.em("Coming Apart: The State of White America, 1960-2010"), | |
" by Charles Murray (Crown Forum, 2012).", | |
id="cite", | |
), | |
) | |
app_ui = ui.page_navbar( | |
ui.nav( | |
"Interactive map", | |
ui.div(head_content(ui.tags.style(css)), ui_map, class_="outer"), | |
), | |
ui.nav( | |
"Data explorer", | |
ui.row( | |
ui.column(3, ui.output_ui("data_intro")), | |
ui.column(9, output_widget("data", height="100%")), | |
), | |
ui.row( | |
ui.column(2), | |
ui.column(8, output_widget("table_map")), | |
ui.column(2), | |
), | |
), | |
title="Superzip", | |
) | |
# ------------------------------------------------------------------------ | |
# non-reactive helper functions | |
# ------------------------------------------------------------------------ | |
def density_plot( | |
overall: pd.DataFrame, | |
in_bounds: pd.DataFrame, | |
var: str, | |
selected: Optional[pd.DataFrame] = None, | |
title: Optional[str] = None, | |
showlegend: bool = False, | |
): | |
dat = [overall[var], in_bounds[var]] | |
if var == "Population": | |
dat = [np.log10(x) for x in dat] | |
# Create distplot with curve_type set to 'normal' | |
fig = ff.create_distplot( | |
dat, | |
["Overall", "In bounds"], | |
colors=["black", "#6DCD59"], | |
show_rug=False, | |
show_hist=False, | |
) | |
# Remove tick labels | |
fig.update_layout( | |
# hovermode="x", | |
height=200, | |
showlegend=showlegend, | |
margin=dict(l=0, r=0, t=0, b=0), | |
legend=dict(x=0.5, y=1, orientation="h", xanchor="center", yanchor="bottom"), | |
xaxis=dict( | |
title=title if title is not None else var, | |
showgrid=False, | |
showline=False, | |
zeroline=False, | |
), | |
yaxis=dict( | |
showgrid=False, | |
showline=False, | |
showticklabels=False, | |
zeroline=False, | |
), | |
) | |
# hovermode itsn't working properly when dynamically, absolutely positioned | |
for _, trace in enumerate(fig.data): | |
trace.update(hoverinfo="none") | |
if selected is not None: | |
x = selected[var].tolist()[0] | |
if var == "Population": | |
x = np.log10(x) | |
fig.add_shape( | |
type="line", | |
x0=x, | |
x1=x, | |
y0=0, | |
y1=1, | |
yref="paper", | |
line=dict(width=1, dash="dashdot", color="gray"), | |
) | |
return go.FigureWidget(data=fig.data, layout=fig.layout) | |
def create_map(**kwargs): | |
map = leaf.Map( | |
center=(37.45, -88.85), | |
zoom=4, | |
scroll_wheel_zoom=True, | |
attribution_control=False, | |
**kwargs, | |
) | |
map.add_layer(leaf.basemap_to_tiles(basemaps.CartoDB.DarkMatter)) | |
return map | |
# ------------------------------------------------------------------------ | |
# Server logic | |
# ------------------------------------------------------------------------ | |
def server(input: Inputs, output: Outputs, session: Session): | |
# ------------------------------------------------------------------------ | |
# Main map logic | |
# ------------------------------------------------------------------------ | |
map = create_map(layout=ipywidgets.Layout(width="100%", height="100%")) | |
register_widget("map", map) | |
# Keeps track of whether we're showing markers (zoomed in) or heatmap (zoomed out) | |
show_markers = reactive.Value(False) | |
def _(): | |
nzips = zips_in_bounds().shape[0] | |
show_markers.set(nzips < 200) | |
# When the variable changes, either update marker colors or redraw the heatmap | |
def _(): | |
zips = zips_in_bounds() | |
if not show_markers(): | |
remove_heatmap() | |
map.add_layer(layer_heatmap()) | |
else: | |
zip_colors = dict(zip(zips.Zipcode, zips_marker_color())) | |
for x in map.layers: | |
if x.name.startswith("marker-"): | |
zipcode = int(x.name.split("-")[1]) | |
if zipcode in zip_colors: | |
x.color = zip_colors[zipcode] | |
# When bounds change, maybe add new markers | |
def _(): | |
if not show_markers(): | |
return | |
zips = zips_in_bounds() | |
if zips.empty: | |
return | |
# Be careful not to create markers until we know we need to add it | |
current_markers = set( | |
[m.name for m in map.layers if m.name.startswith("marker-")] | |
) | |
zips["Color"] = zips_marker_color() | |
for _, row in zips.iterrows(): | |
if ("marker-" + str(row.Zipcode)) not in current_markers: | |
map.add_layer(create_marker(row, color=row.Color)) | |
# Change from heatmap to markers: remove the heatmap and show markers | |
# Change from markers to heatmap: hide the markers and add the heatmap | |
def _(): | |
if show_markers(): | |
map.remove_layer(layer_heatmap()) | |
else: | |
map.add_layer(layer_heatmap()) | |
opacity = 0.6 if show_markers() else 0.0 | |
for x in map.layers: | |
if x.name.startswith("marker-"): | |
x.fill_opacity = opacity | |
x.opacity = opacity | |
def zips_in_bounds(): | |
bb = reactive_read(map, "bounds") | |
if not bb: | |
# TODO: this should really be `raise SilentException`...why doesn't it work? | |
# return pd.DataFrame() | |
raise SilentException | |
lats = (bb[0][0], bb[1][0]) | |
lons = (bb[0][1], bb[1][1]) | |
return allzips[ | |
(allzips.Lat >= lats[0]) | |
& (allzips.Lat <= lats[1]) | |
& (allzips.Long >= lons[0]) | |
& (allzips.Long <= lons[1]) | |
] | |
def zips_marker_color(): | |
vals = allzips[input.variable()] | |
domain = (vals.min(), vals.max()) | |
vals_in_bb = zips_in_bounds()[input.variable()] | |
return col_numeric(domain)(vals_in_bb) | |
def layer_heatmap(): | |
locs = allzips[["Lat", "Long", input.variable()]].to_numpy() | |
return leaf.Heatmap( | |
locations=locs.tolist(), | |
name="heatmap", | |
# R> cat(paste0(round(scales::rescale(log10(1:10), to = c(0.05, 1)), 2), ": '", viridis::viridis(10), "'"), sep = "\n") | |
gradient={ | |
0.05: "#440154", | |
0.34: "#482878", | |
0.5: "#3E4A89", | |
0.62: "#31688E", | |
0.71: "#26828E", | |
0.79: "#1F9E89", | |
0.85: "#35B779", | |
0.91: "#6DCD59", | |
0.96: "#B4DE2C", | |
1: "#FDE725", | |
}, | |
) | |
def remove_heatmap(): | |
for x in map.layers: | |
if x.name == "heatmap": | |
map.remove_layer(x) | |
zip_selected = reactive.Value(None) | |
def _(): | |
return density_plot( | |
allzips, | |
zips_in_bounds(), | |
selected=zip_selected(), | |
var="Score", | |
title="Overall Score", | |
showlegend=True, | |
) | |
def _(): | |
return density_plot( | |
allzips, zips_in_bounds(), selected=zip_selected(), var="Income" | |
) | |
def _(): | |
return density_plot( | |
allzips, zips_in_bounds(), selected=zip_selected(), var="College" | |
) | |
def _(): | |
return density_plot( | |
allzips, | |
zips_in_bounds(), | |
selected=zip_selected(), | |
var="Population", | |
title="log10(Population)", | |
) | |
def create_marker(row, **kwargs): | |
m = leaf.CircleMarker( | |
location=(row.Lat, row.Long), | |
popup=ipywidgets.HTML( | |
f""" | |
{row.City}, {row.State} ({row.Zipcode})<br/> | |
{row.Score:.1f} overall score<br/> | |
{row.College:.1f}% college educated<br/> | |
${row.Income:.0f}k median income<br/> | |
{row.Population} people<br/> | |
""" | |
), | |
name=f"marker-{row.Zipcode}", | |
**kwargs, | |
) | |
def _on_click(**kwargs): | |
coords = kwargs["coordinates"] | |
idx = (allzips.Lat == coords[0]) & (allzips.Long == coords[1]) | |
zip_selected.set(allzips[idx]) | |
m.on_click(_on_click) | |
return m | |
def _(): | |
zips = zips_in_bounds() | |
md = ui.markdown( | |
f""" | |
{zips.shape[0]} zip codes are currently within the map's viewport, and amongst them: | |
* {100*zips.Superzip.mean():.1f}% are superzips | |
* Mean income is ${zips.Income.mean():.0f}k π° | |
* Mean population is {zips.Population.mean():.0f} π¨π½π©π½π¦π½ | |
* Mean college educated is {zips.College.mean():.1f}% π | |
Use the filter controls on the table's columns to drill down further or | |
click on a row to | |
""", | |
) | |
return ui.div(md, class_="my-3 lead") | |
selected_table_row = reactive.Value(pd.DataFrame()) | |
def _(): | |
import qgrid | |
dat = zips_in_bounds().drop(["Lat", "Long", "Color"], axis=1, errors="ignore") | |
w = qgrid.show_grid( | |
dat, | |
grid_options={"editable": False}, | |
column_definitions={"index": {"maxWidth": 0, "minWidth": 0, "width": 0}}, | |
) | |
def _on_change(event, widget): | |
idx = event["new"][0] | |
selected_table_row.set(zips_in_bounds().iloc[[idx]]) | |
w.on("selection_changed", _on_change) | |
return w | |
table_map = create_map() | |
def _(): | |
if selected_table_row().empty: | |
return None | |
else: | |
return table_map | |
# TODO: currently there is a bug where clicking the popup causes an error, | |
# but I _think_ this'll get fixed in the next release of ipywidgets/ipyleaflet | |
# https://github.com/jupyter-widgets/ipywidgets/issues/3384 | |
def _(): | |
for x in table_map.layers: | |
if x.name.startswith("marker"): | |
table_map.remove_layer(x) | |
for _, row in selected_table_row().iterrows(): | |
table_map.add_layer(create_marker(row)) | |
app = App(app_ui, server) | |