基于flask和百度AI接口实现前后端的语音交互

作者:一抹浅笑
cnblogs.com/zhaopanpan/articles/9404211.html

在百度AI中,语音识别,语音合成,词法分析。

简单的实现,前后端的语音交互。

封装百度的AI接口

import os
from uuid import uuid4
from aip import AipSpeech
from aip import AipNlp

import settings

""" 你的 APPID AK SK """
APP_ID = '11617876'
API_KEY = 'KqqpO9GclBimrcSNrSANPhUQ'
SECRET_KEY = 'xc7IFW4w6DVtuNQlMkBX05Ulhx5Mm5zh'

client = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
nlp_client = AipNlp(APP_ID, API_KEY, SECRET_KEY)

# 音频转为文本
def audio2text(file_name):
    file_path = os.path.join(settings.AUDIO_PCM_DIR, file_name)
    cmd_str = f'ffmpeg -y -i {file_path} -acodec pcm_s16le -f s16le -ac 1 -ar 16000 {file_path}.pcm'
    os.system(cmd_str)
    with open(f'{file_path}.pcm''rb'as f:
        audio_context = f.read()
    res = client.asr(audio_context, 'pcm'16000, {
        "dev_pid"1537
    })
    print("res",res)
    if res.get('err_no'):
        return res
    return res.get('result')[0]

# 文本转为音频
def text2audio(text):
    file_name = f"{uuid4()}.mp3"
    file_path = os.path.join(settings.AUDIO_DIR, file_name)
    res = client.synthesis(text, 'zh'1, {
        "vol"5,
        'pit'7,
        "spd"4,
        "per"4
    })
    if isinstance(res, dict):
        return res
    with open(file_path, 'wb'as f:
        f.write(res)
    return file_name

# 词法的匹配分析
def my_nlp(text):
    print("text", nlp_client.simnet('你今年几岁了', text))
    if nlp_client.simnet('你今年几岁了', text).get('score') >= 0.72:
        return '我今年73了,不然84也行'
    elif nlp_client.simnet('你叫什么名字', text).get('score') >= 0.72:
        return '我的名字叫傻逼'
    elif nlp_client.simnet('你在哪儿学习',text).get('score') >= 0.72:
        return '我在老男孩教育'
    else:
        return '不知道你在说什么'

html页面

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
    <audio autoplay="autoplay"
           controls id="play_mp3"
           src="http://127.0.0.1:5000/getfile/ad74226f-3646-40d5-9548-7e817f1cb405.mp3">

        浏览器不支持
    </audio>
    <button onclick="start_reco()">开始录音</button>
    <button onclick="stop_reco()">结束录音</button>
</body>
<script type="text/javascript" src="/static/Recorder.js"></script>
<script type="text/javascript">
    //建立 websocket 连接
    var ws = new WebSocket('ws://127.0.0.1:5000/upload');
    // ws.onopen = function () {
    //     ws.send("hello")
    // };

    // 如何录音
    var reco = null;
    // 创建一个录音audiocontext对象
    var audio_context = new AudioContext();
    // 解决浏览器的兼容问题
    navigator.getUserMedia = (navigator.getUserMedia ||
    navigator.webkitGetUserMedia ||
    navigator.mozGetUserMedia ||
    navigator.msGetUserMedia);

    navigator.getUserMedia({audio:true},create_stream,function (err{
        console.log(err)
    });

    function create_stream(stream{
        var stream_input = audio_context.createMediaStreamSource(stream);
        reco = new Recorder(stream_input)
    }
    // 开始录音
    function start_reco({
        reco.record();
    }
    // 结束录音
    function stop_reco({
        reco.stop();
        get_audio();
        reco.clear();
    }
    // 向后端发送录制的音频
    function get_audio({
        reco.exportWAV(function (wav_file{
            ws.send(wav_file)
        })
    }
    // 接收后端发送的信息,触发回调函数
    ws.onmessage = function (ev{
        // 从ev.data中获取参数
        var data = JSON.parse(ev.data);
        document.getElementById("play_mp3").src = "http://127.0.0.1:5000/getfile/"+data.filename;
    }

</script>
</html>

后端flask 实现逻辑

import os
import json
from uuid import uuid4

from geventwebsocket.handler import WebSocketHandler
from geventwebsocket.websocket import WebSocket
from gevent.pywsgi import WSGIServer
from flask import Flask,request,render_template,send_file
import baidu_aip
import settings

app = Flask(__name__)  # type:Flask

@app.route('/index')
def index():
    return render_template('index.html')

@app.route('/getfile/<filename>')
def getfile(filename):
    file_path = os.path.join(settings.AUDIO_DIR,filename)
    return send_file(file_path)

@app.route('/upload')
def upload():
    ws = request.environ.get('wsgi.websocket')
    if not ws:
        return '请使用websocket连接'

    while True:
        message = ws.receive()
        if isinstance(message,bytearray):
        # if message:
            file_name = f"{uuid4()}.wav"
            file_path = os.path.join(settings.AUDIO_PCM_DIR,file_name)
            with open(file_path,'wb'as f:
                f.write(message)
            asr_str = baidu_aip.audio2text(file_name)
            answer = baidu_aip.my_nlp(asr_str)
            file_mp3_name = baidu_aip.text2audio(answer)
            send_dic = json.dumps({
                "filename":file_mp3_name,
                "play_type":'audio',
                "sendtime":111
            })
            ws.send(send_dic)
        else:
            ws.close()

if __name__ == '__main__':
    http_server = WSGIServer(('127.0.0.1',5000),app,handler_class=WebSocketHandler)
    http_server.serve_forever()


关注后端技术精选,每天推送优质好文

基于flask和百度AI接口实现前后端的语音交互

发表评论