Make a single live graph module

This commit is contained in:
Danny Staple 2022-03-21 18:56:54 +00:00
parent 929f36dc8c
commit 8700edb05c
5 changed files with 53 additions and 120 deletions

View File

@ -1,60 +0,0 @@
""" Turn JSON data stream into graphs"""
import requests
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
url = "http://192.168.1.128"
class SensorStream:
def __init__(self) -> None:
self.reset()
def reset(self):
self.error_values = []
self.pid_outputs = []
self.times = []
def sensor_stream(self, frame):
try:
response = requests.get(url, timeout=1)
except (
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
):
print("Waiting...")
return
print(f"Content: {response.content}")
print(f"status: {response.status_code}")
item = response.json()
print(f"Received: {item}")
if self.times and item["time"] < self.times[-1]:
self.reset()
self.times.append(item["time"])
self.error_values.append(item["last_value"])
self.pid_outputs.append(item["pid_output"])
if len(self.times) > 100:
self.times = self.times[-100:]
self.error_values = self.error_values[-100:]
self.pid_outputs = self.pid_outputs[-100:]
plt.cla() # clear axes.
# plot the items
plt.plot(self.times, self.error_values, label="error")
plt.plot(self.times, self.pid_outputs, label="pid")
plt.legend(loc="upper right")
def start(self):
# Create the animation. GFC - get current figure. random_stream - callback func.
self.ani = FuncAnimation(plt.gcf(), self.sensor_stream, interval=200)
plt.tight_layout()
plt.show()
SensorStream().start()

View File

@ -0,0 +1,53 @@
""" Turn JSON data stream into graphs"""
import requests
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
url = "http://192.168.1.128"
class AnimatedGraph:
def __init__(self):
self.fields = {}
self.samples = 100
self.reset()
def reset(self):
for field in self.fields:
self.fields[field] = []
def make_frame(self, frame):
try:
response = requests.get(url, timeout=1)
except requests.exceptions.RequestException:
print("Waiting...")
return
print(f"Content: {response.content}")
print(f"status: {response.status_code}")
item = response.json()
if 'time' in self.fields and item["time"] < self.fields['time'][-1]:
self.reset()
for field in item:
if field not in self.fields:
self.fields[field] = []
self.fields[field].append(item[field])
if len(self.fields['time'] ) > self.samples:
for field in self.fields:
self.fields[field] = self.fields[field][-self.samples:]
plt.cla() # clear axes.
# plot the items
for field in self.fields:
if field != "time":
plt.plot("time", field, data=self.fields)
plt.legend(loc="upper right")
# Create the animation. gcf - get current figure. random_stream - callback func.
animation = FuncAnimation(plt.gcf(), AnimatedGraph().make_frame, interval=200)
plt.tight_layout()
plt.show()

View File

@ -1,60 +0,0 @@
""" Turn JSON data stream into graphs"""
import requests
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
url = "http://192.168.1.128"
class SensorStream:
def __init__(self) -> None:
self.reset()
def reset(self):
self.error_values = []
self.pid_outputs = []
self.times = []
def sensor_stream(self, frame):
try:
response = requests.get(url, timeout=1)
except (
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
):
print("Waiting...")
return
print(f"Content: {response.content}")
print(f"status: {response.status_code}")
item = response.json()
print(f"Received: {item}")
if self.times and item["time"] < self.times[-1]:
self.reset()
self.times.append(item["time"])
self.error_values.append(item["last_value"])
self.pid_outputs.append(item["pid_output"])
if len(self.times) > 100:
self.times = self.times[-100:]
self.error_values = self.error_values[-100:]
self.pid_outputs = self.pid_outputs[-100:]
plt.cla() # clear axes.
# plot the items
plt.plot(self.times, self.error_values, label="error")
plt.plot(self.times, self.pid_outputs, label="pid")
plt.legend(loc="upper right")
def start(self):
# Create the animation. GFC - get current figure. random_stream - callback func.
self.ani = FuncAnimation(plt.gcf(), self.sensor_stream, interval=200)
plt.tight_layout()
plt.show()
SensorStream().start()