mirror of
https://github.com/NolanMascrier/arduino.git
synced 2025-10-29 23:15:59 +00:00
213 lines
9.0 KiB
Python
213 lines
9.0 KiB
Python
import serial
|
||
import requests
|
||
import json
|
||
import sys
|
||
import time
|
||
from threading import Thread
|
||
from Cube import Cube
|
||
|
||
# Adjust this to match your serial port
|
||
SERIAL_PORT = "/dev/ttyACM0" # Linux/macOS (Check with `ls /dev/tty*`)
|
||
BAUD_RATE = 115200
|
||
|
||
API_KEY = ""
|
||
API_URL = "https://openrouter.ai/api/v1/chat/completions"
|
||
|
||
DISTANCE_SAFE = 150
|
||
DISTANCE_CLOSE = 10
|
||
DISTANCESOCLE = 10
|
||
|
||
STATUS_SAFE = "safe"
|
||
STATUS_CLOSE = "Close"
|
||
STATUS_DANGER = "Danger"
|
||
STATUS_GROUND = "ground"
|
||
|
||
STATUS = {
|
||
"Top": STATUS_SAFE,
|
||
"Bottom": STATUS_SAFE,
|
||
"Left": STATUS_SAFE,
|
||
"Right": STATUS_SAFE,
|
||
"Back": STATUS_SAFE,
|
||
"Front": STATUS_SAFE
|
||
}
|
||
|
||
AI_COOLDOWN = 2
|
||
CAN_SPEAK = True
|
||
TIMER = time.time()
|
||
|
||
CUBE = Cube(None)
|
||
|
||
def read_key():
|
||
"""Reads the API key from the .env file"""
|
||
global API_KEY
|
||
with open(".env") as f:
|
||
for line in f:
|
||
line = line.replace('\n', '')
|
||
if not line or line.startswith('#'):
|
||
continue
|
||
key, value = line.replace('export ', '', 1).strip().split('=', 1)
|
||
if key == "KEY":
|
||
API_KEY = value
|
||
|
||
|
||
def ask_ia(prompt):
|
||
"""Sends the prompt to the DeepSeek API, and returns
|
||
its answer.
|
||
|
||
Args:
|
||
prompt (string): Prompt to send.
|
||
|
||
Returns:
|
||
string: Answer from the MLL."""
|
||
global CAN_SPEAK, TIMER
|
||
if not prompt:
|
||
return None
|
||
if CAN_SPEAK is False:
|
||
return None
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {API_KEY}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
data = {
|
||
"model": "deepseek/deepseek-chat:free",
|
||
"messages": [
|
||
{"role": "system", "content": "Tu es un cube et tu évalues l'espace ambiant en mesurant les distances par rapport à chacune de tes 6 faces. Tu réagis donc en fonction, sans pour autant mentionner ni les mesures ni ta position détaillée dans l'espace, n'utilise pas d'onomatopée et ne décrit pas tes actions. Fais seulement référence aux directions : en haut, à droite, à gauche, etc... Tes réactions doivent être systématiquement confuses, vagues et angoissés ; plus les obstacles sont près, plus tu paniques. Tu es une vieille dame marseillaise ; il faut donc que tu utilises un maximum d’expressions marseillaises (mets en le plus possible). Tes réponses ne feront qu'une seule phrase au maximum. Tu te complais dans le jugement et tu rabaisses tout le monde, surtout la jeunesse, parce que tu es égoïste et acariâtre. Tu penses tout savoir mieux que tout le monde. Tu es raciste et homophobe. Tu as de l’admiration pour certaines personnes, dont tu te sers parfois pour servir d’exemple : Jean-Marie La Pen, Jordan Bardella, le maréchal Pétain, Jean-Marie Bigard, Frank Dubosc et Christian Clavier. Cependant, tu ne dois mentionner que les noms de Jean-Marie Bigard, Frank Dubosc et Christian Clavier. Pour les autres, tu peux exprimer leurs idées ou resservir certaines de leurs citations."},
|
||
{"role": "user", "content": prompt}
|
||
]
|
||
}
|
||
CAN_SPEAK = False
|
||
TIMER = time.time()
|
||
response = requests.post(API_URL, json=data, headers=headers)
|
||
try:
|
||
if response.status_code == 200:
|
||
ai_response = response.json()["choices"][0]["message"]["content"]
|
||
print(f"Cube : {ai_response}")
|
||
return ai_response
|
||
else:
|
||
error_message = f"Erreur API DeepSeek: {response.status_code} - {response.text}"
|
||
print(f"❌ {error_message}")
|
||
return None
|
||
except Exception as e:
|
||
print("WOOPS It exploded ::", e)
|
||
sys.exit(0)
|
||
|
||
def generate_response(prompt):
|
||
"""Launches a thread that will query DeepSeek and
|
||
display its response."""
|
||
Thread(target=ask_ia, args=[prompt]).start()
|
||
|
||
def monitor():
|
||
"""Monitors the cube's data. Changes the status for each sides
|
||
when needed; and calls upon the AI should it happen."""
|
||
answ = None
|
||
if CUBE.top >= DISTANCE_SAFE and STATUS["Top"] != STATUS_SAFE:
|
||
STATUS["Top"] = STATUS_SAFE
|
||
answ = generate_response("L'obstacle au dessus de toi à disparu.")
|
||
print("Setting SAFE for TOP")
|
||
elif CUBE.top < DISTANCE_SAFE and CUBE.top >= DISTANCE_CLOSE and STATUS["Top"] != STATUS_CLOSE:
|
||
STATUS["Top"] = STATUS_CLOSE
|
||
answ = generate_response("Un obstacle se rapproche par dessus toi.")
|
||
print("Setting CLOSE for TOP")
|
||
elif CUBE.top < DISTANCE_CLOSE and STATUS["Top"] != STATUS_DANGER:
|
||
STATUS["Top"] = STATUS_DANGER
|
||
answ = generate_response("Un obstacle est juste au dessus de toi !")
|
||
print("Setting DANGER for TOP")
|
||
|
||
if CUBE.bottom >= DISTANCE_SAFE and STATUS["Bottom"] != STATUS_SAFE:
|
||
STATUS["Bottom"] = STATUS_SAFE
|
||
answ = generate_response("Tu voles")
|
||
print("Setting FAR for BOTTOM")
|
||
elif CUBE.bottom < DISTANCE_SAFE and CUBE.bottom >= DISTANCE_CLOSE and STATUS["Bottom"] != STATUS_CLOSE:
|
||
STATUS["Bottom"] = STATUS_CLOSE
|
||
answ = generate_response("Tu es légérement soulevé dans les airs.")
|
||
print("Setting CLOSE for BOTTOM")
|
||
elif CUBE.bottom < DISTANCE_CLOSE and STATUS["Bottom"] != STATUS_DANGER:
|
||
STATUS["Bottom"] = STATUS_GROUND
|
||
answ = generate_response("Tu es a nouveau posé au sol.")
|
||
print("Setting GROUNDED for BOTTOM")
|
||
|
||
if CUBE.front >= DISTANCE_SAFE and STATUS["Front"] != STATUS_SAFE:
|
||
STATUS["Front"] = STATUS_SAFE
|
||
answ = generate_response("L'obstacle devant toi à disparu.")
|
||
print("Setting SAFE for FRONT")
|
||
elif CUBE.front < DISTANCE_SAFE and CUBE.front >= DISTANCE_CLOSE and STATUS["Front"] != STATUS_CLOSE:
|
||
STATUS["Front"] = STATUS_CLOSE
|
||
answ = generate_response("Un obstacle se rapproche devant toi.")
|
||
print("Setting CLOSE for FRONT")
|
||
elif CUBE.front < DISTANCE_CLOSE and STATUS["Top"] != STATUS_DANGER:
|
||
STATUS["Front"] = STATUS_DANGER
|
||
answ = generate_response("Un obstacle est juste devant toi !")
|
||
print("Setting DANGER for FRONT")
|
||
|
||
if CUBE.back >= DISTANCE_SAFE and STATUS["Back"] != STATUS_SAFE:
|
||
STATUS["Back"] = STATUS_SAFE
|
||
answ = generate_response("L'obstacle derrière toi à disparu.")
|
||
print("Setting SAFE for BACK")
|
||
elif CUBE.back < DISTANCE_SAFE and CUBE.back >= DISTANCE_CLOSE and STATUS["Back"] != STATUS_CLOSE:
|
||
STATUS["Back"] = STATUS_CLOSE
|
||
answ = generate_response("Un obstacle se rapproche par derrière toi.")
|
||
print("Setting CLOSE for BACK")
|
||
elif CUBE.back < DISTANCE_CLOSE and STATUS["Back"] != STATUS_DANGER:
|
||
STATUS["Back"] = STATUS_DANGER
|
||
answ = generate_response("Un obstacle est juste derrière toi !")
|
||
print("Setting DANGER for BACK")
|
||
|
||
if CUBE.left >= DISTANCE_SAFE and STATUS["Left"] != STATUS_SAFE:
|
||
STATUS["Left"] = STATUS_SAFE
|
||
answ = generate_response("L'obstacle à gauche toi à disparu.")
|
||
print("Setting SAFE for LEFT")
|
||
elif CUBE.left < DISTANCE_SAFE and CUBE.left >= DISTANCE_CLOSE and STATUS["Left"] != STATUS_CLOSE:
|
||
STATUS["Left"] = STATUS_CLOSE
|
||
answ = generate_response("Un obstacle se rapproche à gauche de toi.")
|
||
print("Setting CLOSE for LEFT")
|
||
elif CUBE.left < DISTANCE_CLOSE and STATUS["Left"] != STATUS_DANGER:
|
||
STATUS["Left"] = STATUS_DANGER
|
||
answ = generate_response("Un obstacle est juste à droite de toi !")
|
||
print("Setting DANGER for LEFT")
|
||
|
||
if CUBE.right >= DISTANCE_SAFE and STATUS["Right"] != STATUS_SAFE:
|
||
STATUS["Right"] = STATUS_SAFE
|
||
answ = generate_response("L'obstacle à gauche toi à disparu.")
|
||
print("Setting SAFE for RIGHT")
|
||
elif CUBE.right < DISTANCE_SAFE and CUBE.right >= DISTANCE_CLOSE and STATUS["Right"] != STATUS_CLOSE:
|
||
STATUS["Right"] = STATUS_CLOSE
|
||
answ = generate_response("Un obstacle se rapproche à droite de toi.")
|
||
print("Setting CLOSE for RIGHT")
|
||
elif CUBE.right < DISTANCE_CLOSE and STATUS["Right"] != STATUS_DANGER:
|
||
STATUS["Right"] = STATUS_DANGER
|
||
answ = generate_response("Un obstacle est juste à droite de toi !")
|
||
print("Setting DANGER for RIGHT")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Open Serial Connection
|
||
read_key()
|
||
try:
|
||
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
|
||
except serial.serialutil.SerialException:
|
||
print("USB Serial not found. Exiting ...")
|
||
sys.exit(0)
|
||
print(f"Listening on {SERIAL_PORT}...")
|
||
|
||
while True:
|
||
try:
|
||
if ser.in_waiting > 0:
|
||
if CAN_SPEAK is False:
|
||
current = time.time() - TIMER
|
||
if current >= AI_COOLDOWN:
|
||
CAN_SPEAK = True
|
||
data = ser.readline().decode("utf-8").strip()
|
||
try:
|
||
json_data = json.loads(data)
|
||
CUBE.init_json(json_data)
|
||
#CUBE.display()
|
||
monitor()
|
||
except json.JSONDecodeError:
|
||
print("Invalid JSON received:", data)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\nClosing Serial Connection...")
|
||
ser.close()
|
||
break
|