Программный интерфейс (API) для управления роботом
Создание новых модулей для управления роботом даёт возможность адаптировать робота под свои проекты и задачи. В этой статье описаны функции программного интерфейса (API), упрощающие разработку модулей для робота.
Для взаимодействия с роботом используется библиотека SignalR
Управление роботом из C#
using Microsoft.AspNetCore.SignalR.Client;
var scenario = @"<scenario id=""test"">
<bml>
<face lexeme=""happy2""/>
</bml>
</scenario>";
var connectionBuilder = new HubConnectionBuilder().WithUrl("ws://localhost:5000/scenario");
await using var connection = connectionBuilder.Build();
await connection.StartAsync();
await connection.InvokeAsync("Add", scenario);
Console.WriteLine("Scenario has been sent to robot");
Console.ReadLine();
Управление роботом из Python
Требуется версия Python 3.8
import asyncio
from signalr_async.netcore import Hub, Client
from signalr_async.netcore.protocols import MessagePackProtocol
import sys
class MyHub(Hub):
async def on_connect(self, connection_id: str) -> None:
"""Will be awaited after connection established"""
async def on_disconnect(self) -> None:
"""Will be awaited after client disconnection"""
async def add_scenario(self, scenario) -> None:
"""Invoke add method on server"""
return await self.invoke("Add", scenario)
hub = MyHub("scenario")
@hub.on("ReceiveScenarioStatus")
async def receive_scenario_status(logMessage) -> None:
print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Scenario ({logMessage['ScenarioId']}) - {logMessage['Status']}")
@hub.on("ReceiveErrorScenario")
async def receive_error_scenario(logMessage) -> None:
print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Scenario ({logMessage['ScenarioId']}) occurs error - {logMessage['Exception']}")
@hub.on("ReceiveBmlStatus")
async def receive_bml_status(logMessage) -> None:
print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Bml ({logMessage['ScenarioId']};{logMessage['BmlId']}) - {logMessage['Status']}")
@hub.on("ReceiveErrorBml")
async def receive_error_bml(logMessage) -> None:
print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Bml ({logMessage['ScenarioId']};{logMessage['BmlId']}) occurs error - {logMessage['Exception']}")
@hub.on("ReceiveTagStatus")
async def receive_tag_status(logMessage) -> None:
print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Tag ({logMessage['ScenarioId']};{logMessage['BmlId']};{logMessage['TagId']}) - {logMessage['Status']}")
@hub.on("ReceiveErrorTag")
async def receive_error_tag(logMessage) -> None:
print(f"{logMessage['Time'].to_datetime().strftime('%d-%m-%Y %H:%M:%S.%f')[:-3]}: Tag ({logMessage['ScenarioId']};{logMessage['BmlId']};{logMessage['TagId']}) occurs error - {logMessage['Exception']}")
async def ainput(string: str) -> str:
await asyncio.get_event_loop().run_in_executor(
None, lambda s=string: sys.stdout.write(s+' '))
return await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.readline)
async def main():
async with Client(
"http://localhost:5000",
hub,
connection_options={
"protocol": MessagePackProtocol(),
},
) as client:
await hub.add_scenario("""
<scenario id="test">
<bml>
<figure lexeme="happy2"/>
<speech text="Привет, я робот Ф2!"/>
</bml>
</scenario>""")
await ainput("Press enter to exit...")
asyncio.run(main())