Программный интерфейс (API) для управления роботом

Создание новых модулей для управления роботом даёт возможность адаптировать робота под свои проекты и задачи. В этой статье описаны функции программного интерфейса (API), упрощающие разработку модулей для робота.

Для взаимодействия с роботом используется библиотека SignalR

Управление роботом из C#

using Microsoft.AspNetCore.SignalR.Client;

var scenario = @"<scenario id=""test"">
<bml>
    <face lexeme=""happy2""/>
</bml>
</scenario>";

var connectionBuilder = new HubConnectionBuilder().WithUrl("ws://localhost:5000/scenario");
await using var connection = connectionBuilder.Build();

await connection.StartAsync();

await connection.InvokeAsync("Add", scenario);

Console.WriteLine("Scenario has been sent to robot");
Console.ReadLine();

Управление роботом из Python

Требуется версия Python 3.8

import asyncio
from signalr_async.netcore import Hub, Client
from signalr_async.netcore.protocols import MessagePackProtocol
import sys

class MyHub(Hub):
    async def on_connect(self, connection_id: str) -> None:
        """Will be awaited after connection established"""

    async def on_disconnect(self) -> None:
        """Will be awaited after client disconnection"""

    async def add_scenario(self, scenario) -> None:
        """Invoke add method on server"""
        return await self.invoke("Add", scenario)

hub = MyHub("scenario")

@hub.on("ReceiveScenarioStatus")
async def receive_scenario_status(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Scenario ({logMessage['ScenarioId']}) - {logMessage['Status']}")

@hub.on("ReceiveErrorScenario")
async def receive_error_scenario(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Scenario ({logMessage['ScenarioId']}) occurs error - {logMessage['Exception']}")

@hub.on("ReceiveBmlStatus")
async def receive_bml_status(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Bml ({logMessage['ScenarioId']};{logMessage['BmlId']}) - {logMessage['Status']}")

@hub.on("ReceiveErrorBml")
async def receive_error_bml(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Bml ({logMessage['ScenarioId']};{logMessage['BmlId']}) occurs error - {logMessage['Exception']}")

@hub.on("ReceiveTagStatus")
async def receive_tag_status(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Tag ({logMessage['ScenarioId']};{logMessage['BmlId']};{logMessage['TagId']}) - {logMessage['Status']}")

@hub.on("ReceiveErrorTag")
async def receive_error_tag(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Tag ({logMessage['ScenarioId']};{logMessage['BmlId']};{logMessage['TagId']}) occurs error - {logMessage['Exception']}")

async def ainput(string: str) -> str:
    await asyncio.get_event_loop().run_in_executor(
            None, lambda s=string: sys.stdout.write(s+' '))
    return await asyncio.get_event_loop().run_in_executor(
            None, sys.stdin.readline)

async def main():
    async with Client(
        "http://localhost:5000",
        hub,
        connection_options={
            "protocol": MessagePackProtocol(),
        },
    ) as client:
        await hub.add_scenario("""
        <scenario id="test">
            <bml>
                <figure lexeme="happy2"/>
                <speech text="Привет, я робот Ф2!"/>
            </bml>
        </scenario>""")
        await ainput("Press enter to exit...")


asyncio.run(main())