实用篇 | 一文快速构建人工智能前端展示streamlit应用

夏天|여름이다 2024-07-21 13:01:04 阅读 89

----------------------- 🎈API 相关直达 🎈--------------------------

🚀Gradio: 实用篇 | 关于Gradio快速构建人工智能模型实现界面,你想知道的都在这里-CSDN博客

🚀Streamlit :实用篇 | 一文快速构建人工智能前端展示streamlit应用-CSDN博客

🚀Flask: 实用篇 | 一文学会人工智能中API的Flask编写(内含模板)-CSDN博客

Streamlit是一个用于机器学习、数据可视化的 Python 框架,它能几行代码就构建出一个精美的在线 app 应用。相比于Gradio,能展示更多的功能~

目录

1.Streamlit的安装

2.Streamlit的语法

2.1.基本语法

2.2.进阶语法

2.2.1.图片,语音,视频

2.2.2.进程提示

2.3.高级语法

2.3.1.@st.cache_data

2.3.2.st.cache_resource

3.创建一个简单的app

实时读取数据并作图

4.人工智能深度学习项目Streamlit实例

4.1.实例1:文本生成

4.1.1ChatGLM的交互

4.1.2.OpenAI的交互

4.2.图像类

4.2.1.图像分类

4.2.2.图片生成

4.3.语音类

4.3.1.语音合成

 4.3.2.语音转文本

参考文献


官网:Get started - Streamlit Docs

1.Streamlit的安装

<code># 安装

pip install streamlit

pip install streamlit-chat

# 测试

streamlit hello

会出现一些案例

2.Streamlit的语法

2.1.基本语法

import streamlit as st

最常用的几种

标题st.title() : st.title("标题")写入st.write(): st.write("Hello world ")文本st.text():单行文本多行文本框st.text_area():st.text_area("文本框",value=''key=None)滑动条st.slider():st.slider(““)按钮st.button():st.button(“按钮“)输入文本st.text_input():st.text_input(“请求用户输入“)单选框组件st.radio()

2.2.进阶语法

2.2.1.图片,语音,视频

都可以输入向量值,比特值,加载文件,文件路径

st.image()st.audio()st.video()

2.2.2.进程提示

st.progress() 显示进度st.spinner()显示执行状态st.error()显示错误信息st.warning - 显示警告信息

2.3.高级语法

2.3.1.@st.cache_data

当使用 Streamlit 的缓存注释标记函数时,它会告诉 Streamlit 每当调用函数时,它应该检查两件事:

用于函数调用的输入参数函数内部的代码

2.3.2.st.cache_resource

用于缓存返回全局资源(例如数据库连接、ML 模型)的函数的装饰器。

缓存的对象在所有用户、会话和重新运行之间共享。他们 必须是线程安全的,因为它们可以从多个线程访问 同时。如果线程安全是一个问题,请考虑改用 st.session_state 来存储每个会话的资源。

默认情况下,cache_resource函数的所有参数都必须是可哈希的。 名称以 _ 开头的任何参数都不会进行哈希处理。

3.创建一个简单的app

实时读取数据并作图

<code>import streamlit as st

import pandas as pd

import numpy as np

st.title('Uber pickups in NYC')

DATA_COLUMN = 'data/time'

DATA_URL = ('https://s3-us-west-2.amazonaws.com/'

'streamlit-demo-data/uber-raw-data-sep14.csv.gz')

# 增加缓存

@st.cache_data

# 下载数据函数

def load_data(nrows):

# 读取csv文件

data = pd.rea_csv(data_url,nrows=nrows)

# 转换小写字母

lowercase = lambda x:tr(x).lower()

# 将数据重命名

data.rename(lowercase,axis='columns',inplace=True)code>

# 将数据以panda的数据列的形式展示出来

data[DATA_COLUMN] = pd.to_datatime(data[DATA_COLUMN])

# 返回最终数据

return data

# 直接打印文本信息

data_load_state = st.text('正在下载')

# 下载一万条数据中的数据

data = load_data(10000)

# 最后输出文本显示

data_load_state.text("完成!(using st.cache_data)")

# 检查原始数据

if st.checkbox('Show raw data'):

st.subheader('Raw data')

st.write(data)

# 绘制直方图

# 添加一个子标题

st.subheader('Number of pickups by hour')

# 使用numpy生成一个直方图,按小时排列

hist_values = np.histogram(data[DATE_COLUMN].dt.hour, bins=24, range=(0,24))[0]

# 使用Streamlit 的 st.bar_chart() 方法来绘制直方图

st.bar_chart(hist_values)

# 使用滑动块筛选结果

hour_to_filter = st.slider('hour', 0, 23, 17)

# 实时更新

filtered_data = data[data[DATE_COLUMN].dt.hour == hour_to_filter]

# 为地图添加一个副标题

st.subheader('Map of all pickups at %s:00' % hour_to_filter)

# 使用st.map()函数绘制数据

st.map(filtered_data)

运行

streamlit run demo.py

4.人工智能深度学习项目Streamlit实例

4.1.实例1:文本生成

4.1.1ChatGLM的交互

<code>from transformers import AutoModel, AutoTokenizer

import streamlit as st

from streamlit_chat import message

st.set_page_config(

page_title="ChatGLM-6b 演示",code>

page_icon=":robot:"code>

)

@st.cache_resource

def get_model():

tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm-6b", trust_remote_code=True)

model = AutoModel.from_pretrained("THUDM/chatglm-6b", trust_remote_code=True).half().cuda()

model = model.eval()

return tokenizer, model

MAX_TURNS = 20

MAX_BOXES = MAX_TURNS * 2

def predict(input, max_length, top_p, temperature, history=None):

tokenizer, model = get_model()

if history is None:

history = []

with container:

if len(history) > 0:

if len(history)>MAX_BOXES:

history = history[-MAX_TURNS:]

for i, (query, response) in enumerate(history):

message(query, avatar_style="big-smile", key=str(i) + "_user")code>

message(response, avatar_style="bottts", key=str(i))code>

message(input, avatar_style="big-smile", key=str(len(history)) + "_user")code>

st.write("AI正在回复:")

with st.empty():

for response, history in model.stream_chat(tokenizer, input, history, max_length=max_length, top_p=top_p,

temperature=temperature):

query, response = history[-1]

st.write(response)

return history

container = st.container()

# create a prompt text for the text generation

prompt_text = st.text_area(label="用户命令输入",code>

height = 100,

placeholder="请在这儿输入您的命令")code>

max_length = st.sidebar.slider(

'max_length', 0, 4096, 2048, step=1

)

top_p = st.sidebar.slider(

'top_p', 0.0, 1.0, 0.6, step=0.01

)

temperature = st.sidebar.slider(

'temperature', 0.0, 1.0, 0.95, step=0.01

)

if 'state' not in st.session_state:

st.session_state['state'] = []

if st.button("发送", key="predict"):code>

with st.spinner("AI正在思考,请稍等........"):

# text generation

st.session_state["state"] = predict(prompt_text, max_length, top_p, temperature, st.session_state["state"])

4.1.2.OpenAI的交互

from openai import OpenAI

import streamlit as st

with st.sidebar:

openai_api_key = st.text_input("OpenAI API Key", key="chatbot_api_key", type="password")code>

"[Get an OpenAI API key](https://platform.openai.com/account/api-keys)"

"[View the source code](https://github.com/streamlit/llm-examples/blob/main/Chatbot.py)"

"[![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/streamlit/llm-examples?quickstart=1)"

st.title("💬 Chatbot")

st.caption("🚀 A streamlit chatbot powered by OpenAI LLM")

if "messages" not in st.session_state:

st.session_state["messages"] = [{"role": "assistant", "content": "How can I help you?"}]

for msg in st.session_state.messages:

st.chat_message(msg["role"]).write(msg["content"])

if prompt := st.chat_input():

if not openai_api_key:

st.info("Please add your OpenAI API key to continue.")

st.stop()

client = OpenAI(api_key=openai_api_key)

st.session_state.messages.append({"role": "user", "content": prompt})

st.chat_message("user").write(prompt)

response = client.chat.completions.create(model="gpt-3.5-turbo", messages=st.session_state.messages)code>

msg = response.choices[0].message.content

st.session_state.messages.append({"role": "assistant", "content": msg})

st.chat_message("assistant").write(msg)

4.2.图像类

4.2.1.图像分类

import streamlit as st

st.markdown('<h1 style="color:black;">Vgg 19 Image classification model</h1>', unsafe_allow_html=True)code>

st.markdown('<h2 style="color:gray;">The image classification model classifies image into following categories:</h2>', unsafe_allow_html=True)code>

st.markdown('<h3 style="color:gray;"> street, buildings, forest, sea, mountain, glacier</h3>', unsafe_allow_html=True)code>

# 背景图片background image to streamlit

@st.cache(allow_output_mutation=True)

# 以base64的方式传输文件

def get_base64_of_bin_file(bin_file):

with open(bin_file, 'rb') as f:

data = f.read()

return base64.b64encode(data).decode()

#设置背景图片,颜色等

def set_png_as_page_bg(png_file):

bin_str = get_base64_of_bin_file(png_file)

page_bg_img = '''

<style>

.stApp {

background-image: url("data:image/png;base64,%s");

background-size: cover;

background-repeat: no-repeat;

background-attachment: scroll; # doesn't work

}

</style>

''' % bin_str

st.markdown(page_bg_img, unsafe_allow_html=True)

return

set_png_as_page_bg('/content/background.webp')

# 上传png/jpg的照片

upload= st.file_uploader('Insert image for classification', type=['png','jpg'])

c1, c2= st.columns(2)

if upload is not None:

im= Image.open(upload)

img= np.asarray(im)

image= cv2.resize(img,(224, 224))

img= preprocess_input(image)

img= np.expand_dims(img, 0)

c1.header('Input Image')

c1.image(im)

c1.write(img.shape)

# 下载预训练模型

# 输入尺寸

input_shape = (224, 224, 3)

# 定义优化器

optim_1 = Adam(learning_rate=0.0001)

# 分类数

n_classes=6

# 定义模型

vgg_model = model(input_shape, n_classes, optim_1, fine_tune=2)

# 下载权重

vgg_model.load_weights('/content/drive/MyDrive/vgg/tune_model19.weights.best.hdf5')

#预测

vgg_preds = vgg_model.predict(img)

vgg_pred_classes = np.argmax(vgg_preds, axis=1)

c2.header('Output')

c2.subheader('Predicted class :')

c2.write(classes[vgg_pred_classes[0]] )

4.2.2.图片生成

import streamlit as st

from dotenv import load_dotenv

import os

import openai

from diffusers import StableDiffusionPipeline

import torch

load_dotenv()

openai.api_key = os.getenv("OPENAI_API_KEY")

#function to generate AI based images using OpenAI Dall-E

def generate_images_using_openai(text):

response = openai.Image.create(prompt= text, n=1, size="512x512")code>

image_url = response['data'][0]['url']

return image_url

#function to generate AI based images using Huggingface Diffusers

def generate_images_using_huggingface_diffusers(text):

pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)

pipe = pipe.to("cuda")

prompt = text

image = pipe(prompt).images[0]

return image

#Streamlit Code

choice = st.sidebar.selectbox("Select your choice", ["Home", "DALL-E", "Huggingface Diffusers"])

if choice == "Home":

st.title("AI Image Generation App")

with st.expander("About the App"):

st.write("This is a simple image generation app that uses AI to generates images from text prompt.")

elif choice == "DALL-E":

st.subheader("Image generation using Open AI's DALL-E")

input_prompt = st.text_input("Enter your text prompt")

if input_prompt is not None:

if st.button("Generate Image"):

image_url = generate_images_using_openai(input_prompt)

st.image(image_url, caption="Generated by DALL-E")code>

elif choice == "Huggingface Diffusers":

st.subheader("Image generation using Huggingface Diffusers")

input_prompt = st.text_input("Enter your text prompt")

if input_prompt is not None:

if st.button("Generate Image"):

image_output = generate_images_using_huggingface_diffusers(input_prompt)

st.info("Generating image.....")

st.success("Image Generated Successfully")

st.image(image_output, caption="Generated by Huggingface Diffusers")code>

4.3.语音类

4.3.1.语音合成

import torch

import streamlit as st

# 这里使用coqui-tts,直接pip install tts就可以

from TTS.api import TTS

import tempfile

import os

device = "cuda" if torch.cuda.is_available() else "cpu"

# 模型选择

model_name = 'tts_models/en/jenny/jenny'

tts = TTS(model_name).to(device)

st.title('Coqui TTS')

# 输入文本

text_to_speak = st.text_area('Entire article text here:', '')

# 点击按钮监听

if st.button('Listen'):

if text_to_speak:

# temp path needed for audio to listen to

# 定义合成语音文件名称

temp_audio_path = './temp_audio.wav'

# 使用tts库中的tts_to_file函数

tts.tts_to_file(text=text_to_speak, file_path=temp_audio_path)

#输出语音

st.audio(temp_audio_path, format='audio/wav')code>

os.unlink(temp_audio_path)

 4.3.2.语音转文本

import logging

import logging.handlers

import queue

import threading

import time

import urllib.request

import os

from collections import deque

from pathlib import Path

from typing import List

import av

import numpy as np

import pydub

import streamlit as st

from twilio.rest import Client

from streamlit_webrtc import WebRtcMode, webrtc_streamer

HERE = Path(__file__).parent

logger = logging.getLogger(__name__)

# This code is based on https://github.com/streamlit/demo-self-driving/blob/230245391f2dda0cb464008195a470751c01770b/streamlit_app.py#L48 # noqa: E501

def download_file(url, download_to: Path, expected_size=None):

# Don't download the file twice.

# (If possible, verify the download using the file length.)

if download_to.exists():

if expected_size:

if download_to.stat().st_size == expected_size:

return

else:

st.info(f"{url} is already downloaded.")

if not st.button("Download again?"):

return

download_to.parent.mkdir(parents=True, exist_ok=True)

# These are handles to two visual elements to animate.

weights_warning, progress_bar = None, None

try:

weights_warning = st.warning("Downloading %s..." % url)

progress_bar = st.progress(0)

with open(download_to, "wb") as output_file:

with urllib.request.urlopen(url) as response:

length = int(response.info()["Content-Length"])

counter = 0.0

MEGABYTES = 2.0 ** 20.0

while True:

data = response.read(8192)

if not data:

break

counter += len(data)

output_file.write(data)

# We perform animation by overwriting the elements.

weights_warning.warning(

"Downloading %s... (%6.2f/%6.2f MB)"

% (url, counter / MEGABYTES, length / MEGABYTES)

)

progress_bar.progress(min(counter / length, 1.0))

# Finally, we remove these visual elements by calling .empty().

finally:

if weights_warning is not None:

weights_warning.empty()

if progress_bar is not None:

progress_bar.empty()

# This code is based on https://github.com/whitphx/streamlit-webrtc/blob/c1fe3c783c9e8042ce0c95d789e833233fd82e74/sample_utils/turn.py

@st.cache_data # type: ignore

def get_ice_servers():

"""Use Twilio's TURN server because Streamlit Community Cloud has changed

its infrastructure and WebRTC connection cannot be established without TURN server now. # noqa: E501

We considered Open Relay Project (https://www.metered.ca/tools/openrelay/) too,

but it is not stable and hardly works as some people reported like https://github.com/aiortc/aiortc/issues/832#issuecomment-1482420656 # noqa: E501

See https://github.com/whitphx/streamlit-webrtc/issues/1213

"""

# Ref: https://www.twilio.com/docs/stun-turn/api

try:

account_sid = os.environ["TWILIO_ACCOUNT_SID"]

auth_token = os.environ["TWILIO_AUTH_TOKEN"]

except KeyError:

logger.warning(

"Twilio credentials are not set. Fallback to a free STUN server from Google." # noqa: E501

)

return [{"urls": ["stun:stun.l.google.com:19302"]}]

client = Client(account_sid, auth_token)

token = client.tokens.create()

return token.ice_servers

def main():

st.header("Real Time Speech-to-Text")

st.markdown(

"""

This demo app is using [DeepSpeech](https://github.com/mozilla/DeepSpeech),

an open speech-to-text engine.

A pre-trained model released with

[v0.9.3](https://github.com/mozilla/DeepSpeech/releases/tag/v0.9.3),

trained on American English is being served.

"""

)

# https://github.com/mozilla/DeepSpeech/releases/tag/v0.9.3

MODEL_URL = "https://github.com/mozilla/DeepSpeech/releases/download/v0.9.3/deepspeech-0.9.3-models.pbmm" # noqa

LANG_MODEL_URL = "https://github.com/mozilla/DeepSpeech/releases/download/v0.9.3/deepspeech-0.9.3-models.scorer" # noqa

MODEL_LOCAL_PATH = HERE / "models/deepspeech-0.9.3-models.pbmm"

LANG_MODEL_LOCAL_PATH = HERE / "models/deepspeech-0.9.3-models.scorer"

download_file(MODEL_URL, MODEL_LOCAL_PATH, expected_size=188915987)

download_file(LANG_MODEL_URL, LANG_MODEL_LOCAL_PATH, expected_size=953363776)

lm_alpha = 0.931289039105002

lm_beta = 1.1834137581510284

beam = 100

sound_only_page = "Sound only (sendonly)"

with_video_page = "With video (sendrecv)"

app_mode = st.selectbox("Choose the app mode", [sound_only_page, with_video_page])

if app_mode == sound_only_page:

app_sst(

str(MODEL_LOCAL_PATH), str(LANG_MODEL_LOCAL_PATH), lm_alpha, lm_beta, beam

)

elif app_mode == with_video_page:

app_sst_with_video(

str(MODEL_LOCAL_PATH), str(LANG_MODEL_LOCAL_PATH), lm_alpha, lm_beta, beam

)

def app_sst(model_path: str, lm_path: str, lm_alpha: float, lm_beta: float, beam: int):

webrtc_ctx = webrtc_streamer(

key="speech-to-text",code>

mode=WebRtcMode.SENDONLY,

audio_receiver_size=1024,

rtc_configuration={"iceServers": get_ice_servers()},

media_stream_constraints={"video": False, "audio": True},

)

status_indicator = st.empty()

if not webrtc_ctx.state.playing:

return

status_indicator.write("Loading...")

text_output = st.empty()

stream = None

while True:

if webrtc_ctx.audio_receiver:

if stream is None:

from deepspeech import Model

model = Model(model_path)

model.enableExternalScorer(lm_path)

model.setScorerAlphaBeta(lm_alpha, lm_beta)

model.setBeamWidth(beam)

stream = model.createStream()

status_indicator.write("Model loaded.")

sound_chunk = pydub.AudioSegment.empty()

try:

audio_frames = webrtc_ctx.audio_receiver.get_frames(timeout=1)

except queue.Empty:

time.sleep(0.1)

status_indicator.write("No frame arrived.")

continue

status_indicator.write("Running. Say something!")

for audio_frame in audio_frames:

sound = pydub.AudioSegment(

data=audio_frame.to_ndarray().tobytes(),

sample_width=audio_frame.format.bytes,

frame_rate=audio_frame.sample_rate,

channels=len(audio_frame.layout.channels),

)

sound_chunk += sound

if len(sound_chunk) > 0:

sound_chunk = sound_chunk.set_channels(1).set_frame_rate(

model.sampleRate()

)

buffer = np.array(sound_chunk.get_array_of_samples())

stream.feedAudioContent(buffer)

text = stream.intermediateDecode()

text_output.markdown(f"**Text:** {text}")

else:

status_indicator.write("AudioReciver is not set. Abort.")

break

def app_sst_with_video(

model_path: str, lm_path: str, lm_alpha: float, lm_beta: float, beam: int

):

frames_deque_lock = threading.Lock()

frames_deque: deque = deque([])

async def queued_audio_frames_callback(

frames: List[av.AudioFrame],

) -> av.AudioFrame:

with frames_deque_lock:

frames_deque.extend(frames)

# Return empty frames to be silent.

new_frames = []

for frame in frames:

input_array = frame.to_ndarray()

new_frame = av.AudioFrame.from_ndarray(

np.zeros(input_array.shape, dtype=input_array.dtype),

layout=frame.layout.name,

)

new_frame.sample_rate = frame.sample_rate

new_frames.append(new_frame)

return new_frames

webrtc_ctx = webrtc_streamer(

key="speech-to-text-w-video",code>

mode=WebRtcMode.SENDRECV,

queued_audio_frames_callback=queued_audio_frames_callback,

rtc_configuration={"iceServers": get_ice_servers()},

media_stream_constraints={"video": True, "audio": True},

)

status_indicator = st.empty()

if not webrtc_ctx.state.playing:

return

status_indicator.write("Loading...")

text_output = st.empty()

stream = None

while True:

if webrtc_ctx.state.playing:

if stream is None:

from deepspeech import Model

model = Model(model_path)

model.enableExternalScorer(lm_path)

model.setScorerAlphaBeta(lm_alpha, lm_beta)

model.setBeamWidth(beam)

stream = model.createStream()

status_indicator.write("Model loaded.")

sound_chunk = pydub.AudioSegment.empty()

audio_frames = []

with frames_deque_lock:

while len(frames_deque) > 0:

frame = frames_deque.popleft()

audio_frames.append(frame)

if len(audio_frames) == 0:

time.sleep(0.1)

status_indicator.write("No frame arrived.")

continue

status_indicator.write("Running. Say something!")

for audio_frame in audio_frames:

sound = pydub.AudioSegment(

data=audio_frame.to_ndarray().tobytes(),

sample_width=audio_frame.format.bytes,

frame_rate=audio_frame.sample_rate,

channels=len(audio_frame.layout.channels),

)

sound_chunk += sound

if len(sound_chunk) > 0:

sound_chunk = sound_chunk.set_channels(1).set_frame_rate(

model.sampleRate()

)

buffer = np.array(sound_chunk.get_array_of_samples())

stream.feedAudioContent(buffer)

text = stream.intermediateDecode()

text_output.markdown(f"**Text:** {text}")

else:

status_indicator.write("Stopped.")

break

if __name__ == "__main__":

import os

DEBUG = os.environ.get("DEBUG", "false").lower() not in ["false", "no", "0"]

logging.basicConfig(

format="[%(asctime)s] %(levelname)7s from %(name)s in %(pathname)s:%(lineno)d: "code>

"%(message)s",

force=True,

)

logger.setLevel(level=logging.DEBUG if DEBUG else logging.INFO)

st_webrtc_logger = logging.getLogger("streamlit_webrtc")

st_webrtc_logger.setLevel(logging.DEBUG)

fsevents_logger = logging.getLogger("fsevents")

fsevents_logger.setLevel(logging.WARNING)

main()

参考文献

【1】API Reference - Streamlit Docs

【2】andfanilo/streamlit-lottie: Streamlit component to render Lottie animations (github.com)turner-anderson/streamlit-cropper: A simple image cropper for Streamlit (github.com)andfanilo/streamlit-lottie: Streamlit component to render Lottie animations (github.com) 

【3】awetomate/text-to-speech-streamlit: Text-to-Speech solution using Google's Cloud TTS API and a Streamlit front end (github.com) 【4】Using streamlit for an STT / TTS model demo? - 🧩 Streamlit Components - Streamlit

【5】AI-App/Streamlit-TTS (github.com)

【6】Building a Voice Assistant using ChatGPT API | Vahid's ML-Blog (vahidmirjalili.com) 

【7】streamlit/llm-examples: Streamlit LLM app examples for getting started (github.com) 

【8】whitphx/streamlit-stt-app: Real time web based Speech-to-Text app with Streamlit (github.com)



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。