""" Module containing the environment to run experiments.
An :class:`~pypet.environment.Environment` provides an interface to run experiments based on
parameter exploration.
The environment contains and might even create a :class:`~pypet.trajectory.Trajectory`
container which can be filled with parameters and results (see :mod:`pypet.parameter`).
Instance of this trajectory are distributed to the user's job function to perform a single run
of an experiment.
An `Environment` is the handyman for scheduling, it can be used for multiprocessing and takes
care of organizational issues like logging.
"""
__author__ = 'Robert Meyer'
try:
import __main__ as main
except ImportError as exc:
main = None # We can end up here in an interactive IPython console
import os
import sys
import logging
import shutil
import multiprocessing as multip
import traceback
import hashlib
import time
import datetime
import inspect
try:
from sumatra.projects import load_project
from sumatra.programs import PythonExecutable
except ImportError:
load_project = None
PythonExecutable = None
try:
import dill
# If you do not set this log-level dill will flood any log file :-(
logging.getLogger(dill.__name__).setLevel(logging.WARNING)
except ImportError:
dill = None
try:
import psutil
except ImportError:
psutil = None
try:
import scoop
from scoop import futures, shared
except ImportError:
scoop = None
try:
import git
except ImportError:
git = None
try:
import zmq
except ImportError:
zmq = None
from pypet.pypetlogging import LoggingManager, HasLogger, simple_logging_config
from pypet.trajectory import Trajectory
from pypet.storageservice import HDF5StorageService, LazyStorageService
from pypet.utils.mpwrappers import QueueStorageServiceWriter, LockWrapper, \
PipeStorageServiceSender, PipeStorageServiceWriter, ReferenceWrapper, \
ReferenceStore, QueueStorageServiceSender, LockerServer, LockerClient, \
ForkAwareLockerClient, TimeOutLockerServer, QueuingClient, QueuingServer, \
ForkAwareQueuingClient
from pypet.utils.siginthandling import sigint_handling
from pypet.utils.gitintegration import make_git_commit
from pypet._version import __version__ as VERSION
from pypet.utils.decorators import deprecated, kwargs_api_change, prefix_naming
from pypet.utils.helpful_functions import is_debug, result_sort, format_time, port_to_tcp, \
racedirs
from pypet.utils.storagefactory import storage_factory
from pypet.utils.configparsing import parse_config
from pypet.parameter import Parameter
import pypet.pypetconstants as pypetconstants
def _pool_single_run(kwargs):
"""Starts a pool single run and passes the storage service"""
wrap_mode = kwargs['wrap_mode']
traj = kwargs['traj']
traj.v_storage_service = _pool_single_run.storage_service
if wrap_mode == pypetconstants.WRAP_MODE_LOCAL:
# Free references from previous runs
traj.v_storage_service.free_references()
return _sigint_handling_single_run(kwargs)
def _frozen_pool_single_run(kwargs):
"""Single run wrapper for the frozen pool, makes a single run and passes kwargs"""
idx = kwargs.pop('idx')
frozen_kwargs = _frozen_pool_single_run.kwargs
frozen_kwargs.update(kwargs) # in case of `run_map`
# we need to update job's args and kwargs
traj = frozen_kwargs['traj']
traj.f_set_crun(idx)
return _sigint_handling_single_run(frozen_kwargs)
def _configure_pool(kwargs):
"""Configures the pool and keeps the storage service"""
_pool_single_run.storage_service = kwargs['storage_service']
_configure_niceness(kwargs)
_configure_logging(kwargs, extract=False)
def _configure_frozen_pool(kwargs):
"""Configures the frozen pool and keeps all kwargs"""
_frozen_pool_single_run.kwargs = kwargs
_configure_niceness(kwargs)
_configure_logging(kwargs, extract=False)
# Reset full copy to it's old value
traj = kwargs['traj']
traj.v_full_copy = kwargs['full_copy']
def _process_single_run(kwargs):
"""Wrapper function that first configures logging and starts a single run afterwards."""
_configure_niceness(kwargs)
_configure_logging(kwargs)
result_queue = kwargs['result_queue']
result = _sigint_handling_single_run(kwargs)
result_queue.put(result)
result_queue.close()
def _configure_frozen_scoop(kwargs):
"""Wrapper function that configures a frozen SCOOP set up.
Deletes of data if necessary.
"""
def _delete_old_scoop_rev_data(old_scoop_rev):
if old_scoop_rev is not None:
try:
elements = shared.elements
for key in elements:
var_dict = elements[key]
if old_scoop_rev in var_dict:
del var_dict[old_scoop_rev]
logging.getLogger('pypet.scoop').debug('Deleted old SCOOP data from '
'revolution `%s`.' % old_scoop_rev)
except AttributeError:
logging.getLogger('pypet.scoop').error('Could not delete old SCOOP data from '
'revolution `%s`.' % old_scoop_rev)
scoop_rev = kwargs.pop('scoop_rev')
# Check if we need to reconfigure SCOOP
try:
old_scoop_rev = _frozen_scoop_single_run.kwargs['scoop_rev']
configured = old_scoop_rev == scoop_rev
except (AttributeError, KeyError):
old_scoop_rev = None
configured = False
if not configured:
_frozen_scoop_single_run.kwargs = shared.getConst(scoop_rev, timeout=424.2)
frozen_kwargs = _frozen_scoop_single_run.kwargs
frozen_kwargs['scoop_rev'] = scoop_rev
frozen_kwargs['traj'].v_full_copy = frozen_kwargs['full_copy']
if not scoop.IS_ORIGIN:
_configure_niceness(frozen_kwargs)
_configure_logging(frozen_kwargs, extract=False)
_delete_old_scoop_rev_data(old_scoop_rev)
logging.getLogger('pypet.scoop').info('Configured Worker %s' % str(scoop.worker))
def _frozen_scoop_single_run(kwargs):
try:
_configure_frozen_scoop(kwargs)
idx = kwargs.pop('idx')
frozen_kwargs = _frozen_scoop_single_run.kwargs
frozen_kwargs.update(kwargs)
traj = frozen_kwargs['traj']
traj.f_set_crun(idx)
return _single_run(frozen_kwargs)
except Exception:
scoop.logger.exception('ERROR occurred during a single run!')
raise
def _scoop_single_run(kwargs):
"""Wrapper function for scoop, that does not configure logging"""
try:
try:
is_origin = scoop.IS_ORIGIN
except AttributeError:
# scoop is not properly started, i.e. with `python -m scoop...`
# in this case scoop uses default `map` function, i.e.
# the main process
is_origin = True
if not is_origin:
# configure logging and niceness if not the main process:
_configure_niceness(kwargs)
_configure_logging(kwargs)
return _single_run(kwargs)
except Exception:
scoop.logger.exception('ERROR occurred during a single run!')
raise
def _configure_logging(kwargs, extract=True):
"""Requests the logging manager to configure logging.
:param extract:
If naming data should be extracted from the trajectory
"""
try:
logging_manager = kwargs['logging_manager']
if extract:
logging_manager.extract_replacements(kwargs['traj'])
logging_manager.make_logging_handlers_and_tools(multiproc=True)
except Exception as exc:
sys.stderr.write('Could not configure logging system because of: %s' % repr(exc))
traceback.print_exc()
def _configure_niceness(kwargs):
"""Sets niceness of a process"""
niceness = kwargs['niceness']
if niceness is not None:
try:
try:
current = os.nice(0)
if niceness - current > 0:
# Under Linux you cannot decrement niceness if set elsewhere
os.nice(niceness-current)
except AttributeError:
# Fall back on psutil under Windows
psutil.Process().nice(niceness)
except Exception as exc:
sys.stderr.write('Could not configure niceness because of: %s' % repr(exc))
traceback.print_exc()
def _sigint_handling_single_run(kwargs):
"""Wrapper that allow graceful exits of single runs"""
try:
graceful_exit = kwargs['graceful_exit']
if graceful_exit:
sigint_handling.start()
if sigint_handling.hit:
result = (sigint_handling.SIGINT, None)
else:
result = _single_run(kwargs)
if sigint_handling.hit:
result = (sigint_handling.SIGINT, result)
return result
return _single_run(kwargs)
except:
# Log traceback of exception
pypet_root_logger = logging.getLogger('pypet')
pypet_root_logger.exception('ERROR occurred during a single run! ')
raise
def _single_run(kwargs):
""" Performs a single run of the experiment.
:param kwargs: Dict of arguments
traj: The trajectory containing all parameters set to the corresponding run index.
runfunc: The user's job function
runargs: The arguments handed to the user's job function (as *args)
runkwargs: The keyword arguments handed to the user's job function (as **kwargs)
clean_up_after_run: Whether to clean up after the run
automatic_storing: Whether or not the data should be automatically stored
result_queue: A queue object to store results into in case a pool is used, otherwise None
:return:
Results computed by the user's job function which are not stored into the trajectory.
Returns a nested tuple of run index and result and run information:
``((traj.v_idx, result), run_information_dict)``
"""
pypet_root_logger = logging.getLogger('pypet')
traj = kwargs['traj']
runfunc = kwargs['runfunc']
runargs = kwargs['runargs']
kwrunparams = kwargs['runkwargs']
clean_up_after_run = kwargs['clean_up_runs']
automatic_storing = kwargs['automatic_storing']
wrap_mode = kwargs['wrap_mode']
idx = traj.v_idx
total_runs = len(traj)
pypet_root_logger.info('\n=========================================\n '
'Starting single run #%d of %d '
'\n=========================================\n' % (idx, total_runs))
# Measure start time
traj.f_start_run(turn_into_run=True)
# Run the job function of the user
result = runfunc(traj, *runargs, **kwrunparams)
# Store data if desired
if automatic_storing:
traj.f_store()
# Add the index to the result and the run information
if wrap_mode == pypetconstants.WRAP_MODE_LOCAL:
result = ((traj.v_idx, result),
traj.f_get_run_information(traj.v_idx, copy=False),
traj.v_storage_service.references)
traj.v_storage_service.free_references()
else:
result = ((traj.v_idx, result),
traj.f_get_run_information(traj.v_idx, copy=False))
# Measure time of finishing
traj.f_finalize_run(store_meta_data=False,
clean_up=clean_up_after_run)
pypet_root_logger.info('\n=========================================\n '
'Finished single run #%d of %d '
'\n=========================================\n' % (idx, total_runs))
return result
def _wrap_handling(kwargs):
""" Starts running a queue handler and creates a log file for the queue."""
_configure_logging(kwargs, extract=False)
# Main job, make the listener to the queue start receiving message for writing to disk.
handler=kwargs['handler']
graceful_exit = kwargs['graceful_exit']
# import cProfile as profile
# profiler = profile.Profile()
# profiler.enable()
if graceful_exit:
sigint_handling.start()
handler.run()
# profiler.disable()
# profiler.dump_stats('./queue.profile2')
[docs]@prefix_naming
class Environment(HasLogger):
""" The environment to run a parameter exploration.
The first thing you usually do is to create and environment object that takes care about
the running of the experiment. You can provide the following arguments:
:param trajectory:
String or trajectory instance. If a string is supplied, a novel
trajectory is created with that name.
Note that the comment and the dynamically imported classes (see below) are only considered
if a novel trajectory is created. If you supply a trajectory instance, these fields
can be ignored.
:param add_time: If True the current time is added to the trajectory name if created new.
:param comment: Comment added to the trajectory if a novel trajectory is created.
:param dynamic_imports:
Only considered if a new trajectory is created.
If you've written custom parameters or results that need to be loaded
dynamically during runtime, the module containing the class
needs to be specified here as a list of classes or strings
naming classes and there module paths.
For example:
`dynamic_imports =
['pypet.parameter.PickleParameter', MyCustomParameter]`
If you only have a single class to import, you do not need
the list brackets:
`dynamic_imports = 'pypet.parameter.PickleParameter'`
:param wildcard_functions:
Dictionary of wildcards like `$` and corresponding functions that are called upon
finding such a wildcard. For example, to replace the `$` aka `crun` wildcard,
you can pass the following: ``wildcard_functions = {('$', 'crun'): myfunc}``.
Your wildcard function `myfunc` must return a unique run name as a function of
a given integer run index. Moreover, your function must also return a unique
*dummy* name for the run index being `-1`.
Of course, you can define your
own wildcards like `wildcard_functions = {('$mycard', 'mycard'): myfunc)}.
These are not required to return a unique name for each run index, but can be used
to group runs into buckets by returning the same name for several run indices.
Yet, all wildcard functions need to return a dummy name for the index `-1`.
:param automatic_storing:
If `True` the trajectory will be stored at the end of the simulation and
single runs will be stored after their completion.
Be aware of data loss if you set this to `False` and not
manually store everything.
:param log_config:
Can be path to a logging `.ini` file specifying the logging configuration.
For an example of such a file see :ref:`more-on-logging`.
Can also be a dictionary that is accepted by the built-in logging module.
Set to `None` if you don't want *pypet* to configure logging.
If not specified, the default settings are used. Moreover, you can manually tweak the
default settings without creating a new `ini` file.
Instead of the `log_config` parameter, pass a ``log_folder``,
a list of `logger_names` and corresponding `log_levels` to fine grain
the loggers to which the default settings apply.
For example:
``log_folder='logs', logger_names='('pypet', 'MyCustomLogger'), log_levels=(logging.ERROR, logging.INFO)``
You can further disable multiprocess logging via setting ``log_multiproc=False``.
:param log_stdout:
Whether the output of ``stdout`` should be recorded into the log files.
Disable if only logging statement should be recorded. Note if you work with an
interactive console like *IPython*, it is a good idea to set ``log_stdout=False``
to avoid messing up the console output.
Can also be a tuple: ('mylogger', 10), specifying a logger name as well as a log-level.
The log-level defines with what level `stdout` is logged, it is *not* a filter.
:param report_progress:
If progress of runs and an estimate of the remaining time should be shown.
Can be `True` or `False` or a triple ``(10, 'pypet', logging.Info)`` where the first number
is the percentage and update step of the resulting progressbar and
the second one is a corresponding logger name with which the progress should be logged.
If you use `'print'`, the `print` statement is used instead. The third value
specifies the logging level (level of logging statement *not* a filter)
with which the progress should be logged.
Note that the progress is based on finished runs. If you use the `QUEUE` wrapping
in case of multiprocessing and if storing takes long, the estimate of the remaining
time might not be very accurate.
:param multiproc:
Whether or not to use multiprocessing. Default is ``False``.
Besides the wrap_mode (see below) that deals with how
storage to disk is carried out in case of multiprocessing, there
are two ways to do multiprocessing. By using a fixed pool of
processes (choose `use_pool=True`, default option) or by spawning an
individual process for every run and parameter combination (`use_pool=False`).
The former will only spawn not more than *ncores* processes and all simulation runs are
sent over to to the pool one after the other.
This requires all your data to be pickled.
If your data cannot be pickled (which could be the case for some
BRIAN networks, for instance) choose `use_pool=False` (also make sure to set
`continuable=False`). This will also spawn
at most *ncores* processes at a time, but as soon as a process terminates
a new one is spawned with the next parameter combination. Be aware that you will
have as many logfiles in your logfolder as processes were spawned.
If your simulation returns results besides storing results directly into the trajectory,
these returned results still need to be pickled.
:param ncores:
If multiproc is ``True``, this specifies the number of processes that will be spawned
to run your experiment. Note if you use QUEUE mode (see below) the queue process
is not included in this number and will add another extra process for storing.
If you have *psutil* installed, you can set `ncores=0` to let *psutil* determine
the number of CPUs available.
:param use_scoop:
If python should be used in a SCOOP_ framework to distribute runs amond a cluster
or multiple servers. If so you need to start your script via
``python -m scoop my_script.py``. Currently, SCOOP_ only works with
``'LOCAL'`` ``wrap_mode`` (see below).
.. _SCOOP: http://scoop.readthedocs.org/
:param use_pool:
Whether to use a fixed pool of processes or whether to spawn a new process
for every run. Use the former if you perform many runs (50k and more)
which are in terms of memory and runtime inexpensive.
Be aware that everything you use must be picklable.
Use the latter for fewer runs (50k and less) and which are longer lasting
and more expensive runs (in terms of memory consumption).
In case your operating system allows forking, your data does not need to be
picklable.
If you choose ``use_pool=False`` you can also make use of the `cap` values,
see below.
:param freeze_input:
Can be set to ``True`` if the run function as well as all additional arguments
are immutable. This will prevent the trajectory from getting pickled again and again.
Thus, the run function, the trajectory, as well as all arguments are passed to the pool
or SCOOP workers at initialisation. Works also under `run_map`.
In this case the iterable arguments are, of course, not frozen but passed for every run.
:param timeout:
Timeout parameter in seconds passed on to SCOOP_ and ``'NETLOCK'`` wrapping.
Leave `None` for no timeout. After `timeout` seconds SCOOP_ will assume
that a single run failed and skip waiting for it.
Moreover, if using ``'NETLOCK'`` wrapping, after `timeout` seconds
a lock is automatically released and again
available for other waiting processes.
:param cpu_cap:
If `multiproc=True` and `use_pool=False` you can specify a maximum cpu utilization between
0.0 (excluded) and 100.0 (included) as fraction of maximum capacity. If the current cpu
usage is above the specified level (averaged across all cores),
*pypet* will not spawn a new process and wait until
activity falls below the threshold again. Note that in order to avoid dead-lock at least
one process will always be running regardless of the current utilization.
If the threshold is crossed a warning will be issued. The warning won't be repeated as
long as the threshold remains crossed.
For example `cpu_cap=70.0`, `ncores=3`, and currently on average 80 percent of your cpu are
used. Moreover, let's assume that at the moment only 2 processes are
computing single runs simultaneously. Due to the usage of 80 percent of your cpu,
*pypet* will wait until cpu usage drops below (or equal to) 70 percent again
until it starts a third process to carry out another single run.
The parameters `memory_cap` and `swap_cap` are analogous. These three thresholds are
combined to determine whether a new process can be spawned. Accordingly, if only one
of these thresholds is crossed, no new processes will be spawned.
To disable the cap limits simply set all three values to 100.0.
You need the psutil_ package to use this cap feature. If not installed and you
choose cap values different from 100.0 a ValueError is thrown.
:param memory_cap:
Cap value of RAM usage. If more RAM than the threshold is currently in use, no new
processes are spawned. Can also be a tuple ``(limit, memory_per_process)``,
first value is the cap value (between 0.0 and 100.0),
second one is the estimated memory per process in mega bytes (MB).
If an estimate is given a new process is not started if
the threshold would be crossed including the estimate.
:param swap_cap:
Analogous to `cpu_cap` but the swap memory is considered.
:param niceness:
If you are running on a UNIX based system or you have psutil_ (under Windows) installed,
you can choose a niceness value to prioritize the child processes executing the
single runs in case you use multiprocessing.
Under Linux these usually range from 0 (highest priority)
to 19 (lowest priority). For Windows values check the psutil_ homepage.
Leave ``None`` if you don't care about niceness.
Under Linux the `niceness`` value is a minimum value, if the OS decides to
nice your program (maybe you are running on a server) *pypet* does not try to
decrease the `niceness` again.
:param wrap_mode:
If multiproc is ``True``, specifies how storage to disk is handled via
the storage service.
There are a few options:
:const:`~pypet.pypetconstants.WRAP_MODE_QUEUE`: ('QUEUE')
Another process for storing the trajectory is spawned. The sub processes
running the individual single runs will add their results to a
multiprocessing queue that is handled by an additional process.
Note that this requires additional memory since the trajectory
will be pickled and send over the queue for storage!
:const:`~pypet.pypetconstants.WRAP_MODE_LOCK`: ('LOCK')
Each individual process takes care about storage by itself. Before
carrying out the storage, a lock is placed to prevent the other processes
to store data. Accordingly, sometimes this leads to a lot of processes
waiting until the lock is released.
Allows loading of data during runs.
:const:`~pypet.pypetconstants.WRAP_MODE_PIPE`: ('PIPE)
Experimental mode based on a single pipe. Is faster than ``'QUEUE'`` wrapping
but data corruption may occur, does not work under Windows
(since it relies on forking).
:const:`~pypet.pypetconstant.WRAP_MODE_LOCAL` ('LOCAL')
Data is not stored during the single runs but after they completed.
Storing is only performed in the main process.
Note that removing data during a single run has no longer an effect on memory
whatsoever, because there are references kept for all data
that is supposed to be stored.
:const:`~pypet.pypetconstant.WRAP_MODE_NETLOCK` ('NETLOCK')
Similar to 'LOCK' but locks can be shared across a network.
Sharing is established by running a lock server that
distributes locks to the individual processes.
Can be used with SCOOP_ if all hosts have access to
a shared home directory.
Allows loading of data during runs.
:const:`~pypet.pypetconstant.WRAP_MODE_NETQUEUE` ('NETQUEUE')
Similar to 'QUEUE' but data can be shared across a network.
Sharing is established by running a queue server that
distributes locks to the individual processes.
If you don't want wrapping at all use
:const:`~pypet.pypetconstants.WRAP_MODE_NONE` ('NONE')
:param queue_maxsize:
Maximum size of the Storage Queue, in case of ``'QUEUE'`` wrapping.
``0`` means infinite, ``-1`` (default) means the educated guess of ``2 * ncores``.
:param port:
Port to be used by lock server in case of ``'NETLOCK'`` wrapping.
Can be a single integer as well as a tuple ``(7777, 9999)`` to specify
a range of ports from which to pick a random one.
Leave `None` for using pyzmq's default range.
In case automatic determining of the host's IP address fails,
you can also pass the full address (including the protocol and
the port) of the host in the network like ``'tcp://127.0.0.1:7777'``.
:param gc_interval:
Interval (in runs or storage operations) with which ``gc.collect()``
should be called in case of the ``'LOCAL'``, ``'QUEUE'``, or ``'PIPE'`` wrapping.
Leave ``None`` for never.
In case of ``'LOCAL'`` wrapping ``1`` means after every run ``2``
after every second run, and so on. In case of ``'QUEUE'`` or ``'PIPE''`` wrapping
``1`` means after every store operation,
``2`` after every second store operation, and so on.
Only calls ``gc.collect()`` in the main (if ``'LOCAL'`` wrapping)
or the queue/pipe process. If you need to garbage collect data within your single runs,
you need to manually call ``gc.collect()``.
Usually, there is no need to set this parameter since the Python garbage collection
works quite nicely and schedules collection automatically.
:param clean_up_runs:
In case of single core processing, whether all results under groups named `run_XXXXXXXX`
should be removed after the completion of
the run. Note in case of multiprocessing this happens anyway since the single run
container will be destroyed after finishing of the process.
Moreover, if set to ``True`` after post-processing it is checked if there is still data
under `run_XXXXXXXX` and this data is removed if the trajectory is expanded.
:param immediate_postproc:
If you use post- and multiprocessing, you can immediately start analysing the data
as soon as the trajectory runs out of tasks, i.e. is fully explored but the final runs
are not completed. Thus, while executing the last batch of parameter space points,
you can already analyse the finished runs. This is especially helpful if you perform some
sort of adaptive search within the parameter space.
The difference to normal post-processing is that you do not have to wait until all
single runs are finished, but your analysis already starts while there are still
runs being executed. This can be a huge time saver especially if your simulation time
differs a lot between individual runs. Accordingly, you don't have to wait for a very
long run to finish to start post-processing.
In case you use immediate postprocessing, the storage service of your trajectory is still
multiprocessing safe (except when using the wrap_mode ``'LOCAL'``).
Accordingly, you could even use multiprocessing in your immediate post-processing phase
if you dare, like use a multiprocessing pool_, for instance.
Note that after the execution of the final run, your post-processing routine will
be called again as usual.
**IMPORTANT**: If you use immediate post-processing, the results that are passed to
your post-processing function are not sorted by their run indices but by finishing time!
.. _pool: https://docs.python.org/2/library/multiprocessing.html
:param resumable:
Whether the environment should take special care to allow to resume or continue
crashed trajectories. Default is ``False``.
You need to install dill_ to use this feature. *dill* will make snapshots
of your simulation function as well as the passed arguments.
BE AWARE that dill is still rather experimental!
Assume you run experiments that take a lot of time.
If during your experiments there is a power failure,
you can resume your trajectory after the last single run that was still
successfully stored via your storage service.
The environment will create several `.ecnt` and `.rcnt` files in a folder that you specify
(see below).
Using this data you can resume crashed trajectories.
In order to resume trajectories use :func:`~pypet.environment.Environment.resume`.
Be aware that your individual single runs must be completely independent of one
another to allow continuing to work. Thus, they should **NOT** be based on shared data
that is manipulated during runtime (like a multiprocessing manager list)
in the positional and keyword arguments passed to the run function.
If you use post-processing, the expansion of trajectories and continuing of trajectories
is NOT supported properly. There is no guarantee that both work together.
.. _dill: https://pypi.python.org/pypi/dill
:param resume_folder:
The folder where the resume files will be placed. Note that *pypet* will create
a sub-folder with the name of the environment.
:param delete_resume:
If true, *pypet* will delete the resume files after a successful simulation.
:param storage_service:
Pass a given storage service or a class constructor (default ``HDF5StorageService``)
if you want the environment to create
the service for you. The environment will pass the
additional keyword arguments you pass directly to the constructor.
If the trajectory already has a service attached,
the one from the trajectory will be used.
:param git_repository:
If your code base is under git version control you can specify here the path
(relative or absolute) to the folder containing the `.git` directory as a string.
Note in order to use this tool you need GitPython_.
If you set this path the environment will trigger a commit of your code base
adding all files that are currently under version control.
Similar to calling `git add -u` and `git commit -m 'My Message'` on the command line.
The user can specify the commit message, see below. Note that the message
will be augmented by the name and the comment of the trajectory.
A commit will only be triggered if there are changes detected within your
working copy.
This will also add information about the revision to the trajectory, see below.
.. _GitPython: http://pythonhosted.org/GitPython/0.3.1/index.html
:param git_message:
Message passed onto git command. Only relevant if a new commit is triggered.
If no changes are detected, the information about the previous commit and the previous
commit message are added to the trajectory and this user passed message is discarded.
:param git_fail:
If `True` the program fails instead of triggering a commit if there are not committed
changes found in the code base. In such a case a `GitDiffError` is raised.
:param sumatra_project:
If your simulation is managed by sumatra_, you can specify here the path to the
*sumatra* root folder. Note that you have to initialise the *sumatra* project at least
once before via ``smt init MyFancyProjectName``.
*pypet* will automatically ad ALL parameters to the *sumatra* record.
If a parameter is explored, the WHOLE range is added instead of the default value.
*pypet* will add the label and reason (only if provided, see below)
to your trajectory as config parameters.
.. _sumatra : http://neuralensemble.org/sumatra/
:param sumatra_reason:
You can add an additional reason string that is added to the *sumatra* record.
Regardless if `sumatra_reason` is empty, the name of the trajectory, the comment
as well as a list of all explored parameters is added to the *sumatra* record.
Note that the augmented label is not stored into the trajectory as config
parameter, but the original one (without the name of the trajectory, the comment,
and the list of explored parameters) in case it is not the empty string.
:param sumatra_label:
The label or name of your sumatra record. Set to `None` if you want sumatra
to choose a label in form of a timestamp for you.
:param do_single_runs:
Whether you intend to actually to compute single runs with the trajectory.
If you do not intend to do single runs, than set to ``False`` and the
environment won't add config information like number of processors to the
trajectory.
:param graceful_exit:
If ``True`` hitting CTRL+C (i.e.sending SIGINT) will not terminate the program
immediately. Instead, active single runs will be finished and stored before
shutdown. Hitting CTRL+C twice will raise a KeyboardInterrupt as usual.
:param lazy_debug:
If ``lazy_debug=True`` and in case you debug your code (aka you use pydevd and
the expression ``'pydevd' in sys.modules`` is ``True``), the environment will use the
:class:`~pypet.storageservice.LazyStorageService` instead of the HDF5 one.
Accordingly, no files are created and your trajectory and results are not saved.
This allows faster debugging and prevents *pypet* from blowing up your hard drive with
trajectories that you probably not want to use anyway since you just debug your code.
The Environment will automatically add some config settings to your trajectory.
Thus, you can always look up how your trajectory was run. This encompasses most of the above
named parameters as well as some information about the environment.
This additional information includes
a timestamp as well as a SHA-1 hash code that uniquely identifies your environment.
If you use git integration, the SHA-1 hash code will be the one from your git commit.
Otherwise the code will be calculated from the trajectory name, the current time, and your
current *pypet* version.
The environment will be named `environment_XXXXXXX_XXXX_XX_XX_XXhXXmXXs`. The first seven
`X` are the first seven characters of the SHA-1 hash code followed by a human readable
timestamp.
All information about the environment can be found in your trajectory under
`config.environment.environment_XXXXXXX_XXXX_XX_XX_XXhXXmXXs`. Your trajectory could
potentially be run by several environments due to merging or extending an existing trajectory.
Thus, you will be able to track how your trajectory was built over time.
Git information is added to your trajectory as follows:
* git.commit_XXXXXXX_XXXX_XX_XX_XXh_XXm_XXs.hexsha
The SHA-1 hash of the commit.
`commit_XXXXXXX_XXXX_XX_XX_XXhXXmXXs` is mapped to the first seven items of the SHA-1 hash
and the formatted data of the commit, e.g. `commit_7ef7hd4_2015_10_21_16h29m00s`.
* git.commit_XXXXXXX_XXXX_XX_XX_XXh_XXm_XXs.name_rev
String describing the commits hexsha based on the closest reference
* git.commit_XXXXXXX_XXXX_XX_XX_XXh_XXm_XXs.committed_date
Commit date as Unix Epoch data
* git.commit_XXXXXXX_XXXX_XX_XX_XXh_XXm_XXs.message
The commit message
Moreover, if you use the standard ``HDF5StorageService`` you can pass the following keyword
arguments in ``**kwargs``:
:param filename:
The name of the hdf5 file. If none is specified the default
`./hdf5/the_name_of_your_trajectory.hdf5` is chosen. If `filename` contains only a path
like `filename='./myfolder/', it is changed to
`filename='./myfolder/the_name_of_your_trajectory.hdf5'`.
:param file_title: Title of the hdf5 file (only important if file is created new)
:param overwrite_file:
If the file already exists it will be overwritten. Otherwise,
the trajectory will simply be added to the file and already
existing trajectories are **not** deleted.
:param encoding:
Format to encode and decode unicode strings stored to disk.
The default ``'utf8'`` is highly recommended.
:param complevel:
You can specify your compression level. 0 means no compression
and 9 is the highest compression level. See `PyTables Compression`_ for a detailed
description.
.. _`PyTables Compression`: http://pytables.github.io/usersguide/optimization.html#compression-issues
:param complib:
The library used for compression. Choose between *zlib*, *blosc*, and *lzo*.
Note that 'blosc' and 'lzo' are usually faster than 'zlib' but it may be the case that
you can no longer open your hdf5 files with third-party applications that do not rely
on PyTables.
:param shuffle:
Whether or not to use the shuffle filters in the HDF5 library.
This normally improves the compression ratio.
:param fletcher32:
Whether or not to use the *Fletcher32* filter in the HDF5 library.
This is used to add a checksum on hdf5 data.
:param pandas_format:
How to store pandas data frames. Either in 'fixed' ('f') or 'table' ('t') format.
Fixed format allows fast reading and writing but disables querying the hdf5 data and
appending to the store (with other 3rd party software other than *pypet*).
:param purge_duplicate_comments:
If you add a result via :func:`~pypet.naturalnaming.ResultGroup.f_add_result` or a derived
parameter :func:`~pypet.naturalnaming.DerivedParameterGroup.f_add_derived_parameter` and
you set a comment, normally that comment would be attached to each and every instance.
This can produce a lot of unnecessary overhead if the comment is the same for every
instance over all runs. If `purge_duplicate_comments=1` than only the comment of the
first result or derived parameter instance created in a run is stored or comments
that differ from this first comment.
For instance, during a single run you call
`traj.f_add_result('my_result`,42, comment='Mostly harmless!')`
and the result will be renamed to `results.run_00000000.my_result`. After storage
in the node associated with this result in your hdf5 file, you will find the comment
`'Mostly harmless!'` there. If you call
`traj.f_add_result('my_result',-43, comment='Mostly harmless!')`
in another run again, let's say run 00000001, the name will be mapped to
`results.run_00000001.my_result`. But this time the comment will not be saved to disk
since `'Mostly harmless!'` is already part of the very first result with the name
'results.run_00000000.my_result'.
Note that the comments will be compared and storage will only be discarded if the strings
are exactly the same.
If you use multiprocessing, the storage service will take care that the comment for
the result or derived parameter with the lowest run index will be considered regardless
of the order of the finishing of your runs. Note that this only works properly if all
comments are the same. Otherwise the comment in the overview table might not be the one
with the lowest run index.
You need summary tables (see below) to be able to purge duplicate comments.
This feature only works for comments in *leaf* nodes (aka Results and Parameters).
So try to avoid to add comments in *group* nodes within single runs.
:param summary_tables:
Whether the summary tables should be created, i.e. the 'derived_parameters_runs_summary',
and the `results_runs_summary`.
The 'XXXXXX_summary' tables give a summary about all results or derived parameters.
It is assumed that results and derived parameters with equal names in individual runs
are similar and only the first result or derived parameter that was created
is shown as an example.
The summary table can be used in combination with `purge_duplicate_comments` to only store
a single comment for every result with the same name in each run, see above.
:param small_overview_tables:
Whether the small overview tables should be created.
Small tables are giving overview about 'config','parameters',
'derived_parameters_trajectory', ,
'results_trajectory', 'results_runs_summary'.
Note that these tables create some overhead. If you want very small hdf5 files set
`small_overview_tables` to False.
:param large_overview_tables:
Whether to add large overview tables. This encompasses information about every derived
parameter, result, and the explored parameter in every single run.
If you want small hdf5 files set to ``False`` (default).
:param results_per_run:
Expected results you store per run. If you give a good/correct estimate
storage to hdf5 file is much faster in case you store LARGE overview tables.
Default is 0, i.e. the number of results is not estimated!
:param derived_parameters_per_run:
Analogous to the above.
Finally, you can also pass properties of the trajectory, like ``v_with_links=True``
(you can leave the prefix ``v_``, i.e. ``with_links`` works, too).
Thus, you can change the settings of the trajectory immediately.
.. _psutil: http://psutil.readthedocs.org/
"""
@parse_config
@kwargs_api_change('delete_continue', 'delete_resume')
@kwargs_api_change('continue_folder', 'resume_folder')
@kwargs_api_change('continuable', 'resumable')
@kwargs_api_change('freeze_pool_input', 'freeze_input')
@kwargs_api_change('use_hdf5', 'storage_service')
@kwargs_api_change('dynamically_imported_classes', 'dynamic_imports')
@kwargs_api_change('pandas_append')
@simple_logging_config
def __init__(self, trajectory='trajectory',
add_time=False,
comment='',
dynamic_imports=None,
wildcard_functions=None,
automatic_storing=True,
log_config=pypetconstants.DEFAULT_LOGGING,
log_stdout=False,
report_progress = (5, 'pypet', logging.INFO),
multiproc=False,
ncores=1,
use_scoop=False,
use_pool=False,
freeze_input=False,
timeout=None,
cpu_cap=100.0,
memory_cap=100.0,
swap_cap=100.0,
niceness=None,
wrap_mode=pypetconstants.WRAP_MODE_LOCK,
queue_maxsize=-1,
port=None,
gc_interval=None,
clean_up_runs=True,
immediate_postproc=False,
resumable=False,
resume_folder=None,
delete_resume=True,
storage_service=HDF5StorageService,
git_repository=None,
git_message='',
git_fail=False,
sumatra_project=None,
sumatra_reason='',
sumatra_label=None,
do_single_runs=True,
graceful_exit=False,
lazy_debug=False,
**kwargs):
if git_repository is not None and git is None:
raise ValueError('You cannot specify a git repository without having '
'GitPython. Please install the GitPython package to use '
'pypet`s git integration.')
if resumable and dill is None:
raise ValueError('Please install `dill` if you want to use the feature to '
'resume halted trajectories')
if load_project is None and sumatra_project is not None:
raise ValueError('`sumatra` package has not been found, either install '
'`sumatra` or set `sumatra_project=None`.')
if sumatra_label is not None and '.' in sumatra_label:
raise ValueError('Your sumatra label is not allowed to contain dots.')
if wrap_mode == pypetconstants.WRAP_MODE_NETLOCK and zmq is None:
raise ValueError('You need to install `zmq` for `NETLOCK` wrapping.')
if (use_pool or use_scoop) and immediate_postproc:
raise ValueError('You CANNOT perform immediate post-processing if you DO '
'use a pool or scoop.')
if use_pool and use_scoop:
raise ValueError('You can either `use_pool` or `use_scoop` or none of both, '
'but not both together')
if use_scoop and scoop is None:
raise ValueError('Cannot use `scoop` because it is not installed.')
if (wrap_mode not in (pypetconstants.WRAP_MODE_NONE,
pypetconstants.WRAP_MODE_LOCAL,
pypetconstants.WRAP_MODE_LOCK,
pypetconstants.WRAP_MODE_NETLOCK) and
resumable):
raise ValueError('Continuing trajectories does only work with '
'`LOCK`, `NETLOCK` or `LOCAL`wrap mode.')
if resumable and not automatic_storing:
raise ValueError('Continuing only works with `automatic_storing=True`')
if use_scoop and wrap_mode not in (pypetconstants.WRAP_MODE_LOCAL,
pypetconstants.WRAP_MODE_NONE,
pypetconstants.WRAP_MODE_NETLOCK,
pypetconstants.WRAP_MODE_NETQUEUE):
raise ValueError('SCOOP mode only works with `LOCAL`, `NETLOCK` or '
'`NETQUEUE` wrap mode!')
if niceness is not None and not hasattr(os, 'nice') and psutil is None:
raise ValueError('You cannot set `niceness` if your operating system does not '
'support the `nice` operation. Alternatively you can install '
'`psutil`.')
if freeze_input and not use_pool and not use_scoop:
raise ValueError('You can only use `freeze_input=True` if you either use '
'a pool or SCOOP.')
if not isinstance(memory_cap, tuple):
memory_cap = (memory_cap, 0.0)
if (cpu_cap <= 0.0 or cpu_cap > 100.0 or
memory_cap[0] <= 0.0 or memory_cap[0] > 100.0 or
swap_cap <= 0.0 or swap_cap > 100.0):
raise ValueError('Please choose cap values larger than 0.0 '
'and smaller or equal to 100.0.')
check_usage = cpu_cap < 100.0 or memory_cap[0] < 100.0 or swap_cap < 100.0
if check_usage and psutil is None:
raise ValueError('You cannot enable monitoring without having '
'installed psutil. Please install psutil or set '
'cpu_cap, memory_cap, and swap_cap to 100.0')
if ncores == 0 and psutil is None:
raise ValueError('You cannot set `ncores=0` for auto detection of CPUs if you did not '
'installed psutil. Please install psutil or '
'set `ncores` manually.')
if port is not None and wrap_mode not in (pypetconstants.WRAP_MODE_NETLOCK,
pypetconstants.WRAP_MODE_NETQUEUE):
raise ValueError('You can only specify a port for the `NETLOCK` wrapping.')
if use_scoop and graceful_exit:
raise ValueError('You cannot exit gracefully using SCOOP.')
unused_kwargs = set(kwargs.keys())
self._logging_manager = LoggingManager(log_config=log_config,
log_stdout=log_stdout,
report_progress=report_progress)
self._logging_manager.check_log_config()
self._logging_manager.add_null_handler()
self._set_logger()
self._map_arguments = False
self._stop_iteration = False # Marker to cancel
# iteration in case of Keyboard interrupt
self._graceful_exit = graceful_exit
# Helper attributes defined later on
self._start_timestamp = None
self._finish_timestamp = None
self._runtime = None
self._cpu_cap = cpu_cap
self._memory_cap = memory_cap
if psutil is not None:
# Total memory in MB
self._total_memory = psutil.virtual_memory().total / 1024.0 / 1024.0
# Estimated memory needed by each process as ratio
self._est_per_process = self._memory_cap[1] / self._total_memory * 100.0
self._swap_cap = swap_cap
self._check_usage = check_usage
self._last_cpu_check = 0.0
self._last_cpu_usage = 0.0
if self._check_usage:
# For initialisation
self._estimate_cpu_utilization()
self._niceness = niceness
self._sumatra_project = sumatra_project
self._sumatra_reason = sumatra_reason
self._sumatra_label = sumatra_label
self._loaded_sumatatra_project = None
self._sumatra_record = None
self._runfunc = None
self._args = ()
self._kwargs = {}
self._postproc = None
self._postproc_args = ()
self._postproc_kwargs = {}
self._immediate_postproc = immediate_postproc
self._user_pipeline = False
self._git_repository = git_repository
self._git_message = git_message
self._git_fail = git_fail
# Check if a novel trajectory needs to be created.
if isinstance(trajectory, str):
# Create a new trajectory
self._traj = Trajectory(trajectory,
add_time=add_time,
dynamic_imports=dynamic_imports,
wildcard_functions=wildcard_functions,
comment=comment)
self._timestamp = self.trajectory.v_timestamp # Timestamp of creation
self._time = self.trajectory.v_time # Formatted timestamp
else:
self._traj = trajectory
# If no new trajectory is created the time of the environment differs
# from the trajectory and must be computed from the current time.
init_time = time.time()
formatted_time = format_time(init_time)
self._timestamp = init_time
self._time = formatted_time
# In case the user provided a git repository path, a git commit is performed
# and the environment's hexsha is taken from the commit if the commit was triggered by
# this particular environment, otherwise a new one is generated
if self._git_repository is not None:
new_commit, self._hexsha = make_git_commit(self, self._git_repository,
self._git_message, self._git_fail)
# Identifier hexsha
else:
new_commit = False
if not new_commit:
# Otherwise we need to create a novel hexsha
self._hexsha = hashlib.sha1((self.trajectory.v_name +
str(self.trajectory.v_timestamp) +
str(self.timestamp) +
VERSION).encode('utf-8')).hexdigest()
# Create the name of the environment
short_hexsha = self._hexsha[0:7]
name = 'environment'
self._name = name + '_' + str(short_hexsha) + '_' + self._time # Name of environment
# The trajectory should know the hexsha of the current environment.
# Thus, for all runs, one can identify by which environment they were run.
self._traj._environment_hexsha = self._hexsha
self._traj._environment_name = self._name
self._logging_manager.extract_replacements(self._traj)
self._logging_manager.remove_null_handler()
self._logging_manager.make_logging_handlers_and_tools()
# Drop a message if we made a commit. We cannot drop the message directly after the
# commit, because the logging files do not exist yet,
# and we want this commit to be tracked
if self._git_repository is not None:
if new_commit:
self._logger.info('Triggered NEW GIT commit `%s`.' % str(self._hexsha))
else:
self._logger.info('No changes detected, added PREVIOUS GIT commit `%s`.' %
str(self._hexsha))
# Create the storage service
if storage_service is True: # to allow compatibility with older python versions, i.e. old
# keyword use_hdf5
storage_service = HDF5StorageService
if self._traj.v_storage_service is not None:
# Use the service of the trajectory
self._logger.info('Found storage service attached to Trajectory. Will use '
'this storage service.')
self._storage_service = self.trajectory.v_storage_service
else:
# Create a new service
self._storage_service, unused_factory_kwargs = storage_factory(storage_service,
self._traj, **kwargs)
unused_kwargs = unused_kwargs - (set(kwargs.keys()) - unused_factory_kwargs)
if lazy_debug and is_debug():
self._storage_service = LazyStorageService()
self._traj.v_storage_service = self._storage_service
# Create resume path if desired
self._resumable = resumable
if self._resumable:
if resume_folder is None:
resume_folder = os.path.join(os.getcwd(), 'resume')
resume_path = os.path.join(resume_folder, self._traj.v_name)
else:
resume_path = None
self._resume_folder = resume_folder
self._resume_path = resume_path
self._delete_resume = delete_resume
# Check multiproc
self._multiproc = multiproc
if ncores == 0:
# Let *pypet* detect CPU count via psutil
ncores = psutil.cpu_count()
self._logger.info('Determined CPUs automatically, found `%d` cores.' % ncores)
self._ncores = ncores
if queue_maxsize == -1:
# Educated guess of queue size
queue_maxsize = 2 * ncores
self._queue_maxsize = queue_maxsize
if wrap_mode is None:
# None cannot be used in HDF5 files, accordingly we need a string representation
wrap_mode = pypetconstants.WRAP_MODE_NONE
self._wrap_mode = wrap_mode
# Whether to use a pool of processes
self._use_pool = use_pool
self._use_scoop = use_scoop
self._freeze_input = freeze_input
self._gc_interval = gc_interval
self._multiproc_wrapper = None # The wrapper Service
self._do_single_runs = do_single_runs
self._automatic_storing = automatic_storing
self._clean_up_runs = clean_up_runs
if (wrap_mode == pypetconstants.WRAP_MODE_NETLOCK and
not isinstance(port, str)):
url = port_to_tcp(port)
self._logger.info('Determined lock-server URL automatically, it is `%s`.' % url)
else:
url = port
self._url = url
self._timeout = timeout
# self._deep_copy_data = False # deep_copy_data # For future reference deep_copy_arguments
# Notify that in case of lazy debuggin we won't record anythin
if lazy_debug and is_debug():
self._logger.warning('Using the LazyStorageService, nothing will be saved to disk.')
# Current run index to avoid quadratic runtime complexity in case of re-running
self._current_idx = 0
self._trajectory_name = self._traj.v_name
for kwarg in list(unused_kwargs):
try:
val = kwargs[kwarg]
self._traj.f_set_properties(**{kwarg: val})
self._logger.info('Set trajectory property `%s` to `%s`.' % (kwarg, str(val)))
unused_kwargs.remove(kwarg)
except AttributeError:
pass
if len(unused_kwargs) > 0:
raise ValueError('You passed keyword arguments to the environment that you '
'did not use. The following keyword arguments were ignored: '
'`%s`' % str(unused_kwargs))
# Add all config data to the environment
self._add_config()
self._logger.info('Environment initialized.')
def _add_config(self):
# Add config data to the trajectory
if self._do_single_runs:
# Only add parameters if we actually want single runs to be performed
config_name = 'environment.%s.multiproc' % self.name
self._traj.f_add_config(Parameter, config_name, self._multiproc,
comment='Whether or not to use multiprocessing.').f_lock()
if self._multiproc:
config_name = 'environment.%s.use_pool' % self.name
self._traj.f_add_config(Parameter, config_name, self._use_pool,
comment='Whether to use a pool of processes or '
'spawning individual processes for '
'each run.').f_lock()
config_name = 'environment.%s.use_scoop' % self.name
self._traj.f_add_config(Parameter, config_name, self._use_scoop,
comment='Whether to use scoop to launch single '
'runs').f_lock()
if self._niceness is not None:
config_name = 'environment.%s.niceness' % self.name
self._traj.f_add_config(Parameter, config_name, self._niceness,
comment='Niceness value of child processes.').f_lock()
if self._use_pool:
config_name = 'environment.%s.freeze_input' % self.name
self._traj.f_add_config(Parameter, config_name, self._freeze_input,
comment='If inputs to each run are static and '
'are not mutated during each run, '
'can speed up pool running.').f_lock()
elif self._use_scoop:
pass
else:
config_name = 'environment.%s.cpu_cap' % self.name
self._traj.f_add_config(Parameter, config_name, self._cpu_cap,
comment='Maximum cpu usage beyond '
'which no new processes '
'are spawned.').f_lock()
config_name = 'environment.%s.memory_cap' % self.name
self._traj.f_add_config(Parameter, config_name, self._memory_cap,
comment='Tuple, first entry: Maximum RAM usage beyond '
'which no new processes are spawned; '
'second entry: Estimated usage per '
'process in MB. 0 if not estimated.').f_lock()
config_name = 'environment.%s.swap_cap' % self.name
self._traj.f_add_config(Parameter, config_name, self._swap_cap,
comment='Maximum Swap memory usage beyond '
'which no new '
'processes are spawned').f_lock()
config_name = 'environment.%s.immediate_postprocessing' % self.name
self._traj.f_add_config(Parameter, config_name, self._immediate_postproc,
comment='Whether to use immediate '
'postprocessing.').f_lock()
config_name = 'environment.%s.ncores' % self.name
self._traj.f_add_config(Parameter, config_name, self._ncores,
comment='Number of processors in case of '
'multiprocessing').f_lock()
config_name = 'environment.%s.wrap_mode' % self.name
self._traj.f_add_config(Parameter, config_name, self._wrap_mode,
comment='Multiprocessing mode (if multiproc),'
' i.e. whether to use QUEUE'
' or LOCK or NONE'
' for thread/process safe storing').f_lock()
if (self._wrap_mode == pypetconstants.WRAP_MODE_QUEUE or
self._wrap_mode == pypetconstants.WRAP_MODE_PIPE):
config_name = 'environment.%s.queue_maxsize' % self.name
self._traj.f_add_config(Parameter, config_name, self._queue_maxsize,
comment='Maximum size of Storage Queue/Pipe in case of '
'multiprocessing and QUEUE/PIPE wrapping').f_lock()
if self._wrap_mode == pypetconstants.WRAP_MODE_NETLOCK:
config_name = 'environment.%s.url' % self.name
self._traj.f_add_config(Parameter, config_name, self._url,
comment='URL of lock distribution server, including '
'protocol and port.').f_lock()
if self._wrap_mode == pypetconstants.WRAP_MODE_NETLOCK or self._use_scoop:
config_name = 'environment.%s.timeout' % self.name
timeout = self._timeout
if timeout is None:
timeout = -1.0
self._traj.f_add_config(Parameter, config_name, timeout,
comment='Timout for scoop and NETLOCK, '
'-1.0 means no timeout.').f_lock()
if (self._gc_interval and
(self._wrap_mode == pypetconstants.WRAP_MODE_LOCAL or
self._wrap_mode == pypetconstants.WRAP_MODE_QUEUE or
self._wrap_mode == pypetconstants.WRAP_MODE_PIPE)):
config_name = 'environment.%s.gc_interval' % self.name
self._traj.f_add_config(Parameter, config_name, self._gc_interval,
comment='Intervals with which ``gc.collect()`` '
'is called.').f_lock()
config_name = 'environment.%s.clean_up_runs' % self._name
self._traj.f_add_config(Parameter, config_name, self._clean_up_runs,
comment='Whether or not results should be removed after the '
'completion of a single run. '
'You are not advised to set this '
'to `False`. Only do it if you know what you are '
'doing.').f_lock()
config_name = 'environment.%s.resumable' % self._name
self._traj.f_add_config(Parameter, config_name, self._resumable,
comment='Whether or not resume files should '
'be created. If yes, everything is '
'handled by `dill`.').f_lock()
config_name = 'environment.%s.graceful_exit' % self._name
self._traj.f_add_config(Parameter, config_name, self._graceful_exit,
comment='Whether or not to allow graceful handling '
'of `SIGINT` (`CTRL+C`).').f_lock()
config_name = 'environment.%s.trajectory.name' % self.name
self._traj.f_add_config(Parameter, config_name, self.trajectory.v_name,
comment='Name of trajectory').f_lock()
config_name = 'environment.%s.trajectory.timestamp' % self.name
self._traj.f_add_config(Parameter, config_name, self.trajectory.v_timestamp,
comment='Timestamp of trajectory').f_lock()
config_name = 'environment.%s.timestamp' % self.name
self._traj.f_add_config(Parameter, config_name, self.timestamp,
comment='Timestamp of environment creation').f_lock()
config_name = 'environment.%s.hexsha' % self.name
self._traj.f_add_config(Parameter, config_name, self.hexsha,
comment='SHA-1 identifier of the environment').f_lock()
config_name = 'environment.%s.automatic_storing' % self.name
if not self._traj.f_contains('config.' + config_name):
self._traj.f_add_config(Parameter, config_name, self._automatic_storing,
comment='If trajectory should be stored automatically in the '
'end.').f_lock()
try:
config_name = 'environment.%s.script' % self.name
self._traj.f_add_config(Parameter, config_name, main.__file__,
comment='Name of the executed main script').f_lock()
except AttributeError:
pass # We end up here if we use pypet within an ipython console
for package_name, version in pypetconstants.VERSIONS_TO_STORE.items():
config_name = 'environment.%s.versions.%s' % (self.name, package_name)
self._traj.f_add_config(Parameter, config_name, version,
comment='Particular version of a package or distribution '
'used during experiment. N/A if package could not '
'be imported.').f_lock()
self._traj.config.environment.v_comment = 'Settings for the different environments ' \
'used to run the experiments'
def __repr__(self):
"""String representation of environment"""
repr_string = '<%s %s for Trajectory %s>' % (self.__class__.__name__, self.name,
self.trajectory.v_name)
return repr_string
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disable_logging()
[docs] def disable_logging(self, remove_all_handlers=True):
"""Removes all logging handlers and stops logging to files and logging stdout.
:param remove_all_handlers:
If `True` all logging handlers are removed.
If you want to keep the handlers set to `False`.
"""
self._logging_manager.finalize(remove_all_handlers)
[docs] @kwargs_api_change('continue_folder', 'resume_folder')
def resume(self, trajectory_name=None, resume_folder=None):
"""Resumes crashed trajectories.
:param trajectory_name:
Name of trajectory to resume, if not specified the name passed to the environment
is used. Be aware that if `add_time=True` the name you passed to the environment is
altered and the current date is added.
:param resume_folder:
The folder where resume files can be found. Do not pass the name of the sub-folder
with the trajectory name, but to the name of the parental folder.
If not specified the resume folder passed to the environment is used.
:return:
List of the individual results returned by your run function.
Returns a LIST OF TUPLES, where first entry is the run idx and second entry
is the actual result. In case of multiprocessing these are not necessarily
ordered according to their run index, but ordered according to their finishing time.
Does not contain results stored in the trajectory!
In order to access these simply interact with the trajectory object,
potentially after calling`~pypet.trajectory.Trajectory.f_update_skeleton`
and loading all results at once with :func:`~pypet.trajectory.f_load`
or loading manually with :func:`~pypet.trajectory.f_load_items`.
Even if you use multiprocessing without a pool the results returned by
`runfunc` still need to be pickled.
"""
if trajectory_name is None:
self._trajectory_name = self.trajectory.v_name
else:
self._trajectory_name = trajectory_name
if resume_folder is not None:
self._resume_folder = resume_folder
return self._execute_runs(None)
@property
def trajectory(self):
""" The trajectory of the Environment"""
return self._traj
@property
def traj(self):
""" Equivalent to env.trajectory"""
return self.trajectory
@property
def current_idx(self):
"""The current run index that is the next one to be executed.
Can be set manually to make the environment consider old non-completed ones.
"""
return self._current_idx
@current_idx.setter
def current_idx(self, idx):
self._current_idx = idx
@property
def hexsha(self):
"""The SHA1 identifier of the environment.
It is identical to the SHA1 of the git commit.
If version control is not used, the environment hash is computed from the
trajectory name, the current timestamp and your current *pypet* version."""
return self._hexsha
@property
def time(self):
""" Time of the creation of the environment, human readable."""
return self._time
@property
def timestamp(self):
"""Time of creation as python datetime float"""
return self._timestamp
@property
def name(self):
""" Name of the Environment"""
return self._name
[docs] def add_postprocessing(self, postproc, *args, **kwargs):
""" Adds a post processing function.
The environment will call this function via
``postproc(traj, result_list, *args, **kwargs)`` after the completion of the
single runs.
This function can load parts of the trajectory id needed and add additional results.
Moreover, the function can be used to trigger an expansion of the trajectory.
This can be useful if the user has an `optimization` task.
Either the function calls `f_expand` directly on the trajectory or returns
an dictionary. If latter `f_expand` is called by the environment.
Note that after expansion of the trajectory, the post-processing function is called
again (and again for further expansions). Thus, this allows an iterative approach
to parameter exploration.
Note that in case post-processing is called after all runs have been executed,
the storage service of the trajectory is no longer multiprocessing safe.
If you want to use multiprocessing in your post-processing you can still
manually wrap the storage service with the :class:`~pypet.environment.MultiprocessWrapper`.
In case you use **immediate** postprocessing, the storage service of your trajectory
is still multiprocessing safe (except when using the wrap_mode ``'LOCAL'``).
Accordingly, you could even use multiprocessing in your immediate post-processing phase
if you dare, like use a multiprocessing pool_, for instance.
You can easily check in your post-processing function if the storage service is
multiprocessing safe via the ``multiproc_safe`` attribute, i.e.
``traj.v_storage_service.multiproc_safe``.
:param postproc:
The post processing function
:param args:
Additional arguments passed to the post-processing function
:param kwargs:
Additional keyword arguments passed to the postprocessing function
:return:
"""
self._postproc = postproc
self._postproc_args = args
self._postproc_kwargs = kwargs
[docs] def pipeline(self, pipeline):
""" You can make *pypet* supervise your whole experiment by defining a pipeline.
`pipeline` is a function that defines the entire experiment. From pre-processing
including setting up the trajectory over defining the actual simulation runs to
post processing.
The `pipeline` function needs to return TWO tuples with a maximum of three entries each.
For example:
::
return (runfunc, args, kwargs), (postproc, postproc_args, postproc_kwargs)
Where `runfunc` is the actual simulation function thet gets passed the trajectory
container and potentially additional arguments `args` and keyword arguments `kwargs`.
This will be run by your environment with all parameter combinations.
`postproc` is a post processing function that handles your computed results.
The function must accept as arguments the trajectory container, a list of
results (list of tuples (run idx, result) ) and potentially
additional arguments `postproc_args` and keyword arguments `postproc_kwargs`.
As for :func:`~pypet.environment.Environment.f_add_postproc`, this function can
potentially extend the trajectory.
If you don't want to apply post-processing, your pipeline function can also simply
return the run function and the arguments:
::
return runfunc, args, kwargs
Or
::
return runfunc, args
Or
::
return runfunc
``return runfunc, kwargs`` does NOT work, if you don't want to pass `args` do
``return runfunc, (), kwargs``.
Analogously combinations like
::
return (runfunc, args), (postproc,)
work as well.
:param pipeline:
The pipleine function, taking only a single argument `traj`.
And returning all functions necessary for your experiment.
:return:
List of the individual results returned by `runfunc`.
Returns a LIST OF TUPLES, where first entry is the run idx and second entry
is the actual result. In case of multiprocessing these are not necessarily
ordered according to their run index, but ordered according to their finishing time.
Does not contain results stored in the trajectory!
In order to access these simply interact with the trajectory object,
potentially after calling :func:`~pypet.trajectory.Trajectory.f_update_skeleton`
and loading all results at once with :func:`~pypet.trajectory.f_load`
or loading manually with :func:`~pypet.trajectory.f_load_items`.
Even if you use multiprocessing without a pool the results returned by
`runfunc` still need to be pickled.
Results computed from `postproc` are not returned. `postproc` should not
return any results except dictionaries if the trajectory should be expanded.
"""
self._user_pipeline = True
self._map_arguments = False
return self._execute_runs(pipeline)
[docs] def pipeline_map(self, pipeline):
"""Creates a pipeline with iterable arguments"""
self._user_pipeline = True
self._map_arguments = True
return self._execute_runs(pipeline)
[docs] def run(self, runfunc, *args, **kwargs):
""" Runs the experiments and explores the parameter space.
:param runfunc: The task or job to do
:param args: Additional arguments (not the ones in the trajectory) passed to `runfunc`
:param kwargs:
Additional keyword arguments (not the ones in the trajectory) passed to `runfunc`
:return:
List of the individual results returned by `runfunc`.
Returns a LIST OF TUPLES, where first entry is the run idx and second entry
is the actual result. They are always ordered according to the run index.
Does not contain results stored in the trajectory!
In order to access these simply interact with the trajectory object,
potentially after calling`~pypet.trajectory.Trajectory.f_update_skeleton`
and loading all results at once with :func:`~pypet.trajectory.f_load`
or loading manually with :func:`~pypet.trajectory.f_load_items`.
If you use multiprocessing without a pool the results returned by
`runfunc` still need to be pickled.
"""
pipeline = lambda traj: ((runfunc, args, kwargs),
(self._postproc, self._postproc_args, self._postproc_kwargs))
self._user_pipeline = False
self._map_arguments = False
return self._execute_runs(pipeline)
[docs] def run_map(self, runfunc, *iter_args, **iter_kwargs):
"""Calls runfunc with different args and kwargs each time.
Similar to `:func:`~pypet.environment.Environment.run`
but all ``iter_args`` and ``iter_kwargs`` need to be iterables,
iterators, or generators that return new arguments for each run.
"""
if len(iter_args) == 0 and len(iter_kwargs) == 0:
raise ValueError('Use `run` if you don`t have any other arguments.')
pipeline = lambda traj: ((runfunc, iter_args, iter_kwargs),
(self._postproc, self._postproc_args, self._postproc_kwargs))
self._user_pipeline = False
self._map_arguments = True
return self._execute_runs(pipeline)
def _trigger_resume_snapshot(self):
""" Makes the trajectory continuable in case the user wants that"""
dump_dict = {}
dump_filename = os.path.join(self._resume_path, 'environment.ecnt')
# Store the trajectory before the first runs
prev_full_copy = self._traj.v_full_copy
dump_dict['full_copy'] = prev_full_copy
self._traj.v_full_copy = True
prev_storage_service = self._traj.v_storage_service
self._traj.v_storage_service = self._storage_service
dump_dict['trajectory'] = self._traj
dump_dict['args'] = self._args
dump_dict['kwargs'] = self._kwargs
dump_dict['runfunc'] = self._runfunc
dump_dict['postproc'] = self._postproc
dump_dict['postproc_args'] = self._postproc_args
dump_dict['postproc_kwargs'] = self._postproc_kwargs
dump_dict['start_timestamp'] = self._start_timestamp
dump_file = open(dump_filename, 'wb')
dill.dump(dump_dict, dump_file, protocol=2)
dump_file.flush()
dump_file.close()
self._traj.v_full_copy = prev_full_copy
self._traj.v_storage_service = prev_storage_service
def _prepare_sumatra(self):
""" Prepares a sumatra record """
reason = self._sumatra_reason
if reason:
reason += ' -- '
if self._traj.v_comment:
commentstr = ' (`%s`)' % self._traj.v_comment
else:
commentstr = ''
reason += 'Trajectory %s%s -- Explored Parameters: %s' % \
(self._traj.v_name,
commentstr,
str(list(self._traj._explored_parameters.keys())))
self._logger.info('Preparing sumatra record with reason: %s' % reason)
self._sumatra_reason = reason
self._loaded_sumatatra_project = load_project(self._sumatra_project)
if self._traj.f_contains('parameters', shortcuts=False):
param_dict = self._traj.parameters.f_to_dict(fast_access=False)
for param_name in list(param_dict.keys()):
param = param_dict[param_name]
if param.f_has_range():
param_dict[param_name] = param.f_get_range()
else:
param_dict[param_name] = param.f_get()
else:
param_dict = {}
relpath = os.path.relpath(sys.modules['__main__'].__file__, self._sumatra_project)
executable = PythonExecutable(path=sys.executable)
self._sumatra_record = self._loaded_sumatatra_project.new_record(
parameters=param_dict,
main_file=relpath,
executable=executable,
label=self._sumatra_label,
reason=reason)
def _finish_sumatra(self):
""" Saves a sumatra record """
finish_time = self._start_timestamp - self._finish_timestamp
self._sumatra_record.duration = finish_time
self._sumatra_record.output_data = self._sumatra_record.datastore.find_new_data(
self._sumatra_record.timestamp)
self._loaded_sumatatra_project.add_record(self._sumatra_record)
self._loaded_sumatatra_project.save()
sumatra_label = self._sumatra_record.label
config_name = 'sumatra.record_%s.label' % str(sumatra_label)
conf_list = []
if not self._traj.f_contains('config.' + config_name):
conf1 = self._traj.f_add_config(Parameter, config_name, str(sumatra_label),
comment='The label of the sumatra record')
conf_list.append(conf1)
if self._sumatra_reason:
config_name = 'sumatra.record_%s.reason' % str(sumatra_label)
if not self._traj.f_contains('config.' + config_name):
conf2 = self._traj.f_add_config(Parameter, config_name,
str(self._sumatra_reason),
comment='Reason of sumatra run.')
conf_list.append(conf2)
if self._automatic_storing and conf_list:
self._traj.f_store_items(conf_list)
self._logger.info('Saved sumatra project record with reason: '
'%s' % str(self._sumatra_reason))
def _prepare_resume(self):
""" Prepares the continuation of a crashed trajectory """
if not self._resumable:
raise RuntimeError('If you create an environment to resume a run, you need to '
'set `continuable=True`.')
if not self._do_single_runs:
raise RuntimeError('You cannot resume a run if you did create an environment '
'with `do_single_runs=False`.')
self._resume_path = os.path.join(self._resume_folder, self._trajectory_name)
cnt_filename = os.path.join(self._resume_path, 'environment.ecnt')
cnt_file = open(cnt_filename, 'rb')
resume_dict = dill.load(cnt_file)
cnt_file.close()
traj = resume_dict['trajectory']
# We need to update the information about the trajectory name
config_name = 'config.environment.%s.trajectory.name' % self.name
if self._traj.f_contains(config_name, shortcuts=False):
param = self._traj.f_get(config_name, shortcuts=False)
param.f_unlock()
param.f_set(traj.v_name)
param.f_lock()
config_name = 'config.environment.%s.trajectory.timestamp' % self.name
if self._traj.f_contains(config_name, shortcuts=False):
param = self._traj.f_get(config_name, shortcuts=False)
param.f_unlock()
param.f_set(traj.v_timestamp)
param.f_lock()
# Merge the information so that we keep a record about the current environment
if not traj.config.environment.f_contains(self.name, shortcuts=False):
traj._merge_config(self._traj)
self._traj = traj
# User's job function
self._runfunc = resume_dict['runfunc']
# Arguments to the user's job function
self._args = resume_dict['args']
# Keyword arguments to the user's job function
self._kwargs = resume_dict['kwargs']
# Postproc Function
self._postproc = resume_dict['postproc']
# Postprog args
self._postproc_args = resume_dict['postproc_args']
# Postproc Kwargs
self._postproc_kwargs = resume_dict['postproc_kwargs']
# Unpack the trajectory
self._traj.v_full_copy = resume_dict['full_copy']
# Load meta data
self._traj.f_load(load_parameters=pypetconstants.LOAD_NOTHING,
load_derived_parameters=pypetconstants.LOAD_NOTHING,
load_results=pypetconstants.LOAD_NOTHING,
load_other_data=pypetconstants.LOAD_NOTHING)
# Now we have to reconstruct previous results
result_list = []
full_filename_list = []
for filename in os.listdir(self._resume_path):
_, ext = os.path.splitext(filename)
if ext != '.rcnt':
continue
full_filename = os.path.join(self._resume_path, filename)
cnt_file = open(full_filename, 'rb')
result_list.append(dill.load(cnt_file))
cnt_file.close()
full_filename_list.append(full_filename)
new_result_list = []
for result_tuple in result_list:
run_information = result_tuple[1]
self._traj._update_run_information(run_information)
new_result_list.append(result_tuple[0])
result_sort(new_result_list)
# Add a config parameter signalling that an experiment was resumed, and how many of them
config_name = 'environment.%s.resumed' % self.name
if not config_name in self._traj:
self._traj.f_add_config(Parameter, config_name, True,
comment='Added if a crashed trajectory was continued.')
self._logger.info('I will resume trajectory `%s`.' % self._traj.v_name)
return new_result_list
def _prepare_runs(self, pipeline):
"""Prepares the running of an experiment
:param pipeline:
A pipeline function that defines the task
"""
pip_result = pipeline(self._traj) # Call the pipeline function
# Extract the task to do from the pipeline result
raise_error = False
if pip_result is None:
if self._do_single_runs:
raise RuntimeError('Your pipeline function did return `None`.'
'Accordingly, I assume you just do data analysis. '
'Please create and environment with `do_single_runs=False`.')
self._logger.info('Your pipeline returned no runfunction, I assume you do some '
'sort of data analysis and will skip any single run execution.')
self._runfunc = None
return
elif (len(pip_result) == 2 and
isinstance(pip_result[0], tuple) and
isinstance(pip_result[1], tuple)):
# Extract the run and post-processing functions and arguments
run_tuple = pip_result[0]
self._runfunc = run_tuple[0]
if len(run_tuple) > 1:
self._args = run_tuple[1]
if len(run_tuple) > 2:
self._kwargs = run_tuple[2]
if len(run_tuple) > 3:
raise_error = True
postproc_tuple = pip_result[1]
if len(postproc_tuple) > 0:
self._postproc = postproc_tuple[0]
if len(postproc_tuple) > 1:
self._postproc_args = postproc_tuple[1]
if len(postproc_tuple) > 2:
self._postproc_kwargs = postproc_tuple[2]
if len(run_tuple) > 3:
raise_error = True
elif len(pip_result) <= 3:
self._runfunc = pip_result[0]
if len(pip_result) > 1:
self._args = pip_result[1]
if len(pip_result) > 2:
self._kwargs = pip_result[2]
else:
raise_error = True
if raise_error:
raise RuntimeError('Your pipeline result is not understood please return'
'a tuple of maximum length 3: ``(runfunc, args, kwargs)`` '
'Or return two tuple of maximum length 3: '
'``(runfunc, args, kwargs), '
'(postproc, postproc_args, postproc_kwargs)')
if self._runfunc is not None and not self._do_single_runs:
raise RuntimeError('You cannot make a run if you did create an environment '
'with `do_single_runs=False`.')
if self._resumable:
racedirs(self._resume_path)
if os.listdir(self._resume_path):
raise RuntimeError('Your resume folder `%s` needs '
'to be empty to allow continuing!' % self._resume_path)
if self._user_pipeline:
self._logger.info('\n************************************************************\n'
'STARTING PPREPROCESSING for trajectory\n`%s`'
'\n************************************************************\n' %
self._traj.v_name)
# Make some preparations (locking of parameters etc) and store the trajectory
self._logger.info('I am preparing the Trajectory for the experiment and '
'initialise the store.')
self._traj._prepare_experiment()
self._logger.info('Initialising the storage for the trajectory.')
self._traj.f_store(only_init=True)
def _show_progress(self, n, total_runs):
"""Displays a progressbar"""
self._logging_manager.show_progress(n, total_runs)
def _make_kwargs(self, **kwargs):
"""Creates the keyword arguments for the single run handling"""
result_dict = {'traj': self._traj,
'logging_manager': self._logging_manager,
'runfunc': self._runfunc,
'runargs': self._args,
'runkwargs': self._kwargs,
'clean_up_runs': self._clean_up_runs,
'automatic_storing': self._automatic_storing,
'wrap_mode': self._wrap_mode,
'niceness': self._niceness,
'graceful_exit': self._graceful_exit}
result_dict.update(kwargs)
if self._multiproc:
if self._use_pool or self._use_scoop:
if self._use_scoop:
del result_dict['graceful_exit']
if self._freeze_input:
# Remember the full copy setting for the frozen input to
# change this back once the trajectory is received by
# each process
result_dict['full_copy'] = self.traj.v_full_copy
if self._map_arguments:
del result_dict['runargs']
del result_dict['runkwargs']
else:
result_dict['clean_up_runs'] = False
if self._use_pool:
# Needs only be deleted in case of using a pool but necessary for scoop
del result_dict['logging_manager']
del result_dict['niceness']
else:
result_dict['clean_up_runs'] = False
return result_dict
def _make_index_iterator(self, start_run_idx):
"""Returns an iterator over the run indices that are not completed"""
total_runs = len(self._traj)
for n in range(start_run_idx, total_runs):
self._current_idx = n + 1
if self._stop_iteration:
self._logger.debug('I am stopping new run iterations now!')
break
if not self._traj._is_completed(n):
self._traj.f_set_crun(n)
yield n
else:
self._logger.debug('Run `%d` has already been completed, I am skipping it.' % n)
def _make_iterator(self, start_run_idx, copy_data=False, **kwargs):
""" Returns an iterator over all runs and yields the keyword arguments """
if (not self._freeze_input) or (not self._multiproc):
kwargs = self._make_kwargs(**kwargs)
def _do_iter():
if self._map_arguments:
self._args = tuple(iter(arg) for arg in self._args)
for key in list(self._kwargs.keys()):
self._kwargs[key] = iter(self._kwargs[key])
for idx in self._make_index_iterator(start_run_idx):
iter_args = tuple(next(x) for x in self._args)
iter_kwargs = {}
for key in self._kwargs:
iter_kwargs[key] = next(self._kwargs[key])
kwargs['runargs'] = iter_args
kwargs['runkwargs'] = iter_kwargs
if self._freeze_input:
# Frozen pool needs current run index
kwargs['idx'] = idx
if copy_data:
copied_kwargs = kwargs.copy()
if not self._freeze_input:
copied_kwargs['traj'] = self._traj.f_copy(copy_leaves='explored',
with_links=True)
yield copied_kwargs
else:
yield kwargs
else:
for idx in self._make_index_iterator(start_run_idx):
if self._freeze_input:
# Frozen pool needs current run index
kwargs['idx'] = idx
if copy_data:
copied_kwargs = kwargs.copy()
if not self._freeze_input:
copied_kwargs['traj'] = self._traj.f_copy(copy_leaves='explored',
with_links=True)
yield copied_kwargs
else:
yield kwargs
return _do_iter()
def _execute_postproc(self, results):
""" Executes a postprocessing function
:param results:
List of tuples containing the run indices and the results
:return:
1. Whether to new single runs, since the trajectory was enlarged
2. Index of next new run
3. Number of new runs
"""
repeat = False
start_run_idx = 0
new_runs = 0
# Do some finalization
self._traj._finalize(store_meta_data=True)
old_traj_length = len(self._traj)
postproc_res = self._postproc(self._traj, results,
*self._postproc_args, **self._postproc_kwargs)
if postproc_res is None:
pass
elif isinstance(postproc_res, dict):
if postproc_res:
self._traj.f_expand(postproc_res)
elif isinstance(postproc_res, tuple):
expand_dict = postproc_res[0]
if len(postproc_res) > 1:
self._args = postproc_res[1]
if len(postproc_res) > 2:
self._kwargs = postproc_res[2]
if len(postproc_res) > 3:
self._postproc_args = postproc_res[3]
if len(postproc_res) > 4:
self._postproc_kwargs = postproc_res[4]
if expand_dict:
self._traj.f_expand(expand_dict)
else:
self._logger.error('Your postproc result `%s` was not understood.' % str(postproc_res))
new_traj_length = len(self._traj)
if new_traj_length != old_traj_length:
start_run_idx = old_traj_length
repeat = True
if self._resumable:
self._logger.warning('Continuing a trajectory AND expanding it during runtime is '
'NOT supported properly, there is no guarantee that this '
'works!')
self._traj.f_store(only_init=True)
new_traj_length = len(self._traj)
new_runs = new_traj_length - old_traj_length
return repeat, start_run_idx, new_runs
def _estimate_cpu_utilization(self):
"""Estimates the cpu utilization within the last 500ms"""
now = time.time()
if now - self._last_cpu_check >= 0.5:
try:
self._last_cpu_usage = psutil.cpu_percent()
self._last_cpu_check = now
except (psutil.NoSuchProcess, ZeroDivisionError):
pass # psutil sometimes produces ZeroDivisionErrors, has been fixed in newer
# Versions but we want to support older as well
return self._last_cpu_usage
def _estimate_memory_utilization(self, process_dict):
"""Estimates memory utilization to come if process was started"""
n_processes = len(process_dict)
total_utilization = psutil.virtual_memory().percent
sum = 0.0
for proc in process_dict.values():
try:
sum += psutil.Process(proc.pid).memory_percent()
except (psutil.NoSuchProcess, ZeroDivisionError):
pass
curr_all_processes = sum
missing_utilization = max(0.0, n_processes * self._est_per_process - curr_all_processes)
estimated_utilization = total_utilization
estimated_utilization += missing_utilization
estimated_utilization += self._est_per_process
return estimated_utilization
def _execute_runs(self, pipeline):
""" Starts the individual single runs.
Starts runs sequentially or initiates multiprocessing.
:param pipeline:
A pipeline function producing the run function the corresponding arguments
and postprocessing function and arguments
:return:
List of tuples, where each tuple contains the run idx and the result.
"""
if self._start_timestamp is None:
self._start_timestamp = time.time()
if self._map_arguments and self._resumable:
raise ValueError('You cannot use `run_map` or `pipeline_map` in combination '
'with continuing option.')
if self._sumatra_project is not None:
self._prepare_sumatra()
if pipeline is not None:
results = []
self._prepare_runs(pipeline)
else:
results = self._prepare_resume()
if self._runfunc is not None:
self._traj._run_by_environment = True
if self._graceful_exit:
sigint_handling.start()
try:
self._inner_run_loop(results)
finally:
self._traj._run_by_environment = False
self._stop_iteration = False
if self._graceful_exit:
sigint_handling.finalize()
self._add_wildcard_config()
if self._automatic_storing:
self._logger.info('\n************************************************************\n'
'STARTING FINAL STORING of trajectory\n`%s`'
'\n************************************************************\n' %
self._traj.v_name)
self._traj.f_store()
self._logger.info('\n************************************************************\n'
'FINISHED FINAL STORING of trajectory\n`%s`.'
'\n************************************************************\n' %
self._traj.v_name)
self._finish_timestamp = time.time()
findatetime = datetime.datetime.fromtimestamp(self._finish_timestamp)
startdatetime = datetime.datetime.fromtimestamp(self._start_timestamp)
self._runtime = str(findatetime - startdatetime)
conf_list = []
config_name = 'environment.%s.start_timestamp' % self.name
if not self._traj.f_contains('config.' + config_name):
conf1 = self._traj.f_add_config(Parameter, config_name, self._start_timestamp,
comment='Timestamp of starting of experiment '
'(when the actual simulation was '
'started (either by calling `run`, '
'`resume`, or `pipeline`).')
conf_list.append(conf1)
config_name = 'environment.%s.finish_timestamp' % self.name
if not self._traj.f_contains('config.' + config_name):
conf2 = self._traj.f_add_config(Parameter, config_name, self._finish_timestamp,
comment='Timestamp of finishing of an experiment.')
else:
conf2 = self._traj.f_get('config.' + config_name)
conf2.f_unlock()
conf2.f_set(self._finish_timestamp)
conf_list.append(conf2)
config_name = 'environment.%s.runtime' % self.name
if not self._traj.f_contains('config.' + config_name):
conf3 = self._traj.f_add_config(Parameter, config_name, self._runtime,
comment='Runtime of whole experiment.')
else:
conf3 = self._traj.f_get('config.' + config_name)
conf3.f_unlock()
conf3.f_set(self._runtime)
conf_list.append(conf3)
if self._automatic_storing:
self._traj.f_store_items(conf_list, store_data=pypetconstants.OVERWRITE_DATA)
if hasattr(self._traj.v_storage_service, 'finalize'):
# Finalize the storage service if this is supported
self._traj.v_storage_service.finalize()
incomplete = []
for run_name in self._traj.f_get_run_names():
if not self._traj._is_completed(run_name):
incomplete.append(run_name)
if len(incomplete) > 0:
self._logger.error('Following runs of trajectory `%s` '
'did NOT complete: `%s`' % (self._traj.v_name,
', '.join(incomplete)))
else:
self._logger.info('All runs of trajectory `%s` were completed successfully.' %
self._traj.v_name)
if self._sumatra_project is not None:
self._finish_sumatra()
return results
def _add_wildcard_config(self):
"""Adds config data about the wildcard functions"""
for idx, pair in enumerate(self._traj._wildcard_functions.items()):
wildcards, wc_function = pair
for jdx, wildcard in enumerate(wildcards):
config_name = ('environment.%s.wildcards.function_%d.wildcard_%d' %
(self.name, idx, jdx))
if not self._traj.f_contains('config.' + config_name):
self._traj.f_add_config(Parameter, config_name, wildcard,
comment='Wildcard symbol for the wildcard function').f_lock()
if hasattr(wc_function, '__name__'):
config_name = ('environment.%s.wildcards.function_%d.name' %
(self.name, idx))
if not self._traj.f_contains('config.' + config_name):
self._traj.f_add_config(Parameter, config_name, wc_function.__name__,
comment='Nme of wildcard function').f_lock()
if wc_function.__doc__:
config_name = ('environment.%s.wildcards.function_%d.doc' %
(self.name, idx))
if not self._traj.f_contains('config.' + config_name):
self._traj.f_add_config(Parameter, config_name, wc_function.__doc__,
comment='Docstring of wildcard function').f_lock()
try:
source = inspect.getsource(wc_function)
config_name = ('environment.%s.wildcards.function_%d.source' %
(self.name, idx))
if not self._traj.f_contains('config.' + config_name):
self._traj.f_add_config(Parameter, config_name, source,
comment='Source code of wildcard function').f_lock()
except Exception:
pass # We cannot find the source, just leave it
def _inner_run_loop(self, results):
"""Performs the inner loop of the run execution"""
start_run_idx = self._current_idx
expanded_by_postproc = False
self._storage_service = self._traj.v_storage_service
self._multiproc_wrapper = None
if self._resumable:
self._trigger_resume_snapshot()
self._logger.info(
'\n************************************************************\n'
'STARTING runs of trajectory\n`%s`.'
'\n************************************************************\n' %
self._traj.v_name)
while True:
if self._multiproc:
expanded_by_postproc = self._execute_multiprocessing(start_run_idx, results)
else:
# Create a generator to generate the tasks
iterator = self._make_iterator(start_run_idx)
n = start_run_idx
total_runs = len(self._traj)
# Signal start of progress calculation
self._show_progress(n - 1, total_runs)
for task in iterator:
result = _sigint_handling_single_run(task)
n = self._check_result_and_store_references(result, results,
n, total_runs)
repeat = False
if self._postproc is not None:
self._logger.info('Performing POSTPROCESSING')
repeat, start_run_idx, new_runs = self._execute_postproc(results)
if not repeat:
break
else:
expanded_by_postproc = True
self._logger.info('POSTPROCESSING expanded the trajectory and added %d new runs' %
new_runs)
# Do some finalization
self._traj._finalize(store_meta_data=True)
self._logger.info(
'\n************************************************************\n'
'FINISHED all runs of trajectory\n`%s`.'
'\n************************************************************\n' %
self._traj.v_name)
if self._resumable and self._delete_resume:
# We remove all resume files if the simulation was successfully completed
shutil.rmtree(self._resume_path)
if expanded_by_postproc:
config_name = 'environment.%s.postproc_expand' % self.name
if not self._traj.f_contains('config.' + config_name):
self._traj.f_add_config(Parameter, config_name, True,
comment='Added if trajectory was expanded '
'by postprocessing.')
def _get_results_from_queue(self, result_queue, results, n, total_runs):
"""Extract all available results from the queue and returns the increased n"""
# Get all results from the result queue
while not result_queue.empty():
result = result_queue.get()
n = self._check_result_and_store_references(result, results, n, total_runs)
return n
def _check_result_and_store_references(self, result, results, n, total_runs):
"""Checks for SIGINT and if reference wrapping and stores references."""
if result[0] == sigint_handling.SIGINT:
self._stop_iteration = True
result = result[1] # If SIGINT result is a nested tuple
if result is not None:
if self._wrap_mode == pypetconstants.WRAP_MODE_LOCAL:
self._multiproc_wrapper.store_references(result[2])
self._traj._update_run_information(result[1])
results.append(result[0])
if self._resumable:
# [0:2] to not store references
self._trigger_result_snapshot(result[0:2])
self._show_progress(n, total_runs)
n += 1
return n
def _trigger_result_snapshot(self, result):
""" Triggers a snapshot of the results for continuing
:param result: Currently computed result
"""
timestamp = result[1]['finish_timestamp']
timestamp_str = repr(timestamp).replace('.', '_')
filename = 'result_%s' % timestamp_str
extension = '.ncnt'
dump_filename = os.path.join(self._resume_path, filename + extension)
dump_file = open(dump_filename, 'wb')
dill.dump(result, dump_file, protocol=2)
dump_file.flush()
dump_file.close()
# We rename the file to be certain that the trajectory did not crash during taking
# the snapshot!
extension = '.rcnt'
rename_filename = os.path.join(self._resume_path, filename + extension)
shutil.move(dump_filename, rename_filename)
def _execute_multiprocessing(self, start_run_idx, results):
"""Performs multiprocessing and signals expansion by postproc"""
n = start_run_idx
total_runs = len(self._traj)
expanded_by_postproc = False
if (self._wrap_mode == pypetconstants.WRAP_MODE_NONE or
self._storage_service.multiproc_safe):
self._logger.info('I assume that your storage service is multiprocessing safe.')
else:
use_manager = (self._wrap_mode == pypetconstants.WRAP_MODE_QUEUE or
self._immediate_postproc)
self._multiproc_wrapper = MultiprocContext(self._traj,
self._wrap_mode,
full_copy=None,
manager=None,
use_manager=use_manager,
lock=None,
queue=None,
queue_maxsize=self._queue_maxsize,
port=self._url,
timeout=self._timeout,
gc_interval=self._gc_interval,
log_config=self._logging_manager.log_config,
log_stdout=self._logging_manager.log_stdout,
graceful_exit=self._graceful_exit)
self._multiproc_wrapper.start()
try:
if self._use_pool:
self._logger.info('Starting Pool with %d processes' % self._ncores)
if self._freeze_input:
self._logger.info('Freezing pool input')
init_kwargs = self._make_kwargs()
# To work under windows we must allow the full-copy now!
# Because windows does not support forking!
pool_full_copy = self._traj.v_full_copy
self._traj.v_full_copy = True
initializer = _configure_frozen_pool
target = _frozen_pool_single_run
else:
# We don't want to pickle the storage service
pool_service = self._traj.v_storage_service
self._traj.v_storage_service = None
init_kwargs = dict(logging_manager=self._logging_manager,
storage_service=pool_service,
niceness=self._niceness)
initializer = _configure_pool
target = _pool_single_run
try:
iterator = self._make_iterator(start_run_idx)
mpool = multip.Pool(self._ncores, initializer=initializer,
initargs=(init_kwargs,))
pool_results = mpool.imap(target, iterator)
# Signal start of progress calculation
self._show_progress(n - 1, total_runs)
for result in pool_results:
n = self._check_result_and_store_references(result, results,
n, total_runs)
# Everything is done
mpool.close()
mpool.join()
finally:
if self._freeze_input:
self._traj.v_full_copy = pool_full_copy
else:
self._traj.v_storage_service = pool_service
self._logger.info('Pool has joined, will delete it.')
del mpool
elif self._use_scoop:
self._logger.info('Starting SCOOP jobs')
if self._freeze_input:
self._logger.info('Freezing SCOOP input')
if not hasattr(_frozen_scoop_single_run, 'kwargs'):
_frozen_scoop_single_run.kwargs = {}
scoop_full_copy = self._traj.v_full_copy
self._traj.v_full_copy = True
init_kwargs = self._make_kwargs()
scoop_rev = self.name + '_' + str(time.time()).replace('.','_')
shared.setConst(**{scoop_rev: init_kwargs})
iterator = self._make_iterator(start_run_idx,
copy_data=True,
scoop_rev=scoop_rev)
target = _frozen_scoop_single_run
else:
iterator = self._make_iterator(start_run_idx,
copy_data=True)
target = _scoop_single_run
try:
if scoop.IS_RUNNING:
scoop_results = futures.map(target, iterator, timeout=self._timeout)
else:
self._logger.error('SCOOP is NOT running, I will use Python`s map '
'function. To activate scoop, start your script via '
'`python -m scoop your_script.py`.')
scoop_results = map(target, iterator)
# Signal start of progress calculation
self._show_progress(n - 1, total_runs)
for result in scoop_results:
n = self._check_result_and_store_references(result, results,
n, total_runs)
finally:
if self._freeze_input:
self._traj.v_full_copy = scoop_full_copy
else:
# If we spawn a single process for each run, we need an additional queue
# for the results of `runfunc`
if self._immediate_postproc:
maxsize = 0
else:
maxsize = total_runs
start_result_length = len(results)
result_queue = multip.Queue(maxsize=maxsize)
# Create a generator to generate the tasks for multiprocessing
iterator = self._make_iterator(start_run_idx, result_queue=result_queue)
self._logger.info('Starting multiprocessing with at most '
'%d processes running at the same time.' % self._ncores)
if self._check_usage:
self._logger.info(
'Monitoring usage statistics. I will not spawn new processes '
'if one of the following cap thresholds is crossed, '
'CPU: %.1f %%, RAM: %.1f %%, Swap: %.1f %%.' %
(self._cpu_cap, self._memory_cap[0], self._swap_cap))
keep_running = True # Evaluates to false if trajectory produces
# no more single runs
process_dict = {} # Dict containing all subprocees
# For the cap values, we lazily evaluate them
cpu_usage_func = lambda: self._estimate_cpu_utilization()
memory_usage_func = lambda: self._estimate_memory_utilization(process_dict)
swap_usage_func = lambda: psutil.swap_memory().percent
signal_cap = True # If True cap warning is emitted
max_signals = 10 # Maximum number of warnings, after that warnings are
# no longer signaled
# Signal start of progress calculation
self._show_progress(n - 1, total_runs)
while len(process_dict) > 0 or keep_running:
# First check if some processes did finish their job
for pid in list(process_dict.keys()):
proc = process_dict[pid]
# Delete the terminated processes
if not proc.is_alive():
proc.join()
del process_dict[pid]
del proc
# Check if caps are reached.
# Cap is only checked if there is at least one
# process working to prevent deadlock.
no_cap = True
if self._check_usage and self._ncores > len(process_dict) > 0:
for cap_name, cap_function, threshold in (
('CPU Cap', cpu_usage_func, self._cpu_cap),
('Memory Cap', memory_usage_func, self._memory_cap[0]),
('Swap Cap', swap_usage_func, self._swap_cap)):
cap_value = cap_function()
if cap_value > threshold:
no_cap = False
if signal_cap:
if cap_name == 'Memory Cap':
add_on_str = ' [including estimate]'
else:
add_on_str = ''
self._logger.warning('Could not start next process '
'immediately [currently running '
'%d process(es)]. '
'%s reached, '
'%.1f%% >= %.1f%%%s.' %
(len(process_dict), cap_name,
cap_value, threshold,
add_on_str))
signal_cap = False
max_signals -= 1
if max_signals == 0:
self._logger.warning('Maximum number of cap warnings '
'reached. I will no longer '
'notify about cap violations, '
'but cap values are still applied '
'silently in background.')
break # If one cap value is reached we can skip the rest
# If we have less active processes than
# self._ncores and there is still
# a job to do, add another process
if len(process_dict) < self._ncores and keep_running and no_cap:
try:
task = next(iterator)
proc = multip.Process(target=_process_single_run,
args=(task,))
proc.start()
process_dict[proc.pid] = proc
signal_cap = max_signals > 0 # Only signal max_signals times
except StopIteration:
# All simulation runs have been started
keep_running = False
if self._postproc is not None and self._immediate_postproc:
if self._wrap_mode == pypetconstants.WRAP_MODE_LOCAL:
reference_service = self._traj._storage_service
self._traj.v_storage_service = self._storage_service
try:
self._logger.info('Performing IMMEDIATE POSTPROCESSING.')
keep_running, start_run_idx, new_runs = \
self._execute_postproc(results)
finally:
if self._wrap_mode == pypetconstants.WRAP_MODE_LOCAL:
self._traj._storage_service = reference_service
if keep_running:
expanded_by_postproc = True
self._logger.info('IMMEDIATE POSTPROCESSING expanded '
'the trajectory and added %d '
'new runs' % new_runs)
n = start_run_idx
total_runs = len(self._traj)
iterator = self._make_iterator(start_run_idx,
result_queue=result_queue)
if not keep_running:
self._logger.debug('All simulation runs have been started. '
'No new runs will be started. '
'The simulation will finish after the still '
'active runs completed.')
else:
time.sleep(0.001)
# Get all results from the result queue
n = self._get_results_from_queue(result_queue, results, n, total_runs)
# Finally get all results from the result queue once more and finalize the queue
self._get_results_from_queue(result_queue, results, n, total_runs)
result_queue.close()
result_queue.join_thread()
del result_queue
result_sort(results, start_result_length)
finally:
# Finalize the wrapper
if self._multiproc_wrapper is not None:
self._multiproc_wrapper.finalize()
self._multiproc_wrapper = None
return expanded_by_postproc
[docs]@prefix_naming
class MultiprocContext(HasLogger):
""" A lightweight environment that allows the usage of multiprocessing.
Can be used if you don't want a full-blown :class:`~pypet.environment.Environment` to
enable multiprocessing or if you want to implement your own custom multiprocessing.
This Wrapper tool will take a trajectory container and take care that the storage
service is multiprocessing safe. Supports the ``'LOCK'`` as well as the ``'QUEUE'`` mode.
In case of the latter an extra queue process is created if desired.
This process will handle all storage requests and write data to the hdf5 file.
Not that in case of ``'QUEUE'`` wrapping data can only be stored not loaded, because
the queue will only be read in one direction.
:param trajectory:
The trajectory which storage service should be wrapped
:param wrap_mode:
There are four options:
:const:`~pypet.pypetconstants.WRAP_MODE_QUEUE`: ('QUEUE')
If desired another process for storing the trajectory is spawned.
The sub processes running the individual trajectories will add their results to a
multiprocessing queue that is handled by an additional process.
Note that this requires additional memory since data
will be pickled and send over the queue for storage!
:const:`~pypet.pypetconstants.WRAP_MODE_LOCK`: ('LOCK')
Each individual process takes care about storage by itself. Before
carrying out the storage, a lock is placed to prevent the other processes
to store data. Accordingly, sometimes this leads to a lot of processes
waiting until the lock is released.
Yet, data does not need to be pickled before storage!
:const:`~pypet.pypetconstants.WRAP_MODE_PIPE`: ('PIPE)
Experimental mode based on a single pipe. Is faster than ``'QUEUE'`` wrapping
but data corruption may occur, does not work under Windows
(since it relies on forking).
:const:`~pypet.pypetconstant.WRAP_MODE_LOCAL` ('LOCAL')
Data is not stored in spawned child processes, but data needs to be
retunred manually in terms of *references* dictionaries (the ``reference`` property
of the ``ReferenceWrapper`` class)..
Storing is only performed in the main process.
Note that removing data during a single run has no longer an effect on memory
whatsoever, because there are references kept for all data
that is supposed to be stored.
:param full_copy:
In case the trajectory gets pickled (sending over a queue or a pool of processors)
if the full trajectory should be copied each time (i.e. all parameter points) or
only a particular point. A particular point can be chosen beforehand with
:func:`~pypet.trajectory.Trajectory.f_set_crun`.
Leave ``full_copy=None`` if the setting from the passed trajectory should be used.
Otherwise ``v_full_copy`` of the trajectory is changed to your chosen value.
:param manager:
You can pass an optional multiprocessing manager here,
if you already have instantiated one.
Leave ``None`` if you want the wrapper to create one.
:param use_manager:
If your lock and queue should be created with a manager or if wrapping should be
created from the multiprocessing module directly.
For example: ``multiprocessing.Lock()`` or via a manager
``multiprocessing.Manager().Lock()``
(if you specified a manager, this manager will be used).
The former is usually faster whereas the latter is more flexible and can
be used in an environment where fork is not available, for instance.
:param lock:
You can pass a multiprocessing lock here, if you already have instantiated one.
Leave ``None`` if you want the wrapper to create one in case of ``'LOCK'`` wrapping.
:param queue:
You can pass a multiprocessing queue here, if you already instantiated one.
Leave ``None`` if you want the wrapper to create one in case of ''`QUEUE'`` wrapping.
:param queue_maxsize:
Maximum size of queue if created new. 0 means infinite.
:param port:
Port to be used by lock server in case of ``'NETLOCK'`` wrapping.
Can be a single integer as well as a tuple ``(7777, 9999)`` to specify
a range of ports from which to pick a random one.
Leave `None` for using pyzmq's default range.
In case automatic determining of the host's ip address fails,
you can also pass the full address (including the protocol and
the port) of the host in the network like ``'tcp://127.0.0.1:7777'``.
:param timeout:
Timeout for a NETLOCK wrapping in seconds. After ``timeout``
seconds a lock is automatically released and free for other
processes.
:param gc_interval:
Interval (in runs or storage operations) with which ``gc.collect()``
should be called in case of the ``'LOCAL'``, ``'QUEUE'``, or ``'PIPE'`` wrapping.
Leave ``None`` for never.
``1`` means after every storing, ``2`` after every second storing, and so on.
Only calls ``gc.collect()`` in the main (if ``'LOCAL'`` wrapping)
or the queue/pipe process. If you need to garbage collect data within your single runs,
you need to manually call ``gc.collect()``.
Usually, there is no need to set this parameter since the Python garbage collection
works quite nicely and schedules collection automatically.
:param log_config:
Path to logging config file or dictionary to configure logging for the
spawned queue process. Thus, only considered if the queue wrap mode is chosen.
:param log_stdout:
If stdout of the queue process should also be logged.
:param graceful_exit:
Hitting Ctrl+C won't kill a server process unless hit twice.
For an usage example see :ref:`example-16`.
"""
def __init__(self, trajectory,
wrap_mode=pypetconstants.WRAP_MODE_LOCK,
full_copy=None,
manager=None,
use_manager=True,
lock=None,
queue=None,
queue_maxsize=0,
port=None,
timeout=None,
gc_interval=None,
log_config=None,
log_stdout=False,
graceful_exit=False):
self._set_logger()
self._manager = manager
self._traj = trajectory
self._storage_service = self._traj.v_storage_service
self._queue_process = None
self._pipe_process = None
self._lock_wrapper = None
self._queue_wrapper = None
self._reference_wrapper = None
self._wrap_mode = wrap_mode
self._queue = queue
self._queue_maxsize = queue_maxsize
self._pipe = queue
self._max_buffer_size = queue_maxsize
self._lock = lock
self._lock_process = None
self._port = port
self._timeout = timeout
self._use_manager = use_manager
self._logging_manager = None
self._gc_interval = gc_interval
self._graceful_exit = graceful_exit
if (self._wrap_mode == pypetconstants.WRAP_MODE_QUEUE or
self._wrap_mode == pypetconstants.WRAP_MODE_PIPE or
self._wrap_mode == pypetconstants.WRAP_MODE_NETLOCK or
self._wrap_mode == pypetconstants.WRAP_MODE_NETQUEUE):
self._logging_manager = LoggingManager(log_config=log_config,
log_stdout=log_stdout)
self._logging_manager.extract_replacements(self._traj)
self._logging_manager.check_log_config()
if full_copy is not None:
self._traj.v_full_copy=full_copy
@property
def lock(self):
return self._lock
@property
def queue(self):
return self._queue
@property
def pipe(self):
return self._pipe
@property
def queue_wrapper(self):
return self._queue_wrapper
@property
def reference_wrapper(self):
return self._reference_wrapper
@property
def lock_wrapper(self):
return self._lock_wrapper
@property
def pipe_wrapper(self):
return self._pipe_wrapper
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.finalize()
[docs] def store_references(self, references):
"""In case of reference wrapping, stores data.
:param references: References dictionary from a ReferenceWrapper.
:param gc_collect: If ``gc.collect`` should be called.
:param n:
Alternatively if ``gc_interval`` is set, a current index can be passed.
Data is stored in case ``n % gc_interval == 0``.
"""
self._reference_store.store_references(references)
[docs] def start(self):
"""Starts the multiprocess wrapping.
Automatically called when used as context manager.
"""
self._do_wrap()
def _do_wrap(self):
""" Wraps a Storage Service """
# First take care that the storage is initialised
self._traj.f_store(only_init=True)
if self._wrap_mode == pypetconstants.WRAP_MODE_QUEUE:
self._prepare_queue()
elif self._wrap_mode == pypetconstants.WRAP_MODE_LOCK:
self._prepare_lock()
elif self._wrap_mode == pypetconstants.WRAP_MODE_PIPE:
self._prepare_pipe()
elif self._wrap_mode == pypetconstants.WRAP_MODE_LOCAL:
self._prepare_local()
elif self._wrap_mode == pypetconstants.WRAP_MODE_NETLOCK:
self._prepare_netlock()
elif self._wrap_mode == pypetconstants.WRAP_MODE_NETQUEUE:
self._prepare_netqueue()
else:
raise RuntimeError('The mutliprocessing mode %s, your choice is '
'not supported, use %s`, `%s`, %s, `%s`, or `%s`.'
% (self._wrap_mode, pypetconstants.WRAP_MODE_QUEUE,
pypetconstants.WRAP_MODE_LOCK,
pypetconstants.WRAP_MODE_PIPE,
pypetconstants.WRAP_MODE_LOCAL,
pypetconstants.WRAP_MODE_NETLOCK))
def _prepare_local(self):
reference_wrapper = ReferenceWrapper()
self._traj.v_storage_service = reference_wrapper
self._reference_wrapper = reference_wrapper
self._reference_store = ReferenceStore(self._storage_service, self._gc_interval)
def _prepare_netlock(self):
""" Replaces the trajectory's service with a LockWrapper """
if not isinstance(self._port, str):
url = port_to_tcp(self._port)
self._logger.info('Determined Server URL: `%s`' % url)
else:
url = self._port
if self._lock is None:
if hasattr(os, 'fork'):
self._lock = ForkAwareLockerClient(url)
else:
self._lock = LockerClient(url)
if self._timeout is None:
lock_server = LockerServer(url)
else:
lock_server = TimeOutLockerServer(url, self._timeout)
self._logger.info('Using timeout aware lock server.')
self._lock_process = multip.Process(name='LockServer', target=_wrap_handling,
args=(dict(handler=lock_server,
logging_manager=self._logging_manager,
graceful_exit=self._graceful_exit),))
# self._lock_process = threading.Thread(name='LockServer', target=_wrap_handling,
# args=(dict(handler=lock_server,
# logging_manager=self._logging_manager),))
self._lock_process.start()
self._lock.start()
# Wrap around the storage service to allow the placement of locks around
# the storage procedure.
lock_wrapper = LockWrapper(self._storage_service, self._lock)
self._traj.v_storage_service = lock_wrapper
self._lock_wrapper = lock_wrapper
def _prepare_lock(self):
""" Replaces the trajectory's service with a LockWrapper """
if self._lock is None:
if self._use_manager:
if self._manager is None:
self._manager = multip.Manager()
# We need a lock that is shared by all processes.
self._lock = self._manager.Lock()
else:
self._lock = multip.Lock()
# Wrap around the storage service to allow the placement of locks around
# the storage procedure.
lock_wrapper = LockWrapper(self._storage_service, self._lock)
self._traj.v_storage_service = lock_wrapper
self._lock_wrapper = lock_wrapper
def _prepare_pipe(self):
""" Replaces the trajectory's service with a queue sender and starts the queue process.
"""
if self._pipe is None:
self._pipe = multip.Pipe(True)
if self._lock is None:
self._lock = multip.Lock()
self._logger.info('Starting the Storage Pipe!')
# Wrap a queue writer around the storage service
pipe_handler = PipeStorageServiceWriter(self._storage_service, self._pipe[0],
max_buffer_size=self._max_buffer_size)
# Start the queue process
self._pipe_process = multip.Process(name='PipeProcess', target=_wrap_handling,
args=(dict(handler=pipe_handler,
logging_manager=self._logging_manager,
graceful_exit=self._graceful_exit),))
self._pipe_process.start()
# Replace the storage service of the trajectory by a sender.
# The sender will put all data onto the pipe.
# The writer from above will receive the data from
# the pipe and hand it over to
# the storage service
self._pipe_wrapper = PipeStorageServiceSender(self._pipe[1], self._lock)
self._traj.v_storage_service = self._pipe_wrapper
def _prepare_queue(self):
""" Replaces the trajectory's service with a queue sender and starts the queue process.
"""
if self._queue is None:
if self._use_manager:
if self._manager is None:
self._manager = multip.Manager()
self._queue = self._manager.Queue(maxsize=self._queue_maxsize)
else:
self._queue = multip.Queue(maxsize=self._queue_maxsize)
self._logger.info('Starting the Storage Queue!')
# Wrap a queue writer around the storage service
queue_handler = QueueStorageServiceWriter(self._storage_service, self._queue,
self._gc_interval)
# Start the queue process
self._queue_process = multip.Process(name='QueueProcess', target=_wrap_handling,
args=(dict(handler=queue_handler,
logging_manager=self._logging_manager,
graceful_exit=self._graceful_exit),))
self._queue_process.start()
# Replace the storage service of the trajectory by a sender.
# The sender will put all data onto the queue.
# The writer from above will receive the data from
# the queue and hand it over to
# the storage service
self._queue_wrapper = QueueStorageServiceSender(self._queue)
self._traj.v_storage_service = self._queue_wrapper
def _prepare_netqueue(self):
""" Replaces the trajectory's service with a queue sender and starts the queue process.
"""
self._logger.info('Starting Network Queue!')
if not isinstance(self._port, str):
url = port_to_tcp(self._port)
self._logger.info('Determined Server URL: `%s`' % url)
else:
url = self._port
if self._queue is None:
if hasattr(os, 'fork'):
self._queue = ForkAwareQueuingClient(url)
else:
self._queue = QueuingClient(url)
# Wrap a queue writer around the storage service
queuing_server_handler = QueuingServer(url,
self._storage_service,
self._queue_maxsize,
self._gc_interval)
# Start the queue process
self._queue_process = multip.Process(name='QueuingServerProcess', target=_wrap_handling,
args=(dict(handler=queuing_server_handler,
logging_manager=self._logging_manager,
graceful_exit=self._graceful_exit),))
self._queue_process.start()
self._queue.start()
# Replace the storage service of the trajectory by a sender.
# The sender will put all data onto the queue.
# The writer from above will receive the data from
# the queue and hand it over to
# the storage service
self._queue_wrapper = QueueStorageServiceSender(self._queue)
self._traj.v_storage_service = self._queue_wrapper
[docs] def finalize(self):
""" Restores the original storage service.
If a queue process and a manager were used both are shut down.
Automatically called when used as context manager.
"""
if (self._wrap_mode == pypetconstants.WRAP_MODE_QUEUE and
self._queue_process is not None):
self._logger.info('The Storage Queue will no longer accept new data. '
'Hang in there for a little while. '
'There still might be some data in the queue that '
'needs to be stored.')
# We might have passed the queue implicitly,
# to be sure we add the queue here again
self._traj.v_storage_service.queue = self._queue
self._traj.v_storage_service.send_done()
self._queue_process.join()
if hasattr(self._queue, 'join'):
self._queue.join()
if hasattr(self._queue, 'close'):
self._queue.close()
if hasattr(self._queue, 'join_thread'):
self._queue.join_thread()
self._logger.info('The Storage Queue has joined.')
elif (self._wrap_mode == pypetconstants.WRAP_MODE_PIPE and
self._pipe_process is not None):
self._logger.info('The Storage Pipe will no longer accept new data. '
'Hang in there for a little while. '
'There still might be some data in the pipe that '
'needs to be stored.')
self._traj.v_storage_service.conn = self._pipe[1]
self._traj.v_storage_service.send_done()
self._pipe_process.join()
self._pipe[1].close()
self._pipe[0].close()
elif (self._wrap_mode == pypetconstants.WRAP_MODE_NETLOCK and
self._lock_process is not None):
self._lock.send_done()
self._lock.finalize()
self._lock_process.join()
elif (self._wrap_mode == pypetconstants.WRAP_MODE_NETQUEUE and
self._queue_process is not None):
self._queue.send_done()
self._queue.finalize()
self._queue_process.join()
if self._manager is not None:
self._manager.shutdown()
self._manager = None
self._queue_process = None
self._queue = None
self._queue_wrapper = None
self._lock = None
self._lock_wrapper = None
self._lock_process = None
self._reference_wrapper = None
self._pipe = None
self._pipe_process = None
self._pipe_wrapper = None
self._logging_manager = None
self._traj._storage_service = self._storage_service
def __del__(self):
self.finalize()