video_stream / index.js
MarcSkovMadsen's picture
Upload 4 files
0be8eeb
importScripts("https://cdn.jsdelivr.net/pyodide/v0.24.1/full/pyodide.js");
function sendPatch(patch, buffers, msg_id) {
self.postMessage({
type: 'patch',
patch: patch,
buffers: buffers
})
}
async function startApplication() {
console.log("Loading pyodide!");
self.postMessage({type: 'status', msg: 'Loading pyodide'})
self.pyodide = await loadPyodide();
self.pyodide.globals.set("sendPatch", sendPatch);
console.log("Loaded!");
await self.pyodide.loadPackage("micropip");
const env_spec = ['https://cdn.holoviz.org/panel/wheels/bokeh-3.3.2-py3-none-any.whl', 'https://cdn.holoviz.org/panel/1.3.6/dist/wheels/panel-1.3.6-py3-none-any.whl', 'pyodide-http==0.2.1', 'numpy', 'param', 'scikit-image']
for (const pkg of env_spec) {
let pkg_name;
if (pkg.endsWith('.whl')) {
pkg_name = pkg.split('/').slice(-1)[0].split('-')[0]
} else {
pkg_name = pkg
}
self.postMessage({type: 'status', msg: `Installing ${pkg_name}`})
try {
await self.pyodide.runPythonAsync(`
import micropip
await micropip.install('${pkg}');
`);
} catch(e) {
console.log(e)
self.postMessage({
type: 'status',
msg: `Error while installing ${pkg_name}`
});
}
}
console.log("Packages loaded!");
self.postMessage({type: 'status', msg: 'Executing code'})
const code = `
import asyncio
from panel.io.pyodide import init_doc, write_doc
init_doc()
import base64
import io
import time
import numpy as np
import param
import PIL
import skimage
from PIL import Image, ImageFilter
from skimage import data, filters
from skimage.color.adapt_rgb import adapt_rgb, each_channel
from skimage.draw import rectangle
from skimage.exposure import rescale_intensity
from skimage.feature import Cascade
import panel as pn
import sys
HEIGHT = 600 # pixels
WIDTH = 600 # pixels
TIMEOUT = 500 # milliseconds
if sys.platform == 'emscripten':
HEIGHT = 800 # pixels
WIDTH = 800 # pixels
TIMEOUT=100
CSS="""
.mdc-drawer {background: var(--light-bg-color) !important;}"""
pn.extension(raw_css=[CSS], sizing_mode="stretch_width")
class ImageModel(pn.viewable.Viewer):
"""Base class for image models."""
def __init__(self, **params):
super().__init__(**params)
with param.edit_constant(self):
self.name = self.__class__.name.replace("Model", "")
self.view = self.create_view()
def __panel__(self):
return self.view
def apply(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> str:
"""Transforms a base64 encoded jpg image to a base64 encoded jpg BytesIO object"""
raise NotImplementedError()
def create_view(self):
"""Creates a view of the parameters of the transform to enable the user to configure them"""
return pn.Param(self, name=self.name)
def transform(self, image):
"""Transforms the image"""
raise NotImplementedError()
class PILImageModel(ImageModel):
"""Base class for PIL image models"""
@staticmethod
def to_pil_img(value: str, height=HEIGHT, width=WIDTH):
"""Converts a base64 jpeg image string to a PIL.Image"""
encoded_data = value.split(",")[1]
base64_decoded = base64.b64decode(encoded_data)
image = Image.open(io.BytesIO(base64_decoded))
image.draft("RGB", (height, width))
return image
@staticmethod
def from_pil_img(image: Image):
"""Converts a PIL.Image to a base64 encoded JPG BytesIO object"""
buff = io.BytesIO()
image.save(buff, format="JPEG")
return buff
def apply(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO:
pil_img = self.to_pil_img(image, height=height, width=width)
transformed_image = self.transform(pil_img)
return self.from_pil_img(transformed_image)
def transform(self, image: PIL.Image) -> PIL.Image:
"""Transforms the PIL.Image image"""
raise NotImplementedError()
class NumpyImageModel(ImageModel):
"""Base class for np.ndarray image models"""
@staticmethod
def to_np_ndarray(image: str, height=HEIGHT, width=WIDTH) -> np.ndarray:
"""Converts a base64 encoded jpeg string to a np.ndarray"""
pil_img = PILImageModel.to_pil_img(image, height=height, width=width)
return np.array(pil_img)
@staticmethod
def from_np_ndarray(image: np.ndarray) -> io.BytesIO:
"""Converts np.ndarray jpeg image to a jpeg BytesIO instance"""
if image.dtype == np.dtype("float64"):
image = (image * 255).astype(np.uint8)
pil_img = PIL.Image.fromarray(image)
return PILImageModel.from_pil_img(pil_img)
def apply(self, image: str, height: int = HEIGHT, width: int = WIDTH) -> io.BytesIO:
np_array = self.to_np_ndarray(image, height=height, width=width)
transformed_image = self.transform(np_array)
return self.from_np_ndarray(transformed_image)
def transform(self, image: np.ndarray) -> np.ndarray:
"""Transforms the np.array image"""
raise NotImplementedError()
class Timer(pn.viewable.Viewer):
"""Helper Component used to show duration trends"""
_trends = param.Dict()
def __init__(self, **params):
super().__init__()
self.last_updates = {}
self._trends = {}
self._layout = pn.Row(**params)
def time_it(self, name, func, *args, **kwargs):
"""Measures the duration of the execution of the func function and reports it under the
name specified"""
start = time.time()
result = func(*args, **kwargs)
end = time.time()
duration = round(end - start, 2)
self._report(name=name, duration=duration)
return result
def inc_it(self, name):
"""Measures the duration since the last time inc_it was called and reports it under the
specified name"""
start = self.last_updates.get(name, time.time())
end = time.time()
duration = round(end - start, 2)
self._report(name=name, duration=duration)
self.last_updates[name] = end
def _report(self, name, duration):
if not name in self._trends:
self._trends[name] = pn.indicators.Trend(
name=name,
data={"x": [1], "y": [duration]},
height=100,
width=150,
sizing_mode="fixed",
)
self.param.trigger("_trends")
else:
trend = self._trends[name]
next_x = max(trend.data["x"]) + 1
trend.stream({"x": [next_x], "y": [duration]}, rollover=10)
@param.depends("_trends")
def _panel(self):
self._layout[:] = list(self._trends.values())
return self._layout
def __panel__(self):
return pn.panel(self._panel)
def to_instance(value, **params):
"""Converts the value to an instance
Args:
value: A param.Parameterized class or instance
Returns:
An instance of the param.Parameterized class
"""
if isinstance(value, param.Parameterized):
value.param.update(**params)
return value
return value(**params)
class VideoStreamInterface(pn.viewable.Viewer):
"""An easy to use interface for a VideoStream and a set of transforms"""
video_stream = param.ClassSelector(
class_=pn.widgets.VideoStream, constant=True, doc="The source VideoStream", allow_refs=False,
)
height = param.Integer(
default=HEIGHT,
bounds=(10, 2000),
step=10,
doc="""The height of the image converted and shown""",
)
width = param.Integer(
default=WIDTH,
bounds=(10, 2000),
step=10,
doc="""The width of the image converted and shown""",
)
model = param.Selector(doc="The currently selected model")
def __init__(
self,
models,
timeout=TIMEOUT,
paused=False,
**params,
):
super().__init__(
video_stream=pn.widgets.VideoStream(
name="Video Stream",
timeout=timeout,
paused=paused,
height=0,
width=0,
visible=False,
format="jpeg",
),
**params,
)
self.image = pn.pane.JPG(
height=self.height, width=self.width, sizing_mode="fixed"
)
self._updating = False
models = [to_instance(model) for model in models]
self.param.model.objects = models
self.model = models[0]
self.timer = Timer(sizing_mode="stretch_width")
self.settings = self._create_settings()
self._panel = self._create_panel()
def _create_settings(self):
return pn.Column(
pn.Param(
self.video_stream,
parameters=["timeout", "paused"],
widgets={
"timeout": {
"widget_type": pn.widgets.IntSlider,
"start": 10,
"end": 2000,
"step": 10,
}
},
),
self.timer,
pn.Param(self, parameters=["height", "width"], name="Image"),
pn.Param(
self,
parameters=["model"],
expand_button=False,
expand=False,
widgets={
"model": {
"widget_type": pn.widgets.RadioButtonGroup,
"orientation": "vertical",
"button_type": "primary",
"button_style": "outline"
}
},
name="Model",
),
self._get_transform,
)
def _create_panel(self):
return pn.Row(
self.video_stream,
pn.layout.HSpacer(),
self.image,
pn.layout.HSpacer(),
sizing_mode="stretch_width",
align="center",
)
@param.depends("height", "width", watch=True)
def _update_height_width(self):
self.image.height = self.height
self.image.width = self.width
@param.depends("model")
def _get_transform(self):
# Hack: returning self.transform stops working after browsing the transforms for a while
return self.model.view
def __panel__(self):
return self._panel
@param.depends("video_stream.value", watch=True)
def _handle_stream(self):
if self._updating:
return
self._updating = True
if self.model and self.video_stream.value:
value = self.video_stream.value
try:
image = self.timer.time_it(
name="Model",
func=self.model.apply,
image=value,
height=self.height,
width=self.width,
)
self.image.object = image
except PIL.UnidentifiedImageError:
print("unidentified image")
self.timer.inc_it("Last Update")
self._updating = False
class GaussianBlurModel(PILImageModel):
"""Gaussian Blur Model
https://pillow.readthedocs.io/en/stable/reference/ImageFilter.html#PIL.ImageFilter.GaussianBlur
"""
radius = param.Integer(default=0, bounds=(0, 10))
def transform(self, image: Image):
return image.filter(ImageFilter.GaussianBlur(radius=self.radius))
class GrayscaleModel(NumpyImageModel):
"""GrayScale Model
https://scikit-image.org/docs/0.15.x/auto_examples/color_exposure/plot_rgb_to_gray.html
"""
def transform(self, image: np.ndarray):
grayscale = skimage.color.rgb2gray(image[:, :, :3])
return skimage.color.gray2rgb(grayscale)
class SobelModel(NumpyImageModel):
"""Sobel Model
https://scikit-image.org/docs/0.15.x/auto_examples/color_exposure/plot_adapt_rgb.html
"""
def transform(self, image):
@adapt_rgb(each_channel)
def sobel_each(image):
return filters.sobel(image)
return rescale_intensity(1 - sobel_each(image))
@pn.cache()
def get_detector():
"""Returns the Cascade detector"""
trained_file = data.lbp_frontal_face_cascade_filename()
return Cascade(trained_file)
class FaceDetectionModel(NumpyImageModel):
"""Face detection using a cascade classifier.
https://scikit-image.org/docs/0.15.x/auto_examples/applications/plot_face_detection.html
"""
scale_factor = param.Number(default=1.4, bounds=(1.0, 2.0), step=0.1)
step_ratio = param.Integer(default=1, bounds=(1, 10))
size_x = param.Range(default=(60, 322), bounds=(10, 500))
size_y = param.Range(default=(60, 322), bounds=(10, 500))
def transform(self, image):
detector = get_detector()
detected = detector.detect_multi_scale(
img=image,
scale_factor=self.scale_factor,
step_ratio=self.step_ratio,
min_size=(self.size_x[0], self.size_y[0]),
max_size=(self.size_x[1], self.size_y[1]),
)
for patch in detected:
rrr, ccc = rectangle(
start=(patch["r"], patch["c"]),
extent=(patch["height"], patch["width"]),
shape=image.shape[:2],
)
image[rrr, ccc, 0] = 200
return image
component = VideoStreamInterface(
models=[
GaussianBlurModel,
GrayscaleModel,
SobelModel,
FaceDetectionModel,
]
)
pn.Row(pn.Row(component.settings, max_width=400), component)
pn.template.MaterialTemplate(
site="Awesome Panel",
title="VideoStream with ScikitImage",
sidebar=[component.settings],
main=[component],
).servable(); # We add ; to not show the template in the notebook as it does not display well.
await write_doc()
`
try {
const [docs_json, render_items, root_ids] = await self.pyodide.runPythonAsync(code)
self.postMessage({
type: 'render',
docs_json: docs_json,
render_items: render_items,
root_ids: root_ids
})
} catch(e) {
const traceback = `${e}`
const tblines = traceback.split('\n')
self.postMessage({
type: 'status',
msg: tblines[tblines.length-2]
});
throw e
}
}
self.onmessage = async (event) => {
const msg = event.data
if (msg.type === 'rendered') {
self.pyodide.runPythonAsync(`
from panel.io.state import state
from panel.io.pyodide import _link_docs_worker
_link_docs_worker(state.curdoc, sendPatch, setter='js')
`)
} else if (msg.type === 'patch') {
self.pyodide.globals.set('patch', msg.patch)
self.pyodide.runPythonAsync(`
state.curdoc.apply_json_patch(patch.to_py(), setter='js')
`)
self.postMessage({type: 'idle'})
} else if (msg.type === 'location') {
self.pyodide.globals.set('location', msg.location)
self.pyodide.runPythonAsync(`
import json
from panel.io.state import state
from panel.util import edit_readonly
if state.location:
loc_data = json.loads(location)
with edit_readonly(state.location):
state.location.param.update({
k: v for k, v in loc_data.items() if k in state.location.param
})
`)
}
}
startApplication()