Skip to content

ComponentInstance

ComponentInstance

instance_name: str property

Component name with a random phrase to represent the name of this instance. In the form of componentname__randomphrase.

run(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> Any

Runs the flow (serve and update ops) for the keyword argument passed in. If the key is not found to have any ops, an error is raised. Only one flow key should be passed in.

Example Usage:

from motion import Component

C = Component("MyComponent")

@C.init_state
def setUp():
    return {"value": 0}

@C.serve("add")
def add(state, value):
    return state["value"] + value

@C.update("add")
def add(state, value):
    return {"value": state["value"] + value}

if __name__ == "__main__":
    with C() as c: # Create instance of C
        c.run("add", props={"value": 1}, flush_update=True) # (1)!
        c.run("add", props={"value": 1}) # Returns 1
        c.run("add", props={"value": 2}) # (2)!

        c.run("add", props={"value": 3})

        c.run("add", props={"value": 3}, force_refresh=True) # (3)!

# 1. Waits for the update op to finish, then updates the state
# 2. Returns 2, result state["value"] = 4
# 3. Force refreshes the state before running the flow by waiting
#    for all pending updates to finish processing. This
#    reruns the serve op even though the result might be cached.

Parameters:

Name Type Description Default
flow_key str

Key of the flow to run.

required
props Dict[str, Any]

Keyword arguments to pass into the flow ops, in addition to the state.

{}
ignore_cache bool

If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False.

False
force_refresh bool

Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used.

False
flush_update bool

If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False.

False

Raises: ValueError: If more than one flow key-value pair is passed. RuntimeError: If flush_update is called and the component instance update processes are disabled.

Returns:

Name Type Description
Any Any

Result of the serve call. Might take a long time

Any

to run if flush_update = True and the update operation is

Any

computationally expensive.

arun(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> Awaitable[Any] async

Async version of run. Runs the flow (serve and update ops) for the specified key. You should use arun if either the serve or update op is an async function.

Example Usage:

from motion import Component
import asyncio

C = Component("MyComponent")

@C.serve("sleep")
async def sleep(state, value):
    await asyncio.sleep(value)
    return "Slept!"

async def main():
    with C() as c:
        await c.arun("sleep", props={"value": 1})

if __name__ == "__main__":
    asyncio.run(main())

Parameters:

Name Type Description Default
flow_key str

Key of the flow to run.

required
props Dict[str, Any]

Keyword arguments to pass into the flow ops, in addition to the state.

{}
ignore_cache bool

If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False.

False
force_refresh bool

Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used.

False
flush_update bool

If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False.

False

Raises:

Type Description
ValueError

If more than one flow key-value pair is passed. If flush_update is called and the component instance update processes are disabled.

Returns:

Type Description
Awaitable[Any]

Awaitable[Any]: Awaitable Result of the serve call.

gen(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> Generator[Any, None, None]

Runs the flow (serve and update ops) for the specified key and yields the results as they come in, as a generator. Use this if your serve op is a generator function. If your serve op just returns a value, use run instead. You should use agen as opposed to gen if your serve op is an async generator function.

Example Usage:

from motion import Component
import asyncio

C = Component("MyComponent")

@C.serve("count")
def count(state, value):
    for i in range(value):
        yield i

def main():
    with C() as c:
        for elem in c.gen("count", props={"value": 3}): # (1)!
            print(elem) # Prints 0, 1, 2

if __name__ == "__main__":
    asyncio.run(main())

# 1. Iterates over the generator returned by the "count" serve op

Parameters:

Name Type Description Default
flow_key str

Key of the flow to run.

required
props Dict[str, Any]

Keyword arguments to pass into the flow ops, in addition to the state.

{}
ignore_cache bool

If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False.

False
force_refresh bool

Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used.

False
flush_update bool

If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False.

False

Raises:

Type Description
ValueError

If more than one flow key-value pair is passed. If flush_update is called and the component instance update processes are disabled.

Returns:

Type Description
Generator[Any, None, None]

Awaitable[Any]: Awaitable Result of the serve call.

agen(flow_key: str, props: Dict[str, Any] = {}, ignore_cache: bool = False, force_refresh: bool = False, flush_update: bool = False) -> AsyncGenerator[Any, None] async

Async version of gen. Runs the flow (serve and update ops) for the specified key and yields the results as they come in, as a generator. Use this if your serve op is an async generator function. You should use agen as opposed to gen if your serve op is an async function.

Example Usage:

from motion import Component
import asyncio

C = Component("MyComponent")

@C.serve("count")
async def count(state, value):
    for i in range(value):
        yield i
        await asyncio.sleep(i)

async def main():
    with C() as c:
        async for elem in c.agen("count", props={"value": 3}):
            print(elem) # Prints 0, 1, 2

if __name__ == "__main__":
    asyncio.run(main())

Parameters:

Name Type Description Default
flow_key str

Key of the flow to run.

required
props Dict[str, Any]

Keyword arguments to pass into the flow ops, in addition to the state.

{}
ignore_cache bool

If True, ignores the cache and runs the serve op. Does not force refresh the state. Defaults to False.

False
force_refresh bool

Whether to wait for all the pending updates to finish processing, resulting in the most up-to-date state, before running the serve op. Defaults to False, where a stale version of the state or a cached result may be used.

False
flush_update bool

If True, waits for the update op to finish executing before returning. If the update queue hasn't reached batch_size yet, the update op runs anyways. Force refreshes the state after the update op completes. Defaults to False.

False

Raises:

Type Description
ValueError

If more than one flow key-value pair is passed. If flush_update is called and the component instance update processes are disabled.

Returns:

Type Description
AsyncGenerator[Any, None]

Awaitable[Any]: Awaitable Result of the serve call.

read_state(key: str, default_value: Optional[Any] = None) -> Any

Gets the current value for the key in the component instance's state.

Usage:

from motion import Component

C = Component("MyComponent")

@C.init_state
def setUp():
    return {"value": 0}

# Define serve and update operations
...

if __name__ == "__main__":
    with C() as c_instance:
        c_instance.read_state("value") # Returns 0
        c_instance.run(...)
        c_instance.read_state("value") # This will return the current
        # value of "value" in the state

Parameters:

Name Type Description Default
key str

Key in the state to get the value for.

required
default_value Optional[Any]

Default value to return if the key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

Current value for the key, or default_value if the key

Any

is not found.

write_state(state_update: Dict[str, Any]) -> None

Writes the state update to the component instance's state. If a update op is currently running, the state update will be applied after the update op is finished. Warning: this could take a while if your update ops take a long time!

Usage:

from motion import Component

C = Component("MyComponent")

@C.init_state
def setUp():
    return {"value": 0}

# Define serve and update operations
...

if __name__ == "__main__":
    with C() as c_instance:
        c_instance.read_state("value") # Returns 0
        c_instance.write_state({"value": 1, "value2": 2})
        c_instance.read_state("value") # Returns 1
        c_instance.read_state("value2") # Returns 2

Parameters:

Name Type Description Default
state_update Dict[str, Any]

Dictionary of key-value pairs to update the state with.

required
latest bool

Whether or not to apply the update to the latest version of the state. If true, Motion will redownload the latest version of the state and apply the update to that version. You only need to set this to true if you are updating an instance you connected to a while ago and might be outdated. Defaults to False.

required

flush_update(flow_key: str) -> None

Flushes the update queue corresponding to the flow key, if it exists, and updates the instance state. Warning: this is a blocking operation and could take a while if your update op takes a long time!

Example Usage:

from motion import Component

C = Component("MyComponent")

@C.init_state
def setUp():
    return {"value": 0}

@C.serve("add")
def add(state, value):
    return state["value"] + value

@C.update("add")
def add(state, value):
    return {"value": state["value"] + value}

@C.serve("multiply")
def multiply(state, value):
    return state["value"] * value

if __name__ == "__main__":
    with C() as c: # Create instance of C
        c.run("add", props={"value": 1})
        c.flush_update("add") # (1)!
        c.run("add", props={"value": 2}) # This will use the updated state

# 1. Waits for the update op to finish, then updates the state

Parameters:

Name Type Description Default
flow_key str

Key of the flow.

required

Raises:

Type Description
RuntimeError

If the component instance was initialized as disable_update_task.

shutdown(wait_for_logging_threads: bool = False) -> None

Shuts down a Motion component instance, saving state. Automatically called when the instance is garbage collected.

Usage:

from motion import Component

C = Component("MyComponent")

@C.init_state
def setUp():
    return {"value": 0}

# Define serve and update operations

if __name__ == "__main__":
    c_instance = C()
    c_instance.run(...)
    c_instance.run(...)
    c_instance.shutdown()

    # Or, use a context manager
    with C() as c_instance:
        c_instance.run(...)
        c_instance.run(...)

close(wait_for_logging_threads: bool = False) -> None

Alias for shutdown.

Usage:

from motion import Component

C = Component("MyComponent")

@C.init_state
def setUp():
    return {"value": 0}

# Define serve and update operations

if __name__ == "__main__":
    c_instance = C()
    c_instance.run(...)
    c_instance.run(...)
    c_instance.close()

    # Or, use a context manager
    with C() as c_instance:
        c_instance.run(...)
        c_instance.run(...)

Parameters:

Name Type Description Default
wait_for_logging_threads bool

Defaults to False.

False