以下代码可以在树莓派与Azure OpenAI 语音服务实现对话机器人😁
import os
import azure.cognitiveservices.speech as speechsdk
import openai
import RPi.GPIO as GPIO
import time
import threading
import textwrap
import wave
import pygame
import pyttsx3
import Adafruit_GPIO.SPI as SPI
import Adafruit_SSD1306
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
# 设置 OpenAI API 凭据
# 设置引擎名称
engine_name = "engine_name"
openai.api_key = "api_key"
openai.api_base = "openai.api_base"
# 您的终端节点应如下所示
openai.api_type = 'azure'
openai.api_version = '2023-03-15-preview'
# this may change in the future
deployment_name='deployment_name"'
#This will correspond to the custom name you chose for your deployment when you deployed a model.
# Set up Azure Speech-to-Text and Text-to-Speech credentials
speech_key = "speech_key"
service_region = "service_region"
speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=service_region)
# Set up Azure Speech-to-Text language recognition
speech_config.speech_recognition_language = "zh-CN"
# Set up the voice configuration
speech_config.speech_synthesis_voice_name = "zh-CN-YunyangNeural"
speech_synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config)
# 设置 GPIO 模式
GPIO.setmode(GPIO.BCM)
# 设置 GPIO14 为输出模式
GPIO.setup(14, GPIO.OUT)
def play_wav(file_path, volume=1.0):
# 初始化 mixer 模块
pygame.mixer.init()
# 加载 wav 文件
sound = pygame.mixer.Sound(file_path)
# 设置音量大小
sound.set_volume(volume)
# 播放音频
sound.play()
# 初始化显示器
disp = Adafruit_SSD1306.SSD1306_128_64(rst=None)
disp.begin()
# 清除显示器
disp.clear()
disp.display()
# 创建一个空白图像,用于绘制文本
width = disp.width
height = disp.height
image = Image.new('1', (width, height))
# 获取绘图对象,用于在图像上绘制文本
draw = ImageDraw.Draw(image)
# 设置字体和字体大小
font1 = ImageFont.truetype('chinese.msyh.ttf', 42)
font2 = ImageFont.truetype('chinese.msyh.ttf', 12)
# 绘制文本
text1 = "MOSS"
text2 = "一个尽职的人工智能"
draw.text((0, 0), text1, font=font1, fill=255)
draw.text((0, 50), text2, font=font2, fill=255)
# 显示图像
disp.image(image)
disp.display()
# 控制 GPIO14 引脚实现呼吸灯效果
p = GPIO.PWM(14, 50)
p.start(0)
while pygame.mixer.get_busy():
for dc in range(0, 101, 5):
p.ChangeDutyCycle(dc)
time.sleep(0.1)
for dc in range(100, -1, -5):
p.ChangeDutyCycle(dc)
time.sleep(0.1)
# 停止 PWM 输出
p.stop()
# 播放指定的 wav 文件,并设置音量为 50%
play_wav("/home/haibin/MOSS1.wav", volume=0.2)
# 清理 GPIO 资源
GPIO.cleanup()
def speak(text):
engine = pyttsx3.init()
engine.say(text)
engine.runAndWait()
def speech_to_text():
# Set up the audio configuration
audio_config = speechsdk.audio.AudioConfig(use_default_microphone=True)
# Create a speech recognizer and start the recognition
speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config)
print("等待响应...")
result = speech_recognizer.recognize_once_async().get()
if result.reason == speechsdk.ResultReason.RecognizedSpeech:
return result.text
elif result.reason == speechsdk.ResultReason.NoMatch:
return "MOSS: 你好我是MOSS,还有一个更好听的名字小苔藓,我只是一个尽职的人工智能,为了心中人类文明延续的信念而努力"
elif result.reason == speechsdk.ResultReason.Canceled:
return "Recognition canceled."
# Define the text-to-speech function
def text_to_speech(text):
try:
result = speech_synthesizer.speak_text_async(text).get()
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
print("Text-to-speech conversion successful.")
return True
else:
print(f"Error synthesizing audio: {result}")
return False
except Exception as ex:
print(f"Error synthesizing audio: {ex}")
return False
# Define the Azure OpenAI language generation function
def generate_text(prompt):
response = openai.ChatCompletion.create(
engine=engine_name,
messages=[
{"role": "system", "content": "You are an AI assistant that helps people find information."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=800,
top_p=0.95,
frequency_penalty=0,
presence_penalty=0,
stop=None
)
return response['choices'][0]['message']['content']
# 初始化显示器
disp = Adafruit_SSD1306.SSD1306_128_64(rst=None)
disp.begin()
disp.clear()
disp.display()
# 创建一个空白图像,用于绘制文本
width = disp.width
height = disp.height
image = Image.new('1', (width, height))
# 获取绘图对象,用于在图像上绘制文本
draw = ImageDraw.Draw(image)
# 设置字体和字体大小
font = ImageFont.truetype('chinese.msyh.ttf', 12)
x = 0
y = 0
def display_text(text):
# 设置每行的最大字符数
max_chars_per_line = 10
# 将文本分成多行
lines = []
for line in text.split('\n'):
if line.startswith("MOSS:"):
lines.append(line)
else:
lines.extend(textwrap.wrap(line, width=max_chars_per_line))
# 设置显示速度(每行文本的显示时间)
speed = 1 # 单位:秒
# 滚动并朗读文本
for i, line in enumerate(lines):
# 将图像向上移动
image.paste(image, (0, -font.getsize(line)[1]))
# 在底部绘制新的文本行
draw.rectangle((0, height - font.getsize(line)[1], width, height), outline=0, fill=0)
draw.text((x, height - font.getsize(line)[1]), line, font=font, fill=255)
# 更新显示内容
disp.image(image)
disp.display()
# 如果当前行是响应,则启动语音播报线程
if line.startswith("MOSS:"):
response = text.split("MOSS: ")[1]
tts_thread = threading.Thread(target=text_to_speech, args=(response,))
tts_thread.start()
# 等待一段时间
time.sleep(speed)
def main():
while True:
# 获取用户输入(例如,使用语音识别)
user_input = speech_to_text()
# 获取响应
prompt = f"Q: {user_input}\nA:"
response = generate_text(prompt)
# 在屏幕上显示文本
display_text(f"你: {user_input}\nMOSS: {response}")
if __name__ == '__main__':
main()
后续继续完善中