InfluxDB Publisher v2 Agent
The InfluxDB Publisher v2 Agent is an upgraded InfluxDB Publisher Agent that uses the influxdb-client package instead of influxdb package to work with InfluxDB v2.
usage: agent.py [-h] [--initial-state {idle,record}] [--protocol {json,line}]
[--gzip GZIP]
Agent Options
- --initial-state
Possible choices: idle, record
Initial state of argument parser. Can be either idle or record
Default:
'record'- --protocol
Possible choices: json, line
Protocol for writing data. Either ‘line’ or ‘json’.
Default:
'line'- --gzip
Use gzip content encoding to compress requests.
Default:
False
Configuration File Examples
Below are configuration examples for the ocs config file and for running the Agent in a docker container. Also included is an example for setting up Grafana to display data from InfluxDB.
OCS Site Config
Add an InfluxDBAgent to your OCS configuration file:
{'agent-class': 'InfluxDBAgentv2',
'instance-id': 'influxagent',
'arguments': ['--initial-state', 'record']},
Docker Compose
Add the InfluxDB Publisher v2 Agent container to your docker-compose file:
ocs-influxdb-publisher-v2:
image: simonsobs/ocs:latest
hostname: ocs-docker
environment:
- INSTANCE_ID=influxagent
volumes:
- ${OCS_CONFIG_DIR}:/config:ro
env_file:
- .env
Your .env file should contain credentials for the InfluxDB v2 database:
INFLUXDB_V2_URL=http://influxdb:8086
INFLUXDB_V2_ORG=ocs
INFLUXDB_V2_BUCKET=ocs_feeds
INFLUXDB_V2_TOKEN=<your-token>
You will also need an instance of InfluxDB v2 running somewhere on your network. This likely should go in a separate docker-compose file so that it remains online at all times. An example compose file would look like:
services:
influxdb:
image: "influxdb:2.7"
container_name: "influxdb"
restart: always
ports:
- "8086:8086"
environment:
- INFLUXDB_HTTP_LOG_ENABLED=false
volumes:
- /srv/influxdb2:/var/lib/influxdb2
networks:
default:
external:
name: ocs-net
Note
This separate docker-compose file setup depends on having a docker network that connects your various docker-compose files. On a single-node setup this can be accomplished with the network settings above in each docker-compose file.
You then need to create the docker network with:
$ docker network create --driver bridge ocs-net
Containers on the network should then be able to communicate.
For more information about configuring Docker Compose files, see the Compose file reference.
Database Migration
Follow instructions to Upgrade from InfluxDB 1.x to 2.7 with Docker.
Grafana
Once your InfluxDB v2 container and publisher are configured and running you will
need to create an InfluxDB data source in Grafana. To do so, we add an InfluxDB
data source with the URL http://influxdb:8086, and the Database
(default “ocs_feeds”, but this can be customized in your OCS config file.) The
Name of the Data Source is up to you, in this example we set it to “OCS Feeds”.
Note that if you migrated from InfluxDB 1.x to 2.7, this process is slightly
different from adding an InfluxDB 1.x data source, as the auth token is required.
Follow instructions to Configure your InfluxDB connection for InfluxDB v2.
Note
The “ocs_feeds” database (or whatever you choose to name the database) will not exist until the first time the InfluxDB Publisher Agent has successfully connected to the InfluxDB.
For more information about using InfluxDB in Grafana, see the Grafana Documentation.
Agent API
- class ocs.agents.influxdb_publisher_v2.agent.InfluxDBAgentv2(agent, args)[source]
This class provide a WAMP wrapper for the data publisher. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.
- Parameters:
agent (OCSAgent) – OCS Agent object
args (namespace) – args from the function’s argparser.
- data_dir
Path to the base directory where data should be written.
- Type:
path
- aggregate
Specifies if the agent is currently aggregating data.
- Type:
bool
- incoming_data
Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Publisher.
- Type:
queue.Queue
- loop_time
Time between iterations of the run loop.
- Type:
float
- record()[source]
Process - This process will create an Publisher instance, which will collect and write provider data to disk as long as this process is running.
- Parameters:
test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.
Notes
An example of the session data:
>>> response.session['data'] {'connected': True, 'last_updated': 1774389203.53926}
Supporting APIs
- class ocs.agents.influxdb_publisher_v2.agent.Publisher(incoming_data, protocol='line', gzip=False, operate_callback=None)[source]
Data publisher. This manages data to be published to the InfluxDB.
This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.
- Parameters:
incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.
database (str) – database name within InfluxDB to publish to
protocol (str, optional) – Protocol for writing data. Either ‘line’ or ‘json’.
gzip (bool, optional) – compress influxdb requsts with gzip
operate_callback (callable, optional) – Function to call to see if failed connections should be retried (to prevent a thread from locking).
- db
database name within InfluxDB to publish to (from database arg)
- Type:
str
- incoming_data
data to be published
- client
InfluxDB client connection
- connection
Connection state tracking object.
- Type:
Connection
- process_incoming_data()[source]
Takes all data from the incoming_data queue, and writes them to the InfluxDB.