import os
import time
import pytest
import signal
import subprocess
import coverage.data
import urllib.request
from urllib.error import URLError
from ocs.ocs_client import OCSClient
SIGINT_TIMEOUT = 10
[docs]
def create_agent_runner_fixture(agent_path, agent_name, args=None):
"""Create a pytest fixture for running a given OCS Agent.
Parameters:
agent_path (str): Relative path to Agent,
i.e. '../agents/fake_data/fake_data_agent.py'
agent_name (str): Short, unique name for the agent
args (list): Additional CLI arguments to add when starting the Agent
"""
@pytest.fixture()
def run_agent(cov):
env = os.environ.copy()
env['COVERAGE_FILE'] = f'.coverage.agent.{agent_name}'
env['OCS_CONFIG_DIR'] = os.getcwd()
cmd = ['coverage', 'run',
'--rcfile=./.coveragerc',
agent_path,
'--site-file',
'./default.yaml']
if args is not None:
cmd.extend(args)
agentproc = subprocess.Popen(cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid)
def raise_subprocess(msg):
stdout, stderr = agentproc.stdout.read(), agentproc.stderr.read()
print(f'Here is stdout from {agent_name}:\n{stdout}')
print(f'Here is stderr from {agent_name}:\n{stderr}')
raise RuntimeError(msg)
# Wait briefly then make sure subprocess hasn't already exited.
time.sleep(1)
if agentproc.poll() is not None:
raise_subprocess(f"Agent failed to startup, cmd: {cmd}")
yield
# shutdown Agent
agentproc.send_signal(signal.SIGINT)
try:
agentproc.communicate(timeout=SIGINT_TIMEOUT)
except subprocess.TimeoutExpired:
raise_subprocess('Agent did not terminate within '
f'{SIGINT_TIMEOUT} seconds on SIGINT.')
# report coverage
agentcov = coverage.data.CoverageData(
basename=f'.coverage.agent.{agent_name}')
agentcov.read()
# protect against missing --cov flag
if cov is not None:
cov.get_data().update(agentcov)
return run_agent
[docs]
def create_client_fixture(instance_id, timeout=30):
"""Create the fixture that provides tests a Client object.
Parameters:
instance_id (str): Agent instance-id to connect the Client to
timeout (int): Approximate timeout in seconds for the connection.
Connection attempts will be made X times, with a 1 second pause
between attempts. This is useful if it takes some time for the
Agent to start accepting connections, which varies depending on the
Agent.
"""
@pytest.fixture()
def client_fixture():
# Set the OCS_CONFIG_DIR so we read the local default.yaml file
os.environ['OCS_CONFIG_DIR'] = os.getcwd()
print(os.environ['OCS_CONFIG_DIR'])
attempts = 0
while attempts < timeout:
try:
client = OCSClient(instance_id)
return client
except RuntimeError as e:
print(f"Caught error: {e}")
print("Attempting to reconnect.")
time.sleep(1)
attempts += 1
raise RuntimeError(
f"Failed to connect to {instance_id} after {timeout} attempts.")
return client_fixture
[docs]
def check_crossbar_connection(port=18001, interval=5, max_attempts=6):
"""Check that the crossbar server is up and available for an Agent to
connect to.
Parameters:
port (int): Port the crossbar server is configured to run on for
testing.
interval (float): Amount of time in seconds to wait between checks.
max_attempts (int): Maximum number of attempts before giving up.
Notes:
For this check to work the crossbar server needs the `Node Info Service
<https://crossbar.io/docs/Node-Info-Service/>`_ running at the path
/info.
"""
attempts = 0
while attempts < max_attempts:
try:
url = f"http://localhost:{port}/info"
code = urllib.request.urlopen(url).getcode()
except (URLError, ConnectionResetError):
print("Crossbar server not online yet, waiting 5 seconds.")
time.sleep(interval)
attempts += 1
assert code == 200
print("Crossbar server online.")