Source code for layered.plot

# pylint: disable=wrong-import-position
import collections
import time
import warnings
import inspect
import threading
import matplotlib

# Don't call the code if Sphinx inspects the file mocking external imports.
if inspect.ismodule(matplotlib):  # noqa
    # On Mac force backend that works with threading.
    if matplotlib.get_backend() == 'MacOSX':
        matplotlib.use('TkAgg')
    # Hide matplotlib deprecation message.
    warnings.filterwarnings('ignore', category=matplotlib.cbook.mplDeprecation)
    # Ensure available interactive backend.
    if matplotlib.get_backend() not in matplotlib.rcsetup.interactive_bk:
        print('No visual backend available. Maybe you are inside a virtualenv '
              'that was created without --system-site-packages.')

import matplotlib.pyplot as plt


[docs]class Interface: def __init__(self, title='', xlabel='', ylabel='', style=None): self._style = style or {} self._title = title self._xlabel = xlabel self._ylabel = ylabel self.xdata = [] self.ydata = [] self.width = 0 self.height = 0 @property def style(self): return self._style @property def title(self): return self._title @property def xlabel(self): return self._xlabel @property def ylabel(self): return self._ylabel
[docs]class State: def __init__(self): self.running = False
[docs]class Window: def __init__(self, refresh=0.5): self.refresh = refresh self.thread = None self.state = State() self.figure = plt.figure() self.interfaces = [] plt.ion() plt.show()
[docs] def register(self, position, interface): axis = self.figure.add_subplot( position, title=interface.title, xlabel=interface.xlabel, ylabel=interface.ylabel) axis.get_xaxis().set_ticks([]) line, = axis.plot(interface.xdata, interface.ydata, **interface.style) self.interfaces.append((axis, line, interface))
[docs] def start(self, work): """ Hand the main thread to the window and continue work in the provided function. A state is passed as the first argument that contains a `running` flag. The function is expected to exit if the flag becomes false. The flag can also be set to false to stop the window event loop and continue in the main thread after the `start()` call. """ assert threading.current_thread() == threading.main_thread() assert not self.state.running self.state.running = True self.thread = threading.Thread(target=work, args=(self.state,)) self.thread.start() while self.state.running: try: before = time.time() self.update() duration = time.time() - before plt.pause(max(0.001, self.refresh - duration)) except KeyboardInterrupt: self.state.running = False self.thread.join() return
[docs] def stop(self): """ Close the window and stops the worker thread. The main thread will resume with the next command after the `start()` call. """ assert threading.current_thread() == self.thread assert self.state.running self.state.running = False
[docs] def update(self): """ Redraw the figure to show changed data. This is automatically called after `start()` was run. """ assert threading.current_thread() == threading.main_thread() for axis, line, interface in self.interfaces: line.set_xdata(interface.xdata) line.set_ydata(interface.ydata) axis.set_xlim(0, interface.width or 1, emit=False) axis.set_ylim(0, interface.height or 1, emit=False) self.figure.canvas.draw()
[docs]class Plot(Interface): def __init__(self, title, xlabel, ylabel, style=None, fixed=None): # pylint: disable=too-many-arguments, redefined-variable-type super().__init__(title, xlabel, ylabel, style or {}) self.max_ = 0 if not fixed: self.xdata = [] self.ydata = [] else: self.xdata = list(range(fixed)) self.ydata = collections.deque([None] * fixed, maxlen=fixed) self.width = fixed
[docs] def __call__(self, values): self.ydata += values self.max_ = max(self.max_, *values) self.height = 1.05 * self.max_ while len(self.xdata) < len(self.ydata): self.xdata.append(len(self.xdata)) self.width = len(self.xdata) - 1 assert len(self.xdata) == len(self.ydata)