Engine

Engine is the central component of the AIMM system. Its purpose is handling manageable calls to plugins, using the backend when persistence is required and serving its state to all controllers.

Engine’s part of the configuration needs to satisfy the following schema:

---
$schema: 'http://json-schema.org/schema#'
id: 'aimm://server/engine.yaml#'
type: object
required:
    - sigterm_timeout
    - max_children
    - check_children_period
properties:
    sigterm_timeout:
        type: number
    max_children:
        type: number
    check_children_period:
        type: number
...

When engine is started, it queries its backend to access all existing model instances. The instances are then stored in engine’s state. Any component holding a reference to the engine may use it to perform different actions, such as creating model instances, fitting them or using them for predictions. When a new model instance is created, or an old one is updated, state change is notified to any components that have subscribed to state changes. Additionally, calls to the plugins themselves are manageable and engine keeps theirs states in its state as well.

Workflow actions such as fitting or using models are ran asynchronously in separate asyncio tasks. Additionally, any plugin they call is ran in a separate process, wrapped in a handler that allows subscriptions to call’s state (if the plugin call supports state notification). The calls can also be cancelled, which is done using signals - initially SIGTERM and later SIGKILL if a configured timeout expires. Also, to avoid fork bombs, a separate pseudo-pool is implemented to serve as an interface for process creation, disallowing creation of new processes if a certain number of subprocesses is already running.

Engine configuration reflects the multiprocessing nature of the implementation, since all of the options refer to different timeouts and limitations:

  • sigterm_timeout is the number of seconds waited after SIGTERM was sent to a process before SIGKILL is sent if it doesn’t terminate

  • max_children is the maximum amount of children - concurrent subprocesses that can run at the same time

  • check_children_period - the check for children counts is done periodically and this setting indicates how often it is checked

Engine module provides the following interface:

Since there are no strict limitations on the arguments that may be passed to plugins, i.e., positional and keyword arguments are mostly passed as-is, callers of the actions have the options of passing different special kinds of objects as arguments. These objects are interpreted by the engine as subactions that need to be executed before the main action. E.g., a fitting function may expect a dataset as an input, and while it is possible to pass the dataset directly to engine’s Engine.fit() call, the caller could create a aimm.server.common.DataAccess object and pass it instead. This would indicate to the engine that it needs to use the data access plugin to access the required data before fitting. All subactions are also ran in a separate subprocesses and notify their progress through state.

State

State is a dictionary consisting of two properties, models and actions. Models are a dictionary with instance IDs as keys and aimm.server.common.Model instances as values. Actions are also a dictionary, with the following structure:

---
description: keys are action IDs
patternProperties:
    '(.)+':
        oneOf:
          - type: 'null'
            description: prior to start of the action call
          - type: object
            required:
                - meta
                - progress
            properties:
                meta:
                    type: object
                    required:
                        - call
                    properties:
                        call:
                            type: string
                            description: call that the action is making
                        model_type:
                            type: string
                        model:
                            type: integer
                        args:
                            type: array
                        kwargs:
                            type: object
                progress:
                    enum:
                        - accessing_data
                        - executing
                        - complete
                data_access:
                    type: object
                    description: |
                        keys represent argument IDs (numbers for
                        positional, strings for named), values are set by
                        plugin's state callbacks
                action:
                    description: set by plugin state callback
...

Multiprocessing

The details of the multiprocessing implementation are placed in a separate module, aimm.server.mprocess. This module is in charge of providing an interface for managed process calls. The central class of the module is the aimm.server.mprocess.ProcessManager. Its purpose is similar to one of a standard multiprocessing.Pool, main difference being that it does not keep an exact amount of process workers alive at all times and instead holds an asyncio.Condition that prevents creation of new processes until the number of children is under the max_children configuration parameter.

The manager is implemented in the following class:

class aimm.server.mprocess.ProcessManager(max_children: int, async_group: Group, check_children_period: float, sigterm_timeout: float)

Class used to create ProcessHandler objects and limit the amount of concurrently active child processes.

Parameters:
  • max_children – maximum number of child processes that may be created

  • async_group – async group

  • check_children_period – number of seconds waited before checking if a child process may be created and notifying pending handlers

  • sigterm_timeout – number of seconds waited before sending SIGKILL if a child process does not terminate after SIGTERM

property async_group: Group

Group controlling resource’s lifetime.

create_handler(state_cb: Callable[[Any], None]) ProcessHandler

Creates a ProcessHandler

Parameters:

state_cb (Callable[List[Any], None]) – state callback for changes in the process call

Returns:

ProcessHandler

The process calls are wrapped in a aimm.server.mprocess.ProcessHandler instance, whose interface allows callers to terminate the process call. It also allows callers to pass their state change callback functions which are called whenever the process’ state changes.

After calling aimm.server.mprocess.ProcessManager.create_handler() and receiving a process handler, the call can be made using the aimm.server.mprocess.ProcessHandler.run() function, which, in reality, first spawns an asyncio task that blocks until the process manager allows creation of a new process and only then actually creates a new process.

The state notification is done using callbacks and multiprocessing pipes. Process handler receives a state_cb argument in its constructor and this is the function used to notify states to the rest of the system. It also provides a method proc_notify_state_change, which is a callback passed to the function running in the separate process. This function uses a multiprocessing.Pipe object to send function’s state values (need to be pickle-able). Handlers also have internal state listening loops, running in the main asyncio event loop, that react to receiving these state changes and notify the rest of the system using the state_cb passed in the constructor. Result of the separated process call is also passed through a separate pipe and set as the result of the aimm.server.mprocess.ProcessHandler.result property.

The complete class docstring:

class aimm.server.mprocess.ProcessHandler(async_group: Group, sigterm_timeout: float, state_cb: Callable[[Any], None], condition: Condition)

Handler for calls in separate processes. Created through ProcessManager.create().

Parameters:
  • async_group (hat.aio.Group) – async group

  • sigterm_timeout (float) – time waited until process handles SIGTERM before sending SIGKILL during forced shutdown

  • state_cb (Optional[Callable[Any]]) – state change cb

  • condition (asyncio.Condition) – condition that notifies when a new process may be created

property async_group: Group

Group controlling resource’s lifetime.

proc_notify_state_change(state: Any)

To be passed to and ran in the separate process call. Notifies the handler of state change, new state is passed to state_cb received in the constructor.

Parameters:

state – call state, needs to be pickleable

async run(fn: Callable, *args: Any, **kwargs: Any)

Requests the start of function execution in the separate process.

Parameters:
  • fn – function that will be called

  • *args – positional arguments, need to be pickleable

  • **kwargs – keyword arguments, need to be pickleable