91 lines
2.9 KiB
Python
Executable File
91 lines
2.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import datetime
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from dataclasses import dataclass, field, fields, asdict
|
|
|
|
from pprint import pprint
|
|
|
|
from psycopg.sql import SQL, Identifier, Placeholder
|
|
|
|
import PyNUT
|
|
import psycopg
|
|
|
|
def utcnow():
|
|
return datetime.datetime.now(tz=datetime.timezone.utc)
|
|
|
|
@dataclass
|
|
class UPS():
|
|
ups_id: int
|
|
timestamp: datetime.datetime = field(default_factory=utcnow)
|
|
battery_charge: float | None = None
|
|
battery_runtime: float | None = None
|
|
battery_temperature: float | None = None
|
|
battery_voltage: float | None = None
|
|
input_voltage: float | None = None
|
|
output_current: float | None = None
|
|
output_voltage: float | None = None
|
|
output_frequency: float | None = None
|
|
ups_load: float | None = None
|
|
ups_status: str | None = None
|
|
|
|
|
|
class LogNutUps:
|
|
def __init__(self):
|
|
self.conninfo = os.environ['DB_CONNINFO']
|
|
|
|
def connect(self) -> psycopg.Connection:
|
|
return psycopg.connect(conninfo=self.conninfo)
|
|
|
|
def get_ups(self, ups="ups") -> UPS:
|
|
nut_client = PyNUT.PyNUTClient(host='192.168.122.10')
|
|
result = nut_client.GetUPSVars(ups=ups)
|
|
return UPS(
|
|
ups_id=1,
|
|
battery_charge=float(result.get(b'battery.charge', 0.0)),
|
|
battery_runtime=float(result.get(b'battery.runtime', 0.0)),
|
|
battery_temperature=float(result.get(b'battery.temperature', 0.0)),
|
|
battery_voltage=float(result.get(b'battery.voltage', 0.0)),
|
|
input_voltage=float(result.get(b'input.voltage', 0.0)),
|
|
output_current=float(result.get(b'output.current', 0.0)),
|
|
output_frequency=float(result.get(b'output.frequency', 0.0)),
|
|
output_voltage=float(result.get(b'output.voltage', 0.0)),
|
|
ups_load=float(result.get(b'ups.load', 0.0)),
|
|
ups_status=result.get(b'ups.status', b'').decode('utf-8', errors='ignore')
|
|
)
|
|
def insert_measurement(self, ups: UPS):
|
|
with self.connect() as conn, conn.cursor() as cursor:
|
|
columns = []
|
|
placeholders = []
|
|
data = asdict(ups)
|
|
for key in data:
|
|
if key in {}:
|
|
continue
|
|
columns.append(Identifier(key))
|
|
placeholders.append(Placeholder(key))
|
|
columns = SQL(", ").join(columns)
|
|
placeholders = SQL(", ").join(placeholders)
|
|
query = SQL(
|
|
"insert into public.nut ({columns}) values ({placeholders})"
|
|
).format(columns=columns, placeholders=placeholders)
|
|
cursor.execute(query, data)
|
|
|
|
def main():
|
|
print("Running Log NUT UPS")
|
|
ups = LogNutUps()
|
|
while True:
|
|
data = ups.get_ups()
|
|
ups.insert_measurement(data)
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
while True:
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
traceback.print_exc(file=sys.stdout)
|
|
time.sleep(1)
|