Component
Component
Component class for creating Motion components. Here are some examples:
from motion import Component
AdderComponent = Component("MyAdder")
@AdderComponent.init_state
def setUp():
return {"value": 0}
@AdderComponent.serve("add")
def plus(state, props):
return state["value"] + props["value"]
@AdderComponent.update("add")
def add(state, props):
return {"value": props.serve_result}
if __name__ == "__main__":
c = AdderComponent() # Create instance of AdderComponent
c.run("add", props={"value": 1}, flush_update=True) # Blocks
# until update is done. Resulting state is {"value": 1}
c.run("add", props={"value": 2}) # Will return 3, not waiting
# for update operation.
# Resulting state will eventually be {"value": 3}
from motion import Component
Calculator = Component("MyCalculator")
@Calculator.init_state
def setUp():
return {"value": 0}
@Calculator.serve("add")
def plus(state, props):
return state["value"] + props["value"]
@Calculator.serve("subtract")
def minus(state, props):
return state["value"] - props["value"]
@Calculator.update(["add", "subtract"])
def decrement(state, props):
return {"value": props.serve_result}
if __name__ == "__main__":
c = Calculator()
c.run("add", props={"value": 1}, flush_update=True) # Will return 1,
# blocking until update is done. Resulting state is {"value": 1}
c.run("subtract", props={"value": 1}, flush_update=True)
# Will return 0, blocking until update is done. Resulting state is #
# {"value": 0}
from motion import Component
import numpy as np
MLMonitor = Component("Monitoring_ML_Component")
@MLMonitor.init_state
def setUp():
return {
"model": YOUR_MODEL_HERE,
"historical_values": [],
"historical_serve_res": []
}
@MLMonitor.serve("predict")
def predict(state, props):
return state["model"].predict(props["features"])
@MLMonitor.update("features")
def monitor(state, props):
values = state["historical_values"] + [props["features"]]
serve_results = state["historical_serve_res"] + [props.serve_result]
# Check drift every 10 values
if len(values) == 10:
if YOUR_ANOMALY_ALGORITHM(values, serve_results):
# Fire an alert
YOUR_ALERT_FUNCTION()
values = []
serve_results = []
return {
"historical_values": values,
"historical_serve_res": serve_results
}
if __name__ == "__main__":
c = MLMonitor() # Create instance
c.run("predict", props={"features": YOUR_FEATURES_HERE})
# Some alert may be fired in the background!
name: str
property
Name of the component.
Example Usage:
from motion import Component
MyComponent = Component("MyComponent")
print(MyComponent.name) # Prints "MyComponent"
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
Component name. |
params: Dict[str, Any]
property
Parameters to use in component functions.
Example Usage:
from motion import Component
MyComponent = Component(
"MyComponent",
params={"param1": 1, "param2": 2}
)
@MyComponent.init_state
def setUp():
return {"value": 0}
@MyComponent.serve("add")
def plus(state, value):
# Access params with MyComponent.params["param_name"]
return state["value"] + value + MyComponent.params["param1"] +
MyComponent.params["param2"]
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dict[str, Any]: Parameters dictionary. |
__init__(name: str, params: Dict[str, Any] = {}, cache_ttl: int = DEFAULT_KEY_TTL)
Creates a new Motion component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of the component. |
required |
params |
Dict[str, Any]
|
Parameters to be accessed by the component. Defaults to {}.
Usage: |
{}
|
cache_ttl |
int
|
Time to live for cached serve results (seconds). Defaults to 1 day. Set to 0 to disable caching. |
DEFAULT_KEY_TTL
|
serve(keys: Union[str, List[str]]) -> Callable
Decorator for any serve operation for a flow through the component. Takes in a string or list of strings that represents the flow key. If the decorator is called with a list of strings, each flow key will be mapped to the same serve function.
2 arguments required for an serve operation
state
: The current state of the component instance, which is a dictionary with string keys and any-type values.props
: The properties of the current flow, which is passed via therun
method of the component instance. You can add to theprops
dictionary in the serve op, and the modifiedprops
will be passed to the subsequent update ops in the flow. Props are short-lived and die after the flow's update op finishes.
Components can have multiple serve ops, but no flow key within
the component can have more than one serve op. serve ops should not
modify the state object. If you want to modify the state object, write
an update
op for your flow.
Example Usage:
from motion import Component
MyComponent = Component("MyComponent")
@MyComponent.init_state
def setUp():
return {"value": 0}
@MyComponent.serve("add")
def add(state, props):
return state["value"] + props["value"]
@MyComponent.serve("multiply")
def multiply(state, props):
return state["value"] * props["value"]
c = MyComponent()
c.run("add", props={"value": 1}, flush_update=True) # Returns 1
c.run("multiply", props={"value": 2}) # Returns 2
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[str, List[str]]
|
String or list of strings that represent the input keyword(s) for the serve flow. |
required |
Returns:
Name | Type | Description |
---|---|---|
Callable |
Callable
|
Decorated serve function. |
update(keys: Union[str, List[str]], discard_policy: DiscardPolicy = DiscardPolicy.NONE, discard_after: Optional[int] = None) -> Any
Decorator for any update operations for flows through the component. Takes in a string or list of strings that represents the flow key. If the decorator is called with a list of strings, each flow key will be mapped to the same update operation.
2 arguments required for a update operation
state
: The current state of the component, represented as a dictionary.props
: The properties of the current flow, which could contain properties that were added to theprops
dictionary in the serve op before this update op. Props are short-lived and die after the flow's update op finishes.
Components can have multiple update ops, and the same key can also have multiple update ops. Update functions should return a dictionary of state updates to be merged with the current state.
See DiscardPolicy
for more info on how to expire update operations if
you expect there to be backpressure for an update operation.
Example Usage:
from motion import Component
MyComponent = Component("MyComponent")
@MyComponent.init_state
def setUp():
return {"value": 0}
@MyComponent.serve("multiply")
def multiply(state, props):
props["something"] = props["value"] + 1
return state["value"] * props["value"]
@MyComponent.update("multiply")
def multiply(state, props):
return {"value": props["something"]}
if __name__ == "__main__":
c = MyComponent()
print(
c.run("multiply", props={"value": 2}, flush_update=True)
) # Returns 0 and state updates to {"value": 3}
print(
c.run("multiply", props={"value": 3}, flush_update=True)
) # Returns 9, update will execute
# to get state["value"] = 4
print(
c.run("multiply", props={"value": 4}, flush_update=True)
) # Returns 4 * 4 = 16
Parameters:
Name | Type | Description | Default |
---|---|---|---|
keys |
Union[str, List[str]]
|
String or list of strings that represent the input keyword(s) for the update flow. |
required |
discard_policy |
DiscardPolicy
|
Policy for expiring update operations. Defaults to DiscardPolicy.NONE. |
NONE
|
discard_after |
Optional[int]
|
Number of updates or seconds after which to expire the update operation. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Callable |
Any
|
Decorated update function. |
init_state(func: Callable) -> Callable
Decorator for the init_state function. This function is called once at the beginning of the component's lifecycle. The decorated function should return a dictionary that represents the initial state of the component.
Usage:
from motion import Component
MyComponent = Component("MyComponent")
@MyComponent.init_state
def setUp():
return {"value": 0}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable
|
Function that initializes a state. Must return a dictionary. |
required |
Returns:
Name | Type | Description |
---|---|---|
Callable |
Callable
|
Decorated init_state function. |
__call__(instance_id: str = '', init_state_params: Dict[str, Any] = {}, logging_level: str = 'WARNING', update_task_type: Literal['thread', 'process'] = 'thread', disable_update_task: bool = False, redis_socket_timeout: int = 60, config_path: str = '.motionrc.yml', flush_on_exit: bool = False) -> ComponentInstance
Creates and returns a new instance of a Motion component.
See ComponentInstance
docs for more info.
Usage:
from motion import Component
MyComponent = Component("MyComponent")
@MyComponent.init_state
def setUp(starting_val):
return {"value": starting_val}
# Define serve and update operations
@MyComponent.serve("key1")
def ...
@MyComponent.update("key1)
def ...
# Creates instance of MyComponent
if __name__ == "__main__":
c_instance = MyComponent(init_state_params={"starting_val": 3})
c_instance.run(..)
c_instance.shutdown()
You can also use component instances as context managers, which is the recommended way to use them (because it will automatically gracefully shut down the component instance when the context manager exits):
...
if __name__ == "__main__":
with MyComponent(init_state_params={"starting_val": 3}) as c_instance:
c_instance.run(..)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
instance_id |
str
|
id of the component instance. Defaults to "" which will generate a random id. |
''
|
init_state_params |
Dict[str, Any]
|
Parameters to pass into the init_state function. Defaults to {}. |
{}
|
logging_level |
str
|
Logging level for the Motion logger. Uses the logging library. Defaults to "WARNING". |
'WARNING'
|
update_task_type |
str
|
Type of update task to use. Can be "thread" or "process". "thread" has lower overhead but is not recommended for CPU-intensive update operations. "process" is recommended for CPU-intensive operations (e.g., fine-tuning a model) but has higher startup overhead. Defaults to "thread". |
'thread'
|
disable_update_task |
bool
|
Whether or not to disable the component instance update ops. Useful for printing out state values without running flows. Defaults to False. |
False
|
redis_socket_timeout |
int
|
Timeout for redis socket connections (seconds). Defaults to 60. This means the redis connection will close if idle for 60 seconds. |
60
|
config_path |
str
|
Path to config file of env vars. Defaults to ".motionrc.yml". |
'.motionrc.yml'
|
flush_on_exit |
bool
|
Whether or not to flush the update queue on exit/shutdown. This is useful when running flows in serverless environments, where you want to make sure all updates are finished after the result is returned, before the function exits. Defaults to False. |
False
|
Returns: ComponentInstance: Component instance to run flows with.
save_state(func: Callable) -> Callable
Decorator for the save_state function. This function saves the state of the component to be accessible in future component instances of the same name.
Usage:
from motion import Component
MyComponent = Component("MyComponent")
@c.save_state
def save(state):
# state might have other unpicklable keys, like a DB connection
return {"fit_count": state["fit_count"]}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable
|
Function that returns a cloudpickleable object. |
required |
Returns:
Name | Type | Description |
---|---|---|
Callable |
Callable
|
Decorated save_state function. |
load_state(func: Callable) -> Callable
Decorator for the load_state function. This function loads the state of the component from the unpickled state.
Usage:
from motion import Component
MyComponent = Component("MyComponent")
@c.load_state
def load(state):
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
return {"cursor": cursor, "fit_count": state["fit_count"]}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable
|
Function that consumes a cloudpickleable object. Should return a dictionary representing the state of the component instance. |
required |
Returns:
Name | Type | Description |
---|---|---|
Callable |
Callable
|
Decorated load_state function. |
DiscardPolicy
Bases: Enum
Defines the policy for discarding items in an update operation's queue. Each component instance has a queue for each update operation. Items in the queue are processed in first-in-first-out (FIFO) order, and items in the queue can delete based on the discard policy set by the developer.
Attributes:
Name | Type | Description |
---|---|---|
NONE |
Indicates no discard policy. Items in the queue do not delete. |
|
NUM_NEW_UPDATES |
Items delete based on the number of new updates. Once the number of new updates exceeds a certain threshold, the oldest items are removed. |
|
SECONDS |
Items delete based on time. Items older than a specified number of seconds at the time of processing are removed. |
Use the discard_after
and discard_policy
arguments in Component.update
decorator to set the discard policy for an update operation.
Example Usage:
from motion import Component, DiscardPolicy
C = Component("C")
@C.init_state
def setup():
return {"default_value": 0, "some_value": 0, "another_value": 0}
@C.update(
"something",
discard_after=10,
discard_policy=DiscardPolicy.NUM_NEW_UPDATES
)
def update_num_new(state, props):
# Do an expensive operation that could take a while
...
return {"some_value": state["some_value"] + props["value"]}
@C.update("something", discard_after=1, discard_policy=DiscardPolicy.SECONDS)
def update_seconds(state, props):
# Do an expensive operation that could take a while
...
return {"another_value": state["another_value"] + props["value"]}
@C.update("something")
def update_default(state, props):
# Do an expensive operation that could take a while
...
return {"default_value": state["default_value"] + props["value"]}
if __name__ == "__main__":
c = C()
# If we do many runs of "something", the update queue will grow
# and the policy will be automatically enforced by Motion.
for i in range(100):
c.run("something", props={"value": str(i)})
# Flush the update queue (i.e., wait for all updates to finish)
c.flush_update("something")
print(c.read_state("default_value")) # (1)!
print(c.read_state("some_value")) # (2)!
print(c.read_state("another_value")) # (3)!
c.shutdown()
-
The default policy is to not delete any items (DiscardPolicy.NONE), so the value of
default_value
will be the sum of all the values passed torun
(i.e.,sum(range(100))
). -
The NUM_NEW_UPDATES policy will delete items in the queue once the number of new updates exceeds a certain threshold. The threshold is set by the
discard_after
argument in theupdate
decorator. So the result will be < 4950 because the NUM_NEW_UPDATES policy will have deleted some items. -
This will be < 4950 because the SECONDS policy will have deleted some items (only whatever updates could have been processed in the second after they were added to the queue).
NONE = 0
class-attribute
instance-attribute
No discard policy. Does not discard items in the queue.
NUM_NEW_UPDATES = 1
class-attribute
instance-attribute
Delete items based on the number of new updates enqueued.
SECONDS = 2
class-attribute
instance-attribute
Delete items based on time (in seconds).