Cozmo人工智能机器人SDK使用笔记(8)-应用部分apps

Cozmo应用部分有7个项目示例。


1. 3d viewer

3d Viewer示例并可以远程遥控。

这是一个如何将3D查看器与程序配合使用的示例,3D查看器和控件将自动运行。

注意asyncio!

import asyncio

import cozmo


async def cozmo_program(robot: cozmo.robot.Robot):
    while True:
        await asyncio.sleep(1)


cozmo.robot.Robot.drive_off_charger_on_connect = False
cozmo.run_program(cozmo_program, use_3d_viewer=True, use_viewer=True)

2. cli

Cozmo的命令行界面

这是将Cozmo与基于ipython的命令行界面集成的示例。

注意sys!

import sys

try:
    from IPython.terminal.embed import InteractiveShellEmbed
    from IPython.terminal.prompts import Prompts, Token
except ImportError:
    sys.exit('Cannot import from ipython: Do `pip3 install ipython` to install')

import cozmo

usage = ('This is an IPython interactive shell for Cozmo.\n'
         'All commands are executed within cozmo\'s running program loop.\n'
         'Use the [tab] key to auto-complete commands, and see all available methods.\n'
         'All IPython commands work as usual. See below for some useful syntax:\n'
         '  ?         -> Introduction and overview of IPython\'s features.\n'
         '  object?   -> Details about \'object\'.\n'
         '  object??  -> More detailed, verbose information about \'object\'.')

# Creating IPython's history database on the main thread
ipyshell = InteractiveShellEmbed(banner1='\nWelcome to the Cozmo Shell',
                                 exit_msg='Goodbye\n')

def cozmo_program(robot: cozmo.robot.Robot):
    '''Invoke the ipython shell while connected to cozmo'''
    default_log_level = cozmo.logger.level
    cozmo.logger.setLevel('WARN')
    ipyshell(usage)
    cozmo.logger.setLevel(default_log_level)

cozmo.run_program(cozmo_program, use_3d_viewer=True, use_viewer=True)

3. color finder

Cozmo环顾四周,跟踪指定颜色。

在Cozmo附近放一个网球,看看是否可以跟踪!

程序启动时,Cozmo会四处寻找黄色。 点击立方体亮黄色可将Cozmo的目标颜色切换为黄色,蓝色,红色和绿色等。 点击闪烁的白色立方体,让观看者显示Cozmo的像素化摄像机视图。

import asyncio
import functools
import math
import numpy
import sys

import cozmo

from cozmo.util import degrees, distance_mm, radians, speed_mmps, Vector2
from cozmo.lights import Color, Light
try:
    from PIL import Image, ImageColor, ImageDraw, ImageStat
except ImportError:
    sys.exit('Cannot import from PIL: Do `pip3 install --user Pillow` to install')


# Set ENABLE_COLOR_BALANCING to False to skip the color_balance step
ENABLE_COLOR_BALANCING = True


# map_color_to_light (dict): maps each color name with its cozmo.lights.Light value.
# Red, green, and blue lights are already defined as constants in lights.py, 
# but we need to define our own custom Light for yellow.
map_color_to_light = {
'green' : cozmo.lights.green_light, 
'yellow' : Light(Color(name='yellow', rgb = (255, 255, 0))), 
'blue' : cozmo.lights.blue_light, 
'red' : cozmo.lights.red_light
}

# hsv_color_ranges (dict): map of color names to regions in HSV space.
# Instead of defining a color as a single (H, S, V) point, 
# colors are defined with a minimum and maximum value for H, S, and V.
# For example, a point with (H, S, V) = (200.0, 0.8, 0.95) falls entirely in the 'blue' region, 
# because 180.0 < 200.0 < 245.0, 0 < 0.8 < 1.0, and 0 < 0.95 < 1.0.
# A point with (H, S, V) = (88.0, 0.4, 0.9) does not fall exactly in one color region.
# But applying hsv_color_distance_sqr between this color and all the colors in hsv_color_ranges
# will show that (88.0, 0.4, 0.9) is closest to the 'green' region.

hsv_color_ranges = {
'red' : (-20.0, 20.0, 0.5, 1.0, 0.5, 1.0), 
'green' : (90.0, 155.0, 0.5, 1.0, 0.5, 1.0), 
'blue' : (180.0, 245.0, 0.5, 1.0, 0.5, 1.0), 
'yellow' : (40.0, 80.0, 0.5, 1.0, 0.5, 1.0), 
'white' : (0.0, 360.0, 0.0, 0.2, 0.9, 1.0), 
'black' : (0.0, 360.0, 0.0, 0.1, 0.0, 0.2)
}

def hsv_color_distance_sqr(color, color_range):
    '''Determines the squared euclidean distance between color and color_range.

    Note that we normalize h, minH, and maxH so that they also fall between 0 and 1, instead of 0 and 360.

    Args:
        color (float, float, float): the H, S, V values of the color
        color_range(float, float, float, float, float, float): the minimum and maximum for H, S, and V for the color range

    Returns:
        squared distance between color and color_range, 
        which is the sum of the squared distances from 
        the H, S, V values to their respective ranges
    '''
    h, s, v = color
    minH, maxH, minS, maxS, minV, maxV = color_range
    h_dist_sqr = 0
    s_dist_sqr = 0
    v_dist_sqr = 0
    if h < minH:
        h_dist_sqr = (minH - h) ** 2
    elif h > maxH:
        h_dist_sqr = (maxH - h) ** 2
    if s < minS:
        s_dist_sqr = (minS - s) ** 2
    elif s > maxS:
        s_dist_sqr = (maxS - s) ** 2
    if v < minV:
        v_dist_sqr = (minV - v) ** 2
    elif v > maxV:
        v_dist_sqr = (maxV - v) ** 2
    sum_dist_sqr = h_dist_sqr + s_dist_sqr + v_dist_sqr
    return sum_dist_sqr

def color_balance(image):
    '''Adjusts the color data of an image so that the average R, G, B values across the entire image end up equal.

    This is called a 'gray-world' algorithm, because the colors
    with equal R, G, B values fall along the grayscale.
    https://web.stanford.edu/~sujason/ColorBalancing/grayworld.html

    Args:
        image (PIL image): the image being color-balanced

    Returns
        the PIL image with balanced color distribution
    '''
    image_array = image_to_array(image)
    image_array = image_array.transpose(2, 0, 1).astype(numpy.uint32)
    average_g = numpy.average(image_array[1])
    image_array[0] = numpy.minimum(image_array[0] * (average_g / numpy.average(image_array[0])), 255)
    image_array[2] = numpy.minimum(image_array[2] * (average_g / numpy.average(image_array[2])), 255)
    return array_to_image(image_array.transpose(1, 2, 0).astype(numpy.uint8))

def image_to_array(image):
    '''Converts PIL image to image array.'''
    image_array = numpy.asarray(image)
    image_array.flags.writeable = True
    return image_array

def array_to_image(image_array):
    '''Coverts image array to PIL image.'''
    return Image.fromarray(numpy.uint8(image_array))

def rgb_to_hsv(r, g, b):
    '''Converts an RGB value to its corresponding HSV value.

    Args:
        r (int): the amount of red in the color, between 0 and 255
        g (int): the amount of green in the color, between 0 and 255
        b (int): the amount of blue in the color, between 0 and 255

    Returns:
        tuple of floats (h, s, v) representing the HSV value of the color
        h represents an angle, between 0 and 360 degrees
        s represents the saturation, between 0 and 1
        v represents the brightness, between 0 and 1
    '''
    r_normalized = r / 255.0
    g_normalized = g / 255.0
    b_normalized = b / 255.0
    max_normalized_val = max(r_normalized, g_normalized, b_normalized)
    min_normalized_val = min(r_normalized, g_normalized, b_normalized)
    delta = max_normalized_val - min_normalized_val

    h = 0
    s = 0
    v = max_normalized_val

    if delta != 0:
        if max_normalized_val == r_normalized:
            h = 60.0 * ((g_normalized - b_normalized) / delta)
        elif max_normalized_val == g_normalized:
            h = 60.0 * (((b_normalized - r_normalized) / delta) + 2)
        else:
            h = 60.0 * (((r_normalized - g_normalized) / delta) + 4)
        if h < 0:
            h += 360

        if max_normalized_val == 0:
            s = 0
        else:
            s = delta / max_normalized_val
    return (h, s, v)

POSSIBLE_COLORS_TO_FIND = ['green', 'yellow', 'blue', 'red']

LOOK_AROUND_STATE = 'look_around'
FOUND_COLOR_STATE = 'found_color'
DRIVING_STATE = 'driving'

ANNOTATOR_WIDTH = 640.0
ANNOTATOR_HEIGHT = 480.0

DOWNSIZE_WIDTH = 32
DOWNSIZE_HEIGHT = 24


class ColorFinder(cozmo.annotate.Annotator):
    '''Cozmo looks around and drives after colors.

    Cozmo's camera view is approximated into a matrix of colors.
    Cozmo will look around for self.color_to_find, and once it is spotted, 
    he will drive in the direction of that color.

    Args:
        robot (cozmo.robot.Robot): instance of the robot connected from run_program.
    '''
    def __init__(self, robot: cozmo.robot.Robot):
        self.robot = robot
        self.robot.camera.image_stream_enabled = True
        self.robot.camera.color_image_enabled = True
        self.fov_x  = self.robot.camera.config.fov_x
        self.fov_y = self.robot.camera.config.fov_y
        self.robot.add_event_handler(cozmo.objects.EvtObjectTapped, self.on_cube_tap)
        self.robot.add_event_handler(cozmo.world.EvtNewCameraImage, self.on_new_camera_image)

        self.color_selector_cube = None # type: LightCube
        self.color_to_find = 'yellow'
        self.color_to_find_index = POSSIBLE_COLORS_TO_FIND.index(self.color_to_find)

        self.grid_cube = None # type: LightCube
        self.robot.world.image_annotator.add_annotator('color_finder', self)
        self.robot.world.image_annotator.annotation_enabled = False
        self.enabled = True
        self.pixel_matrix = MyMatrix(DOWNSIZE_WIDTH, DOWNSIZE_HEIGHT)

        self.amount_turned_recently = radians(0)
        self.moving_threshold = radians(12)

        self.state = LOOK_AROUND_STATE

        self.look_around_behavior = None # type: LookAroundInPlace behavior
        self.drive_action = None # type: DriveStraight action
        self.tilt_head_action = None # type: SetHeadAngle action
        self.rotate_action = None # type: TurnInPlace action
        self.lift_action = None # type: SetLiftHeight action

        self.last_known_blob_center = (DOWNSIZE_WIDTH/2, DOWNSIZE_HEIGHT/2) # initially set to center screen

        self.white_balance_cube = None # type: LightCube
        self.adjustment = None

    def apply(self, image, scale):
        '''Draws a pixelated grid of Cozmo's approximate camera view onto the viewer window.
        Also draws a marker showing the center of the largest blob that matches the color_to_find
            
        WM and HM are multipliers that scale the dimensions of the annotated squares 
        based on DOWNSIZE_WIDTH and DOWNSIZE_HEIGHT
        '''
        d = ImageDraw.Draw(image)
        WM = ANNOTATOR_WIDTH/DOWNSIZE_WIDTH
        HM = ANNOTATOR_HEIGHT/DOWNSIZE_HEIGHT

        for i in range(DOWNSIZE_WIDTH):
            for j in range(DOWNSIZE_HEIGHT):
                pt1 = Vector2(i * WM, j * HM)
                pt2 = Vector2(i * WM, (j + 1) * HM)
                pt3 = Vector2((i + 1) * WM, (j + 1) * HM)
                pt4 = Vector2((i + 1) * WM, j * HM)
                points_seq = (pt1, pt2, pt3, pt4)
                cozmo.annotate.add_polygon_to_image(image, points_seq, 1.0, 'green', self.pixel_matrix.at(i, j).value)

        text = cozmo.annotate.ImageText('Looking for {}'.format(self.color_to_find), color = 'white')
        text.render(d, (0, 0, image.width, image.height))

        if self.state != LOOK_AROUND_STATE:
            x, y = self.last_known_blob_center
            pt1 = Vector2((x + 0.5) * WM, (y + 0.5) * HM)
            pt2 = Vector2((x + 1.5) * WM, (y + 0.5) * HM)
            pt3 = Vector2((x + 1.5) * WM, (y + 1.5) * HM)
            pt4 = Vector2((x + 0.5) * WM, (y + 1.5) * HM)
            points_seq = (pt1, pt2, pt3, pt4)
            cozmo.annotate.add_polygon_to_image(image, points_seq, 1.0, 'black', 'gold')

    def on_cube_tap(self, evt, obj, **kwargs):
        '''The blinking white cube switches the viewer between normal mode and pixel mode.
        The other illuminated cube toggles self.color_to_find.       
        '''    
        if obj.object_id == self.color_selector_cube.object_id:
            self.toggle_color_to_find()
        elif obj.object_id == self.grid_cube.object_id:
            self.robot.world.image_annotator.annotation_enabled = not self.robot.world.image_annotator.annotation_enabled
        elif obj.object_id == self.white_balance_cube.object_id:
            self.white_balance()

    def toggle_color_to_find(self):
        '''Sets self.color_to_find to the next color in POSSIBLE_COLORS_TO_FIND.'''    
        self.color_to_find_index += 1
        if self.color_to_find_index == len(POSSIBLE_COLORS_TO_FIND):
            self.color_to_find_index = 0
        self.color_to_find = POSSIBLE_COLORS_TO_FIND[self.color_to_find_index]
        self.color_selector_cube.set_lights(map_color_to_light[self.color_to_find])

    def on_new_camera_image(self, evt, **kwargs):
        '''Processes the blobs in Cozmo's view, and determines the correct reaction.'''
        downsized_image = self.get_low_res_view()
        if ENABLE_COLOR_BALANCING:
            downsized_image = color_balance(downsized_image)
        self.update_pixel_matrix(downsized_image)
        blob_detector = BlobDetector(self.pixel_matrix, self.color_to_find)
        blob_center = blob_detector.get_blob_center()
        if blob_center:
            self.last_known_blob_center = blob_center
            blob_size = blob_detector.get_blob_size()
            if self.state == LOOK_AROUND_STATE:
                self.state = FOUND_COLOR_STATE
                if self.look_around_behavior:
                    self.look_around_behavior.stop()
                    self.look_around_behavior = None
            self.on_finding_a_blob(blob_center, blob_size)
        else:
            self.robot.set_backpack_lights_off()
            self.abort_actions(self.drive_action)
            self.state = LOOK_AROUND_STATE

    def white_balance(self):
        image = self.robot.world.latest_image.raw_image
        self.adjustment = ImageStat.Stat(image).mean

    def update_pixel_matrix(self, downsized_image):
        '''Updates self.pixel_matrix with the colors from the current camera view.

        Args:
            downsized_image (PIL image): the low-resolution version of self.robot.world.latest_image
        '''
        for i in range(self.pixel_matrix.num_cols):
            for j in range(self.pixel_matrix.num_rows):
                r, g, b = downsized_image.getpixel((i, j))
                self.pixel_matrix.at(i, j).set(self.approximate_color_of_pixel(r, g, b))
        self.pixel_matrix.fill_gaps()

    def approximate_color_of_pixel(self, r, g, b):
        '''Returns the approximated color of the RGB value of a pixel.

        Args:
            r (int): the amount of red in the pixel
            g (int): the amount of green in the pixel
            b (int): the amount of blue in the pixel

        Returns:
            string specifying the name of the color range closest to the input color
        '''
        min_distance = sys.maxsize
        closest_color = ''
        h, s, v = rgb_to_hsv(r, g, b)
        if h > 340.0:
            h -= 360.0
        for color_name, color_range in hsv_color_ranges.items():
            d = hsv_color_distance_sqr((h, s, v), color_range)
            if d < min_distance:
                min_distance = d
                closest_color = color_name
        return closest_color

    def get_low_res_view(self):
        '''Downsizes Cozmo's camera view to the specified dimensions.

        Returns:
            PIL image downsized to low-resolution version of Cozmo's camera view.
        '''
        image = self.robot.world.latest_image.raw_image
        downsized_image = image.resize((DOWNSIZE_WIDTH, DOWNSIZE_HEIGHT), resample = Image.LANCZOS)
        return downsized_image

    def on_finding_a_blob(self, blob_center, blob_size):
        '''Determines whether Cozmo should continue to look at the blob, or drive towards it.
            
        Args:
            blob_center (int, int): coordinates of the blob's center in self.pixel_matrix
            blob_size (int): number of pixels in the blob
        '''
        self.robot.set_center_backpack_lights(map_color_to_light[self.color_to_find])
        if blob_size > (self.pixel_matrix.size/4):
            self.lift_action = self.robot.set_lift_height(0.0, in_parallel=True)
        x, y = blob_center
        # 'fov' stands for 'field of view'. This is the angle amount
        # that Cozmo can see to the edges of his camera view.
        amount_to_move_head = radians(self.fov_y.radians*(.5-float(y)/DOWNSIZE_HEIGHT))
        amount_to_rotate = radians(self.fov_x.radians*(.5-float(x)/DOWNSIZE_WIDTH))
        if self.moved_too_far_from_center(amount_to_move_head, amount_to_rotate):
            self.state = FOUND_COLOR_STATE
        if self.state != DRIVING_STATE:
            self.turn_toward_blob(amount_to_move_head, amount_to_rotate)
        else:
            self.drive_toward_color_blob()

    def moved_too_far_from_center(self, amount_to_move_head, amount_to_rotate):
        '''Decides whether the center of the blob is too far from the center of Cozmo's view.

        Args:
            amount_to_move_head (cozmo.util.Angle): 
                the perceived vertical distance of the blob from center-screen
            amount_to_rotate (cozmo.util.Angle): 
                the perceived horizontal distance of the blob from center-screen

        Returns:
            bool specifying whether the object is too far from center-screen
        '''
        too_far_vertical = (amount_to_move_head.abs_value > self.fov_y/4)
        too_far_horizontal = (amount_to_rotate.abs_value > self.fov_x/4)
        too_far = too_far_vertical or too_far_horizontal
        return too_far

    def turn_toward_blob(self, amount_to_move_head, amount_to_rotate):
        '''Calls actions that tilt Cozmo's head and rotate his body toward the color.

        Args:
           amount_to_move_head (cozmo.util.Angle): 
               the perceived vertical distance of the blob from center-screen
           amount_to_rotate (cozmo.util.Angle): 
               the perceived horizontal distance of the blob from center-screen
        '''
        self.abort_actions(self.tilt_head_action, self.rotate_action, self.drive_action)
        new_head_angle = self.robot.head_angle + amount_to_move_head
        self.tilt_head_action = self.robot.set_head_angle(new_head_angle, warn_on_clamp=False, in_parallel=True)
        self.rotate_action = self.robot.turn_in_place(amount_to_rotate, in_parallel=True)
        if self.state == FOUND_COLOR_STATE:
            self.amount_turned_recently += amount_to_move_head.abs_value + amount_to_rotate.abs_value

    def drive_toward_color_blob(self):
        '''Drives straight once prior actions have been cancelled.'''
        self.abort_actions(self.tilt_head_action, self.rotate_action)
        if self.should_start_new_action(self.drive_action):
            self.drive_action = self.robot.drive_straight(distance_mm(500), speed_mmps(300), should_play_anim=False, in_parallel=True)
        if self.should_start_new_action(self.lift_action):
            self.lift_action = self.robot.set_lift_height(1.0, in_parallel=True)

    def turn_toward_last_known_blob(self):
        '''Turns toward the coordinates of the last recorded blob in memory.

        amount_to_rotate is multiplied to overshoot the object rather than undershoot it.
        '''
        x, y = self.last_known_blob_center
        amount_to_move_head = radians(self.fov_y.radians*(.5-y/DOWNSIZE_HEIGHT))
        amount_to_rotate = radians(self.fov_x.radians*(.5-x/DOWNSIZE_WIDTH)) * 4
        self.turn_toward_blob(amount_to_move_head, amount_to_rotate)

    def abort_actions(self, *actions):
        '''Aborts the input actions if they are currently running.

        Args:
            *actions (list): the list of actions
        '''
        for action in actions:
            if action != None and action.is_running:
                action.abort()

    def should_start_new_action(self, action):
        ''' Whether the action should be started.

        Args:
            action (action): the action that should or should not be started

        Returns:
            bool specifying whether the action is not running or is currently None
        '''
        should_start = ((action == None) or (not action.is_running))
        return should_start

    async def start_lookaround(self):
        '''Turns to a likely spot for a blob to be, then starts self.look_around_behavior.'''
        if self.look_around_behavior == None or not self.look_around_behavior.is_active:
            self.turn_toward_last_known_blob()
            await asyncio.sleep(.5)
            if self.state == LOOK_AROUND_STATE: # state may have changed due to turn_toward_last_known_blob
                self.abort_actions(self.tilt_head_action, self.rotate_action, self.drive_action)
                self.look_around_behavior = self.robot.start_behavior(cozmo.behavior.BehaviorTypes.LookAroundInPlace)

    def turn_on_cubes(self):
        '''Illuminates the two cubes that control self.color_to_find and set the viewer display.'''
        self.color_selector_cube.set_lights(map_color_to_light[self.color_to_find])
        self.grid_cube.set_lights(cozmo.lights.white_light.flash())

    def cubes_connected(self):
        '''Returns true if Cozmo connects to both cubes successfully.'''   
        self.color_selector_cube = self.robot.world.get_light_cube(cozmo.objects.LightCube1Id)
        self.grid_cube = self.robot.world.get_light_cube(cozmo.objects.LightCube2Id)
        self.white_balance_cube = self.robot.world.get_light_cube(cozmo.objects.LightCube3Id)
        return not (self.color_selector_cube == None or self.grid_cube == None or self.white_balance_cube == None)

    async def run(self):
        '''Program runs until typing CRTL+C into Terminal/Command Prompt, 
        or by closing the viewer window.
        '''    
        if not self.cubes_connected():
            print('Cubes did not connect successfully - check that they are nearby. You may need to replace the batteries.')
            return
        self.turn_on_cubes()
        await self.robot.drive_straight(distance_mm(100), speed_mmps(50), should_play_anim = False).wait_for_completed()

        # Updates self.state and resets self.amount_turned_recently every 1 second.
        while True:
            await asyncio.sleep(1)
            if self.state == LOOK_AROUND_STATE:
                await self.start_lookaround()
            if self.state == FOUND_COLOR_STATE and self.amount_turned_recently < self.moving_threshold:
                self.state = DRIVING_STATE
            self.amount_turned_recently = radians(0)


class BlobDetector():
    '''Determine where the regions of the specified color reside in a matrix.

    We use this class to find the areas of color_to_find in the pixel_matrix of the ColorFinder class.
    
    Args:
        matrix (int[][]) : the pixel_matrix from ColorFinder
        keylist (list of strings): the list of possible_colors_to_find from ColorFinder
        color_to_find (string): the color of the blobs Cozmo is looking for
    '''
    def __init__(self, matrix, color_to_find):
        self.matrix = matrix
        self.color_to_find = color_to_find

        self.num_blobs = 1
        self.blobs_dict = {}
        self.keys = MyMatrix(self.matrix.num_cols, self.matrix.num_rows)
        self.make_blobs_dict()
        self.filter_blobs_dict_by_size(5) # prevents a lot of irrelevant blobs from being processed
        self.largest_blob_size = 0

    def make_blobs_dict(self):
        '''Using a connected components algorithm, constructs a dictionary 
        that maps a blob to the points of the matrix that make up that blob.

        Only creates a blob if the point's color matches self.color_to_find.

        Key and Value types of the dictionary:
            Key : int specifying self.num_blobs at the time the blob was first created.
            Value : the list of points in the blob.
        '''
        for i in range(self.matrix.num_cols):
            for j in range(self.matrix.num_rows):
                if self.matrix.at(i, j).value == self.color_to_find:
                    matches_left = self.matches_blob_left(i, j)
                    matches_above = self.matches_blob_above(i, j)
                    should_merge = matches_left and matches_above and self.above_and_left_blobs_are_different(i, j)
                    if should_merge:
                        self.merge_up_and_left_blobs(i, j)
                    elif matches_left:
                        self.join_blob_left(i, j)
                    elif matches_above:
                        self.join_blob_above(i, j)
                    else:
                        self.make_new_blob_at(i, j)

    def matches_blob_above(self, i, j):
        '''Returns true if the current point matches the point above.

        Args:
            i (int): the x-coordinate in self.matrix
            j (int): the y-coordinate in self.matrix

        Returns:
            bool specifying whether the current point matches the point above.
        '''
        if j == 0:
            return False
        matches_above = (self.matrix.at(i, j-1).value == self.color_to_find)
        return matches_above

    def matches_blob_left(self, i, j):
        '''Returns true if the current point matches the point to the left.

        Args:
            i (int): the x-coordinate in self.matrix
            j (int): the y-coordinate in self.matrix

        Returns:
            bool specifying whether the current point matches the point to the left.
        '''
        if i == 0:
            return False
        matches_left  = (self.matrix.at(i-1, j).value == self.color_to_find)
        return matches_left

    def above_and_left_blobs_are_different(self, i, j):
        '''Returns true if the point above and the point to the left belong to different blobs.

        Args:
            i (int): the x-coordinate in self.matrix
            j (int): the y-coordinate in self.matrix

        Returns:
            bool specifying whether the above blob and the left blob have different keys in self.keys
        '''
        if i == 0 or j == 0:
            return False
        above_and_left_different = (self.keys.at(i-1, j).value != self.keys.at(i, j-1).value)
        return above_and_left_different

    def make_new_blob_at(self, i, j):
        '''Adds a new blob to self.blob_dict 
        whose list of points initially contains only the current point.

        Args:
            i (int): the x-coordinate in self.matrix
            j (int): the y-coordinate in self.matrix
        '''
        self.blobs_dict[self.num_blobs] = [(i, j)]
        self.keys.at(i, j).set(self.num_blobs)
        self.num_blobs += 1

    def join_blob_above(self, i, j):
        '''Adds current point to the blob above.

        Args:
            i (int): the x-coordinate in self.matrix
            j (int): the y-coordinate in self.matrix
        '''
        above_blob_key = self.keys.at(i, j-1).value
        self.blobs_dict[above_blob_key].append((i, j))
        self.keys.at(i, j).set(above_blob_key)

    def join_blob_left(self, i, j):
        '''Adds current point to the blob to the left.

        Args:
            i (int): the x-coordinate in self.matrix
            j (int): the y-coordinate in self.matrix
        '''
        left_blob_key = self.keys.at(i-1, j).value
        self.blobs_dict[left_blob_key].append((i, j))
        self.keys.at(i, j).set(left_blob_key)

    def merge_up_and_left_blobs(self, i, j):
        '''Adds current point and points from the above blob into left blob, 
        then removes the above blob from self.blob_dict

        Args:
            i (int): the x-coordinate in self.matrix
            j (int): the y-coordinate in self.matrix
        '''
        above_blob_key = self.keys.at(i, j-1).value
        left_blob_key = self.keys.at(i-1, j).value
        above_blob_points = self.blobs_dict[above_blob_key]
        left_blob_points = self.blobs_dict[left_blob_key]
        for point in above_blob_points:
            self.blobs_dict[left_blob_key].append(point)
        self.blobs_dict[left_blob_key].append((i, j))
        self.keys.at(i, j).set(left_blob_key)
        for (x, y) in above_blob_points:
            self.keys.at(x, y).set(left_blob_key)
        self.blobs_dict.pop(above_blob_key)

    def filter_blobs_dict_by_size(self, n):
        '''Filters out small blobs from self.blobs_dict.

        Args:
            n (int): the number of points required of a blob to stay in self.blobs_dict
        '''
        self.blobs_dict = dict((blob, list_of_points) for blob, list_of_points in self.blobs_dict.items() if len(list_of_points) >= n)

    def get_largest_blob_key(self):
        '''Finds the key of the largest blob.

        Returns:
            int specifying the key of the largest blob with that color, or None if no such blob exists
        '''
        largest_blob_key = None
        values = self.blobs_dict.values()
        if len(values) > 0:
            longest_points_list = functools.reduce(lambda largest, current: largest if (largest > current) else current, values)
            sample_x, sample_y = longest_points_list[0]
            largest_blob_key = self.keys.at(sample_x, sample_y).value
            self.largest_blob_size = len(self.blobs_dict[largest_blob_key])
        return largest_blob_key

    def get_blob_center(self):
        '''Approximates the coordinates of the center of the largest blob.

        Returns:
            (int, int) specifying the center of the largest blob, 
            or None if self.get_largest_blob_key() returns None
        '''
        blob_center = None
        largest_blob_key = self.get_largest_blob_key()
        if largest_blob_key:
            xs = []
            ys = []
            for (x, y) in self.blobs_dict[largest_blob_key]:
                xs.append(x)
                ys.append(y)
            average_x = float(sum(xs))/len(xs)
            average_y = float(sum(ys))/len(ys)
            blob_center = (int(average_x), int(average_y))
        return blob_center

    def get_blob_size(self):
        '''Gets the number of pixels in the largest blob.

        Returns:
            int: The size, in pixels, of the largest blob
        '''
        return self.largest_blob_size


class MyMatrix():
    '''A custom class to get dimensions, values, and neighboring values of the pixel_matrix.

    Args:
        num_cols (int): the number of columns in the matrix, specified in ColorFinder as downsize_width
        num_rows (int): the number of rows in the matrix, specified in ColorFinder as downsize_height
    '''
    def __init__(self, num_cols, num_rows):
        self.num_cols = num_cols
        self.num_rows = num_rows
        self._matrix = [[MatrixValueContainer() for _ in range(self.num_rows)] for _ in range(self.num_cols)]
        self.size = self.num_cols * self.num_rows

    def at(self, i, j):
        '''Gets the desired MatrixValueContainer object.

        Args:
            i (int): the x-coordinate in self
            j (int): the y-coordinate in self

        Returns:
            the MatrixValueContainer at the specified coordinates
        '''
        return self._matrix[i][j]

    def fill_gaps(self):
        '''Fills in squares in self._matrix that meet the condition in the surrounded method.

        Ignores the surrounding value if it is 'white' or 'black' to give preference to red, blue, green, and yellow.
        '''
        for i in range(self.num_cols):
            for j in range(self.num_rows):
                val = self.surrounded(i, j)
                if val != None and val != 'white' and val != 'black':
                    self.at(i, j).set(val)

    def surrounded(self, i, j):
        '''Checks if a point is surrounded by at least 3 points of the same value.

        Args:
            i (int): the x-coordinate in self._matrix
            j (int): the y-coordinate in self._matrix

        Returns:
            the surrounding value if the condition is True, otherwise returns None
            When used in the context of ColorFinder, the surrounding value would be the string
            specifying the name of the color surrounding this square.
        '''
        if i != 0 and i != self.num_cols-1 and j != 0 and j != self.num_rows-1:
            left_value, up_value, right_value, down_value = self.get_neighboring_values(i, j)
            if left_value == up_value and left_value == right_value:
                return left_value
            if left_value == up_value and left_value == down_value:
                return left_value
            if left_value == right_value and left_value == down_value:
                return left_value
            if right_value == up_value and right_value == down_value:
                return right_value
        return None

    def get_neighboring_values(self, i, j):
        '''Returns the values in the four surrounding MatrixValueContainers.
        
        Args:
            i (int): the x-coordinate in self._matrix
            j (int): the y-coordinate in self._matrix

        Returns:
            A four-tuple containing (left_value, up_value, right_value, and down_value)
        '''
        return (self.at(i-1, j), self.at(i, j-1), self.at(i + 1, j), self.at(i, j + 1))


class MatrixValueContainer():
    '''Simple container for values in a MyMatrix object.

    This class is intended to clean the syntax of setting
    a new value in the MyMatrix object.

    So we replace this:
        matrix.get_value(i, j)
        matrix.set_value(i, j, new_value)
    with this:
        matrix.at(i, j).value
        matrix.at(i, j).set(new_value)
    '''
    def __init__(self):
        self.value = None

    def set(self, new_value):
        self.value = new_value


async def cozmo_program(robot: cozmo.robot.Robot):
    color_finder = ColorFinder(robot)
    await color_finder.run()

cozmo.robot.Robot.drive_off_charger_on_connect = True
cozmo.run_program(cozmo_program, use_viewer = True, force_viewer_on_top = True)

4. desk security

Cozmo桌面巡逻

Cozmo巡逻办公桌,寻找未知的面孔,并报告给你。

import asyncio
from random import randint
import sys
import time

import cozmo
from cozmo.util import degrees, distance_mm, speed_mmps

#: The name that the owner's face is enrolled as (i.e. your username in the app)
#: When that face is seen, Cozmo will assume no other faces currently seen are intruders
OWNER_FACE_ENROLL_NAME = ""


if OWNER_FACE_ENROLL_NAME == "":
    sys.exit("You must fill in OWNER_FACE_ENROLL_NAME")


class DeskSecurityGuard:
    '''Container for Security Guard status'''

    def __init__(self):
        self.owner_name = OWNER_FACE_ENROLL_NAME

        self.is_armed = True

        self.time_first_observed_intruder = None
        self.time_last_observed_intruder = None

        self.time_first_observed_owner = None
        self.time_last_observed_owner = None

        self.time_last_suspicious = None
        self.time_last_uploaded_photo = None
        self.time_last_announced_intruder = None
        self.time_last_pounced_at_intruder = None
        self.time_last_announced_owner = None

    def is_investigating_intruder(self):
        '''Has an unknown face recently been seen?'''
        return self.time_first_observed_intruder is not None

    def has_confirmed_intruder(self):
        '''The robot has seen an intruder for long enough that it's pretty sure it's not the owner.'''
        if self.time_first_observed_intruder:
            elapsed_time = time.time() - self.time_first_observed_intruder
            return elapsed_time > 2.0
        return False


def did_occur_recently(event_time, max_elapsed_time):
    '''Did event_time occur and was it within the last max_elapsed_time seconds?'''
    if event_time is None:
        return False
    elapsed_time = time.time() - event_time
    return elapsed_time < max_elapsed_time


async def check_for_intruder(robot, dsg:DeskSecurityGuard):
    ''''''

    # Check which faces can be seen, and if any are the owner or an intruder

    owner_face = None
    intruder_face = None
    for visible_face in robot.world.visible_faces:
        if visible_face.name.lower() == dsg.owner_name.lower():
            if owner_face:
                print("Multiple faces with name %s seen - %s and %s!" %
                      (dsg.owner_name, owner_face, visible_face))
            owner_face = visible_face
        else:
            # just use the first intruder seen
            if not intruder_face:
                intruder_face = visible_face

    # Update times first/last seen owner or an intruder

    if owner_face:
        dsg.time_last_observed_owner = owner_face.last_observed_time
        if dsg.time_first_observed_owner is None:
            dsg.time_first_observed_owner = dsg.time_last_observed_owner

    if intruder_face:
        if dsg.time_last_observed_intruder is None or \
                        intruder_face.last_observed_time > dsg.time_last_observed_intruder:
            dsg.time_last_observed_intruder = intruder_face.last_observed_time

        if dsg.time_first_observed_intruder is None:
            dsg.time_first_observed_intruder = dsg.time_last_observed_intruder

    # Check if there's anything to investigate

    can_see_owner = did_occur_recently(dsg.time_last_observed_owner, 1.0)
    can_see_intruders = did_occur_recently(dsg.time_last_observed_intruder, 1.0)
    if not dsg.is_armed:
        can_see_intruders = False
    if not can_see_intruders:
        dsg.time_first_observed_intruder = None

    if can_see_owner:

        # If robot can see the owner then look at and greet them occasionally

        robot.set_all_backpack_lights(cozmo.lights.green_light)
        if not did_occur_recently(dsg.time_last_announced_owner, 60.0):
            await robot.play_anim_trigger(cozmo.anim.Triggers.NamedFaceInitialGreeting).wait_for_completed()
            dsg.time_last_announced_owner = time.time()
        elif owner_face:
            await robot.turn_towards_face(owner_face).wait_for_completed()
    elif can_see_intruders:

        # Don't react unless this is a confirmed intruder

        is_confirmed_intruder = dsg.has_confirmed_intruder()
        if is_confirmed_intruder:
            # Definitely an intruder - turn backpack red to indicate
            robot.set_all_backpack_lights(cozmo.lights.red_light)

            # Sound an alarm (every X seconds)
            if not did_occur_recently(dsg.time_last_announced_intruder, 10):
                await robot.say_text("Intruder Alert").wait_for_completed()
                dsg.time_last_announced_intruder = time.time()

            # Pounce at intruder (every X seconds)
            if not did_occur_recently(dsg.time_last_pounced_at_intruder, 10.0):
                await robot.play_anim_trigger(cozmo.anim.Triggers.CubePouncePounceNormal).wait_for_completed()
                dsg.time_last_pounced_at_intruder = time.time()

            # Turn towards the intruder to keep them in view
            if intruder_face:
                await robot.turn_towards_face(intruder_face).wait_for_completed()
        else:
            # Possibly an intruder - turn backpack blue to indicate, and play
            # suspicious animation (if not played recently)

            robot.set_all_backpack_lights(cozmo.lights.blue_light)
            if not did_occur_recently(dsg.time_last_suspicious, 10.0):
                await robot.play_anim_trigger(cozmo.anim.Triggers.HikingInterestingEdgeThought).wait_for_completed()
                dsg.time_last_suspicious = time.time()
            elif intruder_face:
                # turn robot towards intruder face slightly to get a better look at them
                await robot.turn_towards_face(intruder_face).wait_for_completed()
    else:
        robot.set_backpack_lights_off()


async def desk_security_guard(robot):
    '''The core of the desk_security_guard program'''

    # Turn on image receiving by the camera
    robot.camera.image_stream_enabled = True

    # Create our security guard
    dsg = DeskSecurityGuard()

    # Make sure Cozmo is clear of the charger
    if robot.is_on_charger:
        # Drive fully clear of charger (not just off the contacts)
        await robot.drive_off_charger_contacts().wait_for_completed()
        await robot.drive_straight(distance_mm(150), speed_mmps(50)).wait_for_completed()

    # Tilt head up to look for people
    await robot.set_head_angle(cozmo.robot.MAX_HEAD_ANGLE).wait_for_completed()

    initial_pose_angle = robot.pose_angle

    patrol_offset = 0  # middle
    max_pose_angle = 45  # offset from initial pose_angle (up to +45 or -45 from this)

    # Time to wait between each turn and patrol, in seconds
    time_between_turns = 2.5
    time_between_patrols = 20

    time_for_next_turn = time.time() + time_between_turns
    time_for_next_patrol = time.time() + time_between_patrols

    while True:

        # Turn head every few seconds to cover a wider field of view
        # Only do this if not currently investigating an intruder

        if (time.time() > time_for_next_turn) and not dsg.is_investigating_intruder():
            # pick a random amount to turn
            angle_to_turn = randint(10,40)

            # 50% chance of turning in either direction
            if randint(0,1) > 0:
                angle_to_turn = -angle_to_turn

            # Clamp the amount to turn

            face_angle = (robot.pose_angle - initial_pose_angle).degrees

            face_angle += angle_to_turn
            if face_angle > max_pose_angle:
                angle_to_turn -= (face_angle - max_pose_angle)
            elif face_angle < -max_pose_angle:
                angle_to_turn -= (face_angle + max_pose_angle)

            # Turn left/right
            await robot.turn_in_place(degrees(angle_to_turn)).wait_for_completed()

            # Tilt head up/down slightly
            await robot.set_head_angle(degrees(randint(30,44))).wait_for_completed()

            # Queue up the next time to look around
            time_for_next_turn = time.time() + time_between_turns

        # Every now and again patrol left and right between 3 patrol points

        if (time.time() > time_for_next_patrol) and not dsg.is_investigating_intruder():

            # Check which way robot is facing vs initial pose, pick a new patrol point

            face_angle = (robot.pose_angle - initial_pose_angle).degrees
            drive_right = (patrol_offset < 0) or ((patrol_offset == 0) and (face_angle > 0))

            # Turn to face the new patrol point

            if drive_right:
                await robot.turn_in_place(degrees(90 - face_angle)).wait_for_completed()
                patrol_offset += 1
            else:
                await robot.turn_in_place(degrees(-90 - face_angle)).wait_for_completed()
                patrol_offset -= 1

            # Drive to the patrol point, playing animations along the way

            await robot.drive_wheels(20, 20)
            for i in range(1,4):
                await robot.play_anim("anim_hiking_driving_loop_0" + str(i)).wait_for_completed()

            # Stop driving

            robot.stop_all_motors()

            # Turn to face forwards again

            face_angle = (robot.pose_angle - initial_pose_angle).degrees
            if face_angle > 0:
                await robot.turn_in_place(degrees(-90)).wait_for_completed()
            else:
                await robot.turn_in_place(degrees(90)).wait_for_completed()

            # Queue up the next time to patrol
            time_for_next_patrol = time.time() + time_between_patrols

        # look for intruders

        await check_for_intruder(robot, dsg)

        # Sleep to allow other things to run

        await asyncio.sleep(0.05)


async def run(sdk_conn):
    '''The run method runs once the Cozmo SDK is connected.'''
    robot = await sdk_conn.wait_for_robot()

    try:
        await desk_security_guard(robot)

    except KeyboardInterrupt:
        print("")
        print("Exit requested by user")


if __name__ == '__main__':
    cozmo.setup_basic_logging()
    cozmo.robot.Robot.drive_off_charger_on_connect = False  # Stay on charger until init
    try:
        cozmo.connect_with_tkviewer(run, force_on_top=True)
    except cozmo.ConnectionError as e:
        sys.exit("A connection error occurred: %s" % e)

5. quick tap

Cozmo快速点击 - 当颜色匹配时尽快点击您的立方体(红色除外),但永远不要在都是红色时点击!

当玩家得到5分时游戏结束。

import asyncio, random, sys, time

import cozmo

from cozmo.lights import blue_light, Color, green_light, Light, red_light, white_light, off_light
from cozmo.util import degrees, distance_mm, radians, speed_mmps

purple_light = Light(Color(name = 'purple', rgb = (255, 0, 255)))
yellow_light = Light(Color(name = 'yellow', rgb = (255, 255, 0)))

LIGHT_COLORS_LIST = [blue_light, green_light, purple_light, red_light, white_light, yellow_light]

CHOOSE_CUBES_STATE = 'choose_cubes' # If the game is in CHOOSE_CUBES_STATE, on_cube_tap assigns the player's cube.
GAME_STATE = 'game' # If the game is in GAME_STATE, on_cube_tap registers the tap time of the players.

MAKE_BUZZERS_DIFFERENT_COLORS = 'MAKE_BUZZERS_DIFFERENT_COLORS'
MAKE_BUZZERS_RED = 'MAKE_BUZZERS_RED'
MAKE_BUZZERS_SAME_COLORS = 'MAKE_BUZZERS_SAME_COLORS'

# The buzzers have a 50% chance of displaying the same colors.
RATE_MAKE_BUZZERS_DIFFERENT_COLORS = 0.17 # The buzzers have a 17% chance of displaying different colors.
RATE_MAKE_BUZZERS_RED = 0.33 # the buzzers have a 33% chance of displaying red.

RATE_COZMO_ACCURACY = 0.9 # Cozmo has a 90% chance of reacting correctly to the buzzers.
# This number can therefore be lowered to have Cozmo more frequently make the wrong move.

SCORE_TO_WIN = 5 # the game ends once either player's score has reached SCORE_TO_WIN

class QuickTapGame:
    '''The game logic of Quick Tap.'''
    def __init__(self, robot: cozmo.robot.Robot):
        self.robot = robot
        self.player = QuickTapPlayer()
        self.cozmo_player = CozmoQuickTapPlayer(robot)
        robot.add_event_handler(cozmo.anim.EvtAnimationCompleted, self.on_anim_completed)
        robot.add_event_handler(cozmo.objects.EvtObjectTapped, self.on_cube_tap)

        self.cubes = None
        self.countdown_cube = None

        self.buzzer_display_type = None

        self.round_start_time = time.time()
        self.quick_tap_player_1 = None
        self.quick_tap_player_2 = None
        self.round_over = False

        self.quick_tap_state = CHOOSE_CUBES_STATE

    async def move_cozmo_to_ready_pose(self):
        self.robot.set_lift_height(0, in_parallel = True)
        self.robot.set_head_angle(degrees(0), in_parallel = True)
        await self.robot.wait_for_all_actions_completed()

    async def run(self):
        '''Assigns the cubes, then starts a new round until a player has won.'''
        await self.move_cozmo_to_ready_pose()
        self.print_starting_instructions()
        if not self.cubes_connected():
            print('Cubes did not connect successfully - check that they are nearby. You may need to replace the batteries.')
            return
        await self.assign_cubes()
        self.quick_tap_state = GAME_STATE
        while max(self.player.score, self.cozmo_player.score) < SCORE_TO_WIN:
            await self.game_round()
        await self.report_winner()

    async def game_round(self):
        '''Sets up and runs a round of the game. 

        In run(), a new round starts unless a player's score reaches SCORE_TO_WIN.

        First we ready the players and cubes, and then start the countdown.
        After the countdown, the cubes light up.  Then Cozmo makes his move.
        Once Cozmo's move is over, we determine the winner of the round,
        and Cozmo reacts accordingly.
        '''
        self.round_over = False
        await self.reset_players()
        await self.countdown_cube.countdown()
        await self.set_round_lights()
        self.round_start_time = time.time()
        await self.cozmo_player.determine_move(self.buzzer_display_type)
        while not self.round_over: # self.round_over is True when Cozmo's tap animation is completed
            await asyncio.sleep(0)
        await self.cozmo_anim_reaction()

    async def set_round_lights(self):
        '''Waits a random delay, then sets a display on the buzzer cubes.'''
        await self.cube_light_delay()
        self.determine_buzzer_display()
        self.set_buzzer_lights()

    async def reset_players(self):
        '''Gets the players and cubes ready for a new round.'''
        self.player.reset()
        self.cozmo_player.reset()
        await self.robot.set_lift_height(1.0).wait_for_completed()
        self.turn_off_buzzer_cubes()
        
    async def cube_light_delay(self):
        '''Waits between 0 and 2 seconds.'''
        delay = random.random() * 2
        await asyncio.sleep(delay)

    def determine_buzzer_display(self):
        '''Chooses a buzzer display type based on the probabilities defined above.'''
        probability_red = random.random()
        if probability_red < RATE_MAKE_BUZZERS_RED:
            self.buzzer_display_type = MAKE_BUZZERS_RED
        else:
            probability_different_colors = random.random()
            if probability_different_colors < RATE_MAKE_BUZZERS_DIFFERENT_COLORS:
                self.buzzer_display_type = MAKE_BUZZERS_DIFFERENT_COLORS
            else:
                self.buzzer_display_type = MAKE_BUZZERS_SAME_COLORS

    def on_cube_tap(self, evt, obj, **kwargs):
        '''Responds to cube taps depending on quick_tap_state.

        If in CHOOSE_CUBES_STATE, on_cube_tap assigns the player's cube.
        If in GAME_STATE, on_cube_tap registers the tap time of the players.
        '''
        if obj.object_id is not None:
            if self.quick_tap_state == CHOOSE_CUBES_STATE:
                if self.cozmo_player.cube is None:
                    # Cozmo hasn't picked a cube yet - ignore
                    pass
                elif obj.object_id != self.cozmo_player.cube.object_id:
                    self.player.cube = obj
                    self.player.cube.set_lights_off()
            elif self.quick_tap_state == GAME_STATE:
                self.turn_off_buzzer_cubes()
                if obj.object_id == self.player.cube.object_id:
                    self.player.register_tap(self.round_start_time)
                elif obj.object_id == self.cozmo_player.cube.object_id:
                    self.cozmo_player.register_tap(self.round_start_time)

    async def on_anim_completed(self, evt, animation_name, **kwargs):
        '''Signals the end of the round if the animation completed was Cozmo's tap animation.'''
        if self.quick_tap_state == GAME_STATE and animation_name in ['OnSpeedtapTap', 'OnSpeedtapFakeout', 'OnSpeedtapIdle']:
            await self.determine_result_of_round()
            self.round_over = True

    async def determine_result_of_round(self):
        '''Determines the first tapper, then whether that tapper wins or loses based on the buzzer display.'''
        self.determine_first_tapper()
        if self.quick_tap_player_1:
            if self.buzzer_display_type == MAKE_BUZZERS_SAME_COLORS:
                self.quick_tap_player_1.wins_round()
                await self.quick_tap_player_1.cube.flair_correct_tap()
            elif self.buzzer_display_type == MAKE_BUZZERS_DIFFERENT_COLORS or self.buzzer_display_type == MAKE_BUZZERS_RED:
                self.quick_tap_player_2.wins_round()
                await self.quick_tap_player_1.cube.flair_incorrect_tap()
            self.report_scores()

    def determine_first_tapper(self):
        '''Finds the first tapper from the players' registered tap times.'''
        if self.player.has_tapped or self.cozmo_player.has_tapped:
            if self.cozmo_player.elapsed_tap_time < self.player.elapsed_tap_time:
                self.quick_tap_player_1 = self.cozmo_player
                self.quick_tap_player_2 = self.player
            else:
                self.quick_tap_player_1 = self.player
                self.quick_tap_player_2 = self.cozmo_player
        else:
            self.quick_tap_player_1 = None

    async def cozmo_anim_reaction(self):
        '''Cozmo plays an animation based on whether he won or lost the round.'''        
        if self.cozmo_player.won_round:
            await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapHandCozmoWin).wait_for_completed()
        else:
            await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapHandPlayerWin).wait_for_completed()

    async def assign_cubes(self):
        '''Cozmo chooses his cube, then the player chooses, 
        and the remaining cube becomes the countdown cube.
        '''
        await self.cozmo_player.select_cube()
        self.blink_available_cubes()
        await self.robot.world.wait_for(cozmo.objects.EvtObjectTapped)
        self.player.cube.stop_light_chaser()
        self.assign_countdown_cube()

    def blink_available_cubes(self):
        '''Blinks the cubes which Cozmo did not select for himself.'''
        for cube in self.cubes:
            if cube.object_id != self.cozmo_player.cube.object_id:
                cube.start_light_chaser(0.5)

    def assign_countdown_cube(self):
        '''Assigns the countdown cube to be whichever cube has not been selected by the player or Cozmo.'''
        for cube in self.cubes:
            if cube.object_id != self.cozmo_player.cube.object_id and cube.object_id != self.player.cube.object_id:
                self.countdown_cube = cube
                self.countdown_cube.stop_light_chaser()

    def set_buzzer_lights(self):
        '''Sets the buzzer cube lights based on the buzzer display type.'''
        if self.buzzer_display_type == MAKE_BUZZERS_RED:
            self.turn_on_buzzer_cubes_red()
        elif self.buzzer_display_type == MAKE_BUZZERS_DIFFERENT_COLORS:
            self.turn_on_buzzer_cubes_different()
        elif self.buzzer_display_type == MAKE_BUZZERS_SAME_COLORS:
            self.turn_on_buzzer_cubes_same()

    def turn_on_buzzer_cubes_same(self):
        '''Sets the buzzer cubes to the same randomly generated color pair.'''
        same_colors = self.generate_random_buzzer_colors()
        self.player.cube.set_light_corners(*same_colors)
        self.cozmo_player.cube.set_light_corners(*same_colors)

    def turn_on_buzzer_cubes_different(self):
        '''Sets the buzzer cubes to different randomly generated color pairs.'''
        player_cube_colors = self.generate_random_buzzer_colors()
        cozmo_cube_colors = self.generate_random_buzzer_colors()
        while player_cube_colors == cozmo_cube_colors:
            cozmo_cube_colors = self.generate_random_buzzer_colors()
        self.player.cube.set_light_corners(*player_cube_colors)
        self.cozmo_player.cube.set_light_corners(*cozmo_cube_colors)

    def turn_on_buzzer_cubes_red(self):
        '''Sets the buzzer cubes to red.'''
        self.player.cube.set_lights(cozmo.lights.red_light)
        self.cozmo_player.cube.set_lights(cozmo.lights.red_light)

    def generate_random_buzzer_colors(self):
        '''Creates a list of different alternating colors, chosen randomly from LIGHT_COLORS_LIST.

        Returns:
            a list of Lights from LIGHT_COLORS_LIST
        '''
        num_colors = len(LIGHT_COLORS_LIST)
        x = random.randrange(num_colors)
        y = random.randrange(num_colors)
        while y == x:
            y = random.randrange(num_colors)
        return [LIGHT_COLORS_LIST[x], LIGHT_COLORS_LIST[y], LIGHT_COLORS_LIST[x], LIGHT_COLORS_LIST[y]]

    def turn_off_buzzer_cubes(self):
        '''Turns off both buzzer cubes' lights.'''
        self.player.cube.set_lights_off()
        self.cozmo_player.cube.set_lights_off()

    def cubes_connected(self):
        '''Checks if Cozmo connects to all three cubes successfully.

        Returns:
            bool specifying if all three cubes have been successfully connected'''
        cube1 = self.robot.world.get_light_cube(cozmo.objects.LightCube1Id)
        cube2 = self.robot.world.get_light_cube(cozmo.objects.LightCube2Id)
        cube3 = self.robot.world.get_light_cube(cozmo.objects.LightCube3Id)
        self.cubes = [cube1, cube2, cube3]
        return not (cube1 == None or cube2 == None or cube3 == None)

    def print_starting_instructions(self):
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')
        print('Welcome to Quick Tap!')
        print('Put 1 cube in front of Cozmo. It will turn white when he can see it.')
        print('Cozmo will tap the cube to select it as his buzzer.')
        print('After Cozmo, tap a cube to select your buzzer.')
        print('The last cube will display a countdown with its lights start each round.')
        print('When the buzzers light up, tap if the colors match, but never tap on red!')
        print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')

    def report_scores(self):
        '''Prints the current scores of the game.'''
        print('---------------------------------------------------')
        print('Player score: {}'.format(self.player.score))
        print('Cozmo score: {}'.format(self.cozmo_player.score))
        print('---------------------------------------------------')

    async def report_winner(self):
        '''Prints the final scores of the game, and the winner.'''
        print('You won {} round{}'.format(self.player.score, 's' if self.player.score != 1 else ''))
        print('Cozmo won {} round{}'.format(self.cozmo_player.score, 's' if self.cozmo_player.score != 1 else ''))
        if self.cozmo_player.score > self.player.score:
            print('~COZMO WINS QUICK TAP~')
            await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapGameCozmoWinHighIntensity).wait_for_completed()
        else:
            print('~PLAYER WINS QUICK TAP~')
            await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapGamePlayerWinHighIntensity).wait_for_completed()


class QuickTapPlayer():
    '''Player-specifc Quick Tap logic.'''
    def __init__(self):
        self.cube = None
        self.score = 0
        self.has_tapped = False
        self.name = 'Player'
        self.elapsed_tap_time = None
        self.won_round = False

    def wins_round(self):
        '''Prints winning message, updates score, and sets won_round flag to True.'''
        print('****{} wins the round****'.format(self.name))
        self.score += 1
        self.won_round = True

    def reset(self):
        '''Resets elapsed_tap_time, and sets has_tapped and won_round flags to False.'''
        self.elapsed_tap_time = sys.maxsize
        self.has_tapped = False
        self.won_round = False

    def register_tap(self, round_start_time):
        '''Calculates elapsed time of tap, and sets has_tapped flag to True.

        Args:
            round_start_time (Time): time stamp set in QuickTapGame to calculate players' elapsed_tap_time
        '''
        self.elapsed_tap_time = time.time() - round_start_time
        self.has_tapped = True


class CozmoQuickTapPlayer(QuickTapPlayer):
    '''Cozmo-specific Quick Tap player logic, with a reference to the actual Cozmo robot.
        
    Args:
        robot (cozmo.robot.Robot): passed in from the QuickTapGame class
    '''
    def __init__(self, robot: cozmo.robot.Robot):
        super().__init__()
        self.robot = robot
        self.name = 'Cozmo'

    async def select_cube(self):
        '''Cozmo looks for a cube, drives to it, and taps it.'''
        self.cube = await self.robot.world.wait_for_observed_light_cube()
        self.cube.set_lights(cozmo.lights.white_light)
        await asyncio.sleep(2)
        self.cube.start_light_chaser(0.5)
        await self.robot.set_lift_height(1.0).wait_for_completed()
        await self.robot.go_to_object(self.cube, distance_mm(40)).wait_for_completed()
        await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapTap).wait_for_completed()
        self.cube.stop_light_chaser()
        self.cube.set_lights(green_light)

    async def determine_move(self, buzzer_display_type):
        '''Cozmo chooses a move based on the probabilities above.

        Args:
            buzzer_display_type (string): the display of the buzzers
            Either MAKE_BUZZERS_DIFFERENT_COLORS, MAKE_BUZZERS_RED, or MAKE_BUZZERS_SAME_COLORS
        '''
        await self.hesitate()
        probability_correct = random.random()
        if probability_correct < RATE_COZMO_ACCURACY:
            if buzzer_display_type == MAKE_BUZZERS_SAME_COLORS:
                await self.tap()
            else:
                await self.fail_to_tap()
        else:
            if buzzer_display_type == MAKE_BUZZERS_RED or buzzer_display_type == MAKE_BUZZERS_DIFFERENT_COLORS:
                await self.tap()
            else:
                await self.fail_to_tap()

    async def hesitate(self):
        '''Cozmo waits between 0 and 0.5 seconds'''
        delay = random.random() * .5
        await asyncio.sleep(delay)

    async def tap(self):
        '''Calls Cozmo's tap animation.'''
        await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapTap).wait_for_completed()

    async def fail_to_tap(self):
        '''Randomly calls either Cozmo's fakeout tap animation or his idle animation.'''
        probability_fakeout = random.random()
        if probability_fakeout < 0.5:
            await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapFakeout).wait_for_completed()
        else:
            await self.robot.play_anim_trigger(cozmo.anim.Triggers.OnSpeedtapIdle).wait_for_completed()


rainbow_colors = [blue_light, red_light, green_light, yellow_light]

class BlinkyCube(cozmo.objects.LightCube):
    '''Same as a normal cube, plus extra methods specific to Quick Tap.'''
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
        self._chaser = None

    def start_light_chaser(self, pause_time):
        '''Rotates four colors around the cube light corners in a continuous loop.

        Args:
            pause_time (float): the time awaited before moving the rotating lights
        '''
        if self._chaser:
            raise ValueError('Light chaser already running')
        async def _chaser():
            while True:
                for i in range(4):
                    self.set_light_corners(*rainbow_colors)
                    await asyncio.sleep(pause_time, loop = self._loop)
                    light = rainbow_colors.pop(0)
                    rainbow_colors.append(light)
        self._chaser = asyncio.ensure_future(_chaser(), loop = self._loop)

    def stop_light_chaser(self):
        '''Ends the _chaser loop.'''
        if self._chaser:
            self._chaser.cancel()
            self._chaser = None
        self.set_lights_off()

    async def countdown(self):
        '''Sets all lights to white, then 3 lights, then 2 lights, then 1 light, then none.'''
        for i in range(5):
            cols = [white_light] * (4 - i) + [off_light] * i
            self.set_light_corners(*cols)
            await asyncio.sleep(.5)

    async def flair_correct_tap(self):
        '''Runs a fast _chaser when the player taps correctly.'''
        self.start_light_chaser(0.1)
        await asyncio.sleep(2)
        self.stop_light_chaser()

    async def flair_incorrect_tap(self):
        '''Blinks red when the player taps incorrectly.'''
        for _ in range(4):
            self.set_lights(red_light)
            await asyncio.sleep(.2)
            self.set_lights(off_light)
            await asyncio.sleep(.2)


# Make sure World knows how to instantiate the BlinkyCube subclass
cozmo.world.World.light_cube_factory = BlinkyCube

async def cozmo_program(robot: cozmo.robot.Robot):
    game = QuickTapGame(robot)
    await game.run()

cozmo.run_program(cozmo_program)

6. quizmaster cozmo

测验大师Cozmo  - 由Cozmo主持的简单测验游戏。

使用3个立方体作为蜂鸣器进行最多3名玩家的测验。

import asyncio
import json
from random import randrange, shuffle

import cozmo


class QuizQuestion:
    """A single multiple choice question with 4 choices, one correct.
    
    Args:
        question (str): The question.
        answer_options (list of str): 4 multiple choice answers where the
            1st element is the correct answer. (Choices will be shuffled each time.)
    Raises:
        :class:`ValueError` if not supplied exactly 4 answer_options.
    """
    def __init__(self, question, answer_options):
        if len(answer_options) != 4:
            raise ValueError("Expected 4 answer_options, got %s" % len(answer_options))
        self.question = question
        self._answer_index = 0
        self.answer_options = list(answer_options)  # copy the answer_options, so we can shuffle them
        self.shuffle_answer_options()

    @property
    def answer_number(self):
        """int: The number (i.e. 1, 2, 3 or 4) representing the correct answer."""
        return self._answer_index + 1

    @property
    def answer_str(self):
        """str: The string representing the correct answer."""
        return self.answer_options[self._answer_index]

    def shuffle_answer_options(self):
        """Shuffle the answer_options so that they're not always read in the same order."""

        # to shuffle whilst keeping track of the answer, we first pop the
        # answer out, shuffle the rest, and then insert the answer at a random
        # known point.
        answer = self.answer_options.pop(self._answer_index)
        shuffle(self.answer_options)
        self._answer_index = randrange(len(self.answer_options)+1)
        self.answer_options.insert(self._answer_index, answer)


class CozmoQuizPlayer:
    """A player in the quiz.

    Args:
        robot (:class:`cozmo.robot.Robot`): The cozmo robot.
        cube (:class:`cozmo.objects.LightCube`): This player's cube.
        index(int): The number (i.e. 0, 1 or 2) specifying the index of this player and cube.
        color(:class:`cozmo.lights.Light`): The light color for this player.
        name(str): The name of this player.
    """
    def __init__(self,
                 robot: cozmo.robot.Robot,
                 cube: cozmo.objects.LightCube, index, color, name):
        self._robot = robot
        self._cube = cube
        self._index = index
        self._color = color
        self.name = name
        self.score = 0
        self._has_buzzed_in = False
        self._answer_index = 0

    def verify_setup(self):
        # Return True if and only if the player was setup correctly and has a connected cube.
        success = True
        if self._cube is None:
            cozmo.logger.warning("Cozmo is not connected to a cube %s - check the battery.", (self._index+1))
            success = False
        return success

    def reset_for_question(self):
        self.turn_light_on()
        self._has_buzzed_in = False

    def turn_light_off(self):
        if self._cube is not None:
            self._cube.set_lights_off()

    def turn_light_on(self):
        if self._cube is not None:
            self._cube.set_lights(self._color)

    def set_answer_light(self):
        if self._cube is not None:
            # lights up from 1 to 4 lights in a clockwise order to indicate the
            # current selected answer.
            cols = [cozmo.lights.off_light] * 4
            for i in range(self.answer_number):
                # We index cols in reverse order so they light up in a clockwise order.
                cols[3-i] = self._color
            self._cube.set_light_corners(*cols)

    def on_buzzed_in(self):
        # Called when the player buzzes in for a question.
        self.turn_light_on()
        self._has_buzzed_in = True

    def start_answering(self):
        # Called when the player starts answering a question.
        self._answer_index = 0
        self.set_answer_light()

    def cycle_answer(self):
        # Called every time a player taps the cube to cycle through the 4 answer answer_options.
        self._answer_index += 1
        if self._answer_index > 3:
            self._answer_index = 0
        self.set_answer_light()

    @property
    def object_id(self):
        if self._cube is None:
            return None
        else:
            return self._cube.object_id

    @property
    def has_buzzed_in(self):
        """bool: True if this player has buzzed in for this question already."""
        return self._has_buzzed_in

    @property
    def answer_number(self):
        """int: The number (1..4) representing this player's answer."""
        return self._answer_index + 1


class CozmoQuizMaster:
    """Cozmo the robot quiz master.
    
    Maintains the list of questions and the players, and runs the quiz.

    Args:
        robot (:class:`cozmo.robot.Robot`): The cozmo robot.
    """
    def __init__(self, robot: cozmo.robot.Robot):
        self._robot = robot

        # initialize the list of players
        cube_ids = cozmo.objects.LightCubeIDs
        cube_colors = [cozmo.lights.red_light, cozmo.lights.green_light, cozmo.lights.blue_light]
        player_names = ["Red", "Green", "Blue"]
        self._players = []  # type: list of CozmoQuizPlayer
        for i in range(len(cube_ids)):
            cube = robot.world.get_light_cube(cube_ids[i])
            player = CozmoQuizPlayer(robot, cube, i, cube_colors[i], player_names[i])
            self._players.append(player)

        self._answering_player = None  # type: CozmoQuizPlayer
        self._buzzing_in_accepted = False
        self._answers_accepted = False
        self._questions = []

        with open("quiz_questions.json") as data_file:
            data = json.load(data_file)

        for quiz_question_json in data:
            question = quiz_question_json["question"]
            answer_options = quiz_question_json["answer_options"]
            self._questions.append(QuizQuestion(question, answer_options))

    def verify_setup(self):
        # return True if and only if everything is setup correctly
        num_valid_players = 0
        for player in self._players:
            if player.verify_setup():
                num_valid_players += 1
        return (num_valid_players > 0)

    def get_player_for_object_id(self, object_id):
        for player in self._players:
            if player.object_id == object_id:
                return player
        cozmo.logger.warn("No player for object_id %s", object_id)
        return None

    def on_cube_tapped(self, evt, **kw):
        # find the player for that cube, and handle the tap as appropriate
        player = self.get_player_for_object_id(evt.obj.object_id)
        if self._buzzing_in_accepted and self._answering_player is None:
            if player and not player.has_buzzed_in:
                self._answering_player = player
                player.on_buzzed_in()
        elif self._answers_accepted and player == self._answering_player:
            if player:
                player.cycle_answer()

    def turn_player_lights_on(self):
        for player in self._players:
            player.turn_light_on()

    def turn_player_lights_off(self):
        for player in self._players:
            player.turn_light_off()

    def get_next_question(self) -> QuizQuestion:
        if len(self._questions) > 0:
            i = randrange(len(self._questions))
            question = self._questions.pop(i)
            return question
        else:
            print("Out of questions!")
            return None

    def create_answer_options_string(self, list_of_answer_options) -> str:
        # Build a string that lists all of the answer_options in order.
        text = "Is it "
        for i in range(len(list_of_answer_options)):
            conjunction = ""
            if i > 0:
                is_last_option = (i == (len(list_of_answer_options) - 1))
                conjunction = " or " if is_last_option else ", "
            text += conjunction + str(i+1) + ": " + list_of_answer_options[i]
        return text

    def say_text(self, text, in_parallel=False):
        print("%s" % text)
        return self._robot.say_text(text, in_parallel=in_parallel)

    async def wait_for_answer(self, player):
        # Wait for player's answer (whatever the player leaves selected after x seconds)
        # This is after Cozmo has finished speaking, so we've already given the
        # player a few seconds.
        await asyncio.sleep(2.0)
        return player.answer_number

    def get_winning_players(self):
        # get a list of all the players with the top score
        winning_players = []
        for player in self._players:
            if len(winning_players) == 0 or player.score > winning_players[0].score:
                winning_players = [player]
            elif player.score == winning_players[0].score:
                winning_players.append(player)
        return winning_players

    async def report_leader(self, is_final_score):
        # Report the leading / winning player(s)
        winning_players = self.get_winning_players()
        winning_score = winning_players[0].score
        points_string = "points" if (winning_score != 1) else "point"
        winning_score_str = "%s %s" % (winning_score, points_string)

        if len(winning_players) == len(self._players):
            if is_final_score:
                action = self.say_text("It ends as a draw with everyone at %s" % winning_score_str)
            else:
                action = self.say_text("It's all tied at %s" % winning_score_str)
        else:
            winner_names = winning_players[0].name
            for i in range(1, len(winning_players)):
                # separate winner names with commas, but use 'and' for the last one
                is_last_player = (i == (len(winning_players)-1))
                conjunction = " and " if is_last_player else ", "
                winner_names = winner_names + conjunction + winning_players[i].name

            if is_final_score:
                action = self.say_text("%s won with %s" % (winner_names, winning_score_str))
            else:
                is_or_are = "is" if len(winning_players) == 1 else "are"
                action = self.say_text("%s %s in the lead with %s" %
                                       (winner_names, is_or_are, winning_score_str))

        await action.wait_for_completed()

    async def get_correct_player(self, question: QuizQuestion):
        # Read the answer_options
        read_options_action = self.say_text(self.create_answer_options_string(question.answer_options))
        num_answers = 0

        # Let the player(s) buzz in and answer
        for _ in range(len(self._players)):
            if num_answers > 0:
                read_options_action = self.say_text("Anyone else?")

            # wait for a player to buzz in before the answer finishes
            while not read_options_action.is_completed and self._answering_player is None:
                await asyncio.sleep(0.1)

            if self._answering_player is None:
                # question reading finished, give them 1 more second to buzz in
                await asyncio.sleep(1.0)

            player = self._answering_player
            if player is None:
                # Nobody answered in time
                await read_options_action.wait_for_completed()
                return None

            # short wait before accepting player answer, so we don't incorrectly
            # identify late buzzes as cycling the answer
            await asyncio.sleep(0.5)

            # Start accepting taps from the answering player
            player.start_answering()
            self._answers_accepted = True

            await read_options_action.wait_for_completed()
            action = self.say_text(player.name + "?")
            await action.wait_for_completed()

            player_answer = await self.wait_for_answer(player)
            self._answers_accepted = False
            num_answers += 1

            if player_answer == question.answer_number:
                # Correct
                player.score += 1
                return player
            else:
                # Incorrect
                player.score -= 1
                action = self._robot.play_anim_trigger(cozmo.anim.Triggers.KnockOverFailure)
                self._answering_player = None
                player.turn_light_off()
                await action.wait_for_completed()

    async def ask_question(self, question: QuizQuestion):
        # Reset for a new question
        for player in self._players:
            player.reset_for_question()
        self._answering_player = None
        self._buzzing_in_accepted = False
        self._answers_accepted = False

        # Read the question
        action = self.say_text(question.question)
        await action.wait_for_completed()

        # Allow buzzing in
        self.turn_player_lights_off()
        self._buzzing_in_accepted = True

        correct_player = await self.get_correct_player(question)
        if correct_player is None:
            # Nobody answered correctly
            action = self._robot.play_anim_trigger(cozmo.anim.Triggers.FailedToRightFromFace)
            await action.wait_for_completed()
            action = self.say_text("The answer was %s: %s" % (question.answer_number, question.answer_str))
            await action.wait_for_completed()
        else:
            # Correct
            action = self._robot.play_anim_trigger(cozmo.anim.Triggers.ReactToBlockPickupSuccess)
            await action.wait_for_completed()
            action = self.say_text("Correct it was %s: %s" % (question.answer_number, question.answer_str))
            await action.wait_for_completed()

    async def run(self):
        # Exit immediately if setup failed
        if not self.verify_setup():
            return

        # Add a handler so that we can track whenever a cube is tapped
        self._robot.add_event_handler(cozmo.objects.EvtObjectTapped, self.on_cube_tapped)

        # Keep asking questions until there are none left
        while True:
            question = self.get_next_question()
            if question:
                await self.ask_question(question)
            else:
                print("Quiz is complete!")
                await self.report_leader(True)
                action = self._robot.play_anim_trigger(cozmo.anim.Triggers.BuildPyramidSuccess)
                await action.wait_for_completed()
                action = self.say_text("Game Over - Bye!")
                await action.wait_for_completed()
                return


async def cozmo_program(robot: cozmo.robot.Robot):
    quiz_master = CozmoQuizMaster(robot)
    await quiz_master.run()


cozmo.run_program(cozmo_program)

json

[
    {
      "question": "What is 3 times 4?",
      "answer_options": ["12", "9", "15", "14"]
    },
    {
      "question": "Which of these is not a type of penguin?",
      "answer_options": ["Rigatoni", "Fairy", "Macaroni", "Adelie"]
    },
    {
      "question": "Which of these is not a species of rodent?",
      "answer_options": ["Rabbit", "Beaver", "Lemming", "Capybara"]
    },
    {
      "question": "In physics, which of these forces is real?",
      "answer_options": ["Centripetal", "Coriolis", "Centrifugal", "Euler"]
    },
    {
      "question": "What is the 8th digit of Pi?",
      "answer_options": ["6", "5", "2", "3"]
    }
]

7. remote control cozmo

使用计算机上的网页控制Cozmo。

此示例允许您使用Flask提供的网页通过远程命令控制Cozmo。

import asyncio
import io
import json
import math
import sys

sys.path.append('../lib/')
import flask_helpers
import cozmo


try:
    from flask import Flask, request
except ImportError:
    sys.exit("Cannot import from flask: Do `pip3 install --user flask` to install")

try:
    from PIL import Image, ImageDraw
except ImportError:
    sys.exit("Cannot import from PIL: Do `pip3 install --user Pillow` to install")

try:
    import requests
except ImportError:
    sys.exit("Cannot import from requests: Do `pip3 install --user requests` to install")


DEBUG_ANNOTATIONS_DISABLED = 0
DEBUG_ANNOTATIONS_ENABLED_VISION = 1
DEBUG_ANNOTATIONS_ENABLED_ALL = 2


# Annotator for displaying RobotState (position, etc.) on top of the camera feed
class RobotStateDisplay(cozmo.annotate.Annotator):
    def apply(self, image, scale):
        d = ImageDraw.Draw(image)

        bounds = [3, 0, image.width, image.height]

        def print_line(text_line):
            text = cozmo.annotate.ImageText(text_line, position=cozmo.annotate.TOP_LEFT, outline_color='black', color='lightblue')
            text.render(d, bounds)
            TEXT_HEIGHT = 11
            bounds[1] += TEXT_HEIGHT

        robot = self.world.robot  # type: cozmo.robot.Robot

        # Display the Pose info for the robot

        pose = robot.pose
        print_line('Pose: Pos = <%.1f, %.1f, %.1f>' % pose.position.x_y_z)
        print_line('Pose: Rot quat = <%.1f, %.1f, %.1f, %.1f>' % pose.rotation.q0_q1_q2_q3)
        print_line('Pose: angle_z = %.1f' % pose.rotation.angle_z.degrees)
        print_line('Pose: origin_id: %s' % pose.origin_id)

        # Display the Accelerometer and Gyro data for the robot

        print_line('Accelmtr: <%.1f, %.1f, %.1f>' % robot.accelerometer.x_y_z)
        print_line('Gyro: <%.1f, %.1f, %.1f>' % robot.gyro.x_y_z)

        # Display the Accelerometer and Gyro data for the mobile device

        if robot.device_accel_raw is not None:
            print_line('Device Acc Raw: <%.2f, %.2f, %.2f>' % robot.device_accel_raw.x_y_z)
        if robot.device_accel_user is not None:
            print_line('Device Acc User: <%.2f, %.2f, %.2f>' % robot.device_accel_user.x_y_z)
        if robot.device_gyro is not None:
            mat = robot.device_gyro.to_matrix()
            print_line('Device Gyro Up: <%.2f, %.2f, %.2f>' % mat.up_xyz)
            print_line('Device Gyro Fwd: <%.2f, %.2f, %.2f>' % mat.forward_xyz)
            print_line('Device Gyro Left: <%.2f, %.2f, %.2f>' % mat.left_xyz)


def create_default_image(image_width, image_height, do_gradient=False):
    '''Create a place-holder PIL image to use until we have a live feed from Cozmo'''
    image_bytes = bytearray([0x70, 0x70, 0x70]) * image_width * image_height

    if do_gradient:
        i = 0
        for y in range(image_height):
            for x in range(image_width):
                image_bytes[i] = int(255.0 * (x / image_width))   # R
                image_bytes[i+1] = int(255.0 * (y / image_height))  # G
                image_bytes[i+2] = 0                                # B
                i += 3

    image = Image.frombytes('RGB', (image_width, image_height), bytes(image_bytes))
    return image


flask_app = Flask(__name__)
remote_control_cozmo = None
_default_camera_image = create_default_image(320, 240)
_is_mouse_look_enabled_by_default = False
_is_device_gyro_mode_enabled_by_default = False
_gyro_driving_deadzone_ratio = 0.025

_display_debug_annotations = DEBUG_ANNOTATIONS_ENABLED_ALL


def remap_to_range(x, x_min, x_max, out_min, out_max):
    '''convert x (in x_min..x_max range) to out_min..out_max range'''
    if x < x_min:
        return out_min
    elif x > x_max:
        return out_max
    else:
        ratio = (x - x_min) / (x_max - x_min)
        return out_min + ratio * (out_max - out_min)


class RemoteControlCozmo:

    def __init__(self, coz):
        self.cozmo = coz

        self.drive_forwards = 0
        self.drive_back = 0
        self.turn_left = 0
        self.turn_right = 0
        self.lift_up = 0
        self.lift_down = 0
        self.head_up = 0
        self.head_down = 0

        self.go_fast = 0
        self.go_slow = 0

        self.is_mouse_look_enabled = _is_mouse_look_enabled_by_default
        self.is_device_gyro_mode_enabled = _is_device_gyro_mode_enabled_by_default
        self.mouse_dir = 0

        all_anim_names = list(self.cozmo.anim_names)
        all_anim_names.sort()
        self.anim_names = []

        # Hide a few specific test animations that don't behave well
        bad_anim_names = [
            "ANIMATION_TEST",
            "soundTestAnim"]

        for anim_name in all_anim_names:
            if anim_name not in bad_anim_names:
                self.anim_names.append(anim_name)

        default_anims_for_keys = ["anim_bored_01",  # 0
                                  "anim_poked_giggle",  # 1
                                  "anim_pounce_success_02",  # 2
                                  "anim_bored_event_02",  # 3
                                  "anim_bored_event_03",  # 4
                                  "anim_petdetection_cat_01",  # 5
                                  "anim_petdetection_dog_03",  # 6
                                  "anim_reacttoface_unidentified_02",  # 7
                                  "anim_upgrade_reaction_lift_01",  # 8
                                  "anim_speedtap_wingame_intensity02_01"  # 9
                                 ]

        self.anim_index_for_key = [0] * 10
        kI = 0
        for default_key in default_anims_for_keys:
            try:
                anim_idx = self.anim_names.index(default_key)
            except ValueError:
                print("Error: default_anim %s is not in the list of animations" % default_key)
                anim_idx = kI
            self.anim_index_for_key[kI] = anim_idx
            kI += 1


        self.action_queue = []
        self.text_to_say = "Hi I'm Cozmo"


    def set_anim(self, key_index, anim_index):
        self.anim_index_for_key[key_index] = anim_index


    def handle_mouse(self, mouse_x, mouse_y, delta_x, delta_y, is_button_down):
        '''Called whenever mouse moves
            mouse_x, mouse_y are in in 0..1 range (0,0 = top left, 1,1 = bottom right of window)
            delta_x, delta_y are the change in mouse_x/y since the last update
        '''
        if self.is_mouse_look_enabled:
            mouse_sensitivity = 1.5 # higher = more twitchy
            self.mouse_dir = remap_to_range(mouse_x, 0.0, 1.0, -mouse_sensitivity, mouse_sensitivity)
            self.update_mouse_driving()

            desired_head_angle = remap_to_range(mouse_y, 0.0, 1.0, 45, -25)
            head_angle_delta = desired_head_angle - self.cozmo.head_angle.degrees
            head_vel = head_angle_delta * 0.03
            self.cozmo.move_head(head_vel)


    def set_mouse_look_enabled(self, is_mouse_look_enabled):
        was_mouse_look_enabled = self.is_mouse_look_enabled
        self.is_mouse_look_enabled = is_mouse_look_enabled
        if not is_mouse_look_enabled:
            # cancel any current mouse-look turning
            self.mouse_dir = 0
            if was_mouse_look_enabled:
                self.update_mouse_driving()
                self.update_head()


    def handle_key(self, key_code, is_shift_down, is_ctrl_down, is_alt_down, is_key_down):
        '''Called on any key press or release
           Holding a key down may result in repeated handle_key calls with is_key_down==True
        '''

        # Update desired speed / fidelity of actions based on shift/alt being held
        was_go_fast = self.go_fast
        was_go_slow = self.go_slow

        self.go_fast = is_shift_down
        self.go_slow = is_alt_down

        speed_changed = (was_go_fast != self.go_fast) or (was_go_slow != self.go_slow)

        # Update state of driving intent from keyboard, and if anything changed then call update_driving
        update_driving = True
        if key_code == ord('W'):
            self.drive_forwards = is_key_down
        elif key_code == ord('S'):
            self.drive_back = is_key_down
        elif key_code == ord('A'):
            self.turn_left = is_key_down
        elif key_code == ord('D'):
            self.turn_right = is_key_down
        else:
            if not speed_changed:
                update_driving = False

        # Update state of lift move intent from keyboard, and if anything changed then call update_lift
        update_lift = True
        if key_code == ord('R'):
            self.lift_up = is_key_down
        elif key_code == ord('F'):
            self.lift_down = is_key_down
        else:
            if not speed_changed:
                update_lift = False

        # Update state of head move intent from keyboard, and if anything changed then call update_head
        update_head = True
        if key_code == ord('T'):
            self.head_up = is_key_down
        elif key_code == ord('G'):
            self.head_down = is_key_down
        else:
            if not speed_changed:
                update_head = False

        # Update driving, head and lift as appropriate
        if update_driving:
            self.update_mouse_driving()
        if update_head:
            self.update_head()
        if update_lift:
            self.update_lift()

        # Handle any keys being released (e.g. the end of a key-click)
        if not is_key_down:
            if (key_code >= ord('0')) and (key_code <= ord('9')):
                anim_name = self.key_code_to_anim_name(key_code)
                self.play_animation(anim_name)
            elif key_code == ord(' '):
                self.say_text(self.text_to_say)


    def key_code_to_anim_name(self, key_code):
        key_num = key_code - ord('0')
        anim_num = self.anim_index_for_key[key_num]
        anim_name = self.anim_names[anim_num]
        return anim_name


    def func_to_name(self, func):
        if func == self.try_say_text:
            return "say_text"
        elif func == self.try_play_anim:
            return "play_anim"
        else:
            return "UNKNOWN"


    def action_to_text(self, action):
        func, args = action
        return self.func_to_name(func) + "( " + str(args) + " )"


    def action_queue_to_text(self, action_queue):
        out_text = ""
        i = 0
        for action in action_queue:
            out_text += "[" + str(i) + "] " + self.action_to_text(action)
            i += 1
        return out_text


    def queue_action(self, new_action):
        if len(self.action_queue) > 10:
            self.action_queue.pop(0)
        self.action_queue.append(new_action)


    def try_say_text(self, text_to_say):
        try:
            self.cozmo.say_text(text_to_say)
            return True
        except cozmo.exceptions.RobotBusy:
            return False


    def try_play_anim(self, anim_name):
        try:
            self.cozmo.play_anim(name=anim_name)
            return True
        except cozmo.exceptions.RobotBusy:
            return False


    def say_text(self, text_to_say):
        self.queue_action((self.try_say_text, text_to_say))
        self.update()


    def play_animation(self, anim_name):
        self.queue_action((self.try_play_anim, anim_name))
        self.update()


    def update(self):
        '''Try and execute the next queued action'''
        if len(self.action_queue) > 0:
            queued_action, action_args = self.action_queue[0]
            if queued_action(action_args):
                self.action_queue.pop(0)
        # Update gyro
        if self.is_device_gyro_mode_enabled and self.cozmo.device_gyro:
            self.update_gyro_driving()


    def pick_speed(self, fast_speed, mid_speed, slow_speed):
        if self.go_fast:
            if not self.go_slow:
                return fast_speed
        elif self.go_slow:
            return slow_speed
        return mid_speed


    def update_lift(self):
        lift_speed = self.pick_speed(8, 4, 2)
        lift_vel = (self.lift_up - self.lift_down) * lift_speed
        self.cozmo.move_lift(lift_vel)


    def update_head(self):
        if not self.is_mouse_look_enabled:
            head_speed = self.pick_speed(2, 1, 0.5)
            head_vel = (self.head_up - self.head_down) * head_speed
            self.cozmo.move_head(head_vel)

    def scale_deadzone(self, value, deadzone, maximum):
        if math.fabs(value) > deadzone:
            adjustment = math.copysign(deadzone, value)
            scaleFactor = maximum / (maximum - deadzone)
            return (value - adjustment) * scaleFactor
        else:
            return 0

    def update_gyro_driving(self):
        pitch, yaw, roll = self.cozmo.device_gyro.euler_angles
        # these are multiplied by 2 because 90 degress feels better for full velocity than 180 degrees
        drive_dir = self.scale_deadzone(pitch/math.pi, _gyro_driving_deadzone_ratio, 1) * 2
        turn_dir = self.scale_deadzone(roll/math.pi, _gyro_driving_deadzone_ratio, 1) * 2

        forward_speed = 250
        turn_speed = 250
        wheel_acceleration = 250

        l_wheel_speed = (drive_dir * forward_speed) + (turn_speed * turn_dir)
        r_wheel_speed = (drive_dir * forward_speed) - (turn_speed * turn_dir)

        self.cozmo.drive_wheels(l_wheel_speed, r_wheel_speed, wheel_acceleration, wheel_acceleration)

    def update_mouse_driving(self):
        drive_dir = (self.drive_forwards - self.drive_back)

        if (drive_dir > 0.1) and self.cozmo.is_on_charger:
            # cozmo is stuck on the charger, and user is trying to drive off - issue an explicit drive off action
            try:
                # don't wait for action to complete - we don't want to block the other updates (camera etc.)
                self.cozmo.drive_off_charger_contacts()
            except cozmo.exceptions.RobotBusy:
                # Robot is busy doing another action - try again next time we get a drive impulse
                pass

        turn_dir = (self.turn_right - self.turn_left) + self.mouse_dir
        if drive_dir < 0:
            # It feels more natural to turn the opposite way when reversing
            turn_dir = -turn_dir

        forward_speed = self.pick_speed(150, 75, 50)
        turn_speed = self.pick_speed(100, 50, 30)

        l_wheel_speed = (drive_dir * forward_speed) + (turn_speed * turn_dir)
        r_wheel_speed = (drive_dir * forward_speed) - (turn_speed * turn_dir)

        self.cozmo.drive_wheels(l_wheel_speed, r_wheel_speed, l_wheel_speed*4, r_wheel_speed*4 )

def get_anim_sel_drop_down(selectorIndex):
    html_text = '''<select onchange="handleDropDownSelect(this)" name="animSelector''' + str(selectorIndex) + '''">'''
    i = 0
    for anim_name in remote_control_cozmo.anim_names:
        is_selected_item = (i == remote_control_cozmo.anim_index_for_key[selectorIndex])
        selected_text = ''' selected="selected"''' if is_selected_item else ""
        html_text += '''<option value=''' + str(i) + selected_text + '''>''' + anim_name + '''</option>'''
        i += 1
    html_text += '''</select>'''
    return html_text


def get_anim_sel_drop_downs():
    html_text = ""
    for i in range(10):
        # list keys 1..9,0 as that's the layout on the keyboard
        key = i+1 if (i<9) else 0
        html_text += str(key) + ''': ''' + get_anim_sel_drop_down(key) + '''<br>'''
    return html_text


def to_js_bool_string(bool_value):
    return "true" if bool_value else "false"


@flask_app.route("/")
def handle_index_page():
    return '''
    <html>
        <head>
            <title>remote_control_cozmo.py display</title>
        </head>
        <body>
            <h1>Remote Control Cozmo</h1>
            <table>
                <tr>
                    <td valign = top>
                        <div id="cozmoImageMicrosoftWarning" style="display: none;color: #ff9900; text-align: center;">Video feed performance is better in Chrome or Firefox due to mjpeg limitations in this browser</div>
                        <img src="cozmoImage" id="cozmoImageId" width=640 height=480>
                        <div id="DebugInfoId"></div>
                    </td>
                    <td width=30></td>
                    <td valign=top>
                        <h2>Controls:</h2>

                        <h3>Driving:</h3>

                        <b>W A S D</b> : Drive Forwards / Left / Back / Right<br><br>
                        <b>Q</b> : Toggle Mouse Look: <button id="mouseLookId" onClick=onMouseLookButtonClicked(this) style="font-size: 14px">Default</button><br>
                        <b>Mouse</b> : Move in browser window to aim<br>
                        (steer and head angle)<br>
                        (similar to an FPS game)<br>
                        <br>
                        <b>T</b> : Move Head Up<br>
                        <b>G</b> : Move Head Down<br>

                        <h3>Lift:</h3>
                        <b>R</b> : Move Lift Up<br>
                        <b>F</b>: Move Lift Down<br>
                        <h3>General:</h3>
                        <b>Shift</b> : Hold to Move Faster (Driving, Head and Lift)<br>
                        <b>Alt</b> : Hold to Move Slower (Driving, Head and Lift)<br>
                        <b>L</b> : Toggle IR Headlight: <button id="headlightId" onClick=onHeadlightButtonClicked(this) style="font-size: 14px">Default</button><br>
                        <b>O</b> : Toggle Debug Annotations: <button id="debugAnnotationsId" onClick=onDebugAnnotationsButtonClicked(this) style="font-size: 14px">Default</button><br>
                        <b>P</b> : Toggle Free Play mode: <button id="freeplayId" onClick=onFreeplayButtonClicked(this) style="font-size: 14px">Default</button><br>
                        <b>Y</b> : Toggle Device Gyro mode: <button id="deviceGyroId" onClick=onDeviceGyroButtonClicked(this) style="font-size: 14px">Default</button><br>
                        <h3>Play Animations</h3>
                        <b>0 .. 9</b> : Play Animation mapped to that key<br>
                        <h3>Talk</h3>
                        <b>Space</b> : Say <input type="text" name="sayText" id="sayTextId" value="''' + remote_control_cozmo.text_to_say + '''" onchange=handleTextInput(this)>
                    </td>
                    <td width=30></td>
                    <td valign=top>
                    <h2>Animation key mappings:</h2>
                    ''' + get_anim_sel_drop_downs() + '''<br>
                    </td>
                </tr>
            </table>

            <script type="text/javascript">
                var gLastClientX = -1
                var gLastClientY = -1
                var gIsMouseLookEnabled = '''+ to_js_bool_string(_is_mouse_look_enabled_by_default) + '''
                var gAreDebugAnnotationsEnabled = '''+ str(_display_debug_annotations) + '''
                var gIsHeadlightEnabled = false
                var gIsFreeplayEnabled = false
                var gIsDeviceGyroEnabled = false
                var gUserAgent = window.navigator.userAgent;
                var gIsMicrosoftBrowser = gUserAgent.indexOf('MSIE ') > 0 || gUserAgent.indexOf('Trident/') > 0 || gUserAgent.indexOf('Edge/') > 0;
                var gSkipFrame = false;

                if (gIsMicrosoftBrowser) {
                    document.getElementById("cozmoImageMicrosoftWarning").style.display = "block";
                }

                function postHttpRequest(url, dataSet)
                {
                    var xhr = new XMLHttpRequest();
                    xhr.open("POST", url, true);
                    xhr.send( JSON.stringify( dataSet ) );
                }

                function updateCozmo()
                {
                    if (gIsMicrosoftBrowser && !gSkipFrame) {
                        // IE doesn't support MJPEG, so we need to ping the server for more images.
                        // Though, if this happens too frequently, the controls will be unresponsive.
                        gSkipFrame = true;
                        document.getElementById("cozmoImageId").src="cozmoImage?" + (new Date()).getTime();
                    } else if (gSkipFrame) {
                        gSkipFrame = false;
                    }
                    var xhr = new XMLHttpRequest();
                    xhr.onreadystatechange = function() {
                        if (xhr.readyState == XMLHttpRequest.DONE) {
                            document.getElementById("DebugInfoId").innerHTML = xhr.responseText
                        }
                    }

                    xhr.open("POST", "updateCozmo", true);
                    xhr.send( null );
                    setTimeout(updateCozmo , 60);
                }
                setTimeout(updateCozmo , 60);

                function updateButtonEnabledText(button, isEnabled)
                {
                    button.firstChild.data = isEnabled ? "Enabled" : "Disabled";
                }

                function onMouseLookButtonClicked(button)
                {
                    gIsMouseLookEnabled = !gIsMouseLookEnabled;
                    updateButtonEnabledText(button, gIsMouseLookEnabled);
                    isMouseLookEnabled = gIsMouseLookEnabled
                    postHttpRequest("setMouseLookEnabled", {isMouseLookEnabled})
                }

                function updateDebugAnnotationButtonEnabledText(button, isEnabled)
                {
                    switch(gAreDebugAnnotationsEnabled)
                    {
                    case 0:
                        button.firstChild.data = "Disabled";
                        break;
                    case 1:
                        button.firstChild.data = "Enabled (vision)";
                        break;
                    case 2:
                        button.firstChild.data = "Enabled (all)";
                        break;
                    default:
                        button.firstChild.data = "ERROR";
                        break;
                    }
                }

                function onDebugAnnotationsButtonClicked(button)
                {
                    gAreDebugAnnotationsEnabled += 1;
                    if (gAreDebugAnnotationsEnabled > 2)
                    {
                        gAreDebugAnnotationsEnabled = 0
                    }

                    updateDebugAnnotationButtonEnabledText(button, gAreDebugAnnotationsEnabled)

                    areDebugAnnotationsEnabled = gAreDebugAnnotationsEnabled
                    postHttpRequest("setAreDebugAnnotationsEnabled", {areDebugAnnotationsEnabled})
                }

                function onHeadlightButtonClicked(button)
                {
                    gIsHeadlightEnabled = !gIsHeadlightEnabled;
                    updateButtonEnabledText(button, gIsHeadlightEnabled);
                    isHeadlightEnabled = gIsHeadlightEnabled
                    postHttpRequest("setHeadlightEnabled", {isHeadlightEnabled})
                }

                function onFreeplayButtonClicked(button)
                {
                    gIsFreeplayEnabled = !gIsFreeplayEnabled;
                    updateButtonEnabledText(button, gIsFreeplayEnabled);
                    isFreeplayEnabled = gIsFreeplayEnabled
                    postHttpRequest("setFreeplayEnabled", {isFreeplayEnabled})
                }

                function onDeviceGyroButtonClicked(button)
                {
                    gIsDeviceGyroEnabled = !gIsDeviceGyroEnabled;
                    updateButtonEnabledText(button, gIsDeviceGyroEnabled);
                    isDeviceGyroEnabled = gIsDeviceGyroEnabled
                    postHttpRequest("setDeviceGyroEnabled", {isDeviceGyroEnabled})
                }

                updateButtonEnabledText(document.getElementById("mouseLookId"), gIsMouseLookEnabled);
                updateButtonEnabledText(document.getElementById("headlightId"), gIsHeadlightEnabled);
                updateDebugAnnotationButtonEnabledText(document.getElementById("debugAnnotationsId"), gAreDebugAnnotationsEnabled);
                updateButtonEnabledText(document.getElementById("freeplayId"), gIsFreeplayEnabled);
                updateButtonEnabledText(document.getElementById("deviceGyroId"), gIsDeviceGyroEnabled);

                function handleDropDownSelect(selectObject)
                {
                    selectedIndex = selectObject.selectedIndex
                    itemName = selectObject.name
                    postHttpRequest("dropDownSelect", {selectedIndex, itemName});
                }

                function handleKeyActivity (e, actionType)
                {
                    var keyCode  = (e.keyCode ? e.keyCode : e.which);
                    var hasShift = (e.shiftKey ? 1 : 0)
                    var hasCtrl  = (e.ctrlKey  ? 1 : 0)
                    var hasAlt   = (e.altKey   ? 1 : 0)

                    if (actionType=="keyup")
                    {
                        if (keyCode == 76) // 'L'
                        {
                            // Simulate a click of the headlight button
                            onHeadlightButtonClicked(document.getElementById("headlightId"))
                        }
                        else if (keyCode == 79) // 'O'
                        {
                            // Simulate a click of the debug annotations button
                            onDebugAnnotationsButtonClicked(document.getElementById("debugAnnotationsId"))
                        }
                        else if (keyCode == 80) // 'P'
                        {
                            // Simulate a click of the debug annotations button
                            onFreeplayButtonClicked(document.getElementById("freeplayId"))
                        }
                        else if (keyCode == 81) // 'Q'
                        {
                            // Simulate a click of the mouse look button
                            onMouseLookButtonClicked(document.getElementById("mouseLookId"))
                        }
                        else if (keyCode == 89) // 'Y'
                        {
                            // Simulate a click of the device gyro button
                            onDeviceGyroButtonClicked(document.getElementById("deviceGyroId"))
                        }
                    }

                    postHttpRequest(actionType, {keyCode, hasShift, hasCtrl, hasAlt})
                }

                function handleMouseActivity (e, actionType)
                {
                    var clientX = e.clientX / document.body.clientWidth  // 0..1 (left..right)
                    var clientY = e.clientY / document.body.clientHeight // 0..1 (top..bottom)
                    var isButtonDown = e.which && (e.which != 0) ? 1 : 0
                    var deltaX = (gLastClientX >= 0) ? (clientX - gLastClientX) : 0.0
                    var deltaY = (gLastClientY >= 0) ? (clientY - gLastClientY) : 0.0
                    gLastClientX = clientX
                    gLastClientY = clientY

                    postHttpRequest(actionType, {clientX, clientY, isButtonDown, deltaX, deltaY})
                }

                function handleTextInput(textField)
                {
                    textEntered = textField.value
                    postHttpRequest("sayText", {textEntered})
                }

                document.addEventListener("keydown", function(e) { handleKeyActivity(e, "keydown") } );
                document.addEventListener("keyup",   function(e) { handleKeyActivity(e, "keyup") } );

                document.addEventListener("mousemove",   function(e) { handleMouseActivity(e, "mousemove") } );

                function stopEventPropagation(event)
                {
                    if (event.stopPropagation)
                    {
                        event.stopPropagation();
                    }
                    else
                    {
                        event.cancelBubble = true
                    }
                }

                document.getElementById("sayTextId").addEventListener("keydown", function(event) {
                    stopEventPropagation(event);
                } );
                document.getElementById("sayTextId").addEventListener("keyup", function(event) {
                    stopEventPropagation(event);
                } );
            </script>

        </body>
    </html>
    '''

def get_annotated_image():
    image = remote_control_cozmo.cozmo.world.latest_image
    if _display_debug_annotations != DEBUG_ANNOTATIONS_DISABLED:
        image = image.annotate_image(scale=2)
    else:
        image = image.raw_image
    return image

def streaming_video(url_root):
    '''Video streaming generator function'''
    try:
        while True:
            if remote_control_cozmo:
                image = get_annotated_image()

                img_io = io.BytesIO()
                image.save(img_io, 'PNG')
                img_io.seek(0)
                yield (b'--frame\r\n'
                    b'Content-Type: image/png\r\n\r\n' + img_io.getvalue() + b'\r\n')
            else:
                asyncio.sleep(.1)
    except cozmo.exceptions.SDKShutdown:
        # Tell the main flask thread to shutdown
        requests.post(url_root + 'shutdown')

def serve_single_image():
    if remote_control_cozmo:
        try:
            image = get_annotated_image()
            if image:
                return flask_helpers.serve_pil_image(image)
        except cozmo.exceptions.SDKShutdown:
            requests.post('shutdown')
    return flask_helpers.serve_pil_image(_default_camera_image)

def is_microsoft_browser(request):
    agent = request.user_agent.string
    return 'Edge/' in agent or 'MSIE ' in agent or 'Trident/' in agent

@flask_app.route("/cozmoImage")
def handle_cozmoImage():
    if is_microsoft_browser(request):
        return serve_single_image()
    return flask_helpers.stream_video(streaming_video, request.url_root)

def handle_key_event(key_request, is_key_down):
    message = json.loads(key_request.data.decode("utf-8"))
    if remote_control_cozmo:
        remote_control_cozmo.handle_key(key_code=(message['keyCode']), is_shift_down=message['hasShift'],
                                        is_ctrl_down=message['hasCtrl'], is_alt_down=message['hasAlt'],
                                        is_key_down=is_key_down)
    return ""

@flask_app.route('/shutdown', methods=['POST'])
def shutdown():
    flask_helpers.shutdown_flask(request)
    return ""

@flask_app.route('/mousemove', methods=['POST'])
def handle_mousemove():
    '''Called from Javascript whenever mouse moves'''
    message = json.loads(request.data.decode("utf-8"))
    if remote_control_cozmo:
        remote_control_cozmo.handle_mouse(mouse_x=(message['clientX']), mouse_y=message['clientY'],
                                          delta_x=message['deltaX'], delta_y=message['deltaY'],
                                          is_button_down=message['isButtonDown'])
    return ""


@flask_app.route('/setMouseLookEnabled', methods=['POST'])
def handle_setMouseLookEnabled():
    '''Called from Javascript whenever mouse-look mode is toggled'''
    message = json.loads(request.data.decode("utf-8"))
    if remote_control_cozmo:
        remote_control_cozmo.set_mouse_look_enabled(is_mouse_look_enabled=message['isMouseLookEnabled'])
    return ""


@flask_app.route('/setHeadlightEnabled', methods=['POST'])
def handle_setHeadlightEnabled():
    '''Called from Javascript whenever headlight is toggled on/off'''
    message = json.loads(request.data.decode("utf-8"))
    if remote_control_cozmo:
        remote_control_cozmo.cozmo.set_head_light(enable=message['isHeadlightEnabled'])
    return ""


@flask_app.route('/setAreDebugAnnotationsEnabled', methods=['POST'])
def handle_setAreDebugAnnotationsEnabled():
    '''Called from Javascript whenever debug-annotations mode is toggled'''
    message = json.loads(request.data.decode("utf-8"))
    global _display_debug_annotations
    _display_debug_annotations = message['areDebugAnnotationsEnabled']
    if remote_control_cozmo:
        if _display_debug_annotations == DEBUG_ANNOTATIONS_ENABLED_ALL:
            remote_control_cozmo.cozmo.world.image_annotator.enable_annotator('robotState')
        else:
            remote_control_cozmo.cozmo.world.image_annotator.disable_annotator('robotState')
    return ""


@flask_app.route('/setFreeplayEnabled', methods=['POST'])
def handle_setFreeplayEnabled():
    '''Called from Javascript whenever freeplay mode is toggled on/off'''
    message = json.loads(request.data.decode("utf-8"))
    if remote_control_cozmo:
        isFreeplayEnabled = message['isFreeplayEnabled']
        if isFreeplayEnabled:
            remote_control_cozmo.cozmo.start_freeplay_behaviors()
        else:
            remote_control_cozmo.cozmo.stop_freeplay_behaviors()
    return ""


@flask_app.route('/setDeviceGyroEnabled', methods=['POST'])
def handle_setDeviceGyroEnabled():
    '''Called from Javascript whenever device gyro mode is toggled on/off'''
    message = json.loads(request.data.decode("utf-8"))
    if remote_control_cozmo:
        is_device_gyro_enabled = message['isDeviceGyroEnabled']
        if is_device_gyro_enabled:
            remote_control_cozmo.is_device_gyro_mode_enabled = True
        else:
            remote_control_cozmo.is_device_gyro_mode_enabled = False
            # stop movement when turning off gyro mode
            remote_control_cozmo.cozmo.drive_wheels(0, 0, 0, 0)
    return ""


@flask_app.route('/keydown', methods=['POST'])
def handle_keydown():
    '''Called from Javascript whenever a key is down (note: can generate repeat calls if held down)'''
    return handle_key_event(request, is_key_down=True)


@flask_app.route('/keyup', methods=['POST'])
def handle_keyup():
    '''Called from Javascript whenever a key is released'''
    return handle_key_event(request, is_key_down=False)


@flask_app.route('/dropDownSelect', methods=['POST'])
def handle_dropDownSelect():
    '''Called from Javascript whenever an animSelector dropdown menu is selected (i.e. modified)'''
    message = json.loads(request.data.decode("utf-8"))

    item_name_prefix = "animSelector"
    item_name = message['itemName']

    if remote_control_cozmo and item_name.startswith(item_name_prefix):
        item_name_index = int(item_name[len(item_name_prefix):])
        remote_control_cozmo.set_anim(item_name_index, message['selectedIndex'])

    return ""


@flask_app.route('/sayText', methods=['POST'])
def handle_sayText():
    '''Called from Javascript whenever the saytext text field is modified'''
    message = json.loads(request.data.decode("utf-8"))
    if remote_control_cozmo:
        remote_control_cozmo.text_to_say = message['textEntered']
    return ""


@flask_app.route('/updateCozmo', methods=['POST'])
def handle_updateCozmo():
    if remote_control_cozmo:
        remote_control_cozmo.update()
        action_queue_text = ""
        i = 1
        for action in remote_control_cozmo.action_queue:
            action_queue_text += str(i) + ": " + remote_control_cozmo.action_to_text(action) + "<br>"
            i += 1

        return '''Action Queue:<br>''' + action_queue_text + '''
        '''
    return ""


def run(sdk_conn):
    robot = sdk_conn.wait_for_robot()
    robot.world.image_annotator.add_annotator('robotState', RobotStateDisplay)
    robot.enable_device_imu(True, True, True)

    global remote_control_cozmo
    remote_control_cozmo = RemoteControlCozmo(robot)

    # Turn on image receiving by the camera
    robot.camera.image_stream_enabled = True

    flask_helpers.run_flask(flask_app)

if __name__ == '__main__':
    cozmo.setup_basic_logging()
    cozmo.robot.Robot.drive_off_charger_on_connect = False  # RC can drive off charger if required
    try:
        cozmo.connect(run)
    except KeyboardInterrupt as e:
        pass
    except cozmo.ConnectionError as e:
        sys.exit("A connection error occurred: %s" % e)

Fin


本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏ATYUN订阅号

丰田在机器人领域的发展现状与展望

丰田汽车公司现在也跻身于机器人领域。丰田以其自动化装配线而闻名,它看到了一个遥远的未来——数百万老年人独居的老龄化社会,机器人将走出工厂,走入普通家庭,帮助人们...

27230
来自专栏ATYUN订阅号

哈佛的“虫虫”机器人已经能爬上墙了

新发表在《科学机器人》杂志上的一篇论文详细介绍了一个被称为“哈佛电动附着微型机器人”(简称HAMR-E)的系统,该系统利用静电力抓住水平、垂直、甚至倒立的金属表...

13330
来自专栏ATYUN订阅号

3D打印机器人手,进行不同风格的钢琴演奏

科学家们开发出一种3D打印的机械手,它可以通过移动手腕在钢琴上弹奏简单的乐句。虽然机器人远非艺术大师,但它展示了复制人手的功能是多么具有挑战性,以及通过设计还可...

9420
来自专栏奇点大数据

2018十大机器人新进展:索尼电子狗、哈佛微型机器人、耶鲁机器皮肤榜上有名

机器人技术和机器人开发在2018年取得了一些标志性的进展,无论是家用、工业、医疗、生物和玩具行业都取得了显着的进步,推出了新机器人和先进的人工智能。比如之前日本...

9730
来自专栏新智元

剑桥研发有骨骼韧带的机器手,灵活弹奏圣诞钢琴曲

这是剑桥大学科学家的杰作,研究人员开发出了这种3D打印的机器人手,只需移动手腕即可在钢琴上弹奏出简短的音乐。快到圣诞啦,先让它给你弹奏一首《jingle bel...

12220
来自专栏新智元

自燃了!加州伯克利送餐机器人突然着火,厂商紧急撤回

2018年12月14日下午2点左右,加州伯克利校园内,一台KiwiBot送餐机器人着火,现场冒出一些烟雾和轻微的火焰。

11420
来自专栏ATYUN订阅号

“安卓男孩”机器人可以体现丰富的表情

如果机器人更有效地与人类互动,机器人面孔必须表达更丰富的情感。日前研究人员解决了这一挑战,他们升级了他们的安卓机器人,名为Affetto。他们精确地检查了Aff...

10520
来自专栏ATYUN订阅号

Postmates推出全新送货机器人,加速自主交付进程

Postmates推出了Serve,这是一款能够在人行道和城市街道自动驾驶的四轮交付车。下次你从旧金山按需交付公司Postmates订购东西时,机器人可能会在门...

9520
来自专栏新智元

【视频】现实版阿凡达:丰田研发仿人机器人,10公里内可遥控

4G网络的传输速度,比起3G,快的不只是一点点。虽说天下武功唯快不破,然而4G,还不够快。

9630
来自专栏owent

Rust玩具-企业微信机器人通用服务

这个机器人其实蛮久前就做好了,现在才写了点分享出来。 最近企业微信不断地开放了机器人的接口,所以我想想拿来做一些开发工具集成也是挺不错的,顺便也是为了继续熟悉一...

1.1K30

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励