InfluxDB Publisher Agent

The InfluxDB Publisher Agent acts like the OCS Aggregator, but instead of writing to file it will publish all recorded OCS data feeds to an InfluxDB instance running somewhere on the network.

usage: agent.py [-h] [--initial-state {idle,record}] [--host HOST]
                [--port PORT] [--database DATABASE] [--protocol {json,line}]
                [--gzip GZIP]

Agent Options

--initial-state

Possible choices: idle, record

Initial state of argument parser. Can be either idle or record

Default: “record”

--host

InfluxDB host address.

Default: “influxdb”

--port

InfluxDB port.

Default: 8086

--database

Database within InfluxDB to publish data to.

Default: “ocs_feeds”

--protocol

Possible choices: json, line

Protocol for writing data. Either ‘line’ or ‘json’.

Default: “line”

--gzip

Use gzip content encoding to compress requests.

Default: False

Configuration File Examples

Below are configuration examples for the ocs config file and for running the Agent in a docker container. Also included is an example for setting up Grafana to display data from InfluxDB.

OCS Site Config

Add an InfluxDBAgent to your OCS configuration file:

{'agent-class': 'InfluxDBAgent',
 'instance-id': 'influxagent',
 'arguments': [['--initial-state', 'record'],
               ['--host', 'influxdb'],
               ['--port', 8086],
               ['--protocol', 'line'],
               ['--gzip', True],
               ['--database', 'ocs_feeds']]},

Docker Compose

Add the InfluxDB Publisher Agent container to your docker compose file:

ocs-influxdb-publisher:
  image: simonsobs/ocs:latest
  hostname: ocs-docker
  environment:
    - INSTANCE_ID=influxagent
  volumes:
    - ${OCS_CONFIG_DIR}:/config:ro

You will also need an instance of InfluxDB running somewhere on your network. This likely should go in a separate docker compose file so that it remains online at all times. An example compose file would look like:

version: '3.7'
services:
  influxdb:
    image: "influxdb:1.7"
    container_name: "influxdb"
    restart: always
    ports:
      - "8086:8086"
    environment:
      - INFLUXDB_HTTP_LOG_ENABLED=false
    volumes:
      - /srv/influxdb:/var/lib/influxdb

networks:
  default:
    external:
      name: ocs-net

Note

This separate docker compose file setup depends on having a docker network that connects your various docker compose files. On a single-node setup this can be accomplished with the network settings above in each docker compose file.

You then need to create the docker network with:

$ docker network create --driver bridge ocs-net

Containers on the network should then be able to communicate.

For more information about configuring Docker Compose files, see the Compose file reference.

Grafana

Once your InfluxDB container and publisher are configured and running you will need to create an InfluxDB data source in Grafana. To do so, we add an InfluxDB data source with the URL http://influxdb:8086, and the Database (default “ocs_feeds”, but this can be customized in your OCS config file.) The Name of the Data Source is up to you, in this example we set it to “OCS Feeds”.

Note

The “ocs_feeds” database (or whatever you choose to name the database) will not exist until the first time the InfluxDB Publisher Agent has successfully connected to the InfluxDB.

../_images/grafana_influxdb_data_source.jpg

In a dashboard, create a new panel. Each panel can have a different Data Source, which is selected at the top of the Metrics tab. Select our “OCS Feeds” data source. You’ll then see the rich query editor for your InfluxDB data source. Each OCS Agent shows up as a “measurement” (here “observatory.LSSIM” and “observatory.LSSIM2”). Each feed published by an agent is an InfluxDB tag (here “temperatures” is our only feed.) Finally, each field is available within the SELECT query.

../_images/grafana_influxdb_panel_example.jpg

For more information about using InfluxDB in Grafana, see the Grafana Documentation.

Agent API

class ocs.agents.influxdb_publisher.agent.InfluxDBAgent(agent, args)[source]

This class provide a WAMP wrapper for the data publisher. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.

Parameters:
  • agent (OCSAgent) – OCS Agent object

  • args (namespace) – args from the function’s argparser.

data_dir

Path to the base directory where data should be written.

Type:

path

aggregate

Specifies if the agent is currently aggregating data.

Type:

bool

incoming_data

Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Publisher.

Type:

queue.Queue

loop_time

Time between iterations of the run loop.

Type:

float

record()[source]

Process - This process will create an Publisher instance, which will collect and write provider data to disk as long as this process is running.

Parameters:

test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.

Supporting APIs

class ocs.agents.influxdb_publisher.agent.Publisher(host, database, incoming_data, port=8086, protocol='line', gzip=False, operate_callback=None)[source]

Data publisher. This manages data to be published to the InfluxDB.

This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.

Parameters:
  • incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.

  • host (str) – host for InfluxDB instance.

  • database (str) – database name within InfluxDB to publish to

  • port (int, optional) – port for InfluxDB instance, defaults to 8086.

  • protocol (str, optional) – Protocol for writing data. Either ‘line’ or ‘json’.

  • gzip (bool, optional) – compress influxdb requsts with gzip

  • operate_callback (callable, optional) – Function to call to see if failed connections should be retried (to prevent a thread from locking).

host

host for InfluxDB instance.

Type:

str

port

port for InfluxDB instance, defaults to 8086.

Type:

int, optional

db

database name within InfluxDB to publish to (from database arg)

Type:

str

incoming_data

data to be published

client

InfluxDB client connection

process_incoming_data()[source]

Takes all data from the incoming_data queue, and writes them to the InfluxDB.

static format_data(data, feed, protocol)[source]

Format the data from an OCS feed into a dict for pushing to InfluxDB.

The scheme here is as follows:
  • agent_address is the “measurement” (conceptually like an SQL table)

  • feed names are an indexed “tag” on the data structure (effectively a table column)

  • keys within an OCS block’s ‘data’ dictionary are the field names (effectively a table column)

Parameters:
  • data (dict) – data from the OCS Feed subscription

  • feed (dict) – feed from the OCS Feed subscription, contains feed information used to structure our influxdb query

  • protocol (str) – Protocol for writing data. Either ‘line’ or ‘json’.

Returns:

Data ready to publish to influxdb, in the specified protocol.

Return type:

list

run()[source]

Main run iterator for the publisher. This processes all incoming data, removes stale providers, and writes active providers to disk.

close()[source]

Flushes all remaining data and closes InfluxDB connection.