Робот
Устройство и подключение
Управляющее ПО
Язык BML
Словарь жестов
SSSender
API робота
Когнитивный компонент
Обработка текста
Парсер
Массивы текстов
База разборов

Программный интерфейс (API) для управления роботом

Создание новых модулей для управления роботом даёт возможность адаптировать робота под свои проекты и задачи. В этой статье описаны функции программного интерфейса (API), упрощающие разработку модулей для робота.

Для взаимодействия с роботом используется библиотека SignalR

Управление роботом из C#

using Microsoft.AspNetCore.SignalR.Client;

var scenario = @"<scenario id=""test"">
<bml>
    <face lexeme=""happy2""/>
</bml>
</scenario>";

var connectionBuilder = new HubConnectionBuilder().WithUrl("ws://localhost:5000/scenario");
await using var connection = connectionBuilder.Build();

await connection.StartAsync();

await connection.InvokeAsync("Add", scenario);

Console.WriteLine("Scenario has been sent to robot");
Console.ReadLine();

Управление роботом из Python

Требуется версия Python 3.8

import asyncio
from signalr_async.netcore import Hub, Client
from signalr_async.netcore.protocols import MessagePackProtocol
import sys

class MyHub(Hub):
    async def on_connect(self, connection_id: str) -> None:
        """Will be awaited after connection established"""

    async def on_disconnect(self) -> None:
        """Will be awaited after client disconnection"""

    async def add_scenario(self, scenario) -> None:
        """Invoke add method on server"""
        return await self.invoke("Add", scenario)

hub = MyHub("scenario")

@hub.on("ReceiveScenarioStatus")
async def receive_scenario_status(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Scenario ({logMessage['ScenarioId']}) - {logMessage['Status']}")

@hub.on("ReceiveErrorScenario")
async def receive_error_scenario(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Scenario ({logMessage['ScenarioId']}) occurs error - {logMessage['Exception']}")

@hub.on("ReceiveBmlStatus")
async def receive_bml_status(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Bml ({logMessage['ScenarioId']};{logMessage['BmlId']}) - {logMessage['Status']}")

@hub.on("ReceiveErrorBml")
async def receive_error_bml(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Bml ({logMessage['ScenarioId']};{logMessage['BmlId']}) occurs error - {logMessage['Exception']}")

@hub.on("ReceiveTagStatus")
async def receive_tag_status(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Tag ({logMessage['ScenarioId']};{logMessage['BmlId']};{logMessage['TagId']}) - {logMessage['Status']}")

@hub.on("ReceiveErrorTag")
async def receive_error_tag(logMessage) -> None:
    print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Tag ({logMessage['ScenarioId']};{logMessage['BmlId']};{logMessage['TagId']}) occurs error - {logMessage['Exception']}")

async def ainput(string: str) -> str:
    await asyncio.get_event_loop().run_in_executor(
            None, lambda s=string: sys.stdout.write(s+' '))
    return await asyncio.get_event_loop().run_in_executor(
            None, sys.stdin.readline)

async def main():
    async with Client(
        "http://localhost:5000",
        hub,
        connection_options={
            "protocol": MessagePackProtocol(),
        },
    ) as client:
        await hub.add_scenario("""
        <scenario id="test">
            <bml>
                <figure lexeme="happy2"/>
                <speech text="Привет, я робот Ф2!"/>
            </bml>
        </scenario>""")
        await ainput("Press enter to exit...")


asyncio.run(main())