import os
import time
import pytest
import signal
import subprocess
import coverage.data
import urllib.request
from threading import Timer
from urllib.error import URLError
from ocs.ocs_client import OCSClient
SIGINT_TIMEOUT = 10
class _AgentRunner:
"""Class to manage running an agent as a subprocess during testing.
Parameters:
agent_path (str): Relative path to Agent,
i.e. '../agents/fake_data/fake_data_agent.py'
agent_name (str): Short, unique name for the agent
args (list): Additional CLI arguments to add when starting the Agent
kill_to_exit (bool): If True, will send a kill signal to exit instead
of interrupt.
"""
def __init__(self, agent_path, agent_name, args, kill_to_exit=False):
self.env = os.environ.copy()
self.env['COVERAGE_FILE'] = f'.coverage.agent.{agent_name}'
self.env['OCS_CONFIG_DIR'] = os.getcwd()
self.cmd = [
'python',
'-u',
'-m',
'coverage',
'run',
'--rcfile=./.coveragerc',
agent_path,
'--site-file',
'./default.yaml'
]
if args is not None:
self.cmd.extend(args)
self.agent_name = agent_name
self.proc = None
self._kill_to_exit = kill_to_exit
self._timer = None
self._timedout = False
def run(self, timeout):
"""Run the agent subprocess.
This runs the agent subprocess defined by ``self.cmd``. Output is
written to a ``PIPE``. If the agent does not exit within the given
timeout it will be interrupted with a ``SIGKILL``.
Parameters:
timeout (float): Timeout in seconds to wait for agent to exit.
"""
self.proc = subprocess.Popen(self.cmd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
preexec_fn=os.setsid)
# start timer for if agent crashes and hangs
self._timer = Timer(timeout, self._interrupt)
self._timer.start()
# Wait briefly then make sure subprocess hasn't already exited.
time.sleep(1)
if self.proc.poll() is not None:
self._timer.cancel()
self._raise_subprocess(f"Agent failed to startup, cmd: {self.cmd}")
def _interrupt(self):
# not graceful, but handles really misbehaved agent subprocesses
self.proc.send_signal(signal.SIGKILL)
self._timedout = True
def _raise_subprocess(self, msg):
stdout, stderr = self.proc.stdout.read(), self.proc.stderr.read()
print(f'Here is stdout from {self.agent_name}:\n{stdout}')
print(f'Here is stderr from {self.agent_name}:\n{stderr}')
raise RuntimeError(msg)
def shutdown(self):
"""Shutdown the agent process.
If the agent does not respond to a ``SIGINT`` then output is printed,
and an exception raised.
"""
# don't send SIGINT if we've already sent SIGKILL
if not self._timedout:
if self._kill_to_exit:
sig = signal.SIGKILL
else:
sig = signal.SIGINT
self.proc.send_signal(sig)
self._timer.cancel()
try:
self.proc.communicate(timeout=SIGINT_TIMEOUT)
except subprocess.TimeoutExpired:
self._raise_subprocess('Agent did not terminate within '
f'{SIGINT_TIMEOUT} seconds on SIGINT.')
if self._timedout:
stdout, stderr = self.proc.communicate(timeout=SIGINT_TIMEOUT)
print(f'Here is stdout from {self.agent_name}:\n{stdout}')
print(f'Here is stderr from {self.agent_name}:\n{stderr}')
raise RuntimeError('Agent timed out.')
[docs]
def create_agent_runner_fixture(agent_path, agent_name, args=None, timeout=60, kill_to_exit=False):
"""Create a pytest fixture for running a given OCS Agent.
Parameters:
agent_path (str): Relative path to Agent,
i.e. '../agents/fake_data/fake_data_agent.py'
agent_name (str): Short, unique name for the agent
args (list): Additional CLI arguments to add when starting the Agent
timeout (float): Timeout in seconds, after which the agent process will
be interrupted. This typically indicates a crash within the agent.
This timeout should be longer than you expect the agent to run for
during a given test. Defaults to 60 seconds.
kill_to_exit (bool): If True, will send a kill signal to exit instead of
interrupt.
"""
@pytest.fixture()
def run_agent(cov):
runner = _AgentRunner(agent_path, agent_name, args, kill_to_exit=kill_to_exit)
runner.run(timeout=timeout)
yield
runner.shutdown()
# report coverage
agentcov = coverage.data.CoverageData(
basename=f'.coverage.agent.{agent_name}')
agentcov.read()
# protect against missing --cov flag
if cov is not None:
cov.get_data().update(agentcov)
return run_agent
[docs]
def create_client_fixture(instance_id, timeout=30, privs=None):
"""Create the fixture that provides tests a Client object.
Parameters:
instance_id (str): Agent instance-id to connect the Client to
timeout (int): Approximate timeout in seconds for the connection.
Connection attempts will be made X times, with a 1 second pause
between attempts. This is useful if it takes some time for the
Agent to start accepting connections, which varies depending on the
Agent.
privs (str or int): privs argument for OCSClient constructor.
"""
@pytest.fixture()
def client_fixture():
# Set the OCS_CONFIG_DIR so we read the local default.yaml file
os.environ['OCS_CONFIG_DIR'] = os.getcwd()
print(os.environ['OCS_CONFIG_DIR'])
attempts = 0
while attempts < timeout:
try:
client = OCSClient(instance_id, privs=privs)
return client
except RuntimeError as e:
print(f"Caught error: {e}")
print("Attempting to reconnect.")
time.sleep(1)
attempts += 1
raise RuntimeError(
f"Failed to connect to {instance_id} after {timeout} attempts.")
return client_fixture
[docs]
def check_crossbar_connection(port=18001, interval=5, max_attempts=6):
"""Check that the crossbar server is up and available for an Agent to
connect to.
Parameters:
port (int): Port the crossbar server is configured to run on for
testing.
interval (float): Amount of time in seconds to wait between checks.
max_attempts (int): Maximum number of attempts before giving up.
Notes:
For this check to work the crossbar server needs the `Node Info Service
<https://crossbar.io/docs/Node-Info-Service/>`_ running at the path
/info.
"""
attempts = 0
while attempts < max_attempts:
try:
url = f"http://localhost:{port}/info"
code = urllib.request.urlopen(url).getcode()
except (URLError, ConnectionResetError):
print("Crossbar server not online yet, waiting 5 seconds.")
time.sleep(interval)
attempts += 1
assert code == 200
print("Crossbar server online.")