|
"""Displays Agg images in the browser, with interactivity.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from contextlib import contextmanager |
|
import errno |
|
from io import BytesIO |
|
import json |
|
import mimetypes |
|
from pathlib import Path |
|
import random |
|
import sys |
|
import signal |
|
import threading |
|
|
|
try: |
|
import tornado |
|
except ImportError as err: |
|
raise RuntimeError("The WebAgg backend requires Tornado.") from err |
|
|
|
import tornado.web |
|
import tornado.ioloop |
|
import tornado.websocket |
|
|
|
import matplotlib as mpl |
|
from matplotlib.backend_bases import _Backend |
|
from matplotlib._pylab_helpers import Gcf |
|
from . import backend_webagg_core as core |
|
from .backend_webagg_core import ( |
|
TimerAsyncio, TimerTornado) |
|
|
|
|
|
@mpl._api.deprecated("3.7") |
|
class ServerThread(threading.Thread): |
|
def run(self): |
|
tornado.ioloop.IOLoop.instance().start() |
|
|
|
|
|
webagg_server_thread = threading.Thread( |
|
target=lambda: tornado.ioloop.IOLoop.instance().start()) |
|
|
|
|
|
class FigureManagerWebAgg(core.FigureManagerWebAgg): |
|
_toolbar2_class = core.NavigationToolbar2WebAgg |
|
|
|
@classmethod |
|
def pyplot_show(cls, *, block=None): |
|
WebAggApplication.initialize() |
|
|
|
url = "http://{address}:{port}{prefix}".format( |
|
address=WebAggApplication.address, |
|
port=WebAggApplication.port, |
|
prefix=WebAggApplication.url_prefix) |
|
|
|
if mpl.rcParams['webagg.open_in_browser']: |
|
import webbrowser |
|
if not webbrowser.open(url): |
|
print(f"To view figure, visit {url}") |
|
else: |
|
print(f"To view figure, visit {url}") |
|
|
|
WebAggApplication.start() |
|
|
|
|
|
class FigureCanvasWebAgg(core.FigureCanvasWebAggCore): |
|
manager_class = FigureManagerWebAgg |
|
|
|
|
|
class WebAggApplication(tornado.web.Application): |
|
initialized = False |
|
started = False |
|
|
|
class FavIcon(tornado.web.RequestHandler): |
|
def get(self): |
|
self.set_header('Content-Type', 'image/png') |
|
self.write(Path(mpl.get_data_path(), |
|
'images/matplotlib.png').read_bytes()) |
|
|
|
class SingleFigurePage(tornado.web.RequestHandler): |
|
def __init__(self, application, request, *, url_prefix='', **kwargs): |
|
self.url_prefix = url_prefix |
|
super().__init__(application, request, **kwargs) |
|
|
|
def get(self, fignum): |
|
fignum = int(fignum) |
|
manager = Gcf.get_fig_manager(fignum) |
|
|
|
ws_uri = f'ws://{self.request.host}{self.url_prefix}/' |
|
self.render( |
|
"single_figure.html", |
|
prefix=self.url_prefix, |
|
ws_uri=ws_uri, |
|
fig_id=fignum, |
|
toolitems=core.NavigationToolbar2WebAgg.toolitems, |
|
canvas=manager.canvas) |
|
|
|
class AllFiguresPage(tornado.web.RequestHandler): |
|
def __init__(self, application, request, *, url_prefix='', **kwargs): |
|
self.url_prefix = url_prefix |
|
super().__init__(application, request, **kwargs) |
|
|
|
def get(self): |
|
ws_uri = f'ws://{self.request.host}{self.url_prefix}/' |
|
self.render( |
|
"all_figures.html", |
|
prefix=self.url_prefix, |
|
ws_uri=ws_uri, |
|
figures=sorted(Gcf.figs.items()), |
|
toolitems=core.NavigationToolbar2WebAgg.toolitems) |
|
|
|
class MplJs(tornado.web.RequestHandler): |
|
def get(self): |
|
self.set_header('Content-Type', 'application/javascript') |
|
|
|
js_content = core.FigureManagerWebAgg.get_javascript() |
|
|
|
self.write(js_content) |
|
|
|
class Download(tornado.web.RequestHandler): |
|
def get(self, fignum, fmt): |
|
fignum = int(fignum) |
|
manager = Gcf.get_fig_manager(fignum) |
|
self.set_header( |
|
'Content-Type', mimetypes.types_map.get(fmt, 'binary')) |
|
buff = BytesIO() |
|
manager.canvas.figure.savefig(buff, format=fmt) |
|
self.write(buff.getvalue()) |
|
|
|
class WebSocket(tornado.websocket.WebSocketHandler): |
|
supports_binary = True |
|
|
|
def open(self, fignum): |
|
self.fignum = int(fignum) |
|
self.manager = Gcf.get_fig_manager(self.fignum) |
|
self.manager.add_web_socket(self) |
|
if hasattr(self, 'set_nodelay'): |
|
self.set_nodelay(True) |
|
|
|
def on_close(self): |
|
self.manager.remove_web_socket(self) |
|
|
|
def on_message(self, message): |
|
message = json.loads(message) |
|
|
|
|
|
|
|
if message['type'] == 'supports_binary': |
|
self.supports_binary = message['value'] |
|
else: |
|
manager = Gcf.get_fig_manager(self.fignum) |
|
|
|
|
|
|
|
if manager is not None: |
|
manager.handle_json(message) |
|
|
|
def send_json(self, content): |
|
self.write_message(json.dumps(content)) |
|
|
|
def send_binary(self, blob): |
|
if self.supports_binary: |
|
self.write_message(blob, binary=True) |
|
else: |
|
data_uri = "data:image/png;base64,{}".format( |
|
blob.encode('base64').replace('\n', '')) |
|
self.write_message(data_uri) |
|
|
|
def __init__(self, url_prefix=''): |
|
if url_prefix: |
|
assert url_prefix[0] == '/' and url_prefix[-1] != '/', \ |
|
'url_prefix must start with a "/" and not end with one.' |
|
|
|
super().__init__( |
|
[ |
|
|
|
(url_prefix + r'/_static/(.*)', |
|
tornado.web.StaticFileHandler, |
|
{'path': core.FigureManagerWebAgg.get_static_file_path()}), |
|
|
|
|
|
(url_prefix + r'/_images/(.*)', |
|
tornado.web.StaticFileHandler, |
|
{'path': Path(mpl.get_data_path(), 'images')}), |
|
|
|
|
|
(url_prefix + r'/favicon.ico', self.FavIcon), |
|
|
|
|
|
(url_prefix + r'/([0-9]+)', self.SingleFigurePage, |
|
{'url_prefix': url_prefix}), |
|
|
|
|
|
(url_prefix + r'/?', self.AllFiguresPage, |
|
{'url_prefix': url_prefix}), |
|
|
|
(url_prefix + r'/js/mpl.js', self.MplJs), |
|
|
|
|
|
|
|
(url_prefix + r'/([0-9]+)/ws', self.WebSocket), |
|
|
|
|
|
(url_prefix + r'/([0-9]+)/download.([a-z0-9.]+)', |
|
self.Download), |
|
], |
|
template_path=core.FigureManagerWebAgg.get_static_file_path()) |
|
|
|
@classmethod |
|
def initialize(cls, url_prefix='', port=None, address=None): |
|
if cls.initialized: |
|
return |
|
|
|
|
|
app = cls(url_prefix=url_prefix) |
|
|
|
cls.url_prefix = url_prefix |
|
|
|
|
|
|
|
def random_ports(port, n): |
|
""" |
|
Generate a list of n random ports near the given port. |
|
|
|
The first 5 ports will be sequential, and the remaining n-5 will be |
|
randomly selected in the range [port-2*n, port+2*n]. |
|
""" |
|
for i in range(min(5, n)): |
|
yield port + i |
|
for i in range(n - 5): |
|
yield port + random.randint(-2 * n, 2 * n) |
|
|
|
if address is None: |
|
cls.address = mpl.rcParams['webagg.address'] |
|
else: |
|
cls.address = address |
|
cls.port = mpl.rcParams['webagg.port'] |
|
for port in random_ports(cls.port, |
|
mpl.rcParams['webagg.port_retries']): |
|
try: |
|
app.listen(port, cls.address) |
|
except OSError as e: |
|
if e.errno != errno.EADDRINUSE: |
|
raise |
|
else: |
|
cls.port = port |
|
break |
|
else: |
|
raise SystemExit( |
|
"The webagg server could not be started because an available " |
|
"port could not be found") |
|
|
|
cls.initialized = True |
|
|
|
@classmethod |
|
def start(cls): |
|
import asyncio |
|
try: |
|
asyncio.get_running_loop() |
|
except RuntimeError: |
|
pass |
|
else: |
|
cls.started = True |
|
|
|
if cls.started: |
|
return |
|
|
|
""" |
|
IOLoop.running() was removed as of Tornado 2.4; see for example |
|
https://groups.google.com/forum/#!topic/python-tornado/QLMzkpQBGOY |
|
Thus there is no correct way to check if the loop has already been |
|
launched. We may end up with two concurrently running loops in that |
|
unlucky case with all the expected consequences. |
|
""" |
|
ioloop = tornado.ioloop.IOLoop.instance() |
|
|
|
def shutdown(): |
|
ioloop.stop() |
|
print("Server is stopped") |
|
sys.stdout.flush() |
|
cls.started = False |
|
|
|
@contextmanager |
|
def catch_sigint(): |
|
old_handler = signal.signal( |
|
signal.SIGINT, |
|
lambda sig, frame: ioloop.add_callback_from_signal(shutdown)) |
|
try: |
|
yield |
|
finally: |
|
signal.signal(signal.SIGINT, old_handler) |
|
|
|
|
|
cls.started = True |
|
|
|
print("Press Ctrl+C to stop WebAgg server") |
|
sys.stdout.flush() |
|
with catch_sigint(): |
|
ioloop.start() |
|
|
|
|
|
def ipython_inline_display(figure): |
|
import tornado.template |
|
|
|
WebAggApplication.initialize() |
|
import asyncio |
|
try: |
|
asyncio.get_running_loop() |
|
except RuntimeError: |
|
if not webagg_server_thread.is_alive(): |
|
webagg_server_thread.start() |
|
|
|
fignum = figure.number |
|
tpl = Path(core.FigureManagerWebAgg.get_static_file_path(), |
|
"ipython_inline_figure.html").read_text() |
|
t = tornado.template.Template(tpl) |
|
return t.generate( |
|
prefix=WebAggApplication.url_prefix, |
|
fig_id=fignum, |
|
toolitems=core.NavigationToolbar2WebAgg.toolitems, |
|
canvas=figure.canvas, |
|
port=WebAggApplication.port).decode('utf-8') |
|
|
|
|
|
@_Backend.export |
|
class _BackendWebAgg(_Backend): |
|
FigureCanvas = FigureCanvasWebAgg |
|
FigureManager = FigureManagerWebAgg |
|
|