|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import abc |
|
import base64 |
|
import contextlib |
|
from io import BytesIO, TextIOWrapper |
|
import itertools |
|
import logging |
|
from pathlib import Path |
|
import shutil |
|
import subprocess |
|
import sys |
|
from tempfile import TemporaryDirectory |
|
import uuid |
|
import warnings |
|
|
|
import numpy as np |
|
from PIL import Image |
|
|
|
import matplotlib as mpl |
|
from matplotlib._animation_data import ( |
|
DISPLAY_TEMPLATE, INCLUDED_FRAMES, JS_INCLUDE, STYLE_INCLUDE) |
|
from matplotlib import _api, cbook |
|
import matplotlib.colors as mcolors |
|
|
|
_log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
subprocess_creation_flags = ( |
|
subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def adjusted_figsize(w, h, dpi, n): |
|
""" |
|
Compute figure size so that pixels are a multiple of n. |
|
|
|
Parameters |
|
---------- |
|
w, h : float |
|
Size in inches. |
|
|
|
dpi : float |
|
The dpi. |
|
|
|
n : int |
|
The target multiple. |
|
|
|
Returns |
|
------- |
|
wnew, hnew : float |
|
The new figure size in inches. |
|
""" |
|
|
|
|
|
|
|
def correct_roundoff(x, dpi, n): |
|
if int(x*dpi) % n != 0: |
|
if int(np.nextafter(x, np.inf)*dpi) % n == 0: |
|
x = np.nextafter(x, np.inf) |
|
elif int(np.nextafter(x, -np.inf)*dpi) % n == 0: |
|
x = np.nextafter(x, -np.inf) |
|
return x |
|
|
|
wnew = int(w * dpi / n) * n / dpi |
|
hnew = int(h * dpi / n) * n / dpi |
|
return correct_roundoff(wnew, dpi, n), correct_roundoff(hnew, dpi, n) |
|
|
|
|
|
class MovieWriterRegistry: |
|
"""Registry of available writer classes by human readable name.""" |
|
|
|
def __init__(self): |
|
self._registered = dict() |
|
|
|
def register(self, name): |
|
""" |
|
Decorator for registering a class under a name. |
|
|
|
Example use:: |
|
|
|
@registry.register(name) |
|
class Foo: |
|
pass |
|
""" |
|
def wrapper(writer_cls): |
|
self._registered[name] = writer_cls |
|
return writer_cls |
|
return wrapper |
|
|
|
def is_available(self, name): |
|
""" |
|
Check if given writer is available by name. |
|
|
|
Parameters |
|
---------- |
|
name : str |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
try: |
|
cls = self._registered[name] |
|
except KeyError: |
|
return False |
|
return cls.isAvailable() |
|
|
|
def __iter__(self): |
|
"""Iterate over names of available writer class.""" |
|
for name in self._registered: |
|
if self.is_available(name): |
|
yield name |
|
|
|
def list(self): |
|
"""Get a list of available MovieWriters.""" |
|
return [*self] |
|
|
|
def __getitem__(self, name): |
|
"""Get an available writer class from its name.""" |
|
if self.is_available(name): |
|
return self._registered[name] |
|
raise RuntimeError(f"Requested MovieWriter ({name}) not available") |
|
|
|
|
|
writers = MovieWriterRegistry() |
|
|
|
|
|
class AbstractMovieWriter(abc.ABC): |
|
""" |
|
Abstract base class for writing movies, providing a way to grab frames by |
|
calling `~AbstractMovieWriter.grab_frame`. |
|
|
|
`setup` is called to start the process and `finish` is called afterwards. |
|
`saving` is provided as a context manager to facilitate this process as :: |
|
|
|
with moviewriter.saving(fig, outfile='myfile.mp4', dpi=100): |
|
# Iterate over frames |
|
moviewriter.grab_frame(**savefig_kwargs) |
|
|
|
The use of the context manager ensures that `setup` and `finish` are |
|
performed as necessary. |
|
|
|
An instance of a concrete subclass of this class can be given as the |
|
``writer`` argument of `Animation.save()`. |
|
""" |
|
|
|
def __init__(self, fps=5, metadata=None, codec=None, bitrate=None): |
|
self.fps = fps |
|
self.metadata = metadata if metadata is not None else {} |
|
self.codec = mpl._val_or_rc(codec, 'animation.codec') |
|
self.bitrate = mpl._val_or_rc(bitrate, 'animation.bitrate') |
|
|
|
@abc.abstractmethod |
|
def setup(self, fig, outfile, dpi=None): |
|
""" |
|
Setup for writing the movie file. |
|
|
|
Parameters |
|
---------- |
|
fig : `~matplotlib.figure.Figure` |
|
The figure object that contains the information for frames. |
|
outfile : str |
|
The filename of the resulting movie file. |
|
dpi : float, default: ``fig.dpi`` |
|
The DPI (or resolution) for the file. This controls the size |
|
in pixels of the resulting movie file. |
|
""" |
|
|
|
Path(outfile).parent.resolve(strict=True) |
|
self.outfile = outfile |
|
self.fig = fig |
|
if dpi is None: |
|
dpi = self.fig.dpi |
|
self.dpi = dpi |
|
|
|
@property |
|
def frame_size(self): |
|
"""A tuple ``(width, height)`` in pixels of a movie frame.""" |
|
w, h = self.fig.get_size_inches() |
|
return int(w * self.dpi), int(h * self.dpi) |
|
|
|
@abc.abstractmethod |
|
def grab_frame(self, **savefig_kwargs): |
|
""" |
|
Grab the image information from the figure and save as a movie frame. |
|
|
|
All keyword arguments in *savefig_kwargs* are passed on to the |
|
`~.Figure.savefig` call that saves the figure. However, several |
|
keyword arguments that are supported by `~.Figure.savefig` may not be |
|
passed as they are controlled by the MovieWriter: |
|
|
|
- *dpi*, *bbox_inches*: These may not be passed because each frame of the |
|
animation much be exactly the same size in pixels. |
|
- *format*: This is controlled by the MovieWriter. |
|
""" |
|
|
|
@abc.abstractmethod |
|
def finish(self): |
|
"""Finish any processing for writing the movie.""" |
|
|
|
@contextlib.contextmanager |
|
def saving(self, fig, outfile, dpi, *args, **kwargs): |
|
""" |
|
Context manager to facilitate writing the movie file. |
|
|
|
``*args, **kw`` are any parameters that should be passed to `setup`. |
|
""" |
|
if mpl.rcParams['savefig.bbox'] == 'tight': |
|
_log.info("Disabling savefig.bbox = 'tight', as it may cause " |
|
"frame size to vary, which is inappropriate for " |
|
"animation.") |
|
|
|
|
|
self.setup(fig, outfile, dpi, *args, **kwargs) |
|
with mpl.rc_context({'savefig.bbox': None}): |
|
try: |
|
yield self |
|
finally: |
|
self.finish() |
|
|
|
|
|
class MovieWriter(AbstractMovieWriter): |
|
""" |
|
Base class for writing movies. |
|
|
|
This is a base class for MovieWriter subclasses that write a movie frame |
|
data to a pipe. You cannot instantiate this class directly. |
|
See examples for how to use its subclasses. |
|
|
|
Attributes |
|
---------- |
|
frame_format : str |
|
The format used in writing frame data, defaults to 'rgba'. |
|
fig : `~matplotlib.figure.Figure` |
|
The figure to capture data from. |
|
This must be provided by the subclasses. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
supported_formats = ["rgba"] |
|
|
|
def __init__(self, fps=5, codec=None, bitrate=None, extra_args=None, |
|
metadata=None): |
|
""" |
|
Parameters |
|
---------- |
|
fps : int, default: 5 |
|
Movie frame rate (per second). |
|
codec : str or None, default: :rc:`animation.codec` |
|
The codec to use. |
|
bitrate : int, default: :rc:`animation.bitrate` |
|
The bitrate of the movie, in kilobits per second. Higher values |
|
means higher quality movies, but increase the file size. A value |
|
of -1 lets the underlying movie encoder select the bitrate. |
|
extra_args : list of str or None, optional |
|
Extra command-line arguments passed to the underlying movie encoder. These |
|
arguments are passed last to the encoder, just before the filename. The |
|
default, None, means to use :rc:`animation.[name-of-encoder]_args` for the |
|
builtin writers. |
|
metadata : dict[str, str], default: {} |
|
A dictionary of keys and values for metadata to include in the |
|
output file. Some keys that may be of use include: |
|
title, artist, genre, subject, copyright, srcform, comment. |
|
""" |
|
if type(self) is MovieWriter: |
|
|
|
|
|
|
|
|
|
raise TypeError( |
|
'MovieWriter cannot be instantiated directly. Please use one ' |
|
'of its subclasses.') |
|
|
|
super().__init__(fps=fps, metadata=metadata, codec=codec, |
|
bitrate=bitrate) |
|
self.frame_format = self.supported_formats[0] |
|
self.extra_args = extra_args |
|
|
|
def _adjust_frame_size(self): |
|
if self.codec == 'h264': |
|
wo, ho = self.fig.get_size_inches() |
|
w, h = adjusted_figsize(wo, ho, self.dpi, 2) |
|
if (wo, ho) != (w, h): |
|
self.fig.set_size_inches(w, h, forward=True) |
|
_log.info('figure size in inches has been adjusted ' |
|
'from %s x %s to %s x %s', wo, ho, w, h) |
|
else: |
|
w, h = self.fig.get_size_inches() |
|
_log.debug('frame size in pixels is %s x %s', *self.frame_size) |
|
return w, h |
|
|
|
def setup(self, fig, outfile, dpi=None): |
|
|
|
super().setup(fig, outfile, dpi=dpi) |
|
self._w, self._h = self._adjust_frame_size() |
|
|
|
|
|
self._run() |
|
|
|
def _run(self): |
|
|
|
|
|
|
|
command = self._args() |
|
_log.info('MovieWriter._run: running command: %s', |
|
cbook._pformat_subprocess(command)) |
|
PIPE = subprocess.PIPE |
|
self._proc = subprocess.Popen( |
|
command, stdin=PIPE, stdout=PIPE, stderr=PIPE, |
|
creationflags=subprocess_creation_flags) |
|
|
|
def finish(self): |
|
"""Finish any processing for writing the movie.""" |
|
out, err = self._proc.communicate() |
|
|
|
out = TextIOWrapper(BytesIO(out)).read() |
|
err = TextIOWrapper(BytesIO(err)).read() |
|
if out: |
|
_log.log( |
|
logging.WARNING if self._proc.returncode else logging.DEBUG, |
|
"MovieWriter stdout:\n%s", out) |
|
if err: |
|
_log.log( |
|
logging.WARNING if self._proc.returncode else logging.DEBUG, |
|
"MovieWriter stderr:\n%s", err) |
|
if self._proc.returncode: |
|
raise subprocess.CalledProcessError( |
|
self._proc.returncode, self._proc.args, out, err) |
|
|
|
def grab_frame(self, **savefig_kwargs): |
|
|
|
_validate_grabframe_kwargs(savefig_kwargs) |
|
_log.debug('MovieWriter.grab_frame: Grabbing frame.') |
|
|
|
|
|
self.fig.set_size_inches(self._w, self._h) |
|
|
|
self.fig.savefig(self._proc.stdin, format=self.frame_format, |
|
dpi=self.dpi, **savefig_kwargs) |
|
|
|
def _args(self): |
|
"""Assemble list of encoder-specific command-line arguments.""" |
|
return NotImplementedError("args needs to be implemented by subclass.") |
|
|
|
@classmethod |
|
def bin_path(cls): |
|
""" |
|
Return the binary path to the commandline tool used by a specific |
|
subclass. This is a class method so that the tool can be looked for |
|
before making a particular MovieWriter subclass available. |
|
""" |
|
return str(mpl.rcParams[cls._exec_key]) |
|
|
|
@classmethod |
|
def isAvailable(cls): |
|
"""Return whether a MovieWriter subclass is actually available.""" |
|
return shutil.which(cls.bin_path()) is not None |
|
|
|
|
|
class FileMovieWriter(MovieWriter): |
|
""" |
|
`MovieWriter` for writing to individual files and stitching at the end. |
|
|
|
This must be sub-classed to be useful. |
|
""" |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.frame_format = mpl.rcParams['animation.frame_format'] |
|
|
|
def setup(self, fig, outfile, dpi=None, frame_prefix=None): |
|
""" |
|
Setup for writing the movie file. |
|
|
|
Parameters |
|
---------- |
|
fig : `~matplotlib.figure.Figure` |
|
The figure to grab the rendered frames from. |
|
outfile : str |
|
The filename of the resulting movie file. |
|
dpi : float, default: ``fig.dpi`` |
|
The dpi of the output file. This, with the figure size, |
|
controls the size in pixels of the resulting movie file. |
|
frame_prefix : str, optional |
|
The filename prefix to use for temporary files. If *None* (the |
|
default), files are written to a temporary directory which is |
|
deleted by `finish`; if not *None*, no temporary files are |
|
deleted. |
|
""" |
|
|
|
Path(outfile).parent.resolve(strict=True) |
|
self.fig = fig |
|
self.outfile = outfile |
|
if dpi is None: |
|
dpi = self.fig.dpi |
|
self.dpi = dpi |
|
self._adjust_frame_size() |
|
|
|
if frame_prefix is None: |
|
self._tmpdir = TemporaryDirectory() |
|
self.temp_prefix = str(Path(self._tmpdir.name, 'tmp')) |
|
else: |
|
self._tmpdir = None |
|
self.temp_prefix = frame_prefix |
|
self._frame_counter = 0 |
|
self._temp_paths = list() |
|
self.fname_format_str = '%s%%07d.%s' |
|
|
|
def __del__(self): |
|
if hasattr(self, '_tmpdir') and self._tmpdir: |
|
self._tmpdir.cleanup() |
|
|
|
@property |
|
def frame_format(self): |
|
""" |
|
Format (png, jpeg, etc.) to use for saving the frames, which can be |
|
decided by the individual subclasses. |
|
""" |
|
return self._frame_format |
|
|
|
@frame_format.setter |
|
def frame_format(self, frame_format): |
|
if frame_format in self.supported_formats: |
|
self._frame_format = frame_format |
|
else: |
|
_api.warn_external( |
|
f"Ignoring file format {frame_format!r} which is not " |
|
f"supported by {type(self).__name__}; using " |
|
f"{self.supported_formats[0]} instead.") |
|
self._frame_format = self.supported_formats[0] |
|
|
|
def _base_temp_name(self): |
|
|
|
|
|
return self.fname_format_str % (self.temp_prefix, self.frame_format) |
|
|
|
def grab_frame(self, **savefig_kwargs): |
|
|
|
|
|
_validate_grabframe_kwargs(savefig_kwargs) |
|
path = Path(self._base_temp_name() % self._frame_counter) |
|
self._temp_paths.append(path) |
|
self._frame_counter += 1 |
|
_log.debug('FileMovieWriter.grab_frame: Grabbing frame %d to path=%s', |
|
self._frame_counter, path) |
|
with open(path, 'wb') as sink: |
|
self.fig.savefig(sink, format=self.frame_format, dpi=self.dpi, |
|
**savefig_kwargs) |
|
|
|
def finish(self): |
|
|
|
|
|
try: |
|
self._run() |
|
super().finish() |
|
finally: |
|
if self._tmpdir: |
|
_log.debug( |
|
'MovieWriter: clearing temporary path=%s', self._tmpdir |
|
) |
|
self._tmpdir.cleanup() |
|
|
|
|
|
@writers.register('pillow') |
|
class PillowWriter(AbstractMovieWriter): |
|
@classmethod |
|
def isAvailable(cls): |
|
return True |
|
|
|
def setup(self, fig, outfile, dpi=None): |
|
super().setup(fig, outfile, dpi=dpi) |
|
self._frames = [] |
|
|
|
def grab_frame(self, **savefig_kwargs): |
|
_validate_grabframe_kwargs(savefig_kwargs) |
|
buf = BytesIO() |
|
self.fig.savefig( |
|
buf, **{**savefig_kwargs, "format": "rgba", "dpi": self.dpi}) |
|
self._frames.append(Image.frombuffer( |
|
"RGBA", self.frame_size, buf.getbuffer(), "raw", "RGBA", 0, 1)) |
|
|
|
def finish(self): |
|
self._frames[0].save( |
|
self.outfile, save_all=True, append_images=self._frames[1:], |
|
duration=int(1000 / self.fps), loop=0) |
|
|
|
|
|
|
|
|
|
class FFMpegBase: |
|
""" |
|
Mixin class for FFMpeg output. |
|
|
|
This is a base class for the concrete `FFMpegWriter` and `FFMpegFileWriter` |
|
classes. |
|
""" |
|
|
|
_exec_key = 'animation.ffmpeg_path' |
|
_args_key = 'animation.ffmpeg_args' |
|
|
|
@property |
|
def output_args(self): |
|
args = [] |
|
if Path(self.outfile).suffix == '.gif': |
|
self.codec = 'gif' |
|
else: |
|
args.extend(['-vcodec', self.codec]) |
|
extra_args = (self.extra_args if self.extra_args is not None |
|
else mpl.rcParams[self._args_key]) |
|
|
|
|
|
|
|
|
|
if self.codec == 'h264' and '-pix_fmt' not in extra_args: |
|
args.extend(['-pix_fmt', 'yuv420p']) |
|
|
|
|
|
elif self.codec == 'gif' and '-filter_complex' not in extra_args: |
|
args.extend(['-filter_complex', |
|
'split [a][b];[a] palettegen [p];[b][p] paletteuse']) |
|
if self.bitrate > 0: |
|
args.extend(['-b', '%dk' % self.bitrate]) |
|
for k, v in self.metadata.items(): |
|
args.extend(['-metadata', f'{k}={v}']) |
|
args.extend(extra_args) |
|
|
|
return args + ['-y', self.outfile] |
|
|
|
|
|
|
|
@writers.register('ffmpeg') |
|
class FFMpegWriter(FFMpegBase, MovieWriter): |
|
""" |
|
Pipe-based ffmpeg writer. |
|
|
|
Frames are streamed directly to ffmpeg via a pipe and written in a single pass. |
|
|
|
This effectively works as a slideshow input to ffmpeg with the fps passed as |
|
``-framerate``, so see also `their notes on frame rates`_ for further details. |
|
|
|
.. _their notes on frame rates: https://trac.ffmpeg.org/wiki/Slideshow#Framerates |
|
""" |
|
def _args(self): |
|
|
|
|
|
args = [self.bin_path(), '-f', 'rawvideo', '-vcodec', 'rawvideo', |
|
'-s', '%dx%d' % self.frame_size, '-pix_fmt', self.frame_format, |
|
'-framerate', str(self.fps)] |
|
|
|
|
|
|
|
if _log.getEffectiveLevel() > logging.DEBUG: |
|
args += ['-loglevel', 'error'] |
|
args += ['-i', 'pipe:'] + self.output_args |
|
return args |
|
|
|
|
|
|
|
@writers.register('ffmpeg_file') |
|
class FFMpegFileWriter(FFMpegBase, FileMovieWriter): |
|
""" |
|
File-based ffmpeg writer. |
|
|
|
Frames are written to temporary files on disk and then stitched together at the end. |
|
|
|
This effectively works as a slideshow input to ffmpeg with the fps passed as |
|
``-framerate``, so see also `their notes on frame rates`_ for further details. |
|
|
|
.. _their notes on frame rates: https://trac.ffmpeg.org/wiki/Slideshow#Framerates |
|
""" |
|
supported_formats = ['png', 'jpeg', 'tiff', 'raw', 'rgba'] |
|
|
|
def _args(self): |
|
|
|
|
|
args = [] |
|
|
|
if self.frame_format in {'raw', 'rgba'}: |
|
args += [ |
|
'-f', 'image2', '-vcodec', 'rawvideo', |
|
'-video_size', '%dx%d' % self.frame_size, |
|
'-pixel_format', 'rgba', |
|
] |
|
args += ['-framerate', str(self.fps), '-i', self._base_temp_name()] |
|
if not self._tmpdir: |
|
args += ['-frames:v', str(self._frame_counter)] |
|
|
|
|
|
|
|
if _log.getEffectiveLevel() > logging.DEBUG: |
|
args += ['-loglevel', 'error'] |
|
return [self.bin_path(), *args, *self.output_args] |
|
|
|
|
|
|
|
class ImageMagickBase: |
|
""" |
|
Mixin class for ImageMagick output. |
|
|
|
This is a base class for the concrete `ImageMagickWriter` and |
|
`ImageMagickFileWriter` classes, which define an ``input_names`` attribute |
|
(or property) specifying the input names passed to ImageMagick. |
|
""" |
|
|
|
_exec_key = 'animation.convert_path' |
|
_args_key = 'animation.convert_args' |
|
|
|
def _args(self): |
|
|
|
fmt = "rgba" if self.frame_format == "raw" else self.frame_format |
|
extra_args = (self.extra_args if self.extra_args is not None |
|
else mpl.rcParams[self._args_key]) |
|
return [ |
|
self.bin_path(), |
|
"-size", "%ix%i" % self.frame_size, |
|
"-depth", "8", |
|
"-delay", str(100 / self.fps), |
|
"-loop", "0", |
|
f"{fmt}:{self.input_names}", |
|
*extra_args, |
|
self.outfile, |
|
] |
|
|
|
@classmethod |
|
def bin_path(cls): |
|
binpath = super().bin_path() |
|
if binpath == 'convert': |
|
binpath = mpl._get_executable_info('magick').executable |
|
return binpath |
|
|
|
@classmethod |
|
def isAvailable(cls): |
|
try: |
|
return super().isAvailable() |
|
except mpl.ExecutableNotFoundError as _enf: |
|
|
|
_log.debug('ImageMagick unavailable due to: %s', _enf) |
|
return False |
|
|
|
|
|
|
|
@writers.register('imagemagick') |
|
class ImageMagickWriter(ImageMagickBase, MovieWriter): |
|
""" |
|
Pipe-based animated gif writer. |
|
|
|
Frames are streamed directly to ImageMagick via a pipe and written |
|
in a single pass. |
|
""" |
|
|
|
input_names = "-" |
|
|
|
|
|
|
|
@writers.register('imagemagick_file') |
|
class ImageMagickFileWriter(ImageMagickBase, FileMovieWriter): |
|
""" |
|
File-based animated gif writer. |
|
|
|
Frames are written to temporary files on disk and then stitched |
|
together at the end. |
|
""" |
|
|
|
supported_formats = ['png', 'jpeg', 'tiff', 'raw', 'rgba'] |
|
input_names = property( |
|
lambda self: f'{self.temp_prefix}*.{self.frame_format}') |
|
|
|
|
|
|
|
|
|
def _included_frames(frame_count, frame_format, frame_dir): |
|
return INCLUDED_FRAMES.format(Nframes=frame_count, |
|
frame_dir=frame_dir, |
|
frame_format=frame_format) |
|
|
|
|
|
def _embedded_frames(frame_list, frame_format): |
|
"""frame_list should be a list of base64-encoded png files""" |
|
if frame_format == 'svg': |
|
|
|
frame_format = 'svg+xml' |
|
template = ' frames[{0}] = "data:image/{1};base64,{2}"\n' |
|
return "\n" + "".join( |
|
template.format(i, frame_format, frame_data.replace('\n', '\\\n')) |
|
for i, frame_data in enumerate(frame_list)) |
|
|
|
|
|
@writers.register('html') |
|
class HTMLWriter(FileMovieWriter): |
|
"""Writer for JavaScript-based HTML movies.""" |
|
|
|
supported_formats = ['png', 'jpeg', 'tiff', 'svg'] |
|
|
|
@classmethod |
|
def isAvailable(cls): |
|
return True |
|
|
|
def __init__(self, fps=30, codec=None, bitrate=None, extra_args=None, |
|
metadata=None, embed_frames=False, default_mode='loop', |
|
embed_limit=None): |
|
|
|
if extra_args: |
|
_log.warning("HTMLWriter ignores 'extra_args'") |
|
extra_args = () |
|
self.embed_frames = embed_frames |
|
self.default_mode = default_mode.lower() |
|
_api.check_in_list(['loop', 'once', 'reflect'], |
|
default_mode=self.default_mode) |
|
|
|
|
|
self._bytes_limit = mpl._val_or_rc(embed_limit, 'animation.embed_limit') |
|
|
|
self._bytes_limit *= 1024 * 1024 |
|
|
|
super().__init__(fps, codec, bitrate, extra_args, metadata) |
|
|
|
def setup(self, fig, outfile, dpi=None, frame_dir=None): |
|
outfile = Path(outfile) |
|
_api.check_in_list(['.html', '.htm'], outfile_extension=outfile.suffix) |
|
|
|
self._saved_frames = [] |
|
self._total_bytes = 0 |
|
self._hit_limit = False |
|
|
|
if not self.embed_frames: |
|
if frame_dir is None: |
|
frame_dir = outfile.with_name(outfile.stem + '_frames') |
|
frame_dir.mkdir(parents=True, exist_ok=True) |
|
frame_prefix = frame_dir / 'frame' |
|
else: |
|
frame_prefix = None |
|
|
|
super().setup(fig, outfile, dpi, frame_prefix) |
|
self._clear_temp = False |
|
|
|
def grab_frame(self, **savefig_kwargs): |
|
_validate_grabframe_kwargs(savefig_kwargs) |
|
if self.embed_frames: |
|
|
|
if self._hit_limit: |
|
return |
|
f = BytesIO() |
|
self.fig.savefig(f, format=self.frame_format, |
|
dpi=self.dpi, **savefig_kwargs) |
|
imgdata64 = base64.encodebytes(f.getvalue()).decode('ascii') |
|
self._total_bytes += len(imgdata64) |
|
if self._total_bytes >= self._bytes_limit: |
|
_log.warning( |
|
"Animation size has reached %s bytes, exceeding the limit " |
|
"of %s. If you're sure you want a larger animation " |
|
"embedded, set the animation.embed_limit rc parameter to " |
|
"a larger value (in MB). This and further frames will be " |
|
"dropped.", self._total_bytes, self._bytes_limit) |
|
self._hit_limit = True |
|
else: |
|
self._saved_frames.append(imgdata64) |
|
else: |
|
return super().grab_frame(**savefig_kwargs) |
|
|
|
def finish(self): |
|
|
|
if self.embed_frames: |
|
fill_frames = _embedded_frames(self._saved_frames, |
|
self.frame_format) |
|
frame_count = len(self._saved_frames) |
|
else: |
|
|
|
frame_count = len(self._temp_paths) |
|
fill_frames = _included_frames( |
|
frame_count, self.frame_format, |
|
self._temp_paths[0].parent.relative_to(self.outfile.parent)) |
|
mode_dict = dict(once_checked='', |
|
loop_checked='', |
|
reflect_checked='') |
|
mode_dict[self.default_mode + '_checked'] = 'checked' |
|
|
|
interval = 1000 // self.fps |
|
|
|
with open(self.outfile, 'w') as of: |
|
of.write(JS_INCLUDE + STYLE_INCLUDE) |
|
of.write(DISPLAY_TEMPLATE.format(id=uuid.uuid4().hex, |
|
Nframes=frame_count, |
|
fill_frames=fill_frames, |
|
interval=interval, |
|
**mode_dict)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if self._tmpdir: |
|
_log.debug('MovieWriter: clearing temporary path=%s', self._tmpdir) |
|
self._tmpdir.cleanup() |
|
|
|
|
|
class Animation: |
|
""" |
|
A base class for Animations. |
|
|
|
This class is not usable as is, and should be subclassed to provide needed |
|
behavior. |
|
|
|
.. note:: |
|
|
|
You must store the created Animation in a variable that lives as long |
|
as the animation should run. Otherwise, the Animation object will be |
|
garbage-collected and the animation stops. |
|
|
|
Parameters |
|
---------- |
|
fig : `~matplotlib.figure.Figure` |
|
The figure object used to get needed events, such as draw or resize. |
|
|
|
event_source : object, optional |
|
A class that can run a callback when desired events |
|
are generated, as well as be stopped and started. |
|
|
|
Examples include timers (see `TimedAnimation`) and file |
|
system notifications. |
|
|
|
blit : bool, default: False |
|
Whether blitting is used to optimize drawing. If the backend does not |
|
support blitting, then this parameter has no effect. |
|
|
|
See Also |
|
-------- |
|
FuncAnimation, ArtistAnimation |
|
""" |
|
|
|
def __init__(self, fig, event_source=None, blit=False): |
|
self._draw_was_started = False |
|
|
|
self._fig = fig |
|
|
|
|
|
|
|
self._blit = blit and fig.canvas.supports_blit |
|
|
|
|
|
|
|
|
|
|
|
self.frame_seq = self.new_frame_seq() |
|
self.event_source = event_source |
|
|
|
|
|
|
|
self._first_draw_id = fig.canvas.mpl_connect('draw_event', self._start) |
|
|
|
|
|
|
|
self._close_id = self._fig.canvas.mpl_connect('close_event', |
|
self._stop) |
|
if self._blit: |
|
self._setup_blit() |
|
|
|
def __del__(self): |
|
if not getattr(self, '_draw_was_started', True): |
|
warnings.warn( |
|
'Animation was deleted without rendering anything. This is ' |
|
'most likely not intended. To prevent deletion, assign the ' |
|
'Animation to a variable, e.g. `anim`, that exists until you ' |
|
'output the Animation using `plt.show()` or ' |
|
'`anim.save()`.' |
|
) |
|
|
|
def _start(self, *args): |
|
""" |
|
Starts interactive animation. Adds the draw frame command to the GUI |
|
handler, calls show to start the event loop. |
|
""" |
|
|
|
if self._fig.canvas.is_saving(): |
|
return |
|
|
|
self._fig.canvas.mpl_disconnect(self._first_draw_id) |
|
|
|
|
|
self._init_draw() |
|
|
|
|
|
|
|
self.event_source.add_callback(self._step) |
|
self.event_source.start() |
|
|
|
def _stop(self, *args): |
|
|
|
if self._blit: |
|
self._fig.canvas.mpl_disconnect(self._resize_id) |
|
self._fig.canvas.mpl_disconnect(self._close_id) |
|
self.event_source.remove_callback(self._step) |
|
self.event_source = None |
|
|
|
def save(self, filename, writer=None, fps=None, dpi=None, codec=None, |
|
bitrate=None, extra_args=None, metadata=None, extra_anim=None, |
|
savefig_kwargs=None, *, progress_callback=None): |
|
""" |
|
Save the animation as a movie file by drawing every frame. |
|
|
|
Parameters |
|
---------- |
|
filename : str |
|
The output filename, e.g., :file:`mymovie.mp4`. |
|
|
|
writer : `MovieWriter` or str, default: :rc:`animation.writer` |
|
A `MovieWriter` instance to use or a key that identifies a |
|
class to use, such as 'ffmpeg'. |
|
|
|
fps : int, optional |
|
Movie frame rate (per second). If not set, the frame rate from the |
|
animation's frame interval. |
|
|
|
dpi : float, default: :rc:`savefig.dpi` |
|
Controls the dots per inch for the movie frames. Together with |
|
the figure's size in inches, this controls the size of the movie. |
|
|
|
codec : str, default: :rc:`animation.codec`. |
|
The video codec to use. Not all codecs are supported by a given |
|
`MovieWriter`. |
|
|
|
bitrate : int, default: :rc:`animation.bitrate` |
|
The bitrate of the movie, in kilobits per second. Higher values |
|
means higher quality movies, but increase the file size. A value |
|
of -1 lets the underlying movie encoder select the bitrate. |
|
|
|
extra_args : list of str or None, optional |
|
Extra command-line arguments passed to the underlying movie encoder. These |
|
arguments are passed last to the encoder, just before the output filename. |
|
The default, None, means to use :rc:`animation.[name-of-encoder]_args` for |
|
the builtin writers. |
|
|
|
metadata : dict[str, str], default: {} |
|
Dictionary of keys and values for metadata to include in |
|
the output file. Some keys that may be of use include: |
|
title, artist, genre, subject, copyright, srcform, comment. |
|
|
|
extra_anim : list, default: [] |
|
Additional `Animation` objects that should be included |
|
in the saved movie file. These need to be from the same |
|
`.Figure` instance. Also, animation frames will |
|
just be simply combined, so there should be a 1:1 correspondence |
|
between the frames from the different animations. |
|
|
|
savefig_kwargs : dict, default: {} |
|
Keyword arguments passed to each `~.Figure.savefig` call used to |
|
save the individual frames. |
|
|
|
progress_callback : function, optional |
|
A callback function that will be called for every frame to notify |
|
the saving progress. It must have the signature :: |
|
|
|
def func(current_frame: int, total_frames: int) -> Any |
|
|
|
where *current_frame* is the current frame number and *total_frames* is the |
|
total number of frames to be saved. *total_frames* is set to None, if the |
|
total number of frames cannot be determined. Return values may exist but are |
|
ignored. |
|
|
|
Example code to write the progress to stdout:: |
|
|
|
progress_callback = lambda i, n: print(f'Saving frame {i}/{n}') |
|
|
|
Notes |
|
----- |
|
*fps*, *codec*, *bitrate*, *extra_args* and *metadata* are used to |
|
construct a `.MovieWriter` instance and can only be passed if |
|
*writer* is a string. If they are passed as non-*None* and *writer* |
|
is a `.MovieWriter`, a `RuntimeError` will be raised. |
|
""" |
|
|
|
all_anim = [self] |
|
if extra_anim is not None: |
|
all_anim.extend(anim for anim in extra_anim |
|
if anim._fig is self._fig) |
|
|
|
|
|
for anim in all_anim: |
|
anim._draw_was_started = True |
|
|
|
if writer is None: |
|
writer = mpl.rcParams['animation.writer'] |
|
elif (not isinstance(writer, str) and |
|
any(arg is not None |
|
for arg in (fps, codec, bitrate, extra_args, metadata))): |
|
raise RuntimeError('Passing in values for arguments ' |
|
'fps, codec, bitrate, extra_args, or metadata ' |
|
'is not supported when writer is an existing ' |
|
'MovieWriter instance. These should instead be ' |
|
'passed as arguments when creating the ' |
|
'MovieWriter instance.') |
|
|
|
if savefig_kwargs is None: |
|
savefig_kwargs = {} |
|
else: |
|
|
|
savefig_kwargs = dict(savefig_kwargs) |
|
|
|
if fps is None and hasattr(self, '_interval'): |
|
|
|
fps = 1000. / self._interval |
|
|
|
|
|
dpi = mpl._val_or_rc(dpi, 'savefig.dpi') |
|
if dpi == 'figure': |
|
dpi = self._fig.dpi |
|
|
|
writer_kwargs = {} |
|
if codec is not None: |
|
writer_kwargs['codec'] = codec |
|
if bitrate is not None: |
|
writer_kwargs['bitrate'] = bitrate |
|
if extra_args is not None: |
|
writer_kwargs['extra_args'] = extra_args |
|
if metadata is not None: |
|
writer_kwargs['metadata'] = metadata |
|
|
|
|
|
|
|
if isinstance(writer, str): |
|
try: |
|
writer_cls = writers[writer] |
|
except RuntimeError: |
|
writer_cls = PillowWriter |
|
_log.warning("MovieWriter %s unavailable; using Pillow " |
|
"instead.", writer) |
|
writer = writer_cls(fps, **writer_kwargs) |
|
_log.info('Animation.save using %s', type(writer)) |
|
|
|
if 'bbox_inches' in savefig_kwargs: |
|
_log.warning("Warning: discarding the 'bbox_inches' argument in " |
|
"'savefig_kwargs' as it may cause frame size " |
|
"to vary, which is inappropriate for animation.") |
|
savefig_kwargs.pop('bbox_inches') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
facecolor = savefig_kwargs.get('facecolor', |
|
mpl.rcParams['savefig.facecolor']) |
|
if facecolor == 'auto': |
|
facecolor = self._fig.get_facecolor() |
|
|
|
def _pre_composite_to_white(color): |
|
r, g, b, a = mcolors.to_rgba(color) |
|
return a * np.array([r, g, b]) + 1 - a |
|
|
|
savefig_kwargs['facecolor'] = _pre_composite_to_white(facecolor) |
|
savefig_kwargs['transparent'] = False |
|
|
|
|
|
|
|
with writer.saving(self._fig, filename, dpi), \ |
|
cbook._setattr_cm(self._fig.canvas, _is_saving=True, manager=None): |
|
for anim in all_anim: |
|
anim._init_draw() |
|
frame_number = 0 |
|
|
|
|
|
save_count_list = [getattr(a, '_save_count', None) |
|
for a in all_anim] |
|
if None in save_count_list: |
|
total_frames = None |
|
else: |
|
total_frames = sum(save_count_list) |
|
for data in zip(*[a.new_saved_frame_seq() for a in all_anim]): |
|
for anim, d in zip(all_anim, data): |
|
|
|
anim._draw_next_frame(d, blit=False) |
|
if progress_callback is not None: |
|
progress_callback(frame_number, total_frames) |
|
frame_number += 1 |
|
writer.grab_frame(**savefig_kwargs) |
|
|
|
def _step(self, *args): |
|
""" |
|
Handler for getting events. By default, gets the next frame in the |
|
sequence and hands the data off to be drawn. |
|
""" |
|
|
|
|
|
|
|
try: |
|
framedata = next(self.frame_seq) |
|
self._draw_next_frame(framedata, self._blit) |
|
return True |
|
except StopIteration: |
|
return False |
|
|
|
def new_frame_seq(self): |
|
"""Return a new sequence of frame information.""" |
|
|
|
return iter(self._framedata) |
|
|
|
def new_saved_frame_seq(self): |
|
"""Return a new sequence of saved/cached frame information.""" |
|
|
|
return self.new_frame_seq() |
|
|
|
def _draw_next_frame(self, framedata, blit): |
|
|
|
|
|
self._pre_draw(framedata, blit) |
|
self._draw_frame(framedata) |
|
self._post_draw(framedata, blit) |
|
|
|
def _init_draw(self): |
|
|
|
|
|
self._draw_was_started = True |
|
|
|
def _pre_draw(self, framedata, blit): |
|
|
|
|
|
if blit: |
|
self._blit_clear(self._drawn_artists) |
|
|
|
def _draw_frame(self, framedata): |
|
|
|
raise NotImplementedError('Needs to be implemented by subclasses to' |
|
' actually make an animation.') |
|
|
|
def _post_draw(self, framedata, blit): |
|
|
|
|
|
|
|
if blit and self._drawn_artists: |
|
self._blit_draw(self._drawn_artists) |
|
else: |
|
self._fig.canvas.draw_idle() |
|
|
|
|
|
def _blit_draw(self, artists): |
|
|
|
|
|
updated_ax = {a.axes for a in artists} |
|
|
|
|
|
for ax in updated_ax: |
|
|
|
|
|
|
|
cur_view = ax._get_view() |
|
view, bg = self._blit_cache.get(ax, (object(), None)) |
|
if cur_view != view: |
|
self._blit_cache[ax] = ( |
|
cur_view, ax.figure.canvas.copy_from_bbox(ax.bbox)) |
|
|
|
for a in artists: |
|
a.axes.draw_artist(a) |
|
|
|
for ax in updated_ax: |
|
ax.figure.canvas.blit(ax.bbox) |
|
|
|
def _blit_clear(self, artists): |
|
|
|
|
|
|
|
axes = {a.axes for a in artists} |
|
for ax in axes: |
|
try: |
|
view, bg = self._blit_cache[ax] |
|
except KeyError: |
|
continue |
|
if ax._get_view() == view: |
|
ax.figure.canvas.restore_region(bg) |
|
else: |
|
self._blit_cache.pop(ax) |
|
|
|
def _setup_blit(self): |
|
|
|
self._blit_cache = dict() |
|
self._drawn_artists = [] |
|
|
|
self._post_draw(None, self._blit) |
|
|
|
|
|
|
|
|
|
self._init_draw() |
|
|
|
self._resize_id = self._fig.canvas.mpl_connect('resize_event', |
|
self._on_resize) |
|
|
|
def _on_resize(self, event): |
|
|
|
|
|
|
|
|
|
self._fig.canvas.mpl_disconnect(self._resize_id) |
|
self.event_source.stop() |
|
self._blit_cache.clear() |
|
self._init_draw() |
|
self._resize_id = self._fig.canvas.mpl_connect('draw_event', |
|
self._end_redraw) |
|
|
|
def _end_redraw(self, event): |
|
|
|
|
|
self._post_draw(None, False) |
|
self.event_source.start() |
|
self._fig.canvas.mpl_disconnect(self._resize_id) |
|
self._resize_id = self._fig.canvas.mpl_connect('resize_event', |
|
self._on_resize) |
|
|
|
def to_html5_video(self, embed_limit=None): |
|
""" |
|
Convert the animation to an HTML5 ``<video>`` tag. |
|
|
|
This saves the animation as an h264 video, encoded in base64 |
|
directly into the HTML5 video tag. This respects :rc:`animation.writer` |
|
and :rc:`animation.bitrate`. This also makes use of the |
|
*interval* to control the speed, and uses the *repeat* |
|
parameter to decide whether to loop. |
|
|
|
Parameters |
|
---------- |
|
embed_limit : float, optional |
|
Limit, in MB, of the returned animation. No animation is created |
|
if the limit is exceeded. |
|
Defaults to :rc:`animation.embed_limit` = 20.0. |
|
|
|
Returns |
|
------- |
|
str |
|
An HTML5 video tag with the animation embedded as base64 encoded |
|
h264 video. |
|
If the *embed_limit* is exceeded, this returns the string |
|
"Video too large to embed." |
|
""" |
|
VIDEO_TAG = r'''<video {size} {options}> |
|
<source type="video/mp4" src="data:video/mp4;base64,{video}"> |
|
Your browser does not support the video tag. |
|
</video>''' |
|
|
|
if not hasattr(self, '_base64_video'): |
|
|
|
embed_limit = mpl._val_or_rc(embed_limit, 'animation.embed_limit') |
|
|
|
|
|
embed_limit *= 1024 * 1024 |
|
|
|
|
|
|
|
with TemporaryDirectory() as tmpdir: |
|
path = Path(tmpdir, "temp.m4v") |
|
|
|
|
|
Writer = writers[mpl.rcParams['animation.writer']] |
|
writer = Writer(codec='h264', |
|
bitrate=mpl.rcParams['animation.bitrate'], |
|
fps=1000. / self._interval) |
|
self.save(str(path), writer=writer) |
|
|
|
vid64 = base64.encodebytes(path.read_bytes()) |
|
|
|
vid_len = len(vid64) |
|
if vid_len >= embed_limit: |
|
_log.warning( |
|
"Animation movie is %s bytes, exceeding the limit of %s. " |
|
"If you're sure you want a large animation embedded, set " |
|
"the animation.embed_limit rc parameter to a larger value " |
|
"(in MB).", vid_len, embed_limit) |
|
else: |
|
self._base64_video = vid64.decode('ascii') |
|
self._video_size = 'width="{}" height="{}"'.format( |
|
*writer.frame_size) |
|
|
|
|
|
if hasattr(self, '_base64_video'): |
|
|
|
options = ['controls', 'autoplay'] |
|
|
|
|
|
if getattr(self, '_repeat', False): |
|
options.append('loop') |
|
|
|
return VIDEO_TAG.format(video=self._base64_video, |
|
size=self._video_size, |
|
options=' '.join(options)) |
|
else: |
|
return 'Video too large to embed.' |
|
|
|
def to_jshtml(self, fps=None, embed_frames=True, default_mode=None): |
|
""" |
|
Generate HTML representation of the animation. |
|
|
|
Parameters |
|
---------- |
|
fps : int, optional |
|
Movie frame rate (per second). If not set, the frame rate from |
|
the animation's frame interval. |
|
embed_frames : bool, optional |
|
default_mode : str, optional |
|
What to do when the animation ends. Must be one of ``{'loop', |
|
'once', 'reflect'}``. Defaults to ``'loop'`` if the *repeat* |
|
parameter is True, otherwise ``'once'``. |
|
""" |
|
if fps is None and hasattr(self, '_interval'): |
|
|
|
fps = 1000 / self._interval |
|
|
|
|
|
|
|
if default_mode is None: |
|
default_mode = 'loop' if getattr(self, '_repeat', |
|
False) else 'once' |
|
|
|
if not hasattr(self, "_html_representation"): |
|
|
|
|
|
with TemporaryDirectory() as tmpdir: |
|
path = Path(tmpdir, "temp.html") |
|
writer = HTMLWriter(fps=fps, |
|
embed_frames=embed_frames, |
|
default_mode=default_mode) |
|
self.save(str(path), writer=writer) |
|
self._html_representation = path.read_text() |
|
|
|
return self._html_representation |
|
|
|
def _repr_html_(self): |
|
"""IPython display hook for rendering.""" |
|
fmt = mpl.rcParams['animation.html'] |
|
if fmt == 'html5': |
|
return self.to_html5_video() |
|
elif fmt == 'jshtml': |
|
return self.to_jshtml() |
|
|
|
def pause(self): |
|
"""Pause the animation.""" |
|
self.event_source.stop() |
|
if self._blit: |
|
for artist in self._drawn_artists: |
|
artist.set_animated(False) |
|
|
|
def resume(self): |
|
"""Resume the animation.""" |
|
self.event_source.start() |
|
if self._blit: |
|
for artist in self._drawn_artists: |
|
artist.set_animated(True) |
|
|
|
|
|
class TimedAnimation(Animation): |
|
""" |
|
`Animation` subclass for time-based animation. |
|
|
|
A new frame is drawn every *interval* milliseconds. |
|
|
|
.. note:: |
|
|
|
You must store the created Animation in a variable that lives as long |
|
as the animation should run. Otherwise, the Animation object will be |
|
garbage-collected and the animation stops. |
|
|
|
Parameters |
|
---------- |
|
fig : `~matplotlib.figure.Figure` |
|
The figure object used to get needed events, such as draw or resize. |
|
interval : int, default: 200 |
|
Delay between frames in milliseconds. |
|
repeat_delay : int, default: 0 |
|
The delay in milliseconds between consecutive animation runs, if |
|
*repeat* is True. |
|
repeat : bool, default: True |
|
Whether the animation repeats when the sequence of frames is completed. |
|
blit : bool, default: False |
|
Whether blitting is used to optimize drawing. |
|
""" |
|
def __init__(self, fig, interval=200, repeat_delay=0, repeat=True, |
|
event_source=None, *args, **kwargs): |
|
self._interval = interval |
|
|
|
self._repeat_delay = repeat_delay if repeat_delay is not None else 0 |
|
self._repeat = repeat |
|
|
|
|
|
if event_source is None: |
|
event_source = fig.canvas.new_timer(interval=self._interval) |
|
super().__init__(fig, event_source=event_source, *args, **kwargs) |
|
|
|
def _step(self, *args): |
|
"""Handler for getting events.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
still_going = super()._step(*args) |
|
if not still_going: |
|
if self._repeat: |
|
|
|
self._init_draw() |
|
self.frame_seq = self.new_frame_seq() |
|
self.event_source.interval = self._repeat_delay |
|
return True |
|
else: |
|
|
|
|
|
self.pause() |
|
if self._blit: |
|
|
|
self._fig.canvas.mpl_disconnect(self._resize_id) |
|
self._fig.canvas.mpl_disconnect(self._close_id) |
|
self.event_source = None |
|
return False |
|
|
|
self.event_source.interval = self._interval |
|
return True |
|
|
|
repeat = _api.deprecate_privatize_attribute("3.7") |
|
|
|
|
|
class ArtistAnimation(TimedAnimation): |
|
""" |
|
`TimedAnimation` subclass that creates an animation by using a fixed |
|
set of `.Artist` objects. |
|
|
|
Before creating an instance, all plotting should have taken place |
|
and the relevant artists saved. |
|
|
|
.. note:: |
|
|
|
You must store the created Animation in a variable that lives as long |
|
as the animation should run. Otherwise, the Animation object will be |
|
garbage-collected and the animation stops. |
|
|
|
Parameters |
|
---------- |
|
fig : `~matplotlib.figure.Figure` |
|
The figure object used to get needed events, such as draw or resize. |
|
artists : list |
|
Each list entry is a collection of `.Artist` objects that are made |
|
visible on the corresponding frame. Other artists are made invisible. |
|
interval : int, default: 200 |
|
Delay between frames in milliseconds. |
|
repeat_delay : int, default: 0 |
|
The delay in milliseconds between consecutive animation runs, if |
|
*repeat* is True. |
|
repeat : bool, default: True |
|
Whether the animation repeats when the sequence of frames is completed. |
|
blit : bool, default: False |
|
Whether blitting is used to optimize drawing. |
|
""" |
|
|
|
def __init__(self, fig, artists, *args, **kwargs): |
|
|
|
self._drawn_artists = [] |
|
|
|
|
|
|
|
self._framedata = artists |
|
super().__init__(fig, *args, **kwargs) |
|
|
|
def _init_draw(self): |
|
super()._init_draw() |
|
|
|
figs = set() |
|
for f in self.new_frame_seq(): |
|
for artist in f: |
|
artist.set_visible(False) |
|
artist.set_animated(self._blit) |
|
|
|
if artist.get_figure() not in figs: |
|
figs.add(artist.get_figure()) |
|
|
|
|
|
for fig in figs: |
|
fig.canvas.draw_idle() |
|
|
|
def _pre_draw(self, framedata, blit): |
|
"""Clears artists from the last frame.""" |
|
if blit: |
|
|
|
self._blit_clear(self._drawn_artists) |
|
else: |
|
|
|
for artist in self._drawn_artists: |
|
artist.set_visible(False) |
|
|
|
def _draw_frame(self, artists): |
|
|
|
|
|
self._drawn_artists = artists |
|
|
|
|
|
for artist in artists: |
|
artist.set_visible(True) |
|
|
|
|
|
class FuncAnimation(TimedAnimation): |
|
""" |
|
`TimedAnimation` subclass that makes an animation by repeatedly calling |
|
a function *func*. |
|
|
|
.. note:: |
|
|
|
You must store the created Animation in a variable that lives as long |
|
as the animation should run. Otherwise, the Animation object will be |
|
garbage-collected and the animation stops. |
|
|
|
Parameters |
|
---------- |
|
fig : `~matplotlib.figure.Figure` |
|
The figure object used to get needed events, such as draw or resize. |
|
|
|
func : callable |
|
The function to call at each frame. The first argument will |
|
be the next value in *frames*. Any additional positional |
|
arguments can be supplied using `functools.partial` or via the *fargs* |
|
parameter. |
|
|
|
The required signature is:: |
|
|
|
def func(frame, *fargs) -> iterable_of_artists |
|
|
|
It is often more convenient to provide the arguments using |
|
`functools.partial`. In this way it is also possible to pass keyword |
|
arguments. To pass a function with both positional and keyword |
|
arguments, set all arguments as keyword arguments, just leaving the |
|
*frame* argument unset:: |
|
|
|
def func(frame, art, *, y=None): |
|
... |
|
|
|
ani = FuncAnimation(fig, partial(func, art=ln, y='foo')) |
|
|
|
If ``blit == True``, *func* must return an iterable of all artists |
|
that were modified or created. This information is used by the blitting |
|
algorithm to determine which parts of the figure have to be updated. |
|
The return value is unused if ``blit == False`` and may be omitted in |
|
that case. |
|
|
|
frames : iterable, int, generator function, or None, optional |
|
Source of data to pass *func* and each frame of the animation |
|
|
|
- If an iterable, then simply use the values provided. If the |
|
iterable has a length, it will override the *save_count* kwarg. |
|
|
|
- If an integer, then equivalent to passing ``range(frames)`` |
|
|
|
- If a generator function, then must have the signature:: |
|
|
|
def gen_function() -> obj |
|
|
|
- If *None*, then equivalent to passing ``itertools.count``. |
|
|
|
In all of these cases, the values in *frames* is simply passed through |
|
to the user-supplied *func* and thus can be of any type. |
|
|
|
init_func : callable, optional |
|
A function used to draw a clear frame. If not given, the results of |
|
drawing from the first item in the frames sequence will be used. This |
|
function will be called once before the first frame. |
|
|
|
The required signature is:: |
|
|
|
def init_func() -> iterable_of_artists |
|
|
|
If ``blit == True``, *init_func* must return an iterable of artists |
|
to be re-drawn. This information is used by the blitting algorithm to |
|
determine which parts of the figure have to be updated. The return |
|
value is unused if ``blit == False`` and may be omitted in that case. |
|
|
|
fargs : tuple or None, optional |
|
Additional arguments to pass to each call to *func*. Note: the use of |
|
`functools.partial` is preferred over *fargs*. See *func* for details. |
|
|
|
save_count : int, optional |
|
Fallback for the number of values from *frames* to cache. This is |
|
only used if the number of frames cannot be inferred from *frames*, |
|
i.e. when it's an iterator without length or a generator. |
|
|
|
interval : int, default: 200 |
|
Delay between frames in milliseconds. |
|
|
|
repeat_delay : int, default: 0 |
|
The delay in milliseconds between consecutive animation runs, if |
|
*repeat* is True. |
|
|
|
repeat : bool, default: True |
|
Whether the animation repeats when the sequence of frames is completed. |
|
|
|
blit : bool, default: False |
|
Whether blitting is used to optimize drawing. Note: when using |
|
blitting, any animated artists will be drawn according to their zorder; |
|
however, they will be drawn on top of any previous artists, regardless |
|
of their zorder. |
|
|
|
cache_frame_data : bool, default: True |
|
Whether frame data is cached. Disabling cache might be helpful when |
|
frames contain large objects. |
|
""" |
|
def __init__(self, fig, func, frames=None, init_func=None, fargs=None, |
|
save_count=None, *, cache_frame_data=True, **kwargs): |
|
if fargs: |
|
self._args = fargs |
|
else: |
|
self._args = () |
|
self._func = func |
|
self._init_func = init_func |
|
|
|
|
|
|
|
|
|
self._save_count = save_count |
|
|
|
|
|
|
|
|
|
|
|
if frames is None: |
|
self._iter_gen = itertools.count |
|
elif callable(frames): |
|
self._iter_gen = frames |
|
elif np.iterable(frames): |
|
if kwargs.get('repeat', True): |
|
self._tee_from = frames |
|
def iter_frames(frames=frames): |
|
this, self._tee_from = itertools.tee(self._tee_from, 2) |
|
yield from this |
|
self._iter_gen = iter_frames |
|
else: |
|
self._iter_gen = lambda: iter(frames) |
|
if hasattr(frames, '__len__'): |
|
self._save_count = len(frames) |
|
if save_count is not None: |
|
_api.warn_external( |
|
f"You passed in an explicit {save_count=} " |
|
"which is being ignored in favor of " |
|
f"{len(frames)=}." |
|
) |
|
else: |
|
self._iter_gen = lambda: iter(range(frames)) |
|
self._save_count = frames |
|
if save_count is not None: |
|
_api.warn_external( |
|
f"You passed in an explicit {save_count=} which is being " |
|
f"ignored in favor of {frames=}." |
|
) |
|
if self._save_count is None and cache_frame_data: |
|
_api.warn_external( |
|
f"{frames=!r} which we can infer the length of, " |
|
"did not pass an explicit *save_count* " |
|
f"and passed {cache_frame_data=}. To avoid a possibly " |
|
"unbounded cache, frame data caching has been disabled. " |
|
"To suppress this warning either pass " |
|
"`cache_frame_data=False` or `save_count=MAX_FRAMES`." |
|
) |
|
cache_frame_data = False |
|
|
|
self._cache_frame_data = cache_frame_data |
|
|
|
|
|
self._save_seq = [] |
|
|
|
super().__init__(fig, **kwargs) |
|
|
|
|
|
|
|
self._save_seq = [] |
|
|
|
def new_frame_seq(self): |
|
|
|
return self._iter_gen() |
|
|
|
def new_saved_frame_seq(self): |
|
|
|
|
|
|
|
if self._save_seq: |
|
|
|
|
|
self._old_saved_seq = list(self._save_seq) |
|
return iter(self._old_saved_seq) |
|
else: |
|
if self._save_count is None: |
|
frame_seq = self.new_frame_seq() |
|
|
|
def gen(): |
|
try: |
|
while True: |
|
yield next(frame_seq) |
|
except StopIteration: |
|
pass |
|
return gen() |
|
else: |
|
return itertools.islice(self.new_frame_seq(), self._save_count) |
|
|
|
def _init_draw(self): |
|
super()._init_draw() |
|
|
|
|
|
|
|
|
|
if self._init_func is None: |
|
try: |
|
frame_data = next(self.new_frame_seq()) |
|
except StopIteration: |
|
|
|
|
|
|
|
warnings.warn( |
|
"Can not start iterating the frames for the initial draw. " |
|
"This can be caused by passing in a 0 length sequence " |
|
"for *frames*.\n\n" |
|
"If you passed *frames* as a generator " |
|
"it may be exhausted due to a previous display or save." |
|
) |
|
return |
|
self._draw_frame(frame_data) |
|
else: |
|
self._drawn_artists = self._init_func() |
|
if self._blit: |
|
if self._drawn_artists is None: |
|
raise RuntimeError('The init_func must return a ' |
|
'sequence of Artist objects.') |
|
for a in self._drawn_artists: |
|
a.set_animated(self._blit) |
|
self._save_seq = [] |
|
|
|
def _draw_frame(self, framedata): |
|
if self._cache_frame_data: |
|
|
|
self._save_seq.append(framedata) |
|
self._save_seq = self._save_seq[-self._save_count:] |
|
|
|
|
|
|
|
self._drawn_artists = self._func(framedata, *self._args) |
|
|
|
if self._blit: |
|
|
|
err = RuntimeError('The animation function must return a sequence ' |
|
'of Artist objects.') |
|
try: |
|
|
|
iter(self._drawn_artists) |
|
except TypeError: |
|
raise err from None |
|
|
|
|
|
for i in self._drawn_artists: |
|
if not isinstance(i, mpl.artist.Artist): |
|
raise err |
|
|
|
self._drawn_artists = sorted(self._drawn_artists, |
|
key=lambda x: x.get_zorder()) |
|
|
|
for a in self._drawn_artists: |
|
a.set_animated(self._blit) |
|
|
|
save_count = _api.deprecate_privatize_attribute("3.7") |
|
|
|
|
|
def _validate_grabframe_kwargs(savefig_kwargs): |
|
if mpl.rcParams['savefig.bbox'] == 'tight': |
|
raise ValueError( |
|
f"{mpl.rcParams['savefig.bbox']=} must not be 'tight' as it " |
|
"may cause frame size to vary, which is inappropriate for animation." |
|
) |
|
for k in ('dpi', 'bbox_inches', 'format'): |
|
if k in savefig_kwargs: |
|
raise TypeError( |
|
f"grab_frame got an unexpected keyword argument {k!r}" |
|
) |
|
|