一、工作流程
-
服务文件传输、命令通讯用的ssh
-
大模型返回的动作编排格式:['动作1()', '动作2()',...]
二、代码
1.录音+语音识别
# asr_aipc.py
def asr_aipc():
print('开始录音')
terminal = 'ssh pi@{} "python TonyPi/OpenVINO/utils_asr.py"'.format(PI_IP)
os.system(terminal)
print('开始语音识别')
terminal = 'scp pi@{}:~/TonyPi/OpenVINO/temp/speech_recognition.txt temp/speech_recognition.txt'.format(PI_IP)
os.system(terminal)
with open('temp/speech_recognition.txt', 'r', encoding='utf-8') as f:
speech_result = f.read()
print('【语音识别结果】', speech_result)
return speech_result
# utils_asr.py
def record(MIC_INDEX=2, DURATION=8):
'''
调用麦克风录音,需用arecord -l命令获取麦克风ID
DURATION,录音时长
'''
print('开始 {} 秒录音'.format(DURATION))
os.system('sudo arecord -D "plughw:{}" -f dat -c 1 -r 16000 -d {} /home/pi/TonyPi/OpenVINO/temp/speech_record.wav'.format(MIC_INDEX, DURATION))
print('录音结束')
# 语音识别: appBuilder
2.调用大模型
-
输入
你是我的机器人,请你根据我的指令,以json形式输出接下来要运行的对应函数和你给我的回复
你只需要回答一个列表即可,不要回答任何中文
【以下是所有动作函数】
站立:stand()
原地踏步:stepping()
前进一步:move_forward()
后退一步:move_back()
向左平移移动一步:move_left()
向右平移移动一步:move_right()
向左旋转移动:turn_left()
向右旋转移动:turn_right()
鞠躬:bow()
挥手打招呼:wave()
扭腰:twist()
捶胸庆祝:celebrate()
下蹲:squat()
踢右脚:right_shot()
踢左脚:left_shot()
仰卧起坐:sit_ups()
佛山叶问的咏春拳:wing_chun()
从前倾趴卧起立,也就是从趴下到站起来:stand_up_front()
从后仰躺倒起立,也就是从躺下到站起来:stand_up_back()
巡线跨栏模式,顺着黑色线前进并跨越台阶等障碍物:athletics()
播放音乐并跳舞(唱跳RAP):rap()
踢不同颜色的足球:kickball('red')
搬运不同颜色的海绵方块:transport('red green blue')
【输出限制】
你直接输出json即可,从{开始,以}结束,【不要】输出```json的开头或结尾
在'action'键中,输出函数名列表,列表中每个元素都是字符串,代表要运行的函数名称和参数。每个函数既可以单独运行,也可以和其他函数先后运行。列表元素的先后顺序,表示执行函数的先后顺序
在'response'键中,根据我的指令和你编排的动作,以第一人称简短输出你回复我的中文,要求幽默、善意、玩梗、有趣。不要超过20个字,不要回复英文。
如果我让你从躺倒状态站起来,你回复一些和“躺平”相关的话
kickball和transport函数需要用双引号
【以下是一些具体的例子】
我的指令:你最喜欢哪种颜色呀。你回复:{'action':[], 'response':'我喜欢蓝色,因为我喜欢贝加尔湖,深邃而神秘'}
我的指令:请你先鞠个躬,然后挥挥手。你回复:{'action':['bow()', 'wave()'], 'response':'敬个礼挥挥手,你是我的好朋友'}
我的指令:先前进,再后退,向左转一点,再向右平移。你回复:{'action':['move_forward()', 'move_back()', 'turn_left()', 'move_right()'], 'response':'你真是操作大师'}
我的指令:先蹲下,再站起来,最后做个庆祝的动作。你回复:{'action':['squat()', 'stand()', 'celebrate()'], 'response':'像奥运举重冠军的动作'}
我的指令:向前走两步,向后退三步。你回复:{'action':['move_forward()', 'move_forward()', 'move_back()', 'move_back()', 'move_back()'], 'response':'恰似历史的进程,充满曲折'}
我的指令:先挥挥手,然后踢绿色的足球。你回复:{'action':['wave()', "kickball('green')"], 'response':'绿色的足球咱可以踢,绿色的帽子咱可不戴'}
我的指令:先活动活动筋骨,然后把红色和蓝色的海绵方块搬运到指定位置。你回复:{'action':['twist()', “transport('red blue')”], 'response':'我听说特斯拉的人形机器人兄弟们,每天都在干这种活'}
我的指令:先踢右脚,再踢左脚,然后搬运海绵方块。你回复:{'action':['right_shot()', 'left_shot()', “transport('red green blue')”], 'response':'让我先活动活动,然后让海绵宝宝们各回各家'}
我的指令:别躺着了,快起来,把红色和蓝色方块搬运到指定位置。你回复:{'action':['stand_up_back()', “transport('red blue')”], 'response':'我也想躺平啊,奈何得干活儿'}
【我现在的指令是】
-
输出
['wave()', 'celebrate()']
3. 依次执行动作
-
语音合成+播放
# utils_asr.py
def tts(ai_response):
# 写入文件
with open('temp/ai_response.txt', 'w', encoding='utf-8') as f:
f.write(ai_response)
# 传到开发板
terminal = 'scp temp/ai_response.txt pi@{}:~/TonyPi/OpenVINO/temp/'.format(PI_IP)
os.system(terminal)
print('开始语音合成')
terminal = 'ssh pi@{} "python ~/TonyPi/OpenVINO/utils_tts.py"'.format(PI_IP)
os.system(terminal)
# utils_tts.py"
def tts(TEXT='我是同济子豪兄的机器人', tts_wav_path = '/home/pi/TonyPi/OpenVINO/temp/tts.wav'):
'''
语音合成TTS,生成wav音频文件
'''
inp = appbuilder.Message(content={"text": TEXT})
out = tts_ab.run(inp, model="paddlespeech-tts", audio_type="wav")
# out = tts_ab.run(inp, audio_type="wav")
with open(tts_wav_path, "wb") as f:
f.write(out.content["audio_binary"])
# print("TTS语音合成,导出wav音频文件至:{}".format(tts_wav_path))
def play_wav(wav_file='/home/pi/TonyPi/OpenVINO/temp/tts.wav'):
'''
播放wav音频文件
'''
prompt = 'aplay -t wav {} -q'.format(wav_file)
os.system(prompt)
-
依次执行动作
-
舵机编号
- 一个动作组,比如wave,存储的是每个舵机的不同角度序列,用sqllite文件存储
# ActionGroupControl.py
def runAction(actNum, lock_servos='', path="/home/pi/TonyPi/ActionGroups/"):
'''
运行动作组,无法发送stop停止信号
:param actNum: 动作组名字 , 字符串类型
:return:
'''
global runningAction
global stop_action
if actNum is None:
return
actNum = path + actNum + ".d6a"
if os.path.exists(actNum) is True:
if runningAction is False:
runningAction = True
ag = sql.connect(actNum)
cu = ag.cursor()
cu.execute("select * from ActionGroup")
while True:
act = cu.fetchone()
if stop_action is True:
stop_action = False
print('stop')
break
if act is not None:
for i in range(0, len(act) - 2, 1):
if str(i + 1) in lock_servos:
ctl.set_bus_servo_pulse(i+1, lock_servos[str(i + 1)], act[1])
else:
ctl.set_bus_servo_pulse(i+1, act[2 + i], act[1])
time.sleep(float(act[1])/1000.0)
else: # 运行完才退出
break
runningAction = False
cu.close()
ag.close()
else:
runningAction = False
print("未能找到动作组文件:(),是否将动作组保存在目录:{}".format(actNum, path))