import os
import time
from dataclasses import dataclass, asdict
import txaio
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
from requests.exceptions import ConnectionError as RequestsConnectionError
from ocs.common.influxdb_drivers import format_data
# For logging
txaio.use_twisted()
LOG = txaio.make_logger()
def _get_credentials():
"""Read credentials from environment variable or file.
Reads from either `INFLUXDB_USERNAME`, `INFLUXDB_PASSWORD`,
`INFLUXDB_USERNAME_FILE`, or `INFLUXDB_PASSWORD_FILE`. Precedence is given
to the non-`_FILE` variables.
Returns:
A tuple of (username, password). Defaults to ('root', 'root') if none
of the environment variables are present.
"""
username_file = os.environ.get('INFLUXDB_USERNAME_FILE')
password_file = os.environ.get('INFLUXDB_PASSWORD_FILE')
username = None
password = None
if username_file is not None:
with open(username_file, 'r', encoding="utf-8") as f:
username = f.read().rstrip('\r\n')
if password_file is not None:
with open(password_file, 'r', encoding="utf-8") as f:
password = f.read().rstrip('\r\n')
username_default = 'root' if username is None else username
password_default = 'root' if password is None else password
username = os.environ.get('INFLUXDB_USERNAME', username_default)
password = os.environ.get('INFLUXDB_PASSWORD', password_default)
return username, password
@dataclass
class _InfluxDBClientArgs:
"""Object to hold arguments passed to InfluxDBClient.
https://influxdb-python.readthedocs.io/en/latest/api-documentation.html#influxdb.InfluxDBClient
"""
host: str
port: int
username: str
password: str
ssl: bool
verify_ssl: bool
gzip: bool
[docs]
class Publisher:
"""
Data publisher. This manages data to be published to the InfluxDB.
This class should only be accessed by a single thread. Data can be passed
to it by appending it to the referenced `incoming_data` queue.
Args:
incoming_data (queue.Queue):
A thread-safe queue of (data, feed) pairs.
host (str):
host for InfluxDB instance.
database (str):
database name within InfluxDB to publish to
port (int, optional):
port for InfluxDB instance, defaults to 8086.
protocol (str, optional):
Protocol for writing data. Either 'line' or 'json'.
ssl (bool, optional):
Use https instead of http to connect to InfluxDB, defaults to False.
verify_ssl (bool, optional):
Verify SSL certificates for HTTPS requests, defaults to False.
gzip (bool, optional):
compress influxdb requsts with gzip
operate_callback (callable, optional):
Function to call to see if failed connections should be
retried (to prevent a thread from locking).
Attributes:
db (str):
database name within InfluxDB to publish to (from database arg)
protocol (str, optional):
Protocol for writing data. Either 'line' or 'json'.
incoming_data:
data to be published
client_args:
arguments passed to InfluxDB client
client:
InfluxDB client connection
connected (bool):
True if connected to InfluxDB, False if not.
"""
def __init__(self,
host,
database,
incoming_data,
port=8086,
protocol='line',
ssl=False,
verify_ssl=False,
gzip=False,
operate_callback=None):
self.db = database
self.incoming_data = incoming_data
self.protocol = protocol
print(f"gzip encoding enabled: {gzip}")
print(f"data protocol: {protocol}")
username, password = _get_credentials()
self.client_args = _InfluxDBClientArgs(
host=host,
port=port,
username=username,
password=password,
ssl=ssl,
verify_ssl=verify_ssl,
gzip=gzip)
self.client = InfluxDBClient(**asdict(self.client_args))
db_list = None
# ConnectionError here is indicative of InfluxDB being down
while db_list is None:
try:
db_list = self.client.get_list_database()
except RequestsConnectionError:
LOG.error("Connection error, attempting to reconnect to DB.")
self.client = InfluxDBClient(**asdict(self.client_args))
time.sleep(1)
except InfluxDBClientError as err:
if err.code == 401:
LOG.error("Failed to authenticate. Check your credentials.")
else:
LOG.error(f"Unknown client error: {err}")
time.sleep(1)
if operate_callback and not operate_callback():
break
if db_list is None:
LOG.error("No databases found. Check connection to InfluxDB.")
raise ConnectionError
self.connected = True
db_names = [x['name'] for x in db_list]
if self.db not in db_names:
print(f"{self.db} DB doesn't exist, creating DB")
self.client.create_database(self.db)
self.client.switch_database(self.db)
[docs]
def process_incoming_data(self):
"""
Takes all data from the incoming_data queue, and writes them to the
InfluxDB.
"""
payload = []
LOG.debug("Pulling data from queue.")
while not self.incoming_data.empty():
data, feed = self.incoming_data.get()
if feed['agg_params'].get('exclude_influx', False):
continue
# Formatted for writing to InfluxDB
payload.extend(format_data(data, feed, protocol=self.protocol))
# Skip trying to write if payload is empty
if not payload:
return
try:
LOG.debug("payload: {p}", p=payload)
self.client.write_points(payload,
batch_size=10000,
protocol=self.protocol,
)
if not self.connected:
self.connected = True
LOG.info("Reconnected to InfluxDB!")
LOG.debug("wrote payload to influx")
except RequestsConnectionError:
LOG.error("InfluxDB unavailable, attempting to reconnect.")
self.connected = False
self.client = InfluxDBClient(**asdict(self.client_args))
self.client.switch_database(self.db)
except InfluxDBClientError as err:
LOG.error("InfluxDB Client Error: {e}", e=err)
self.connected = False
except InfluxDBServerError as err:
LOG.error("InfluxDB Server Error: {e}", e=err)
self.connected = False
[docs]
def run(self):
"""Main run iterator for the publisher. This processes all incoming
data, removes stale providers, and writes active providers to disk.
"""
self.process_incoming_data()
[docs]
def close(self):
"""Flushes all remaining data and closes InfluxDB connection."""
pass