Made the sensor's data into an Object.

This commit is contained in:
nmascrie 2025-03-06 08:35:11 +01:00
parent 659f56e433
commit 6b2eaf2d3c
2 changed files with 157 additions and 31 deletions

123
Cube.py Normal file
View File

@ -0,0 +1,123 @@
"""Class to store cube's positional data"""
class Cube():
def __init__(self, json = None):
self._top = 0
self._bottom = 0
self._front = 0
self._back = 0
self._left = 0
self._right = 0
self._rota_x = 0
self._rota_y = 0
self._rota_z = 0
self.init_json(json)
def init_json(self, json):
"""Reads the JSON data from the sensors and adujsts the
data of the cube according to it.
Args:
json (dict): JSON dict containing the sensors data.
"""
if json is None:
return
for data in json:
match data:
case "Top":
self._top = int(json[data])
case "Bottom":
self._bottom = int(json[data])
case "Right":
self._right = int(json[data])
case "Left":
self._left = int(json[data])
case "Front":
self._front = int(json[data])
case "Back":
self._back = int(json[data])
case "X":
self._x = float(json[data])
case "Y":
self._y = float(json[data])
case "Z":
self._z = float(json[data])
def display(self):
"""Prints the cube's data."""
print(f"## Cube at : {self}\nTop \t: {self._top}cm\nBottom \t: {self._bottom}cm")
print(f"Front \t: {self._front}cm\nBack \t: {self._back}cm\nLeft \t: {self._left}cm")
print(f"Right \t: {self._right}cm\n## Gyroscope Data :")
print(f"x : {self._rota_x}, y : {self._rota_y}, z = {self.rota_z}")
@property
def top(self):
return self._top
@top.setter
def top(self, value):
self._top = value
@property
def bottom(self):
return self._bottom
@bottom.setter
def bottom(self, value):
self._bottom = value
@property
def front(self):
return self._front
@front.setter
def front(self, value):
self._front = value
@property
def back(self):
return self._back
@back.setter
def back(self, value):
self._back = value
@property
def left(self):
return self._left
@left.setter
def left(self, value):
self._left = value
@property
def right(self):
return self._right
@right.setter
def right(self, value):
self._right = value
@property
def rota_x(self):
return self._rota_x
@rota_x.setter
def rota_x(self, value):
self._rota_x = value
@property
def rota_y(self):
return self._rota_y
@rota_y.setter
def rota_y(self, value):
self._rota_y = value
@property
def rota_z(self):
return self._rota_z
@rota_z.setter
def rota_z(self, value):
self._rota_z = value

View File

@ -1,36 +1,38 @@
import serial
import requests
import json
import sys
from Cube import Cube
# Adjust this to match your serial port
SERIAL_PORT = "/dev/ttyACM0" # Linux/macOS (Check with `ls /dev/tty*`)
# SERIAL_PORT = "COM3" # Windows (Check with Device Manager)
BAUD_RATE = 115200
API_KEY = ""
API_URL = "https://openrouter.ai/api/v1/chat/completions"
def read_key():
"""Reads the API key from the .env file"""
global API_KEY
with open(".env") as f:
for line in f:
line = line.replace('\n', '')
if not line or line.startswith('#'):
continue
key, value = line.replace('export ', '', 1).strip().split('=', 1)
if key == "":
if key == "KEY":
API_KEY = value
def generate_response(prompt):
""""""
if not prompt:
return "J'ai RIEN COMPRIS, rdp moi en français steuuuuuuplait !"
return "-"
print("🤔 Génération de la réponse...")
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "deepseek/deepseek-chat:free",
"messages": [
@ -38,34 +40,35 @@ def generate_response(prompt):
{"role": "user", "content": prompt}
]
}
response = requests.post(url, json=data, headers=headers)
response = requests.post(API_URL, json=data, headers=headers)
if response.status_code == 200:
ai_response = response.json()["choices"][0]["message"]["content"]
print(f"🤖 IA : {ai_response}")
return ai_response
else:
error_message = f"Erreur API Mistral: {response.status_code} - {response.text}"
error_message = f"Erreur API DeepSeek: {response.status_code} - {response.text}"
print(f"{error_message}")
return error_message
# Open Serial Connection
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
print(f"Listening on {SERIAL_PORT}...")
if __name__ == "__main__":
# Open Serial Connection
read_key()
try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
except serial.serialutil.SerialException:
print("USB Serial not found. Exiting ...")
sys.exit(0)
print(f"Listening on {SERIAL_PORT}...")
while True:
while True:
try:
if ser.in_waiting > 0:
data = ser.readline().decode("utf-8").strip()
print("Received:", data)
# Validate JSON
try:
json_data = json.loads(data)
# Send data to HTTP server
print("Server response:", json_data)
cube = Cube(json_data)
cube.display()
except json.JSONDecodeError:
print("Invalid JSON received:", data)
@ -74,5 +77,5 @@ while True:
ser.close()
break
print(f"Testing AI")
answ = generate_response("t'as vote pour qui aux precedentes legislatives?")
print(f"Testing AI")
answ = generate_response("t'as vote pour qui aux precedentes legislatives?")