Top

csiro_workspace.workspace module

This module contains definitions of the Workspace and WatchList classes; all the things necessary to create, execute and interact with Workspace workflows in Python.

Importantly, alongside this module is a file called workspace.cfg which contains the configuration information required to use this module. Among other things, this file contains configuration options to control which copy of Workspace to use when running workflows, the port on which TCP communication to running Workspace processes should occur, and the log-level used by each process.

Copyright 2015 by:

Commonwealth Scientific and Industrial Research Organisation (CSIRO)

This file is licensed by CSIRO under the copy of the CSIRO Open Source Software License Agreement included with the file when downloaded or obtained from CSIRO (including any Supplementary License). If no copy was included, you must obtain a new copy of the Software from CSIRO before any use is permitted.

For further information, contact: workspace@csiro.au

This copyright notice must be included with all copies of the source code.

"""
This module contains definitions of the `Workspace` and `WatchList` classes;
all the things necessary to create, execute and interact with Workspace
workflows in Python.

Importantly, alongside this module is a file called
_workspace.cfg_ which contains the configuration information required to use
this module. Among other things, this file contains configuration options to
control which copy of Workspace to use when running workflows, the port on
which TCP communication to running Workspace processes should occur, and the
log-level used by each process.

Copyright 2015 by:

Commonwealth Scientific and Industrial Research Organisation (CSIRO)

This file is licensed by CSIRO under the copy of the CSIRO Open Source Software
License Agreement included with the file when downloaded or obtained
from CSIRO (including any Supplementary License). If no copy was
included, you must obtain a new copy of the Software from CSIRO before
any use is permitted.

For further information, contact: workspace@csiro.au

This copyright notice must be included with all copies of the source code.
"""

from ctypes import cdll, byref, c_char, c_char_p, c_int, c_void_p, c_bool, Structure, pointer, POINTER, CFUNCTYPE
import copy
import subprocess
import atexit
import uuid
import json
import datetime
import os.path

class _WORKSPACE_ID(Structure):
    """
    Struct for our WorkspaceId type. Maps to a C struct in the WorkspaceWeb
    shared library by extending ctypes.Structure.
    """
    _fields_ = [("key",  c_int),
                ("port", c_int),
                ("host", c_char * 255)]

    # Returns the key in the correct format. We don't want it as a c_int.
    def getKey(self):
        return int(self.key)


# Load our Workspace config file
_ws_config_file = open(os.path.dirname(__file__) + '/workspace.cfg', 'r')
_ws_config = json.load(_ws_config_file)

# Function types for our C++ code to call back into
LOOPSTARTFUNC = CFUNCTYPE(c_int)
CONNFUNC      = CFUNCTYPE(c_int, POINTER(_WORKSPACE_ID))
SUCCESSFUNC   = CFUNCTYPE(c_int, POINTER(_WORKSPACE_ID))
FAILFUNC      = CFUNCTYPE(c_int, POINTER(_WORKSPACE_ID))
ERRORFUNC     = CFUNCTYPE(c_int, POINTER(_WORKSPACE_ID), c_char_p)
WATCHFUNC     = CFUNCTYPE(c_int, POINTER(_WORKSPACE_ID), c_char_p)
LISTFUNC      = CFUNCTYPE(c_int, POINTER(_WORKSPACE_ID), c_char_p)

# Our C++ function references
LibWorkspaceWeb                       = cdll.LoadLibrary(_ws_config['workspace_install_dir'] + '/lib/libworkspaceweb.dylib')
server_init                           = LibWorkspaceWeb.server_init
server_listen_for_connection_and_wait = LibWorkspaceWeb.server_listen_for_connection_and_wait
server_start_event_loop               = LibWorkspaceWeb.server_start_event_loop
server_stop_event_loop                = LibWorkspaceWeb.server_stop_event_loop
workspace_register_func_success       = LibWorkspaceWeb.workspace_register_func_success
workspace_register_func_failed        = LibWorkspaceWeb.workspace_register_func_failed
workspace_register_func_error         = LibWorkspaceWeb.workspace_register_func_error
server_poll                           = LibWorkspaceWeb.server_poll
workspace_run_once                    = LibWorkspaceWeb.workspace_run_once
workspace_run_continuously            = LibWorkspaceWeb.workspace_run_continuously
workspace_terminate                   = LibWorkspaceWeb.workspace_terminate
workspace_set_input                   = LibWorkspaceWeb.workspace_set_input
workspace_set_global_name             = LibWorkspaceWeb.workspace_set_global_name
workspace_list_inputs                 = LibWorkspaceWeb.workspace_list_inputs
workspace_list_outputs                = LibWorkspaceWeb.workspace_list_outputs
workspace_list_global_names           = LibWorkspaceWeb.workspace_list_global_names
workspace_watch                       = LibWorkspaceWeb.workspace_watch
workspace_cancel_watch                = LibWorkspaceWeb.workspace_cancel_watch
workspace_stop                        = LibWorkspaceWeb.workspace_stop

def _initCInterface():
    """
    Initialises our C++ function references, assigning the correct parameter
    types and return types.
    """
    server_init.restype = c_int
    server_init.argtypes = [c_int]
    server_listen_for_connection_and_wait.restype = c_int
    server_listen_for_connection_and_wait.argtypes = [c_char_p, c_int, CONNFUNC]
    server_start_event_loop.restype = c_int
    server_start_event_loop.argtypes = [LOOPSTARTFUNC]
    server_stop_event_loop.restype = c_int
    workspace_register_func_success.restype = c_int
    workspace_register_func_success.argtypes = [POINTER(_WORKSPACE_ID), SUCCESSFUNC]
    workspace_register_func_failed.restype = c_int
    workspace_register_func_failed.argtypes = [POINTER(_WORKSPACE_ID), FAILFUNC]
    workspace_register_func_error.restype = c_int
    workspace_register_func_error.argtypes = [POINTER(_WORKSPACE_ID), ERRORFUNC]
    server_poll.restype = c_int
    server_poll.argtypes = [c_int]
    workspace_run_once.restype = c_int
    workspace_run_once.argtypes = [POINTER(_WORKSPACE_ID)]
    workspace_run_continuously.restype = c_int
    workspace_run_continuously.argtypes = [POINTER(_WORKSPACE_ID)]
    workspace_terminate.restype = c_int
    workspace_terminate.argtypes = [POINTER(_WORKSPACE_ID)]
    workspace_set_input.restype = c_int
    workspace_set_input.argtypes = [POINTER(_WORKSPACE_ID), c_char_p, c_char_p]
    workspace_set_global_name.restype = c_int
    workspace_set_global_name.argtypes = [POINTER(_WORKSPACE_ID), c_char_p, c_char_p]
    workspace_list_inputs.restype = c_int
    workspace_list_inputs.argtypes = [POINTER(_WORKSPACE_ID), LISTFUNC]
    workspace_list_outputs.restype = c_int
    workspace_list_outputs.argtypes = [POINTER(_WORKSPACE_ID), LISTFUNC]
    workspace_list_global_names.restype = c_int
    workspace_list_global_names.argtypes = [POINTER(_WORKSPACE_ID), LISTFUNC]
    workspace_watch.restype = c_int
    workspace_watch.argtypes = [POINTER(_WORKSPACE_ID), c_char_p, WATCHFUNC]
    workspace_cancel_watch.restype = c_int
    workspace_cancel_watch.argtypes = [POINTER(_WORKSPACE_ID), c_char_p]
    workspace_stop.restype = c_int
    workspace_stop.argtypes = [POINTER(_WORKSPACE_ID)]

    # Initialise the server
    server_init(_ws_config['log_level'])


class IONotExistsError(Exception):
    """
    Exception for when a named input, output or global name doesn't exist.
    """
    def __init__(self, name):
        self._name = name

    def __str__(self):
        return 'ERROR: Input/Output/GlobalName "%s" does not exist.' % self._name

    @property
    def name(self):
        """
        Returns the name of the input, output or global name that does not
        exist.
        """
        return _name


class WatchList(object):
    """
    Represents a list of inputs, outputs or globalnames to watch in the running
    Workspace. Once one of these has been created, it is passed to the method
    `Workspace.watch()` which will monitor the specific inputs/outputs for
    updates. Wraps the C-API's WatchList class to manage scoped deletion etc.
    """
    @classmethod
    def fromIONames(cls, inputs=[], outputs=[], globalNames=[]):
        """
        Constructs a new WatchList object, where `inputs` is the list of input
        names, `outputs` is the list of output names and `globalNames` is the
        list of global names to watch.
        """
        id = str(uuid.uuid4())
        inputsDict = {}
        for name in inputs:
            inputsDict[name] = {}
        outputsDict = {}
        for name in outputs:
            outputsDict[name] = {}
        globalNamesDict = {}
        for name in globalNames:
            globalNamesDict[name] = {}

        return cls(id, inputsDict, outputsDict, globalNamesDict)

    @classmethod
    def fromJson(cls, jsonStr):
        """
        Constructs a new WatchList object from the json contained in `jsonStr`.
        An example of a valid json object is:

            {
                "id": "1lFDS-12314-VBAVD-1241-ADFS",
                "inputs": {
                    "input1": {
                        "type": "double",
                        "value": 0.1
                    },
                },
                "outputs": {
                    "output1": {
                        "type": "double",
                        "value": 3.4
                    },
                },
                "globalNames": {
                    "global1": {
                        "type": "QString",
                        "value": "Hello Workspace!"
                    },
                }
            }

        Note that when creating a WatchList object for the purposes of creating
        a new watch (i.e. with the `Workspace.watch()` method), the `type` and
        `value` members of each input, output or globalName are not required.
        Also note that the `id` member is crucial, as this is used to globally
        identify the WatchList. If an `id` parameter is not present in the
        string, `None` will be returned.
        """
        wl = json.loads(jsonStr)
        if 'id' not in wl.keys():
            return None

        return cls(wl['id'], wl['inputs'] if 'inputs' in wl.keys() else {},
                             wl['outputs'] if 'outputs' in wl.keys() else {},
                             wl['globalNames'] if 'globalNames' in wl.keys() else {})

    def __init__(self, id, inputs, outputs, globalNames):
        """
        Constructs a watchlist using `inputs`, `outputs` and `globalNames`, all
        of which are of type `dict`.
        """
        self._id = id
        self._inputs = inputs
        self._outputs = outputs
        self._globalNames = globalNames

    def __str__(self):
        """
        Return the WatchList in JSON format.
        """
        return json.dumps(self.asDict())

    def asDict(self):
        """
        Return the WatchList as a dictionary.
        """
        wl = dict()
        wl['id'] = self._id
        wl['inputs'] = self._inputs
        wl['outputs'] = self._outputs
        wl['globalNames'] = self._globalNames
        return wl

    @property
    def id(self):
        """
        Returns the unique identifier for this watch list.
        """
        return self._id

    @property
    def inputs(self):
        """
        Returns a list of inputs, where each input is a dictionary with
        the members `name`, `value` and `type`.
        """
        return self._inputs

    @property
    def outputs(self):
        """
        Returns a list of outputs, where each output is a dictionary with
        the members `name1, `value` and `type`.
        """
        return self._outputs

    @property
    def globalNames(self):
        """
        Returns a list of globalNames, where each output is a dictionary with
        the members `name`, `value` and `type`.
        """
        return self._globalNames


class _WatchCallback(object):
    """
    Callback wrapper for watching an input, output or globalname in a
    Workspace. Can specify the autodelete parameter to control whether or not
    the callback is automatically deleted after it is invoked.
    """
    def __init__(self, workspace, watchId, callback, autodelete=True):
        """
        Constructs a new _WatchCallback. When triggered in response to a watch
        event from the target `workspace`, the function `callback` is invoked
        and provided the `workspace` and a `WatchList` object as arguments.

        The `watchId` must be a globally unique identifier (usually a uuid), that
        uniquely identifies the set of watched inputs/outputs/globalNames and
        the associated callback.
        """
        self.workspace = workspace
        self.watchId = watchId
        self.callback = callback
        self.autodelete = autodelete

    def __call__(self, watchList):
        """
        Triggered when the watch idendified by the stored watchId is brought
        up-to-date in the associated Workspace workflow. Will invoke the stored
        callback function, providing it the original `Workspace` object and a
        `WatchList` object as arguments.
        """
        result = self.callback(self.workspace, watchList)
        if self.autodelete:
            self.workspace._removeWatch(self.watchId)
        return result


class Workspace:
    """
    Represents an instance of a Workspace workflow. Users create an instance of
    this class for each instance of a Workspace workflow (.wsx) file that they
    wish to execute. Behind the scenes, Workspaces are executed in a separate
    process, and this object communicates with it via TCP/IP. For this reason,
    the user provides a callback function when creating a new Workspace
    instance, which is invoked only after the instance has been connected to
    successfully.

    Once the Workspace instance is connected, the user is able to set input
    and globalName values using the `setInput` and `setGlobalName` methods,
    execute it using the `runOnce` or `runContinuously` methods, and
    monitor specific inputs, outputs or globalNames by using the `watch`
    method. Similarly, lists of inputs, outputs or globalNames can be retrieved
    by using the `listInputs`, `listOutputs` or `listGlobalNames` methods.

    To take action when the Workspace instance successfully completes its
    execution, fails to execute, or aborts due to an error, provide callback
    functions to the `onSuccess`, `onFailed` or `onError` methods.

    It is important to note that the Workspace class does not allow any
    interaction in a synchronous manner. This ensures that all interactions
    with a running Workspace workflow are safe. Therefore, users always
    interact with Workspace instances via callback functions. Importantly,
    since each running Workspace instance runs in its own separate process, the
    application must periodically check each process to see whether it has
    posted any updates. Calling code can manage this using either of two
    different methods. Users can either:

    - Use the static `startEventLoop` and `stopEventLoop` functions to
      conveniently create an event loop which will monitor workflows and
      notify each Workspace object appropriately, or
    - Use the static `poll` function to check for updates to all running
      Workspace instances. This function invokation could (for example)
      be embedded in your own event loop code elsewhere, such as within
      a python-based web-server, invoked at a frequency of your choosing.

    This is important, as if neither of these methods is followed, no
    callback responses (e.g. from `watch` or `list` requests) will ever be
    received from running Workspace instances.
    """

    # Variables used for ensuring that processes are terminated, even if
    # something goes wrong during the terminate communication process (e.g.
    # child process is frozen)
    _SERVER_ADDRESS = '127.0.0.1'
    _terminating_processes = []
    _registered_workspaces = {}
    _event_loop_running    = False

    @staticmethod
    def _atexit():
        """
        This static method is always invoked when the Workspace module
        terminates. It ensures that the event loop has stopped, and is
        guaranteed to shut down any running Workspace instances.
        """
        if Workspace._event_loop_running:
            Workspace.stopEventLoop()

        # Don't forget we also need to make sure all our workspace processes
        # are shut down, since we started them!
        for key in Workspace._registered_workspaces.keys():
            Workspace._registered_workspaces[key].terminate()

    @staticmethod
    def startEventLoop(onStartFunc):
        """
        Used to start the event loop, if one is needed. For web-based
        applications, it's recommended to instead use the server's event loop
        to repeatedly call `poll()` rather than starting this event loop.
        For a simple command line application, the event loop will be required in
        order to repeatedly pool for updates, but again, a python-based event
        loop that repeatedly invokes `poll()` could be used instead.

        The `onStartFunc` parameter is a callback that will be invoked as soon
        as the event loop has been successfully started.

        *Note:* failure to stop the event loop will cause the application to
        hang on exit.
        """
        server_start_event_loop(LOOPSTARTFUNC(onStartFunc))
        Workspace._event_loop_running = True

    @staticmethod
    def stopEventLoop():
        """
        Stops the event loop if it is running.
        """
        server_stop_event_loop()

    @staticmethod
    def poll(timeoutMs=0):
        """
        Static method for polling the client applications to determine what's
        updated. If any watch events have occurred since the time this method
        was last invoked, all of these watch events will be triggered.

        The `timeoutMs` parameter should be a number representing how long the
        method should wait until returning in the case that there are no new
        updates available.
        """
        server_poll(timeoutMs)

        # Each time we poll, we iterate over the list of existing terminating
        # procesess and kill them if they've been taking too long to shut down.
        # If a process has already been shutdown correctly, we just remove it
        # from the list.
        for procRef in Workspace._terminating_processes:
            ws = procRef[1]
            timeTerminated = procRef[0]
            if None == ws._process.poll():
                if (datetime.datetime.now() - timeTerminated).seconds > _ws_config['terminate_timeout_sec']:
                    ws._process.kill()
                    ws._cleanup()
                    Workspace._terminating_processes.remove(procRef)
            else:
                Workspace._terminating_processes.remove(procRef)
                ws._cleanup()

    def _createConnectedCallback(self, onConnected):
        """
        Factory method to create an success callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId):
            self._id = copy.deepcopy(workspaceId.contents)

            # Register the workspace itself so that it can be cleaned up later
            # if need be.
            Workspace._registered_workspaces[self.id] = self

            # Register success, failed and error callbacks. Make sure to store them
            # in the local workspace, otherwise they'll get garbage collected
            # before being invoked.
            workspace_register_func_success(workspaceId, self._successCallback)
            workspace_register_func_failed(workspaceId, self._failedCallback)
            workspace_register_func_error(workspaceId, self._errorCallback)

            # Invoke our callback for when a process has connected successfully
            return onConnected(self)
        return CONNFUNC(callback)

    def _createSuccessCallback(self):
        """
        Factory method to create an success callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId):
            try:
                return self._onSuccessFunc(self)
            except:
                return True
        return SUCCESSFUNC(callback)

    def _createFailedCallback(self):
        """
        Factory method to create an failure callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId):
            try:
                return self._onFailedFunc(self)
            except:
                return True
        return FAILFUNC(callback)

    def _createErrorCallback(self):
        """
        Factory method to create an error callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId, errorMessage):
            try:
                return self._onErrorFunc(self, errorMessage)
            except:
                return True
        return ERRORFUNC(callback)

    def _create_WatchCallback(self):
        """
        Factory method to create an watch callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId, watchListStr):
            wl = WatchList.fromJson(watchListStr)
            if wl and wl.id in self._watches.keys():
                return self._watches[wl.id](wl)
        return WATCHFUNC(callback)

    def _createListCallback(self, ioType):
        """
        Factory method to create a callback function to invoke when a request
        for a list of inputs / outputs / globalNames is made. As with the other
        callback methods, 'self' is available because the callback is a closure.
        """
        def callback(workspaceId, ioListStr):
            ioList = WatchList.fromJson(ioListStr)
            result = self._listRequests[ioType](self, ioList)
            del self._listRequests[ioType]
            return result
        return LISTFUNC(callback)

    def _removeWatch(self, watchId):
        """
        Removes a specific watch callback. Generally this is only used by the
        _WatchCallback class to remove itself when it is in 'autodelete' mode.
        """
        del self._watches[watchId]

    def _cleanup(self):
        """
        Clean up the workspace after it has terminated. We need to do this to
        make sure we safely delete all of our closures while they are not in
        call. If we don't delete them, the workspace object's reference count
        will never reach zero.
        """
        self._process = None
        del self._connectedCallback
        del self._successCallback
        del self._failedCallback
        del self._errorCallback
        del self._watchCallback
        del self._listCallbackInputs
        del self._listCallbackOutputs
        del self._listCallbackGlobalNames
        del Workspace._registered_workspaces[self.id]

    def __init__(self, fileName, onConnected):
        """
        Constructs a new Workspace instance, creating a subprocess of the
        workspace-web (C++) application. All actions performed on a Workspace
        instance are communicated to this new workspace-web process using the
        underlying libworkspaceweb C interface (and ctypes).

        The `fileName` parameter should be a path (or URL) to a Workspace workflow
        (.wsx) file, and the `onConnected` parameter is a callback function
        that will be invoked once the connection to the newly created
        workspace-web process is successful. It is important to note that until
        the connection is successful, all attempts to communicate with the
        Workspace instance via this class' member functions will fail.
        """
        self._fileName = fileName
        self._watches = dict()
        self._listRequests = dict()

        # Use our factory methods to create all our callback functions. These
        # callback functions are used to forward on to the functions that the
        # user registers.
        self._connectedCallback = self._createConnectedCallback(onConnected)
        self._successCallback = self._createSuccessCallback()
        self._failedCallback = self._createFailedCallback()
        self._errorCallback = self._createErrorCallback()
        self._watchCallback = self._create_WatchCallback()
        self._listCallbackInputs = self._createListCallback('inputs')
        self._listCallbackOutputs = self._createListCallback('outputs')
        self._listCallbackGlobalNames = self._createListCallback('globalNames')

        # Start our actual child process. We start it first since it's
        # asynchronous, whereas our server isn't (since we don't have an event loop)
        self._process = subprocess.Popen([
            _ws_config['workspace_install_dir'] + '/bin/workspace-web',
            fileName,
            '--port', '%d' % _ws_config['connection_port'],
            '--log-level', '%d' % _ws_config['log_level']
        ])

        # Listen to connections from our new process.
        success = server_listen_for_connection_and_wait(Workspace._SERVER_ADDRESS, _ws_config['connection_port'], self._connectedCallback)
        if not success:
            raise RuntimeError('Failed to connect to Workspace process running "%s"' % fileName)

    @property
    def id(self):
        """
        Returns a string representing the unique identifier of this Workspace
        instance.
        """
        return self._id.getKey()

    @property
    def fileName(self):
        """
        Returns the file name of the Workspace being invoked. This may be a url.
        """
        return self._fileName

    @fileName.setter
    def fileName(self, fileName):
        """
        Assigns the `fileName` to this Workspace instance.
        """
        self._fileName = fileName

    def runOnce(self):
        """
        Requests that the Workspace instance be executed once only until it
        either completes or fails. It is the responsibility of the user to
        re-execute it as required.
        """
        workspace_run_once(byref(self._id))

    def runContinuously(self):
        """
        Requests that the Workspace instance be executed in continuous mode.
        This means that the underlying workflow will run until it complete, or
        fails, and then wait for user input. As soon as an input or globalName
        is updated via the `setInput` or `setGlobalName` methods, the workflow
        will re-execute the affected parts of the workflow.
        """
        workspace_run_continuously(byref(self._id))

    def stop(self):
        """
        Requests that the Workspace instance stop executing (though it does not
        guarantee that it will stop immediately). Once stopped, a Workspace
        instance can resume execution through use of the `runOnce` or
        `runContinously` methods. This is different from the `terminate` method
        which will terminate the entire process.

        __*Note:* This method is asynchronous__
        """
        workspace_stop(byref(self._id))

    def terminate(self):
        """
        Requests that the workspace-web suprocess shut down immediately.
        """
        if not self._process:
            return

        # Calling this will tell the process to terminate itself.
        success = workspace_terminate(byref(self._id))

        # Before we get rid of the process reference, store it in the queue of
        # terminating processes so that we can get kill it if it doesn't
        # promptly terminate itself.
        Workspace._terminating_processes.append((datetime.datetime.now(), self))

    def setInput(self, inputName, content):
        """
        Assigns `content` to the top-level input named `inputName` on the Workspace.
        If the Workspace is currently executing, this will be applied as soon as
        it is safe to do so.

        The `content` parameter must contain the serialized data appropriate to the
        underyling data type of the input. For example, if the input is a double,
        a string representing a floating-point number is required. If the input
        is of a more complex type, such as a DataCollection, then `content`
        must contain the serialized XML that can be read into this datatype.

        __*Note:* This method is asynchronous__
        """
        return workspace_set_input(byref(self._id), inputName, str(content))

    def setGlobalName(self, globalName, content):
        """
        Assigns `content` to the input with the attached global name
        `globalName`. If the Workspace is currently executing, this will
        be applied as soon as it is safe to do so.

        The `content` parameter must contain the serialized data appropriate to the
        underyling data type of the input. For example, if the input is a double,
        a string representing a floating-point number is required. If the input
        is of a more complex type, such as a DataCollection, then `content`
        must contain the serialized XML that can be read into this datatype.

        __*Note:* This method is asynchronous__
        """
        return workspace_set_global_name(byref(self._id), globalName, str(content))

    def watch(self, callback, watchList, autoDelete=True):
        """
        Sets up a watch on the specified `watchList`, which must be an object
        of the `WatchList` type. When all of the inputs, outputs and
        globalNames in the watch list are brought up-to-date by the running
        workflow, the `callback` function will be triggered, and passed as
        arguments a reference to this `Workspace` object, as well as the `WatchList`
        containing the name, type and value of each watched item.

        __*Note:* This method is asynchronous__
        """
        self._watches[watchList.id] = _WatchCallback(self, watchList.id, callback, autoDelete)
        if workspace_watch(byref(self._id), str(watchList), self._watchCallback, autoDelete):
            return watchList.id
        return None

    def cancelWatch(self, watchId):
        """
        Cancels a (non-single-shot) watch request, by de-registering the
        existing callback associated with the `watchId`.
        """
        self._removeWatch(watchId)
        workspace_cancel_watch(byref(self._id), watchId)

    def listInputs(self, callback):
        """
        Requests a list of inputs from the running Workspace, and invokes the
        `callback`, passing it itself and a `WatchList` object containing the results.

        __*Note:* This method is asynchronous__
        """
        self._listRequests['inputs'] = callback
        return workspace_list_inputs(byref(self._id), self._listCallbackInputs);

    def listOutputs(self, callback):
        """
        Requests a list of outputs from the running Workspace, and invokes the
        `callback`, passing it itself and a `WatchList` object containing the results.

        __*Note:* This method is asynchronous__
        """
        self._listRequests['outputs'] = callback
        return workspace_list_outputs(byref(self._id), self._listCallbackOutputs);

    def listGlobalNames(self, callback):
        """
        Requests a list of globalnames from the running Workspace and invokes
        the `callback`, passing itself and a `WatchList` object containing the results.

        __*Note:* This method is asynchronous__
        """
        self._listRequests['globalNames'] = callback
        return workspace_list_global_names(byref(self._id), self._listCallbackGlobalNames)

    def onSuccess(self, callback):
        """
        Assign a `callback` function to invoke when the workflow successfully
        completes execution. The callback function is passed a reference to this
        Workspace object as an argument.

        __*Note:* This method is asynchronous__
        """
        self._onSuccessFunc = callback

    def onFailed(self, callback):
        """
        Assign a `callback` function to invoke when the workflow fails to
        execute. The callback function is passed a reference to this
        Workspace object as an argument.

        __*Note:* This method is asynchronous__
        """
        self._onFailedFunc = callback

    def onError(self, callback):
        """
        Assign a `callback` function to invoke when an error occurs in the
        workflow. The callback function is provided a reference to this
        Workspace object, and a string containing a description of the specific
        error message, as arguments.

        __*Note:* This method is asynchronous__
        """
        self._onErrorFunc = callback

# Prior to using the module, all our c functions need to be initialised.
_initCInterface()

# Make sure that on quit() or exit() calls, all our subprocesses are shut down.
atexit.register(Workspace._atexit)

Classes

class IONotExistsError

Exception for when a named input, output or global name doesn't exist.

class IONotExistsError(Exception):
    """
    Exception for when a named input, output or global name doesn't exist.
    """
    def __init__(self, name):
        self._name = name

    def __str__(self):
        return 'ERROR: Input/Output/GlobalName "%s" does not exist.' % self._name

    @property
    def name(self):
        """
        Returns the name of the input, output or global name that does not
        exist.
        """
        return _name

Ancestors (in MRO)

  • IONotExistsError
  • exceptions.Exception
  • exceptions.BaseException
  • __builtin__.object

Class variables

var args

var message

Instance variables

var name

Returns the name of the input, output or global name that does not exist.

Methods

def __init__(

self, name)

def __init__(self, name):
    self._name = name

class WatchList

Represents a list of inputs, outputs or globalnames to watch in the running Workspace. Once one of these has been created, it is passed to the method Workspace.watch() which will monitor the specific inputs/outputs for updates. Wraps the C-API's WatchList class to manage scoped deletion etc.

class WatchList(object):
    """
    Represents a list of inputs, outputs or globalnames to watch in the running
    Workspace. Once one of these has been created, it is passed to the method
    `Workspace.watch()` which will monitor the specific inputs/outputs for
    updates. Wraps the C-API's WatchList class to manage scoped deletion etc.
    """
    @classmethod
    def fromIONames(cls, inputs=[], outputs=[], globalNames=[]):
        """
        Constructs a new WatchList object, where `inputs` is the list of input
        names, `outputs` is the list of output names and `globalNames` is the
        list of global names to watch.
        """
        id = str(uuid.uuid4())
        inputsDict = {}
        for name in inputs:
            inputsDict[name] = {}
        outputsDict = {}
        for name in outputs:
            outputsDict[name] = {}
        globalNamesDict = {}
        for name in globalNames:
            globalNamesDict[name] = {}

        return cls(id, inputsDict, outputsDict, globalNamesDict)

    @classmethod
    def fromJson(cls, jsonStr):
        """
        Constructs a new WatchList object from the json contained in `jsonStr`.
        An example of a valid json object is:

            {
                "id": "1lFDS-12314-VBAVD-1241-ADFS",
                "inputs": {
                    "input1": {
                        "type": "double",
                        "value": 0.1
                    },
                },
                "outputs": {
                    "output1": {
                        "type": "double",
                        "value": 3.4
                    },
                },
                "globalNames": {
                    "global1": {
                        "type": "QString",
                        "value": "Hello Workspace!"
                    },
                }
            }

        Note that when creating a WatchList object for the purposes of creating
        a new watch (i.e. with the `Workspace.watch()` method), the `type` and
        `value` members of each input, output or globalName are not required.
        Also note that the `id` member is crucial, as this is used to globally
        identify the WatchList. If an `id` parameter is not present in the
        string, `None` will be returned.
        """
        wl = json.loads(jsonStr)
        if 'id' not in wl.keys():
            return None

        return cls(wl['id'], wl['inputs'] if 'inputs' in wl.keys() else {},
                             wl['outputs'] if 'outputs' in wl.keys() else {},
                             wl['globalNames'] if 'globalNames' in wl.keys() else {})

    def __init__(self, id, inputs, outputs, globalNames):
        """
        Constructs a watchlist using `inputs`, `outputs` and `globalNames`, all
        of which are of type `dict`.
        """
        self._id = id
        self._inputs = inputs
        self._outputs = outputs
        self._globalNames = globalNames

    def __str__(self):
        """
        Return the WatchList in JSON format.
        """
        return json.dumps(self.asDict())

    def asDict(self):
        """
        Return the WatchList as a dictionary.
        """
        wl = dict()
        wl['id'] = self._id
        wl['inputs'] = self._inputs
        wl['outputs'] = self._outputs
        wl['globalNames'] = self._globalNames
        return wl

    @property
    def id(self):
        """
        Returns the unique identifier for this watch list.
        """
        return self._id

    @property
    def inputs(self):
        """
        Returns a list of inputs, where each input is a dictionary with
        the members `name`, `value` and `type`.
        """
        return self._inputs

    @property
    def outputs(self):
        """
        Returns a list of outputs, where each output is a dictionary with
        the members `name1, `value` and `type`.
        """
        return self._outputs

    @property
    def globalNames(self):
        """
        Returns a list of globalNames, where each output is a dictionary with
        the members `name`, `value` and `type`.
        """
        return self._globalNames

Ancestors (in MRO)

Instance variables

var globalNames

Returns a list of globalNames, where each output is a dictionary with the members name, value and type.

var id

Returns the unique identifier for this watch list.

var inputs

Returns a list of inputs, where each input is a dictionary with the members name, value and type.

var outputs

Returns a list of outputs, where each output is a dictionary with the members name1,valueandtype`.

Methods

def __init__(

self, id, inputs, outputs, globalNames)

Constructs a watchlist using inputs, outputs and globalNames, all of which are of type dict.

def __init__(self, id, inputs, outputs, globalNames):
    """
    Constructs a watchlist using `inputs`, `outputs` and `globalNames`, all
    of which are of type `dict`.
    """
    self._id = id
    self._inputs = inputs
    self._outputs = outputs
    self._globalNames = globalNames

def asDict(

self)

Return the WatchList as a dictionary.

def asDict(self):
    """
    Return the WatchList as a dictionary.
    """
    wl = dict()
    wl['id'] = self._id
    wl['inputs'] = self._inputs
    wl['outputs'] = self._outputs
    wl['globalNames'] = self._globalNames
    return wl

def fromIONames(

cls, inputs=[], outputs=[], globalNames=[])

Constructs a new WatchList object, where inputs is the list of input names, outputs is the list of output names and globalNames is the list of global names to watch.

@classmethod
def fromIONames(cls, inputs=[], outputs=[], globalNames=[]):
    """
    Constructs a new WatchList object, where `inputs` is the list of input
    names, `outputs` is the list of output names and `globalNames` is the
    list of global names to watch.
    """
    id = str(uuid.uuid4())
    inputsDict = {}
    for name in inputs:
        inputsDict[name] = {}
    outputsDict = {}
    for name in outputs:
        outputsDict[name] = {}
    globalNamesDict = {}
    for name in globalNames:
        globalNamesDict[name] = {}
    return cls(id, inputsDict, outputsDict, globalNamesDict)

def fromJson(

cls, jsonStr)

Constructs a new WatchList object from the json contained in jsonStr. An example of a valid json object is:

{
    "id": "1lFDS-12314-VBAVD-1241-ADFS",
    "inputs": {
        "input1": {
            "type": "double",
            "value": 0.1
        },
    },
    "outputs": {
        "output1": {
            "type": "double",
            "value": 3.4
        },
    },
    "globalNames": {
        "global1": {
            "type": "QString",
            "value": "Hello Workspace!"
        },
    }
}

Note that when creating a WatchList object for the purposes of creating a new watch (i.e. with the Workspace.watch() method), the type and value members of each input, output or globalName are not required. Also note that the id member is crucial, as this is used to globally identify the WatchList. If an id parameter is not present in the string, None will be returned.

@classmethod
def fromJson(cls, jsonStr):
    """
    Constructs a new WatchList object from the json contained in `jsonStr`.
    An example of a valid json object is:
        {
            "id": "1lFDS-12314-VBAVD-1241-ADFS",
            "inputs": {
                "input1": {
                    "type": "double",
                    "value": 0.1
                },
            },
            "outputs": {
                "output1": {
                    "type": "double",
                    "value": 3.4
                },
            },
            "globalNames": {
                "global1": {
                    "type": "QString",
                    "value": "Hello Workspace!"
                },
            }
        }
    Note that when creating a WatchList object for the purposes of creating
    a new watch (i.e. with the `Workspace.watch()` method), the `type` and
    `value` members of each input, output or globalName are not required.
    Also note that the `id` member is crucial, as this is used to globally
    identify the WatchList. If an `id` parameter is not present in the
    string, `None` will be returned.
    """
    wl = json.loads(jsonStr)
    if 'id' not in wl.keys():
        return None
    return cls(wl['id'], wl['inputs'] if 'inputs' in wl.keys() else {},
                         wl['outputs'] if 'outputs' in wl.keys() else {},
                         wl['globalNames'] if 'globalNames' in wl.keys() else {})

class Workspace

Represents an instance of a Workspace workflow. Users create an instance of this class for each instance of a Workspace workflow (.wsx) file that they wish to execute. Behind the scenes, Workspaces are executed in a separate process, and this object communicates with it via TCP/IP. For this reason, the user provides a callback function when creating a new Workspace instance, which is invoked only after the instance has been connected to successfully.

Once the Workspace instance is connected, the user is able to set input and globalName values using the setInput and setGlobalName methods, execute it using the runOnce or runContinuously methods, and monitor specific inputs, outputs or globalNames by using the watch method. Similarly, lists of inputs, outputs or globalNames can be retrieved by using the listInputs, listOutputs or listGlobalNames methods.

To take action when the Workspace instance successfully completes its execution, fails to execute, or aborts due to an error, provide callback functions to the onSuccess, onFailed or onError methods.

It is important to note that the Workspace class does not allow any interaction in a synchronous manner. This ensures that all interactions with a running Workspace workflow are safe. Therefore, users always interact with Workspace instances via callback functions. Importantly, since each running Workspace instance runs in its own separate process, the application must periodically check each process to see whether it has posted any updates. Calling code can manage this using either of two different methods. Users can either:

  • Use the static startEventLoop and stopEventLoop functions to conveniently create an event loop which will monitor workflows and notify each Workspace object appropriately, or
  • Use the static poll function to check for updates to all running Workspace instances. This function invokation could (for example) be embedded in your own event loop code elsewhere, such as within a python-based web-server, invoked at a frequency of your choosing.

This is important, as if neither of these methods is followed, no callback responses (e.g. from watch or list requests) will ever be received from running Workspace instances.

class Workspace:
    """
    Represents an instance of a Workspace workflow. Users create an instance of
    this class for each instance of a Workspace workflow (.wsx) file that they
    wish to execute. Behind the scenes, Workspaces are executed in a separate
    process, and this object communicates with it via TCP/IP. For this reason,
    the user provides a callback function when creating a new Workspace
    instance, which is invoked only after the instance has been connected to
    successfully.

    Once the Workspace instance is connected, the user is able to set input
    and globalName values using the `setInput` and `setGlobalName` methods,
    execute it using the `runOnce` or `runContinuously` methods, and
    monitor specific inputs, outputs or globalNames by using the `watch`
    method. Similarly, lists of inputs, outputs or globalNames can be retrieved
    by using the `listInputs`, `listOutputs` or `listGlobalNames` methods.

    To take action when the Workspace instance successfully completes its
    execution, fails to execute, or aborts due to an error, provide callback
    functions to the `onSuccess`, `onFailed` or `onError` methods.

    It is important to note that the Workspace class does not allow any
    interaction in a synchronous manner. This ensures that all interactions
    with a running Workspace workflow are safe. Therefore, users always
    interact with Workspace instances via callback functions. Importantly,
    since each running Workspace instance runs in its own separate process, the
    application must periodically check each process to see whether it has
    posted any updates. Calling code can manage this using either of two
    different methods. Users can either:

    - Use the static `startEventLoop` and `stopEventLoop` functions to
      conveniently create an event loop which will monitor workflows and
      notify each Workspace object appropriately, or
    - Use the static `poll` function to check for updates to all running
      Workspace instances. This function invokation could (for example)
      be embedded in your own event loop code elsewhere, such as within
      a python-based web-server, invoked at a frequency of your choosing.

    This is important, as if neither of these methods is followed, no
    callback responses (e.g. from `watch` or `list` requests) will ever be
    received from running Workspace instances.
    """

    # Variables used for ensuring that processes are terminated, even if
    # something goes wrong during the terminate communication process (e.g.
    # child process is frozen)
    _SERVER_ADDRESS = '127.0.0.1'
    _terminating_processes = []
    _registered_workspaces = {}
    _event_loop_running    = False

    @staticmethod
    def _atexit():
        """
        This static method is always invoked when the Workspace module
        terminates. It ensures that the event loop has stopped, and is
        guaranteed to shut down any running Workspace instances.
        """
        if Workspace._event_loop_running:
            Workspace.stopEventLoop()

        # Don't forget we also need to make sure all our workspace processes
        # are shut down, since we started them!
        for key in Workspace._registered_workspaces.keys():
            Workspace._registered_workspaces[key].terminate()

    @staticmethod
    def startEventLoop(onStartFunc):
        """
        Used to start the event loop, if one is needed. For web-based
        applications, it's recommended to instead use the server's event loop
        to repeatedly call `poll()` rather than starting this event loop.
        For a simple command line application, the event loop will be required in
        order to repeatedly pool for updates, but again, a python-based event
        loop that repeatedly invokes `poll()` could be used instead.

        The `onStartFunc` parameter is a callback that will be invoked as soon
        as the event loop has been successfully started.

        *Note:* failure to stop the event loop will cause the application to
        hang on exit.
        """
        server_start_event_loop(LOOPSTARTFUNC(onStartFunc))
        Workspace._event_loop_running = True

    @staticmethod
    def stopEventLoop():
        """
        Stops the event loop if it is running.
        """
        server_stop_event_loop()

    @staticmethod
    def poll(timeoutMs=0):
        """
        Static method for polling the client applications to determine what's
        updated. If any watch events have occurred since the time this method
        was last invoked, all of these watch events will be triggered.

        The `timeoutMs` parameter should be a number representing how long the
        method should wait until returning in the case that there are no new
        updates available.
        """
        server_poll(timeoutMs)

        # Each time we poll, we iterate over the list of existing terminating
        # procesess and kill them if they've been taking too long to shut down.
        # If a process has already been shutdown correctly, we just remove it
        # from the list.
        for procRef in Workspace._terminating_processes:
            ws = procRef[1]
            timeTerminated = procRef[0]
            if None == ws._process.poll():
                if (datetime.datetime.now() - timeTerminated).seconds > _ws_config['terminate_timeout_sec']:
                    ws._process.kill()
                    ws._cleanup()
                    Workspace._terminating_processes.remove(procRef)
            else:
                Workspace._terminating_processes.remove(procRef)
                ws._cleanup()

    def _createConnectedCallback(self, onConnected):
        """
        Factory method to create an success callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId):
            self._id = copy.deepcopy(workspaceId.contents)

            # Register the workspace itself so that it can be cleaned up later
            # if need be.
            Workspace._registered_workspaces[self.id] = self

            # Register success, failed and error callbacks. Make sure to store them
            # in the local workspace, otherwise they'll get garbage collected
            # before being invoked.
            workspace_register_func_success(workspaceId, self._successCallback)
            workspace_register_func_failed(workspaceId, self._failedCallback)
            workspace_register_func_error(workspaceId, self._errorCallback)

            # Invoke our callback for when a process has connected successfully
            return onConnected(self)
        return CONNFUNC(callback)

    def _createSuccessCallback(self):
        """
        Factory method to create an success callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId):
            try:
                return self._onSuccessFunc(self)
            except:
                return True
        return SUCCESSFUNC(callback)

    def _createFailedCallback(self):
        """
        Factory method to create an failure callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId):
            try:
                return self._onFailedFunc(self)
            except:
                return True
        return FAILFUNC(callback)

    def _createErrorCallback(self):
        """
        Factory method to create an error callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId, errorMessage):
            try:
                return self._onErrorFunc(self, errorMessage)
            except:
                return True
        return ERRORFUNC(callback)

    def _create_WatchCallback(self):
        """
        Factory method to create an watch callback function that can
        be invoked by ctypes that still has access to 'self' because it
        is a closure.
        """
        def callback(workspaceId, watchListStr):
            wl = WatchList.fromJson(watchListStr)
            if wl and wl.id in self._watches.keys():
                return self._watches[wl.id](wl)
        return WATCHFUNC(callback)

    def _createListCallback(self, ioType):
        """
        Factory method to create a callback function to invoke when a request
        for a list of inputs / outputs / globalNames is made. As with the other
        callback methods, 'self' is available because the callback is a closure.
        """
        def callback(workspaceId, ioListStr):
            ioList = WatchList.fromJson(ioListStr)
            result = self._listRequests[ioType](self, ioList)
            del self._listRequests[ioType]
            return result
        return LISTFUNC(callback)

    def _removeWatch(self, watchId):
        """
        Removes a specific watch callback. Generally this is only used by the
        _WatchCallback class to remove itself when it is in 'autodelete' mode.
        """
        del self._watches[watchId]

    def _cleanup(self):
        """
        Clean up the workspace after it has terminated. We need to do this to
        make sure we safely delete all of our closures while they are not in
        call. If we don't delete them, the workspace object's reference count
        will never reach zero.
        """
        self._process = None
        del self._connectedCallback
        del self._successCallback
        del self._failedCallback
        del self._errorCallback
        del self._watchCallback
        del self._listCallbackInputs
        del self._listCallbackOutputs
        del self._listCallbackGlobalNames
        del Workspace._registered_workspaces[self.id]

    def __init__(self, fileName, onConnected):
        """
        Constructs a new Workspace instance, creating a subprocess of the
        workspace-web (C++) application. All actions performed on a Workspace
        instance are communicated to this new workspace-web process using the
        underlying libworkspaceweb C interface (and ctypes).

        The `fileName` parameter should be a path (or URL) to a Workspace workflow
        (.wsx) file, and the `onConnected` parameter is a callback function
        that will be invoked once the connection to the newly created
        workspace-web process is successful. It is important to note that until
        the connection is successful, all attempts to communicate with the
        Workspace instance via this class' member functions will fail.
        """
        self._fileName = fileName
        self._watches = dict()
        self._listRequests = dict()

        # Use our factory methods to create all our callback functions. These
        # callback functions are used to forward on to the functions that the
        # user registers.
        self._connectedCallback = self._createConnectedCallback(onConnected)
        self._successCallback = self._createSuccessCallback()
        self._failedCallback = self._createFailedCallback()
        self._errorCallback = self._createErrorCallback()
        self._watchCallback = self._create_WatchCallback()
        self._listCallbackInputs = self._createListCallback('inputs')
        self._listCallbackOutputs = self._createListCallback('outputs')
        self._listCallbackGlobalNames = self._createListCallback('globalNames')

        # Start our actual child process. We start it first since it's
        # asynchronous, whereas our server isn't (since we don't have an event loop)
        self._process = subprocess.Popen([
            _ws_config['workspace_install_dir'] + '/bin/workspace-web',
            fileName,
            '--port', '%d' % _ws_config['connection_port'],
            '--log-level', '%d' % _ws_config['log_level']
        ])

        # Listen to connections from our new process.
        success = server_listen_for_connection_and_wait(Workspace._SERVER_ADDRESS, _ws_config['connection_port'], self._connectedCallback)
        if not success:
            raise RuntimeError('Failed to connect to Workspace process running "%s"' % fileName)

    @property
    def id(self):
        """
        Returns a string representing the unique identifier of this Workspace
        instance.
        """
        return self._id.getKey()

    @property
    def fileName(self):
        """
        Returns the file name of the Workspace being invoked. This may be a url.
        """
        return self._fileName

    @fileName.setter
    def fileName(self, fileName):
        """
        Assigns the `fileName` to this Workspace instance.
        """
        self._fileName = fileName

    def runOnce(self):
        """
        Requests that the Workspace instance be executed once only until it
        either completes or fails. It is the responsibility of the user to
        re-execute it as required.
        """
        workspace_run_once(byref(self._id))

    def runContinuously(self):
        """
        Requests that the Workspace instance be executed in continuous mode.
        This means that the underlying workflow will run until it complete, or
        fails, and then wait for user input. As soon as an input or globalName
        is updated via the `setInput` or `setGlobalName` methods, the workflow
        will re-execute the affected parts of the workflow.
        """
        workspace_run_continuously(byref(self._id))

    def stop(self):
        """
        Requests that the Workspace instance stop executing (though it does not
        guarantee that it will stop immediately). Once stopped, a Workspace
        instance can resume execution through use of the `runOnce` or
        `runContinously` methods. This is different from the `terminate` method
        which will terminate the entire process.

        __*Note:* This method is asynchronous__
        """
        workspace_stop(byref(self._id))

    def terminate(self):
        """
        Requests that the workspace-web suprocess shut down immediately.
        """
        if not self._process:
            return

        # Calling this will tell the process to terminate itself.
        success = workspace_terminate(byref(self._id))

        # Before we get rid of the process reference, store it in the queue of
        # terminating processes so that we can get kill it if it doesn't
        # promptly terminate itself.
        Workspace._terminating_processes.append((datetime.datetime.now(), self))

    def setInput(self, inputName, content):
        """
        Assigns `content` to the top-level input named `inputName` on the Workspace.
        If the Workspace is currently executing, this will be applied as soon as
        it is safe to do so.

        The `content` parameter must contain the serialized data appropriate to the
        underyling data type of the input. For example, if the input is a double,
        a string representing a floating-point number is required. If the input
        is of a more complex type, such as a DataCollection, then `content`
        must contain the serialized XML that can be read into this datatype.

        __*Note:* This method is asynchronous__
        """
        return workspace_set_input(byref(self._id), inputName, str(content))

    def setGlobalName(self, globalName, content):
        """
        Assigns `content` to the input with the attached global name
        `globalName`. If the Workspace is currently executing, this will
        be applied as soon as it is safe to do so.

        The `content` parameter must contain the serialized data appropriate to the
        underyling data type of the input. For example, if the input is a double,
        a string representing a floating-point number is required. If the input
        is of a more complex type, such as a DataCollection, then `content`
        must contain the serialized XML that can be read into this datatype.

        __*Note:* This method is asynchronous__
        """
        return workspace_set_global_name(byref(self._id), globalName, str(content))

    def watch(self, callback, watchList, autoDelete=True):
        """
        Sets up a watch on the specified `watchList`, which must be an object
        of the `WatchList` type. When all of the inputs, outputs and
        globalNames in the watch list are brought up-to-date by the running
        workflow, the `callback` function will be triggered, and passed as
        arguments a reference to this `Workspace` object, as well as the `WatchList`
        containing the name, type and value of each watched item.

        __*Note:* This method is asynchronous__
        """
        self._watches[watchList.id] = _WatchCallback(self, watchList.id, callback, autoDelete)
        if workspace_watch(byref(self._id), str(watchList), self._watchCallback, autoDelete):
            return watchList.id
        return None

    def cancelWatch(self, watchId):
        """
        Cancels a (non-single-shot) watch request, by de-registering the
        existing callback associated with the `watchId`.
        """
        self._removeWatch(watchId)
        workspace_cancel_watch(byref(self._id), watchId)

    def listInputs(self, callback):
        """
        Requests a list of inputs from the running Workspace, and invokes the
        `callback`, passing it itself and a `WatchList` object containing the results.

        __*Note:* This method is asynchronous__
        """
        self._listRequests['inputs'] = callback
        return workspace_list_inputs(byref(self._id), self._listCallbackInputs);

    def listOutputs(self, callback):
        """
        Requests a list of outputs from the running Workspace, and invokes the
        `callback`, passing it itself and a `WatchList` object containing the results.

        __*Note:* This method is asynchronous__
        """
        self._listRequests['outputs'] = callback
        return workspace_list_outputs(byref(self._id), self._listCallbackOutputs);

    def listGlobalNames(self, callback):
        """
        Requests a list of globalnames from the running Workspace and invokes
        the `callback`, passing itself and a `WatchList` object containing the results.

        __*Note:* This method is asynchronous__
        """
        self._listRequests['globalNames'] = callback
        return workspace_list_global_names(byref(self._id), self._listCallbackGlobalNames)

    def onSuccess(self, callback):
        """
        Assign a `callback` function to invoke when the workflow successfully
        completes execution. The callback function is passed a reference to this
        Workspace object as an argument.

        __*Note:* This method is asynchronous__
        """
        self._onSuccessFunc = callback

    def onFailed(self, callback):
        """
        Assign a `callback` function to invoke when the workflow fails to
        execute. The callback function is passed a reference to this
        Workspace object as an argument.

        __*Note:* This method is asynchronous__
        """
        self._onFailedFunc = callback

    def onError(self, callback):
        """
        Assign a `callback` function to invoke when an error occurs in the
        workflow. The callback function is provided a reference to this
        Workspace object, and a string containing a description of the specific
        error message, as arguments.

        __*Note:* This method is asynchronous__
        """
        self._onErrorFunc = callback

Ancestors (in MRO)

Static methods

def poll(

timeoutMs=0)

Static method for polling the client applications to determine what's updated. If any watch events have occurred since the time this method was last invoked, all of these watch events will be triggered.

The timeoutMs parameter should be a number representing how long the method should wait until returning in the case that there are no new updates available.

@staticmethod
def poll(timeoutMs=0):
    """
    Static method for polling the client applications to determine what's
    updated. If any watch events have occurred since the time this method
    was last invoked, all of these watch events will be triggered.
    The `timeoutMs` parameter should be a number representing how long the
    method should wait until returning in the case that there are no new
    updates available.
    """
    server_poll(timeoutMs)
    # Each time we poll, we iterate over the list of existing terminating
    # procesess and kill them if they've been taking too long to shut down.
    # If a process has already been shutdown correctly, we just remove it
    # from the list.
    for procRef in Workspace._terminating_processes:
        ws = procRef[1]
        timeTerminated = procRef[0]
        if None == ws._process.poll():
            if (datetime.datetime.now() - timeTerminated).seconds > _ws_config['terminate_timeout_sec']:
                ws._process.kill()
                ws._cleanup()
                Workspace._terminating_processes.remove(procRef)
        else:
            Workspace._terminating_processes.remove(procRef)
            ws._cleanup()

def startEventLoop(

onStartFunc)

Used to start the event loop, if one is needed. For web-based applications, it's recommended to instead use the server's event loop to repeatedly call poll() rather than starting this event loop. For a simple command line application, the event loop will be required in order to repeatedly pool for updates, but again, a python-based event loop that repeatedly invokes poll() could be used instead.

The onStartFunc parameter is a callback that will be invoked as soon as the event loop has been successfully started.

Note: failure to stop the event loop will cause the application to hang on exit.

@staticmethod
def startEventLoop(onStartFunc):
    """
    Used to start the event loop, if one is needed. For web-based
    applications, it's recommended to instead use the server's event loop
    to repeatedly call `poll()` rather than starting this event loop.
    For a simple command line application, the event loop will be required in
    order to repeatedly pool for updates, but again, a python-based event
    loop that repeatedly invokes `poll()` could be used instead.
    The `onStartFunc` parameter is a callback that will be invoked as soon
    as the event loop has been successfully started.
    *Note:* failure to stop the event loop will cause the application to
    hang on exit.
    """
    server_start_event_loop(LOOPSTARTFUNC(onStartFunc))
    Workspace._event_loop_running = True

def stopEventLoop(

)

Stops the event loop if it is running.

@staticmethod
def stopEventLoop():
    """
    Stops the event loop if it is running.
    """
    server_stop_event_loop()

Instance variables

var fileName

Returns the file name of the Workspace being invoked. This may be a url.

var id

Returns a string representing the unique identifier of this Workspace instance.

Methods

def __init__(

self, fileName, onConnected)

Constructs a new Workspace instance, creating a subprocess of the workspace-web (C++) application. All actions performed on a Workspace instance are communicated to this new workspace-web process using the underlying libworkspaceweb C interface (and ctypes).

The fileName parameter should be a path (or URL) to a Workspace workflow (.wsx) file, and the onConnected parameter is a callback function that will be invoked once the connection to the newly created workspace-web process is successful. It is important to note that until the connection is successful, all attempts to communicate with the Workspace instance via this class' member functions will fail.

def __init__(self, fileName, onConnected):
    """
    Constructs a new Workspace instance, creating a subprocess of the
    workspace-web (C++) application. All actions performed on a Workspace
    instance are communicated to this new workspace-web process using the
    underlying libworkspaceweb C interface (and ctypes).
    The `fileName` parameter should be a path (or URL) to a Workspace workflow
    (.wsx) file, and the `onConnected` parameter is a callback function
    that will be invoked once the connection to the newly created
    workspace-web process is successful. It is important to note that until
    the connection is successful, all attempts to communicate with the
    Workspace instance via this class' member functions will fail.
    """
    self._fileName = fileName
    self._watches = dict()
    self._listRequests = dict()
    # Use our factory methods to create all our callback functions. These
    # callback functions are used to forward on to the functions that the
    # user registers.
    self._connectedCallback = self._createConnectedCallback(onConnected)
    self._successCallback = self._createSuccessCallback()
    self._failedCallback = self._createFailedCallback()
    self._errorCallback = self._createErrorCallback()
    self._watchCallback = self._create_WatchCallback()
    self._listCallbackInputs = self._createListCallback('inputs')
    self._listCallbackOutputs = self._createListCallback('outputs')
    self._listCallbackGlobalNames = self._createListCallback('globalNames')
    # Start our actual child process. We start it first since it's
    # asynchronous, whereas our server isn't (since we don't have an event loop)
    self._process = subprocess.Popen([
        _ws_config['workspace_install_dir'] + '/bin/workspace-web',
        fileName,
        '--port', '%d' % _ws_config['connection_port'],
        '--log-level', '%d' % _ws_config['log_level']
    ])
    # Listen to connections from our new process.
    success = server_listen_for_connection_and_wait(Workspace._SERVER_ADDRESS, _ws_config['connection_port'], self._connectedCallback)
    if not success:
        raise RuntimeError('Failed to connect to Workspace process running "%s"' % fileName)

def cancelWatch(

self, watchId)

Cancels a (non-single-shot) watch request, by de-registering the existing callback associated with the watchId.

def cancelWatch(self, watchId):
    """
    Cancels a (non-single-shot) watch request, by de-registering the
    existing callback associated with the `watchId`.
    """
    self._removeWatch(watchId)
    workspace_cancel_watch(byref(self._id), watchId)

def listGlobalNames(

self, callback)

Requests a list of globalnames from the running Workspace and invokes the callback, passing itself and a WatchList object containing the results.

Note: This method is asynchronous

def listGlobalNames(self, callback):
    """
    Requests a list of globalnames from the running Workspace and invokes
    the `callback`, passing itself and a `WatchList` object containing the results.
    __*Note:* This method is asynchronous__
    """
    self._listRequests['globalNames'] = callback
    return workspace_list_global_names(byref(self._id), self._listCallbackGlobalNames)

def listInputs(

self, callback)

Requests a list of inputs from the running Workspace, and invokes the callback, passing it itself and a WatchList object containing the results.

Note: This method is asynchronous

def listInputs(self, callback):
    """
    Requests a list of inputs from the running Workspace, and invokes the
    `callback`, passing it itself and a `WatchList` object containing the results.
    __*Note:* This method is asynchronous__
    """
    self._listRequests['inputs'] = callback
    return workspace_list_inputs(byref(self._id), self._listCallbackInputs);

def listOutputs(

self, callback)

Requests a list of outputs from the running Workspace, and invokes the callback, passing it itself and a WatchList object containing the results.

Note: This method is asynchronous

def listOutputs(self, callback):
    """
    Requests a list of outputs from the running Workspace, and invokes the
    `callback`, passing it itself and a `WatchList` object containing the results.
    __*Note:* This method is asynchronous__
    """
    self._listRequests['outputs'] = callback
    return workspace_list_outputs(byref(self._id), self._listCallbackOutputs);

def onError(

self, callback)

Assign a callback function to invoke when an error occurs in the workflow. The callback function is provided a reference to this Workspace object, and a string containing a description of the specific error message, as arguments.

Note: This method is asynchronous

def onError(self, callback):
    """
    Assign a `callback` function to invoke when an error occurs in the
    workflow. The callback function is provided a reference to this
    Workspace object, and a string containing a description of the specific
    error message, as arguments.
    __*Note:* This method is asynchronous__
    """
    self._onErrorFunc = callback

def onFailed(

self, callback)

Assign a callback function to invoke when the workflow fails to execute. The callback function is passed a reference to this Workspace object as an argument.

Note: This method is asynchronous

def onFailed(self, callback):
    """
    Assign a `callback` function to invoke when the workflow fails to
    execute. The callback function is passed a reference to this
    Workspace object as an argument.
    __*Note:* This method is asynchronous__
    """
    self._onFailedFunc = callback

def onSuccess(

self, callback)

Assign a callback function to invoke when the workflow successfully completes execution. The callback function is passed a reference to this Workspace object as an argument.

Note: This method is asynchronous

def onSuccess(self, callback):
    """
    Assign a `callback` function to invoke when the workflow successfully
    completes execution. The callback function is passed a reference to this
    Workspace object as an argument.
    __*Note:* This method is asynchronous__
    """
    self._onSuccessFunc = callback

def runContinuously(

self)

Requests that the Workspace instance be executed in continuous mode. This means that the underlying workflow will run until it complete, or fails, and then wait for user input. As soon as an input or globalName is updated via the setInput or setGlobalName methods, the workflow will re-execute the affected parts of the workflow.

def runContinuously(self):
    """
    Requests that the Workspace instance be executed in continuous mode.
    This means that the underlying workflow will run until it complete, or
    fails, and then wait for user input. As soon as an input or globalName
    is updated via the `setInput` or `setGlobalName` methods, the workflow
    will re-execute the affected parts of the workflow.
    """
    workspace_run_continuously(byref(self._id))

def runOnce(

self)

Requests that the Workspace instance be executed once only until it either completes or fails. It is the responsibility of the user to re-execute it as required.

def runOnce(self):
    """
    Requests that the Workspace instance be executed once only until it
    either completes or fails. It is the responsibility of the user to
    re-execute it as required.
    """
    workspace_run_once(byref(self._id))

def setGlobalName(

self, globalName, content)

Assigns content to the input with the attached global name globalName. If the Workspace is currently executing, this will be applied as soon as it is safe to do so.

The content parameter must contain the serialized data appropriate to the underyling data type of the input. For example, if the input is a double, a string representing a floating-point number is required. If the input is of a more complex type, such as a DataCollection, then content must contain the serialized XML that can be read into this datatype.

Note: This method is asynchronous

def setGlobalName(self, globalName, content):
    """
    Assigns `content` to the input with the attached global name
    `globalName`. If the Workspace is currently executing, this will
    be applied as soon as it is safe to do so.
    The `content` parameter must contain the serialized data appropriate to the
    underyling data type of the input. For example, if the input is a double,
    a string representing a floating-point number is required. If the input
    is of a more complex type, such as a DataCollection, then `content`
    must contain the serialized XML that can be read into this datatype.
    __*Note:* This method is asynchronous__
    """
    return workspace_set_global_name(byref(self._id), globalName, str(content))

def setInput(

self, inputName, content)

Assigns content to the top-level input named inputName on the Workspace. If the Workspace is currently executing, this will be applied as soon as it is safe to do so.

The content parameter must contain the serialized data appropriate to the underyling data type of the input. For example, if the input is a double, a string representing a floating-point number is required. If the input is of a more complex type, such as a DataCollection, then content must contain the serialized XML that can be read into this datatype.

Note: This method is asynchronous

def setInput(self, inputName, content):
    """
    Assigns `content` to the top-level input named `inputName` on the Workspace.
    If the Workspace is currently executing, this will be applied as soon as
    it is safe to do so.
    The `content` parameter must contain the serialized data appropriate to the
    underyling data type of the input. For example, if the input is a double,
    a string representing a floating-point number is required. If the input
    is of a more complex type, such as a DataCollection, then `content`
    must contain the serialized XML that can be read into this datatype.
    __*Note:* This method is asynchronous__
    """
    return workspace_set_input(byref(self._id), inputName, str(content))

def stop(

self)

Requests that the Workspace instance stop executing (though it does not guarantee that it will stop immediately). Once stopped, a Workspace instance can resume execution through use of the runOnce or runContinously methods. This is different from the terminate method which will terminate the entire process.

Note: This method is asynchronous

def stop(self):
    """
    Requests that the Workspace instance stop executing (though it does not
    guarantee that it will stop immediately). Once stopped, a Workspace
    instance can resume execution through use of the `runOnce` or
    `runContinously` methods. This is different from the `terminate` method
    which will terminate the entire process.
    __*Note:* This method is asynchronous__
    """
    workspace_stop(byref(self._id))

def terminate(

self)

Requests that the workspace-web suprocess shut down immediately.

def terminate(self):
    """
    Requests that the workspace-web suprocess shut down immediately.
    """
    if not self._process:
        return
    # Calling this will tell the process to terminate itself.
    success = workspace_terminate(byref(self._id))
    # Before we get rid of the process reference, store it in the queue of
    # terminating processes so that we can get kill it if it doesn't
    # promptly terminate itself.
    Workspace._terminating_processes.append((datetime.datetime.now(), self))

def watch(

self, callback, watchList, autoDelete=True)

Sets up a watch on the specified watchList, which must be an object of the WatchList type. When all of the inputs, outputs and globalNames in the watch list are brought up-to-date by the running workflow, the callback function will be triggered, and passed as arguments a reference to this Workspace object, as well as the WatchList containing the name, type and value of each watched item.

Note: This method is asynchronous

def watch(self, callback, watchList, autoDelete=True):
    """
    Sets up a watch on the specified `watchList`, which must be an object
    of the `WatchList` type. When all of the inputs, outputs and
    globalNames in the watch list are brought up-to-date by the running
    workflow, the `callback` function will be triggered, and passed as
    arguments a reference to this `Workspace` object, as well as the `WatchList`
    containing the name, type and value of each watched item.
    __*Note:* This method is asynchronous__
    """
    self._watches[watchList.id] = _WatchCallback(self, watchList.id, callback, autoDelete)
    if workspace_watch(byref(self._id), str(watchList), self._watchCallback, autoDelete):
        return watchList.id
    return None