ComponentInstance
ComponentInstance
instance_name: str
property
Component name with a random phrase to represent the name of this instance. In the form of componentname__randomphrase.
run(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> Any
Runs the flow (serve and update ops) for the keyword argument passed in. If the key is not found to have any ops, an error is raised. Only one flow key should be passed in.
Example Usage:
from motion import Component
C = Component("MyComponent")
@C.init_state
def setUp():
return {"value": 0}
@C.serve("add")
def add(state, value):
return state["value"] + value
@C.update("add")
def add(state, value):
return {"value": state["value"] + value}
if __name__ == "__main__":
with C() as c: # Create instance of C
c.run("add", props={"value": 1}, flush_update=True) # (1)!
c.run("add", props={"value": 1}) # Returns 1
c.run("add", props={"value": 2}) # (2)!
c.run("add", props={"value": 3})
c.run("add", props={"value": 3}, force_refresh=True) # (3)!
# 1. Waits for the update op to finish, then updates the state
# 2. Returns 2, result state["value"] = 4
# 3. Force refreshes the state before running the flow by waiting
# for all pending updates to finish processing. This
# reruns the serve op even though the result might be cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_key |
str
|
Key of the flow to run. |
required |
props |
Dict[str, Any]
|
Keyword arguments to pass into the flow ops, in addition to the state. |
{}
|
ignore_cache |
bool
|
If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False. |
False
|
force_refresh |
bool
|
Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used. |
False
|
flush_update |
bool
|
If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False. |
False
|
Raises: ValueError: If more than one flow key-value pair is passed. RuntimeError: If flush_update is called and the component instance update processes are disabled.
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
Result of the serve call. Might take a long time |
Any
|
to run if |
|
Any
|
computationally expensive. |
arun(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> Awaitable[Any]
async
Async version of run. Runs the flow (serve and update ops) for the specified key. You should use arun if either the serve or update op is an async function.
Example Usage:
from motion import Component
import asyncio
C = Component("MyComponent")
@C.serve("sleep")
async def sleep(state, value):
await asyncio.sleep(value)
return "Slept!"
async def main():
with C() as c:
await c.arun("sleep", props={"value": 1})
if __name__ == "__main__":
asyncio.run(main())
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_key |
str
|
Key of the flow to run. |
required |
props |
Dict[str, Any]
|
Keyword arguments to pass into the flow ops, in addition to the state. |
{}
|
ignore_cache |
bool
|
If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False. |
False
|
force_refresh |
bool
|
Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used. |
False
|
flush_update |
bool
|
If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False. |
False
|
Raises:
Type | Description |
---|---|
ValueError
|
If more than one flow key-value pair is passed. If flush_update is called and the component instance update processes are disabled. |
Returns:
Type | Description |
---|---|
Awaitable[Any]
|
Awaitable[Any]: Awaitable Result of the serve call. |
gen(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> Generator[Any, None, None]
Runs the flow (serve and update ops) for the specified key and yields the results as they come in, as a generator. Use this if your serve op is a generator function. If your serve op just returns a value, use run instead. You should use agen as opposed to gen if your serve op is an async generator function.
Example Usage:
from motion import Component
import asyncio
C = Component("MyComponent")
@C.serve("count")
def count(state, value):
for i in range(value):
yield i
def main():
with C() as c:
for elem in c.gen("count", props={"value": 3}): # (1)!
print(elem) # Prints 0, 1, 2
if __name__ == "__main__":
asyncio.run(main())
# 1. Iterates over the generator returned by the "count" serve op
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_key |
str
|
Key of the flow to run. |
required |
props |
Dict[str, Any]
|
Keyword arguments to pass into the flow ops, in addition to the state. |
{}
|
ignore_cache |
bool
|
If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False. |
False
|
force_refresh |
bool
|
Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used. |
False
|
flush_update |
bool
|
If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False. |
False
|
Raises:
Type | Description |
---|---|
ValueError
|
If more than one flow key-value pair is passed. If flush_update is called and the component instance update processes are disabled. |
Returns:
Type | Description |
---|---|
Generator[Any, None, None]
|
Awaitable[Any]: Awaitable Result of the serve call. |
agen(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> AsyncGenerator[Any, None]
async
Async version of gen. Runs the flow (serve and update ops) for the specified key and yields the results as they come in, as a generator. Use this if your serve op is an async generator function. You should use agen as opposed to gen if your serve op is an async function.
Example Usage:
from motion import Component
import asyncio
C = Component("MyComponent")
@C.serve("count")
async def count(state, value):
for i in range(value):
yield i
await asyncio.sleep(i)
async def main():
with C() as c:
async for elem in c.agen("count", props={"value": 3}):
print(elem) # Prints 0, 1, 2
if __name__ == "__main__":
asyncio.run(main())
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_key |
str
|
Key of the flow to run. |
required |
props |
Dict[str, Any]
|
Keyword arguments to pass into the flow ops, in addition to the state. |
{}
|
ignore_cache |
bool
|
If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False. |
False
|
force_refresh |
bool
|
Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used. |
False
|
flush_update |
bool
|
If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False. |
False
|
Raises:
Type | Description |
---|---|
ValueError
|
If more than one flow key-value pair is passed. If flush_update is called and the component instance update processes are disabled. |
Returns:
Type | Description |
---|---|
AsyncGenerator[Any, None]
|
Awaitable[Any]: Awaitable Result of the serve call. |
read_state(key: str, default_value: Optional[Any] = None) -> Any
Gets the current value for the key in the component instance's state.
Usage:
from motion import Component
C = Component("MyComponent")
@C.init_state
def setUp():
return {"value": 0}
# Define serve and update operations
...
if __name__ == "__main__":
with C() as c_instance:
c_instance.read_state("value") # Returns 0
c_instance.run(...)
c_instance.read_state("value") # This will return the current
# value of "value" in the state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
Key in the state to get the value for. |
required |
default_value |
Optional[Any]
|
Default value to return if the key is not found. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
Current value for the key, or default_value if the key |
Any
|
is not found. |
write_state(state_update: Dict[str, Any]) -> None
Writes the state update to the component instance's state. If a update op is currently running, the state update will be applied after the update op is finished. Warning: this could take a while if your update ops take a long time!
Usage:
from motion import Component
C = Component("MyComponent")
@C.init_state
def setUp():
return {"value": 0}
# Define serve and update operations
...
if __name__ == "__main__":
with C() as c_instance:
c_instance.read_state("value") # Returns 0
c_instance.write_state({"value": 1, "value2": 2})
c_instance.read_state("value") # Returns 1
c_instance.read_state("value2") # Returns 2
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state_update |
Dict[str, Any]
|
Dictionary of key-value pairs to update the state with. |
required |
latest |
bool
|
Whether or not to apply the update to the latest version of the state. If true, Motion will redownload the latest version of the state and apply the update to that version. You only need to set this to true if you are updating an instance you connected to a while ago and might be outdated. Defaults to False. |
required |
flush_update(flow_key: str) -> None
Flushes the update queue corresponding to the flow key, if it exists, and updates the instance state. Warning: this is a blocking operation and could take a while if your update op takes a long time!
Example Usage:
from motion import Component
C = Component("MyComponent")
@C.init_state
def setUp():
return {"value": 0}
@C.serve("add")
def add(state, value):
return state["value"] + value
@C.update("add")
def add(state, value):
return {"value": state["value"] + value}
@C.serve("multiply")
def multiply(state, value):
return state["value"] * value
if __name__ == "__main__":
with C() as c: # Create instance of C
c.run("add", props={"value": 1})
c.flush_update("add") # (1)!
c.run("add", props={"value": 2}) # This will use the updated state
# 1. Waits for the update op to finish, then updates the state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_key |
str
|
Key of the flow. |
required |
Raises:
Type | Description |
---|---|
RuntimeError
|
If the component instance was initialized as disable_update_task. |
shutdown(wait_for_logging_threads: bool = False) -> None
Shuts down a Motion component instance, saving state. Automatically called when the instance is garbage collected.
Usage:
from motion import Component
C = Component("MyComponent")
@C.init_state
def setUp():
return {"value": 0}
# Define serve and update operations
if __name__ == "__main__":
c_instance = C()
c_instance.run(...)
c_instance.run(...)
c_instance.shutdown()
# Or, use a context manager
with C() as c_instance:
c_instance.run(...)
c_instance.run(...)
close(wait_for_logging_threads: bool = False) -> None
Alias for shutdown.
Usage:
from motion import Component
C = Component("MyComponent")
@C.init_state
def setUp():
return {"value": 0}
# Define serve and update operations
if __name__ == "__main__":
c_instance = C()
c_instance.run(...)
c_instance.run(...)
c_instance.close()
# Or, use a context manager
with C() as c_instance:
c_instance.run(...)
c_instance.run(...)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wait_for_logging_threads |
bool
|
Defaults to False. |
False
|