InfluxDB Publisher Agent

The InfluxDB Publisher Agent acts like the OCS Aggregator, but instead of writing to file it will publish all recorded OCS data feeds to an InfluxDB instance running somewhere on the network.

usage: agent.py [-h] [--initial-state {idle,record}] [--host HOST]
                [--port PORT] [--database DATABASE] [--protocol {json,line}]
                [--ssl SSL] [--verify-ssl VERIFY_SSL] [--gzip GZIP]

Agent Options

--initial-state

Possible choices: idle, record

Initial state of argument parser. Can be either idle or record

Default: 'record'

--host

InfluxDB host address.

Default: 'influxdb'

--port

InfluxDB port.

Default: 8086

--database

Database within InfluxDB to publish data to.

Default: 'ocs_feeds'

--protocol

Possible choices: json, line

Protocol for writing data. Either ‘line’ or ‘json’.

Default: 'line'

--ssl

Use https instead of http to connect to InfluxDB, defaults to False.

Default: False

--verify-ssl

Verify SSL certificates for HTTPS requests, defaults to False.

Default: False

--gzip

Use gzip content encoding to compress requests.

Default: False

Configuration File Examples

Below are configuration examples for the ocs config file and for running the Agent in a docker container. Also included is an example for setting up Grafana to display data from InfluxDB.

OCS Site Config

Add an InfluxDBAgent to your OCS configuration file:

{'agent-class': 'InfluxDBAgent',
 'instance-id': 'influxagent',
 'arguments': ['--initial-state', 'record',
               '--host', 'influxdb',
               '--port', 8086,
               '--protocol', 'line',
               '--gzip', True,
               '--ssl', False,
               '--verify-ssl', False,
               '--database', 'ocs_feeds']},

Docker Compose

Add the InfluxDB Publisher Agent container to your docker compose file:

ocs-influxdb-publisher:
  image: simonsobs/ocs:latest
  hostname: ocs-docker
  environment:
    - INSTANCE_ID=influxagent
    - INFLUXDB_USERNAME=admin       # optional
    - INFLUXDB_PASSWORD=password    # optional
  volumes:
    - ${OCS_CONFIG_DIR}:/config:ro

By default, InfluxDB HTTP authentication is not required. However, if needed, you can optionally pass credentials to use when connecting to your InfluxDB instance through the environment variables shown above or through a file, for instance if you would like to use Docker Secrets. A Docker Secrets example is shown below:

secrets:
  INFLUXDB_USERNAME:
    file: ${PWD}/secrets/INFLUXDB_USERNAME
  INFLUXDB_PASSWORD:
    file: ${PWD}/secrets/INFLUXDB_PASSWORD
services:
  ocs-influxdb-publisher:
    image: simonsobs/ocs:latest
    hostname: ocs-docker
    secrets:
      - INFLUXDB_USERNAME
      - INFLUXDB_PASSWORD
    environment:
      - INSTANCE_ID=influxagent
      - INFLUXDB_USERNAME_FILE=/run/secrets/INFLUXDB_USERNAME
      - INFLUXDB_PASSWORD_FILE=/run/secrets/INFLUXDB_PASSWORD
    volumes:
      - ${OCS_CONFIG_DIR}:/config:ro

Note

The INFLUXDB_USERNAME and INFLUXDB_PASSWORD variables will take precedence if they are also present when using the _FILE variables.

You will also need an instance of InfluxDB running somewhere on your network. This likely should go in a separate docker compose file so that it remains online at all times. An example compose file would look like:

services:
  influxdb:
    image: "influxdb:1.7"
    container_name: "influxdb"
    restart: always
    ports:
      - "8086:8086"
    environment:
      - INFLUXDB_HTTP_LOG_ENABLED=false
    volumes:
      - /srv/influxdb:/var/lib/influxdb

networks:
  default:
    external:
      name: ocs-net

Note

This separate docker compose file setup depends on having a docker network that connects your various docker compose files. On a single-node setup this can be accomplished with the network settings above in each docker compose file.

You then need to create the docker network with:

$ docker network create --driver bridge ocs-net

Containers on the network should then be able to communicate.

For more information about configuring Docker Compose files, see the Compose file reference.

Grafana

Once your InfluxDB container and publisher are configured and running you will need to create an InfluxDB data source in Grafana. To do so, we add an InfluxDB data source with the URL http://influxdb:8086, and the Database (default “ocs_feeds”, but this can be customized in your OCS config file.) The Name of the Data Source is up to you, in this example we set it to “OCS Feeds”.

Note

The “ocs_feeds” database (or whatever you choose to name the database) will not exist until the first time the InfluxDB Publisher Agent has successfully connected to the InfluxDB.

../_images/grafana_influxdb_data_source.jpg

In a dashboard, create a new panel. Each panel can have a different Data Source, which is selected at the top of the Metrics tab. Select our “OCS Feeds” data source. You’ll then see the rich query editor for your InfluxDB data source. Each OCS Agent shows up as a “measurement” (here “observatory.LSSIM” and “observatory.LSSIM2”). Each feed published by an agent is an InfluxDB tag (here “temperatures” is our only feed.) Finally, each field is available within the SELECT query.

../_images/grafana_influxdb_panel_example.jpg

For more information about using InfluxDB in Grafana, see the Grafana Documentation.

Agent API

class ocs.agents.influxdb_publisher.agent.InfluxDBAgent(agent, args)[source]

This class provide a WAMP wrapper for the data publisher. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.

Parameters:
  • agent (OCSAgent) – OCS Agent object

  • args (namespace) – args from the function’s argparser.

data_dir

Path to the base directory where data should be written.

Type:

path

aggregate

Specifies if the agent is currently aggregating data.

Type:

bool

incoming_data

Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Publisher.

Type:

queue.Queue

loop_time

Time between iterations of the run loop.

Type:

float

record()[source]

Process - This process will create an Publisher instance, which will collect and write provider data to disk as long as this process is running.

Parameters:

test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.

Supporting APIs

class ocs.agents.influxdb_publisher.agent.Publisher(host, database, incoming_data, port=8086, protocol='line', ssl=False, verify_ssl=False, gzip=False, operate_callback=None)[source]

Data publisher. This manages data to be published to the InfluxDB.

This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.

Parameters:
  • incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.

  • host (str) – host for InfluxDB instance.

  • database (str) – database name within InfluxDB to publish to

  • port (int, optional) – port for InfluxDB instance, defaults to 8086.

  • protocol (str, optional) – Protocol for writing data. Either ‘line’ or ‘json’.

  • ssl (bool, optional) – Use https instead of http to connect to InfluxDB, defaults to False.

  • verify_ssl (bool, optional) – Verify SSL certificates for HTTPS requests, defaults to False.

  • gzip (bool, optional) – compress influxdb requsts with gzip

  • operate_callback (callable, optional) – Function to call to see if failed connections should be retried (to prevent a thread from locking).

db

database name within InfluxDB to publish to (from database arg)

Type:

str

protocol

Protocol for writing data. Either ‘line’ or ‘json’.

Type:

str, optional

incoming_data

data to be published

client_args

arguments passed to InfluxDB client

client

InfluxDB client connection

process_incoming_data()[source]

Takes all data from the incoming_data queue, and writes them to the InfluxDB.

run()[source]

Main run iterator for the publisher. This processes all incoming data, removes stale providers, and writes active providers to disk.

close()[source]

Flushes all remaining data and closes InfluxDB connection.