import os from typing import List, Optional, Tuple import ipyleaflet as leaf import ipywidgets import matplotlib as mpl import numpy as np import pandas as pd import plotly.figure_factory as ff import plotly.graph_objs as go from htmltools import head_content from ipyleaflet import basemaps from matplotlib import cm from shiny import * from shiny.types import SilentException from shinywidgets import * color_palette = cm.get_cmap("viridis", 10) # TODO: how to handle nas (pd.isna)? def col_numeric(domain: Tuple[float, float], na_color: str = "#808080"): rescale = mpl.colors.Normalize(domain[0], domain[1]) def _(vals: List[float]) -> List[str]: cols = color_palette(rescale(vals)) return [mpl.colors.to_hex(v) for v in cols] return _ # TODO: when this issue is fixed, we won't have to sample anymore # https://github.com/rstudio/prism/issues/119 app_dir = os.path.dirname(__file__) allzips = pd.read_csv(os.path.join(app_dir, "superzip.csv")).sample( n=10000, random_state=42 ) # ------------------------------------------------------------------------ # Define user interface # ------------------------------------------------------------------------ vars = { "Score": "Overall score", "College": "% college educated", "Income": "Median income", "Population": "Population", } css = open(os.path.join(app_dir, "styles.css"), "r").readlines() ui_map = ui.TagList( output_widget("map", width="100%", height="100%"), ui.panel_fixed( ui.h2("SuperZIP explorer"), ui.input_select("variable", "Heatmap variable", vars), output_widget("density_score", height="200px"), output_widget("density_college", height="200px"), output_widget("density_income", height="200px"), output_widget("density_pop", height="200px"), id="controls", class_="panel panel-default", width="330px", height="auto", draggable=True, top="60px", left="auto", right="20px", bottom="auto", ), ui.div( "Data compiled for ", ui.tags.em("Coming Apart: The State of White America, 1960-2010"), " by Charles Murray (Crown Forum, 2012).", id="cite", ), ) app_ui = ui.page_navbar( ui.nav( "Interactive map", ui.div(head_content(ui.tags.style(css)), ui_map, class_="outer"), ), ui.nav( "Data explorer", ui.row( ui.column(3, ui.output_ui("data_intro")), ui.column(9, output_widget("data", height="100%")), ), ui.row( ui.column(2), ui.column(8, output_widget("table_map")), ui.column(2), ), ), title="Superzip", ) # ------------------------------------------------------------------------ # non-reactive helper functions # ------------------------------------------------------------------------ def density_plot( overall: pd.DataFrame, in_bounds: pd.DataFrame, var: str, selected: Optional[pd.DataFrame] = None, title: Optional[str] = None, showlegend: bool = False, ): dat = [overall[var], in_bounds[var]] if var == "Population": dat = [np.log10(x) for x in dat] # Create distplot with curve_type set to 'normal' fig = ff.create_distplot( dat, ["Overall", "In bounds"], colors=["black", "#6DCD59"], show_rug=False, show_hist=False, ) # Remove tick labels fig.update_layout( # hovermode="x", height=200, showlegend=showlegend, margin=dict(l=0, r=0, t=0, b=0), legend=dict(x=0.5, y=1, orientation="h", xanchor="center", yanchor="bottom"), xaxis=dict( title=title if title is not None else var, showgrid=False, showline=False, zeroline=False, ), yaxis=dict( showgrid=False, showline=False, showticklabels=False, zeroline=False, ), ) # hovermode itsn't working properly when dynamically, absolutely positioned for _, trace in enumerate(fig.data): trace.update(hoverinfo="none") if selected is not None: x = selected[var].tolist()[0] if var == "Population": x = np.log10(x) fig.add_shape( type="line", x0=x, x1=x, y0=0, y1=1, yref="paper", line=dict(width=1, dash="dashdot", color="gray"), ) return go.FigureWidget(data=fig.data, layout=fig.layout) def create_map(**kwargs): map = leaf.Map( center=(37.45, -88.85), zoom=4, scroll_wheel_zoom=True, attribution_control=False, **kwargs, ) map.add_layer(leaf.basemap_to_tiles(basemaps.CartoDB.DarkMatter)) return map # ------------------------------------------------------------------------ # Server logic # ------------------------------------------------------------------------ def server(input: Inputs, output: Outputs, session: Session): # ------------------------------------------------------------------------ # Main map logic # ------------------------------------------------------------------------ map = create_map(layout=ipywidgets.Layout(width="100%", height="100%")) register_widget("map", map) # Keeps track of whether we're showing markers (zoomed in) or heatmap (zoomed out) show_markers = reactive.Value(False) @reactive.Effect def _(): nzips = zips_in_bounds().shape[0] show_markers.set(nzips < 200) # When the variable changes, either update marker colors or redraw the heatmap @reactive.Effect @reactive.event(input.variable) def _(): zips = zips_in_bounds() if not show_markers(): remove_heatmap() map.add_layer(layer_heatmap()) else: zip_colors = dict(zip(zips.Zipcode, zips_marker_color())) for x in map.layers: if x.name.startswith("marker-"): zipcode = int(x.name.split("-")[1]) if zipcode in zip_colors: x.color = zip_colors[zipcode] # When bounds change, maybe add new markers @reactive.Effect @reactive.event(lambda: zips_in_bounds()) def _(): if not show_markers(): return zips = zips_in_bounds() if zips.empty: return # Be careful not to create markers until we know we need to add it current_markers = set( [m.name for m in map.layers if m.name.startswith("marker-")] ) zips["Color"] = zips_marker_color() for _, row in zips.iterrows(): if ("marker-" + str(row.Zipcode)) not in current_markers: map.add_layer(create_marker(row, color=row.Color)) # Change from heatmap to markers: remove the heatmap and show markers # Change from markers to heatmap: hide the markers and add the heatmap @reactive.Effect @reactive.event(show_markers) def _(): if show_markers(): map.remove_layer(layer_heatmap()) else: map.add_layer(layer_heatmap()) opacity = 0.6 if show_markers() else 0.0 for x in map.layers: if x.name.startswith("marker-"): x.fill_opacity = opacity x.opacity = opacity @reactive.Calc def zips_in_bounds(): bb = reactive_read(map, "bounds") if not bb: # TODO: this should really be `raise SilentException`...why doesn't it work? # return pd.DataFrame() raise SilentException lats = (bb[0][0], bb[1][0]) lons = (bb[0][1], bb[1][1]) return allzips[ (allzips.Lat >= lats[0]) & (allzips.Lat <= lats[1]) & (allzips.Long >= lons[0]) & (allzips.Long <= lons[1]) ] @reactive.Calc def zips_marker_color(): vals = allzips[input.variable()] domain = (vals.min(), vals.max()) vals_in_bb = zips_in_bounds()[input.variable()] return col_numeric(domain)(vals_in_bb) @reactive.Calc def layer_heatmap(): locs = allzips[["Lat", "Long", input.variable()]].to_numpy() return leaf.Heatmap( locations=locs.tolist(), name="heatmap", # R> cat(paste0(round(scales::rescale(log10(1:10), to = c(0.05, 1)), 2), ": '", viridis::viridis(10), "'"), sep = "\n") gradient={ 0.05: "#440154", 0.34: "#482878", 0.5: "#3E4A89", 0.62: "#31688E", 0.71: "#26828E", 0.79: "#1F9E89", 0.85: "#35B779", 0.91: "#6DCD59", 0.96: "#B4DE2C", 1: "#FDE725", }, ) def remove_heatmap(): for x in map.layers: if x.name == "heatmap": map.remove_layer(x) zip_selected = reactive.Value(None) @output(id="density_score") @render_widget def _(): return density_plot( allzips, zips_in_bounds(), selected=zip_selected(), var="Score", title="Overall Score", showlegend=True, ) @output(id="density_income") @render_widget def _(): return density_plot( allzips, zips_in_bounds(), selected=zip_selected(), var="Income" ) @output(id="density_college") @render_widget def _(): return density_plot( allzips, zips_in_bounds(), selected=zip_selected(), var="College" ) @output(id="density_pop") @render_widget def _(): return density_plot( allzips, zips_in_bounds(), selected=zip_selected(), var="Population", title="log10(Population)", ) def create_marker(row, **kwargs): m = leaf.CircleMarker( location=(row.Lat, row.Long), popup=ipywidgets.HTML( f""" {row.City}, {row.State} ({row.Zipcode})
{row.Score:.1f} overall score
{row.College:.1f}% college educated
${row.Income:.0f}k median income
{row.Population} people
""" ), name=f"marker-{row.Zipcode}", **kwargs, ) def _on_click(**kwargs): coords = kwargs["coordinates"] idx = (allzips.Lat == coords[0]) & (allzips.Long == coords[1]) zip_selected.set(allzips[idx]) m.on_click(_on_click) return m @output(id="data_intro") @render.ui def _(): zips = zips_in_bounds() md = ui.markdown( f""" {zips.shape[0]} zip codes are currently within the map's viewport, and amongst them: * {100*zips.Superzip.mean():.1f}% are superzips * Mean income is ${zips.Income.mean():.0f}k 💰 * Mean population is {zips.Population.mean():.0f} 👨🏽👩🏽👦🏽 * Mean college educated is {zips.College.mean():.1f}% 🎓 Use the filter controls on the table's columns to drill down further or click on a row to """, ) return ui.div(md, class_="my-3 lead") selected_table_row = reactive.Value(pd.DataFrame()) @output(id="data") @render_widget def _(): import qgrid dat = zips_in_bounds().drop(["Lat", "Long", "Color"], axis=1, errors="ignore") w = qgrid.show_grid( dat, grid_options={"editable": False}, column_definitions={"index": {"maxWidth": 0, "minWidth": 0, "width": 0}}, ) def _on_change(event, widget): idx = event["new"][0] selected_table_row.set(zips_in_bounds().iloc[[idx]]) w.on("selection_changed", _on_change) return w table_map = create_map() @output(id="table_map") @render_widget def _(): if selected_table_row().empty: return None else: return table_map # TODO: currently there is a bug where clicking the popup causes an error, # but I _think_ this'll get fixed in the next release of ipywidgets/ipyleaflet # https://github.com/jupyter-widgets/ipywidgets/issues/3384 @reactive.Effect @reactive.event(selected_table_row) def _(): for x in table_map.layers: if x.name.startswith("marker"): table_map.remove_layer(x) for _, row in selected_table_row().iterrows(): table_map.add_layer(create_marker(row)) app = App(app_ui, server)