anki_vector.connection

Management of the connection to and from Vector.

Functions

on_connection_thread([log_messaging, …]) A decorator generator used internally to denote which functions will run on the connection thread.

Classes

CONTROL_PRIORITY_LEVEL Enum used to specify the priority level for the program.
Connection(name, host, cert_file, guid[, …]) Creates and maintains a aiogrpc connection including managing the connection thread.
class anki_vector.connection.CONTROL_PRIORITY_LEVEL

Enum used to specify the priority level for the program.

TOP_PRIORITY_AI = 20

Runs below Mandatory Physical Reactions such as tucking Vector’s head and arms during a fall, yet above Trigger-Word Detection.

class anki_vector.connection.Connection(name, host, cert_file, guid, requires_behavior_control=True)

Creates and maintains a aiogrpc connection including managing the connection thread. The connection thread decouples the actual messaging layer from the user’s main thread, and requires any network requests to be ran using asyncio.run_coroutine_threadsafe() to make them run on the other thread. Connection provides two helper functions for running a function on the connection thread: run_coroutine() and run_soon().

This class may be used to bypass the structures of the python sdk handled by Robot, and instead talk to aiogrpc more directly.

The values for the cert_file location and the guid can be found in your home directory in the sdk_config.ini file.

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
Parameters:
  • name (str) – Vector’s name in the format of “Vector-XXXX”.
  • host (str) – The IP address and port of Vector in the format “XX.XX.XX.XX:443”.
  • cert_file (str) – The location of the certificate file on disk.
  • guid (str) – Your robot’s unique secret key.
  • requires_behavior_control (bool) – True if the connection requires behavior control.
close()

Cleanup the connection, and shutdown all the even handlers.

Usually this should be invoked by the Robot class when it closes.

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
connect(timeout=10.0)

Connect to Vector. This will start the connection thread which handles all messages between Vector and Python.

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
Parameters:timeout (float) – The time allotted to attempt a connection, in seconds.
Return type:None
control_granted_event

This provides an asyncio.Event that a user may wait() upon to detect when Vector has given control of the behavior system to the SDK program.

import anki_vector

async def wait_for_control(conn: anki_vector.connection.Connection):
    await conn.control_granted_event.wait()
    # Run commands that require behavior control
Return type:Event
control_lost_event

This provides an asyncio.Event that a user may wait() upon to detect when Vector has taken control of the behavior system at a higher priority.

import anki_vector

async def auto_reconnect(conn: anki_vector.connection.Connection):
    await conn.control_lost_event.wait()
    conn.request_control()
Return type:Event
grpc_interface

A direct reference to the connected aiogrpc interface.

This may be used to directly call grpc messages bypassing anki_vector.Robot

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
Return type:ExternalInterfaceStub
loop

A direct reference to the loop on the connection thread. Can be used to run functions in on thread.

import anki_vector
import asyncio

async def connection_function():
    print("I'm running in the connection thread event loop.")

with anki_vector.Robot() as robot:
    asyncio.run_coroutine_threadsafe(connection_function(), robot.conn.loop)
Return type:BaseEventLoop
Returns:The loop running inside the connection thread
release_control(timeout=10.0)

Explicitly request control. Typically used after detecting control_lost_event().

To be able to directly control Vector’s motors, override his screen, play an animation, etc., the Connection will need behavior control. This function will acquire control of Vector’s behavior system. This will raise a VectorControlTimeoutException if it fails to receive a control_lost event before the timeout.

import anki_vector

async def wait_for_control(conn: anki_vector.connection.Connection):
    await conn.control_granted_event.wait()
    # Run commands that require behavior control
    conn.release_control()
Parameters:timeout (float) – The time allotted to attempt to release control, in seconds.
request_control(timeout=10.0)

Explicitly request behavior control. Typically used after detecting control_lost_event().

To be able to directly control Vector’s motors, override his screen, play an animation, etc., the Connection will need behavior control. This function will acquire control of Vector’s behavior system. This will raise a VectorControlTimeoutException if it fails to gain control before the timeout.

For more information about behavior control, see behavior

import anki_vector

async def auto_reconnect(conn: anki_vector.connection.Connection):
    await conn.control_lost_event.wait()
    conn.request_control(timeout=5.0)
Parameters:timeout (float) – The time allotted to attempt a connection, in seconds.
requires_behavior_control

True if the Connection requires behavior control.

To be able to directly control Vector’s motors, override his screen, play an animation, etc., the Connection will need behavior control. This boolean signifies that the Connection will try to maintain control of Vector’s behavior system even after losing control to higher priority robot behaviors such as returning home to charge a low battery.

For more information about behavior control, see behavior.

import time

import anki_vector

with anki_vector.Robot(requires_behavior_control=False) as robot:
    async def callback(event_type, event):
        await robot.conn.request_control()
        print(robot.conn.requires_behavior_control) # Will print True
        await robot.anim.play_animation('anim_pounce_success_02')
        await robot.conn.release_control()

    print(robot.conn.requires_behavior_control) # Will print False
    robot.events.subscribe(callback, anki_vector.events.Events.robot_observed_face)

    # Waits 10 seconds. Show Vector your face.
    time.sleep(10)
Return type:bool
run_coroutine(coro)

Runs a given awaitable on the connection thread’s event loop. Cannot be called from within the connection thread.

import anki_vector

async def my_coroutine():
    print("Running on the connection thread")
    return "Finished"

with anki_vector.Robot() as robot:
    result = robot.conn.run_coroutine(my_coroutine())
Parameters:coro (Awaitable[+T_co]) – The coroutine, task or any other awaitable which should be executed.
Return type:Any
Returns:The result of the awaitable’s execution.
run_soon(coro)

Schedules the given awaitable to run on the event loop for the connection thread.

import anki_vector
import time

async def my_coroutine():
    print("Running on the connection thread")

with anki_vector.Robot() as robot:
    robot.conn.run_soon(my_coroutine())
    time.sleep(1)
Parameters:coro (Awaitable[+T_co]) – The coroutine, task or any awaitable to schedule for execution on the connection thread.
Return type:None
thread

A direct reference to the connection thread. Available to callers to determine if the current thread is the connection thread.

import anki_vector
import threading

with anki_vector.Robot() as robot:
    if threading.current_thread() is robot.conn.thread:
        print("This code is running on the connection thread")
    else:
        print("This code is not running on the connection thread")
Return type:Thread
Returns:The connection thread where all of the grpc messages are being processed.
anki_vector.connection.on_connection_thread(log_messaging=True, requires_control=True)

A decorator generator used internally to denote which functions will run on the connection thread. This unblocks the caller of the wrapped function and allows them to continue running while the messages are being processed.

import anki_vector

class MyComponent(anki_vector.util.Component):
    @connection._on_connection_thread()
    async def on_connection_thread(self):
        # Do work on the connection thread
Parameters:
  • log_messaging (bool) – True if the log output should include the entire message or just the size. Recommended for large binary return values.
  • requires_control (bool) – True if the function should wait until behavior control is granted before executing.
Return type:

Callable[[Coroutine[Component, Any, None]], Any]

Returns:

A decorator which has 3 possible returns based on context: the result of the decorated function, the concurrent.futures.Future which points to the decorated function, or the asyncio.Future which points to the decorated function. These contexts are: when the robot is a Robot, when the robot is an AsyncRobot, and when called from the connection thread respectively.