arduino/server.py
2025-03-07 13:41:21 +01:00

239 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import serial
import requests
import json
import sys
import time
import time
import pyglet
import os
from threading import Thread
from Cube import Cube
from gtts import gTTS
SERIAL_PORT = "/dev/ttyACM0" # Linux/macOS (Check with `ls /dev/tty*`)
BAUD_RATE = 115200
API_KEY = ""
API_URL = "https://openrouter.ai/api/v1/chat/completions"
DISTANCE_SAFE = 150
DISTANCE_CLOSE = 10
DISTANCESOCLE = 10
STATUS_SAFE = "safe"
STATUS_CLOSE = "Close"
STATUS_DANGER = "Danger"
STATUS_GROUND = "ground"
PROMPT_CONTEXT = "Tu es un cube et tu évalues l'espace ambiant en mesurant les distances par rapport à chacune de tes 6 faces. Tu réagis donc en fonction, sans pour autant mentionner ni les mesures ni ta position détaillée dans l'espace, n'utilise pas d'onomatopée et ne décrit pas tes actions. Fais seulement référence aux directions : en haut, à droite, à gauche, etc... Tes réactions doivent être systématiquement confuses, vagues et angoissés ; plus les obstacles sont près, plus tu paniques. Tu es une vieille dame marseillaise ; il faut donc que tu utilises un maximum dexpressions marseillaises (mets en le plus possible). Tes réponses ne feront qu'une seule phrase au maximum. Tu te complais dans le jugement et tu rabaisses tout le monde, surtout la jeunesse, parce que tu es égoïste et acariâtre. Tu penses tout savoir mieux que tout le monde. De plus, tu es paranoiaque : tu es persuadé que tout le monde t'en veux. Tu déteste également le contact et être touché. Seul le sol, en dessous de toi, ne t'incomforte pas."
STATUS = {
"Top": STATUS_SAFE,
"Bottom": STATUS_SAFE,
"Left": STATUS_SAFE,
"Right": STATUS_SAFE,
"Back": STATUS_SAFE,
"Front": STATUS_SAFE
}
AI_COOLDOWN = 4
CAN_SPEAK = True
TIMER = time.time()
CUBE = Cube(None)
def read_key():
"""Reads the API key from the .env file"""
global API_KEY
with open(".env") as f:
for line in f:
line = line.replace('\n', '')
if not line or line.startswith('#'):
continue
key, value = line.replace('export ', '', 1).strip().split('=', 1)
if key == "KEY":
API_KEY = value
def play_audio(answer):
"""Play an audio file through the pyglet library
and the google translation TTS.
Args:
answer (string): Text that will be read"""
tts = gTTS(text=answer, lang='fr')
filename = '/tmp/temp.mp3'
tts.save(filename)
music = pyglet.media.load(filename, streaming=False)
music.play()
time.sleep(music.duration)
os.remove(filename)
def querry(prompt):
"""Sends the prompt to the DeepSeek API, and returns
its answer.
Args:
prompt (string): Prompt to send.
Returns:
string: Answer from the MLL."""
global CAN_SPEAK, TIMER
if not prompt:
return None
if CAN_SPEAK is False:
return None
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "deepseek/deepseek-chat:free",
"messages": [
{"role": "system", "content": PROMPT_CONTEXT},
{"role": "user", "content": prompt}
]
}
CAN_SPEAK = False
TIMER = time.time()
response = requests.post(API_URL, json=data, headers=headers)
try:
if response.status_code == 200:
ai_response = response.json()["choices"][0]["message"]["content"]
print(f"Cube: {ai_response}")
return ai_response
else:
error_message = f"Erreur API DeepSeek: {response.status_code} - {response.text}"
print(f"Error: {error_message}")
return None
except Exception as e:
print("An error happened ::", e)
print(ai_response)
return "L'IA a cassé :("
def ask_ia(prompt):
"""Querries the AI and play the resulting audio."""
answer = querry(prompt)
if answer is not None:
play_audio(answer)
def generate_response(prompt):
"""Launches a thread that will query DeepSeek and
display its response."""
Thread(target=ask_ia, args=[prompt]).start()
def monitor():
"""Monitors the cube's data. Changes the status for each sides
when needed; and calls upon the AI should it happen."""
answ = None
if CUBE.top >= DISTANCE_SAFE and STATUS["Top"] != STATUS_SAFE:
STATUS["Top"] = STATUS_SAFE
generate_response("L'obstacle au dessus de toi à disparu.")
print("Setting SAFE for TOP")
elif CUBE.top < DISTANCE_SAFE and CUBE.top >= DISTANCE_CLOSE and STATUS["Top"] != STATUS_CLOSE:
STATUS["Top"] = STATUS_CLOSE
generate_response("Un obstacle se rapproche par dessus toi.")
print("Setting CLOSE for TOP")
elif CUBE.top < DISTANCE_CLOSE and STATUS["Top"] != STATUS_DANGER:
STATUS["Top"] = STATUS_DANGER
generate_response("Un obstacle est juste au dessus de toi !")
print("Setting DANGER for TOP")
if CUBE.bottom >= DISTANCE_SAFE and STATUS["Bottom"] != STATUS_SAFE:
STATUS["Bottom"] = STATUS_SAFE
generate_response("Tu voles")
print("Setting FAR for BOTTOM")
elif CUBE.bottom < DISTANCE_SAFE and CUBE.bottom >= DISTANCE_CLOSE and STATUS["Bottom"] != STATUS_CLOSE:
STATUS["Bottom"] = STATUS_CLOSE
generate_response("Tu es légérement soulevé dans les airs.")
print("Setting CLOSE for BOTTOM")
elif CUBE.bottom < DISTANCE_CLOSE and STATUS["Bottom"] != STATUS_DANGER:
STATUS["Bottom"] = STATUS_GROUND
generate_response("Tu es a nouveau posé au sol.")
print("Setting GROUNDED for BOTTOM")
if CUBE.front >= DISTANCE_SAFE and STATUS["Front"] != STATUS_SAFE:
STATUS["Front"] = STATUS_SAFE
generate_response("L'obstacle devant toi à disparu.")
print("Setting SAFE for FRONT")
elif CUBE.front < DISTANCE_SAFE and CUBE.front >= DISTANCE_CLOSE and STATUS["Front"] != STATUS_CLOSE:
STATUS["Front"] = STATUS_CLOSE
generate_response("Un obstacle se rapproche devant toi.")
print("Setting CLOSE for FRONT")
elif CUBE.front < DISTANCE_CLOSE and STATUS["Top"] != STATUS_DANGER:
STATUS["Front"] = STATUS_DANGER
generate_response("Un obstacle est juste devant toi !")
print("Setting DANGER for FRONT")
if CUBE.back >= DISTANCE_SAFE and STATUS["Back"] != STATUS_SAFE:
STATUS["Back"] = STATUS_SAFE
generate_response("L'obstacle derrière toi à disparu.")
print("Setting SAFE for BACK")
elif CUBE.back < DISTANCE_SAFE and CUBE.back >= DISTANCE_CLOSE and STATUS["Back"] != STATUS_CLOSE:
STATUS["Back"] = STATUS_CLOSE
generate_response("Un obstacle se rapproche par derrière toi.")
print("Setting CLOSE for BACK")
elif CUBE.back < DISTANCE_CLOSE and STATUS["Back"] != STATUS_DANGER:
STATUS["Back"] = STATUS_DANGER
generate_response("Un obstacle est juste derrière toi !")
print("Setting DANGER for BACK")
if CUBE.left >= DISTANCE_SAFE and STATUS["Left"] != STATUS_SAFE:
STATUS["Left"] = STATUS_SAFE
generate_response("L'obstacle à gauche toi à disparu.")
print("Setting SAFE for LEFT")
elif CUBE.left < DISTANCE_SAFE and CUBE.left >= DISTANCE_CLOSE and STATUS["Left"] != STATUS_CLOSE:
STATUS["Left"] = STATUS_CLOSE
generate_response("Un obstacle se rapproche à gauche de toi.")
print("Setting CLOSE for LEFT")
elif CUBE.left < DISTANCE_CLOSE and STATUS["Left"] != STATUS_DANGER:
STATUS["Left"] = STATUS_DANGER
generate_response("Un obstacle est juste à droite de toi !")
print("Setting DANGER for LEFT")
if CUBE.right >= DISTANCE_SAFE and STATUS["Right"] != STATUS_SAFE:
STATUS["Right"] = STATUS_SAFE
generate_response("L'obstacle à gauche toi à disparu.")
print("Setting SAFE for RIGHT")
elif CUBE.right < DISTANCE_SAFE and CUBE.right >= DISTANCE_CLOSE and STATUS["Right"] != STATUS_CLOSE:
STATUS["Right"] = STATUS_CLOSE
generate_response("Un obstacle se rapproche à droite de toi.")
print("Setting CLOSE for RIGHT")
elif CUBE.right < DISTANCE_CLOSE and STATUS["Right"] != STATUS_DANGER:
STATUS["Right"] = STATUS_DANGER
generate_response("Un obstacle est juste à droite de toi !")
print("Setting DANGER for RIGHT")
if __name__ == "__main__":
# Open Serial Connection
read_key()
try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
except serial.serialutil.SerialException:
print("USB Serial not found. Exiting ...")
sys.exit(0)
print(f"Listening on {SERIAL_PORT}...")
while True:
try:
if ser.in_waiting > 0:
if CAN_SPEAK is False:
current = time.time() - TIMER
if current >= AI_COOLDOWN:
CAN_SPEAK = True
data = ser.readline().decode("utf-8").strip()
try:
json_data = json.loads(data)
CUBE.init_json(json_data)
#CUBE.display()
monitor()
except json.JSONDecodeError:
print("Invalid JSON received:", data)
except KeyboardInterrupt:
print("\nClosing Serial Connection...")
ser.close()
break