Skip to content

Component

Component

Component class for creating Motion components. Here are some examples:

from motion import Component

AdderComponent = Component("MyAdder")

@AdderComponent.init_state
def setUp():
    return {"value": 0}

@AdderComponent.serve("add")
def plus(state, props):
    return state["value"] + props["value"]

@AdderComponent.update("add")
def add(state, props):
    return {"value": props.serve_result}

if __name__ == "__main__":
    c = AdderComponent() # Create instance of AdderComponent
    c.run("add", props={"value": 1}, flush_update=True) # Blocks
    # until update is done. Resulting state is {"value": 1}
    c.run("add", props={"value": 2}) # Will return 3, not waiting
    # for update operation.
    # Resulting state will eventually be {"value": 3}
from motion import Component

Calculator = Component("MyCalculator")

@Calculator.init_state
def setUp():
    return {"value": 0}

@Calculator.serve("add")
def plus(state, props):
    return state["value"] + props["value"]

@Calculator.serve("subtract")
def minus(state, props):
    return state["value"] - props["value"]

@Calculator.update(["add", "subtract"])
def decrement(state, props):
    return {"value": props.serve_result}

if __name__ == "__main__":
    c = Calculator()
    c.run("add", props={"value": 1}, flush_update=True) # Will return 1,
    # blocking until update is done. Resulting state is {"value": 1}
    c.run("subtract", props={"value": 1}, flush_update=True)
    # Will return 0, blocking until update is done. Resulting state is #
    # {"value": 0}
from motion import Component
import numpy as np

MLMonitor = Component("Monitoring_ML_Component")

@MLMonitor.init_state
def setUp():
    return {
        "model": YOUR_MODEL_HERE,
        "historical_values": [],
        "historical_serve_res": []
    }

@MLMonitor.serve("predict")
def predict(state, props):
    return state["model"].predict(props["features"])

@MLMonitor.update("features")
def monitor(state, props):

    values = state["historical_values"] + [props["features"]]
    serve_results = state["historical_serve_res"] + [props.serve_result]

    # Check drift every 10 values
    if len(values) == 10:
        if YOUR_ANOMALY_ALGORITHM(values, serve_results):
            # Fire an alert
            YOUR_ALERT_FUNCTION()
        values = []
        serve_results = []

    return {
        "historical_values": values,
        "historical_serve_res": serve_results
    }

if __name__ == "__main__":
    c = MLMonitor() # Create instance
    c.run("predict", props={"features": YOUR_FEATURES_HERE})
    # Some alert may be fired in the background!

name: str property

Name of the component.

Example Usage:

from motion import Component

MyComponent = Component("MyComponent")
print(MyComponent.name) # Prints "MyComponent"

Returns:

Name Type Description
str str

Component name.

params: Dict[str, Any] property

Parameters to use in component functions.

Example Usage:

from motion import Component

MyComponent = Component(
    "MyComponent",
    params={"param1": 1, "param2": 2}
)

@MyComponent.init_state
def setUp():
    return {"value": 0}

@MyComponent.serve("add")
def plus(state, value):
    # Access params with MyComponent.params["param_name"]
    return state["value"] + value + MyComponent.params["param1"] +
    MyComponent.params["param2"]

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Parameters dictionary.

__init__(name: str, params: Dict[str, Any] = {}, cache_ttl: int = DEFAULT_KEY_TTL)

Creates a new Motion component.

Parameters:

Name Type Description Default
name str

Name of the component.

required
params Dict[str, Any]

Parameters to be accessed by the component. Defaults to {}. Usage: C.params["param_name"] if C is the Component you have created.

{}
cache_ttl int

Time to live for cached serve results (seconds). Defaults to 1 day. Set to 0 to disable caching.

DEFAULT_KEY_TTL

serve(keys: Union[str, List[str]]) -> Callable

Decorator for any serve operation for a flow through the component. Takes in a string or list of strings that represents the flow key. If the decorator is called with a list of strings, each flow key will be mapped to the same serve function.

2 arguments required for an serve operation
  • state: The current state of the component instance, which is a dictionary with string keys and any-type values.
  • props: The properties of the current flow, which is passed via the run method of the component instance. You can add to the props dictionary in the serve op, and the modified props will be passed to the subsequent update ops in the flow. Props are short-lived and die after the flow's update op finishes.

Components can have multiple serve ops, but no flow key within the component can have more than one serve op. serve ops should not modify the state object. If you want to modify the state object, write an update op for your flow.

Example Usage:

from motion import Component

MyComponent = Component("MyComponent")

@MyComponent.init_state
def setUp():
    return {"value": 0}

@MyComponent.serve("add")
def add(state, props):
    return state["value"] + props["value"]

@MyComponent.serve("multiply")
def multiply(state, props):
    return state["value"] * props["value"]

c = MyComponent()
c.run("add", props={"value": 1}, flush_update=True) # Returns 1
c.run("multiply", props={"value": 2}) # Returns 2

Parameters:

Name Type Description Default
keys Union[str, List[str]]

String or list of strings that represent the input keyword(s) for the serve flow.

required

Returns:

Name Type Description
Callable Callable

Decorated serve function.

update(keys: Union[str, List[str]], discard_policy: DiscardPolicy = DiscardPolicy.NONE, discard_after: Optional[int] = None) -> Any

Decorator for any update operations for flows through the component. Takes in a string or list of strings that represents the flow key. If the decorator is called with a list of strings, each flow key will be mapped to the same update operation.

2 arguments required for a update operation
  • state: The current state of the component, represented as a dictionary.
  • props: The properties of the current flow, which could contain properties that were added to the props dictionary in the serve op before this update op. Props are short-lived and die after the flow's update op finishes.

Components can have multiple update ops, and the same key can also have multiple update ops. Update functions should return a dictionary of state updates to be merged with the current state.

See DiscardPolicy for more info on how to expire update operations if you expect there to be backpressure for an update operation.

Example Usage:

from motion import Component

MyComponent = Component("MyComponent")


@MyComponent.init_state
def setUp():
    return {"value": 0}


@MyComponent.serve("multiply")
def multiply(state, props):
    props["something"] = props["value"] + 1
    return state["value"] * props["value"]


@MyComponent.update("multiply")
def multiply(state, props):
    return {"value": props["something"]}


if __name__ == "__main__":
    c = MyComponent()
    print(
        c.run("multiply", props={"value": 2}, flush_update=True)
    )  # Returns 0 and state updates to {"value": 3}
    print(
        c.run("multiply", props={"value": 3}, flush_update=True)
    )  # Returns 9, update will execute
    # to get state["value"] = 4
    print(
        c.run("multiply", props={"value": 4}, flush_update=True)
    )  # Returns 4 * 4 = 16

Parameters:

Name Type Description Default
keys Union[str, List[str]]

String or list of strings that represent the input keyword(s) for the update flow.

required
discard_policy DiscardPolicy

Policy for expiring update operations. Defaults to DiscardPolicy.NONE.

NONE
discard_after Optional[int]

Number of updates or seconds after which to expire the update operation. Defaults to None.

None

Returns:

Name Type Description
Callable Any

Decorated update function.

init_state(func: Callable) -> Callable

Decorator for the init_state function. This function is called once at the beginning of the component's lifecycle. The decorated function should return a dictionary that represents the initial state of the component.

Usage:

from motion import Component

MyComponent = Component("MyComponent")

@MyComponent.init_state
def setUp():
    return {"value": 0}

Parameters:

Name Type Description Default
func Callable

Function that initializes a state. Must return a dictionary.

required

Returns:

Name Type Description
Callable Callable

Decorated init_state function.

__call__(instance_id: str = '', init_state_params: Dict[str, Any] = {}, logging_level: str = 'WARNING', update_task_type: Literal['thread', 'process'] = 'thread', disable_update_task: bool = False, redis_socket_timeout: int = 60, config_path: str = '.motionrc.yml', flush_on_exit: bool = False) -> ComponentInstance

Creates and returns a new instance of a Motion component. See ComponentInstance docs for more info.

Usage:

from motion import Component

MyComponent = Component("MyComponent")

@MyComponent.init_state
def setUp(starting_val):
    return {"value": starting_val}

# Define serve and update operations
@MyComponent.serve("key1")
def ...

@MyComponent.update("key1)
def ...

# Creates instance of MyComponent
if __name__ == "__main__":
    c_instance = MyComponent(init_state_params={"starting_val": 3})
    c_instance.run(..)
    c_instance.shutdown()

You can also use component instances as context managers, which is the recommended way to use them (because it will automatically gracefully shut down the component instance when the context manager exits):

...
if __name__ == "__main__":
    with MyComponent(init_state_params={"starting_val": 3}) as c_instance:
        c_instance.run(..)

Parameters:

Name Type Description Default
instance_id str

id of the component instance. Defaults to "" which will generate a random id.

''
init_state_params Dict[str, Any]

Parameters to pass into the init_state function. Defaults to {}.

{}
logging_level str

Logging level for the Motion logger. Uses the logging library. Defaults to "WARNING".

'WARNING'
update_task_type str

Type of update task to use. Can be "thread" or "process". "thread" has lower overhead but is not recommended for CPU-intensive update operations. "process" is recommended for CPU-intensive operations (e.g., fine-tuning a model) but has higher startup overhead. Defaults to "thread".

'thread'
disable_update_task bool

Whether or not to disable the component instance update ops. Useful for printing out state values without running flows. Defaults to False.

False
redis_socket_timeout int

Timeout for redis socket connections (seconds). Defaults to 60. This means the redis connection will close if idle for 60 seconds.

60
config_path str

Path to config file of env vars. Defaults to ".motionrc.yml".

'.motionrc.yml'
flush_on_exit bool

Whether or not to flush the update queue on exit/shutdown. This is useful when running flows in serverless environments, where you want to make sure all updates are finished after the result is returned, before the function exits. Defaults to False.

False

Returns: ComponentInstance: Component instance to run flows with.

save_state(func: Callable) -> Callable

Decorator for the save_state function. This function saves the state of the component to be accessible in future component instances of the same name.

Usage:

from motion import Component

MyComponent = Component("MyComponent")

@c.save_state
def save(state):
    # state might have other unpicklable keys, like a DB connection
    return {"fit_count": state["fit_count"]}

Parameters:

Name Type Description Default
func Callable

Function that returns a cloudpickleable object.

required

Returns:

Name Type Description
Callable Callable

Decorated save_state function.

load_state(func: Callable) -> Callable

Decorator for the load_state function. This function loads the state of the component from the unpickled state.

Usage:

from motion import Component

MyComponent = Component("MyComponent")

@c.load_state
def load(state):
    conn = sqlite3.connect(":memory:")
    cursor = conn.cursor()
    return {"cursor": cursor, "fit_count": state["fit_count"]}

Parameters:

Name Type Description Default
func Callable

Function that consumes a cloudpickleable object. Should return a dictionary representing the state of the component instance.

required

Returns:

Name Type Description
Callable Callable

Decorated load_state function.

DiscardPolicy

Bases: Enum

Defines the policy for discarding items in an update operation's queue. Each component instance has a queue for each update operation. Items in the queue are processed in first-in-first-out (FIFO) order, and items in the queue can delete based on the discard policy set by the developer.

Attributes:

Name Type Description
NONE

Indicates no discard policy. Items in the queue do not delete.

NUM_NEW_UPDATES

Items delete based on the number of new updates. Once the number of new updates exceeds a certain threshold, the oldest items are removed.

SECONDS

Items delete based on time. Items older than a specified number of seconds at the time of processing are removed.

Use the discard_after and discard_policy arguments in Component.update decorator to set the discard policy for an update operation.

Example Usage:

from motion import Component, DiscardPolicy

C = Component("C")

@C.init_state
def setup():
    return {"default_value": 0, "some_value": 0, "another_value": 0}

@C.update(
    "something",
    discard_after=10,
    discard_policy=DiscardPolicy.NUM_NEW_UPDATES
)
def update_num_new(state, props):
    # Do an expensive operation that could take a while
    ...
    return {"some_value": state["some_value"] + props["value"]}

@C.update("something", discard_after=1, discard_policy=DiscardPolicy.SECONDS)
def update_seconds(state, props):
    # Do an expensive operation that could take a while
    ...
    return {"another_value": state["another_value"] + props["value"]}

@C.update("something")
def update_default(state, props):
    # Do an expensive operation that could take a while
    ...
    return {"default_value": state["default_value"] + props["value"]}

if __name__ == "__main__":
    c = C()

    # If we do many runs of "something", the update queue will grow
    # and the policy will be automatically enforced by Motion.

    for i in range(100):
        c.run("something", props={"value": str(i)})

    # Flush the update queue (i.e., wait for all updates to finish)
    c.flush_update("something")

    print(c.read_state("default_value")) # (1)!

    print(c.read_state("some_value")) # (2)!

    print(c.read_state("another_value")) # (3)!

    c.shutdown()

  1. The default policy is to not delete any items (DiscardPolicy.NONE), so the value of default_value will be the sum of all the values passed to run (i.e., sum(range(100))).

  2. The NUM_NEW_UPDATES policy will delete items in the queue once the number of new updates exceeds a certain threshold. The threshold is set by the discard_after argument in the update decorator. So the result will be < 4950 because the NUM_NEW_UPDATES policy will have deleted some items.

  3. This will be < 4950 because the SECONDS policy will have deleted some items (only whatever updates could have been processed in the second after they were added to the queue).

NONE = 0 class-attribute instance-attribute

No discard policy. Does not discard items in the queue.

NUM_NEW_UPDATES = 1 class-attribute instance-attribute

Delete items based on the number of new updates enqueued.

SECONDS = 2 class-attribute instance-attribute

Delete items based on time (in seconds).