Engine
Engine is the central component of the AIMM system. Its purpose is handling manageable calls to plugins, using the backend when persistence is required and serving its state to all controllers.
Engine’s part of the configuration needs to satisfy the following schema:
---
$schema: 'http://json-schema.org/schema#'
id: 'aimm://server/engine.yaml#'
type: object
required:
- sigterm_timeout
- max_children
- check_children_period
properties:
sigterm_timeout:
type: number
max_children:
type: number
check_children_period:
type: number
...
When engine is started, it queries its backend to access all existing model instances. The instances are then stored in engine’s state. Any component holding a reference to the engine may use it to perform different actions, such as creating model instances, fitting them or using them for predictions. When a new model instance is created, or an old one is updated, state change is notified to any components that have subscribed to state changes. Additionally, calls to the plugins themselves are manageable and engine keeps theirs states in its state as well.
Workflow actions such as fitting or using models are ran asynchronously in separate asyncio tasks. Additionally, any plugin they call is ran in a separate process, wrapped in a handler that allows subscriptions to call’s state (if the plugin call supports state notification). The calls can also be cancelled, which is done using signals - initially SIGTERM and later SIGKILL if a configured timeout expires. Also, to avoid fork bombs, a separate pseudo-pool is implemented to serve as an interface for process creation, disallowing creation of new processes if a certain number of subprocesses is already running.
Engine configuration reflects the multiprocessing nature of the implementation, since all of the options refer to different timeouts and limitations:
sigterm_timeoutis the number of seconds waited after SIGTERM was sent to a process before SIGKILL is sent if it doesn’t terminate
max_childrenis the maximum amount of children - concurrent subprocesses that can run at the same time
check_children_period- the check for children counts is done periodically and this setting indicates how often it is checked
Engine module provides the following interface:
Since there are no strict limitations on the arguments that may be passed to
plugins, i.e., positional and keyword arguments are mostly passed as-is,
callers of the actions have the options of passing different special kinds of
objects as arguments. These objects are interpreted by the engine as subactions
that need to be executed before the main action. E.g., a fitting function may
expect a dataset as an input, and while it is possible to pass the dataset
directly to engine’s Engine.fit() call, the caller could create a
aimm.server.common.DataAccess object and pass it instead. This would
indicate to the engine that it needs to use the data access plugin to access
the required data before fitting. All subactions are also ran in a separate
subprocesses and notify their progress through state.
State
State is a dictionary consisting of two properties, models and actions.
Models are a dictionary with instance IDs as keys and
aimm.server.common.Model instances as values. Actions are also a
dictionary, with the following structure:
---
description: keys are action IDs
patternProperties:
'(.)+':
oneOf:
- type: 'null'
description: prior to start of the action call
- type: object
required:
- meta
- progress
properties:
meta:
type: object
required:
- call
properties:
call:
type: string
description: call that the action is making
model_type:
type: string
model:
type: integer
args:
type: array
kwargs:
type: object
progress:
enum:
- accessing_data
- executing
- complete
data_access:
type: object
description: |
keys represent argument IDs (numbers for
positional, strings for named), values are set by
plugin's state callbacks
action:
description: set by plugin state callback
...
Multiprocessing
The details of the multiprocessing implementation are placed in a separate
module, aimm.server.mprocess. This module is in charge of providing an
interface for managed process calls. The central class of the module is the
aimm.server.mprocess.ProcessManager. Its purpose is similar to one of
a standard multiprocessing.Pool, main difference being that it does
not keep an exact amount of process workers alive at all times and instead
holds an asyncio.Condition that prevents creation of new processes
until the number of children is under the max_children configuration
parameter.
The manager is implemented in the following class:
- class aimm.server.mprocess.ProcessManager(max_children: int, async_group: Group, check_children_period: float, sigterm_timeout: float)
Class used to create
ProcessHandlerobjects and limit the amount of concurrently active child processes.- Parameters:
max_children – maximum number of child processes that may be created
async_group – async group
check_children_period – number of seconds waited before checking if a child process may be created and notifying pending handlers
sigterm_timeout – number of seconds waited before sending SIGKILL if a child process does not terminate after SIGTERM
- property async_group: Group
Group controlling resource’s lifetime.
- create_handler(state_cb: Callable[[Any], None]) ProcessHandler
Creates a ProcessHandler
- Parameters:
state_cb (Callable[List[Any], None]) – state callback for changes in the process call
- Returns:
ProcessHandler
The process calls are wrapped in a aimm.server.mprocess.ProcessHandler
instance, whose interface allows callers to terminate the process call. It also
allows callers to pass their state change callback functions which are called
whenever the process’ state changes.
After calling aimm.server.mprocess.ProcessManager.create_handler() and
receiving a process handler, the call can be made using the
aimm.server.mprocess.ProcessHandler.run() function, which, in reality,
first spawns an asyncio task that blocks until the process manager allows
creation of a new process and only then actually creates a new process.
The state notification is done using callbacks and multiprocessing pipes.
Process handler receives a state_cb argument in its constructor and this is
the function used to notify states to the rest of the system. It also provides
a method proc_notify_state_change, which is a callback passed to the
function running in the separate process. This function uses a
multiprocessing.Pipe object to send function’s state values (need to
be pickle-able). Handlers also have internal state listening loops, running in
the main asyncio event loop, that react to receiving these state changes and
notify the rest of the system using the state_cb passed in the constructor.
Result of the separated process call is also passed through a separate pipe and
set as the result of the aimm.server.mprocess.ProcessHandler.result
property.
The complete class docstring:
- class aimm.server.mprocess.ProcessHandler(async_group: Group, sigterm_timeout: float, state_cb: Callable[[Any], None], condition: Condition)
Handler for calls in separate processes. Created through
ProcessManager.create().- Parameters:
async_group (hat.aio.Group) – async group
sigterm_timeout (float) – time waited until process handles SIGTERM before sending SIGKILL during forced shutdown
state_cb (Optional[Callable[Any]]) – state change cb
condition (asyncio.Condition) – condition that notifies when a new process may be created
- property async_group: Group
Group controlling resource’s lifetime.
- proc_notify_state_change(state: Any)
To be passed to and ran in the separate process call. Notifies the handler of state change, new state is passed to
state_cbreceived in the constructor.- Parameters:
state – call state, needs to be pickleable
- async run(fn: Callable, *args: Any, **kwargs: Any)
Requests the start of function execution in the separate process.
- Parameters:
fn – function that will be called
*args – positional arguments, need to be pickleable
**kwargs – keyword arguments, need to be pickleable