Web植物管理系统·上位机部分
CS小麻瓜 2024-10-17 11:03:01 阅读 76
本节主要展示上位机工程,上位机部分由于采用streamlit库进行web开发,由于streamlit不支持多线程,所以会存在一定bug。欢迎大家进行批评指正。
采用streamlit构建前端所以不能直接运行python文件,而应该在根目录下启动终端,运行指令
streamlit run main.py
这个是streamlit官方文档,大家可以参考修改代码 streamlit官方API文档
这个是讲解演示视频,大家可以结合食用
工程结构
根目录
|-->History
| |-->history.py
|-->Pages
| |-->sql.py
|-->Sets
| |-->config.json
|-->SQL
| |-->Ai_sql.py
|-->mian.py
|-->STC_B.py
依赖的包
<code>streamlit==1.36.0
pandas==2.2.2
SQLAlchemy==1.4.0
PyMySQL==1.1.1
mysql==0.0.3
multiprocess==0.70.16
mysql==0.0.3
mysql-connector-python==9.0.0
mysqlclient==2.2.4
pyserial==3.5
main.py
库说明
queue
、time
、pandas
、streamlit
、atexit
、json
、os
、logging
:常用的库和工具,用于数据处理、界面展示、文件操作和日志记录。Thread
、Timer
、ThreadPoolExecutor
:用于多线程处理。Sql_client
、Process_stcB
、History
:自定义模块,用于SQL数据库操作、线程处理和历史数据管理。figure
:Bokeh库用于数据可视化。
日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",code>
handlers=[
logging.FileHandler("翠滴清韵.log"), # 日志写入文件
logging.StreamHandler() # 日志输出到控制台
]
)
全局变量
send_work
、change_work
:用于任务调度的队列。executor
、executor_stc
:线程池,用于异步任务处理。
初始化前端界面
st.session_state 用于保存不被刷新掉的数据
if "messages" not in st.session_state:
st.session_state.messages = [
{
'role': 'assistant',
'content': '您好,欢迎使用翠滴清韵👋'
}
]
if 'state' not in st.session_state:
st.session_state['state'] = 0
if 'change_thread' not in st.session_state:
st.session_state['change_thread'] = 0
if 'change_data' not in st.session_state:
st.session_state['change_data'] = None
函数定义
change_thread(data, pool)
将数据放入线程池并打印日志。
init_session_state()
初始化会话状态中的各种变量,例如植物数据、错误信息和历史记录。
time_put_work(queue, interval, message)
每隔一定时间将消息放入队列中,用于定时任务调度。
change_put_work(queue, interval, message, attempt, max_attempt)
带有重试机制的定时任务调度函数。
cur_path()
获取当前脚本的绝对路径。
read_json(abs_path, path)
从指定路径读取JSON文件并返回其内容。
write_json(abs_path, path, data)
将数据写入指定路径的JSON文件。
Sql_connect(abs_path)
创建一个SQL客户端连接,并缓存结果。
chat_actions()
处理用户输入的聊天消息,并将其添加到会话历史记录中。
refresh_plant(data)
更新会话状态中的植物数据。
add_plant_chat()
在会话历史记录中添加植物信息的聊天消息。
page(select_work_list)
主要页面布局和功能实现:
显示聊天历史。
提供输入框让用户查询植物。
提供按钮进行图表绘制或退出应用。
根据用户输入从数据库中查询植物信息,并更新界面。
cleanup()
在退出应用时清理资源,并加入退出任务到队列。
plot_state()
设置状态以触发图表绘制。
init_history()
初始化历史记录。
plot()
绘制图表,展示植物数据的变化情况:
读取数据库中的最新数据。
使用Bokeh库绘制温度、光照和土壤湿度的变化图。
history_data()
初始化会话历史记录,包括欢迎消息。
主程序逻辑
设置页面标题。初始化全局变量和会话状态。启动后台线程处理任务。定时任务调度用于更新数据。根据会话状态决定是否绘制图表。
完整代码
import queue
import time
import pandas as pd
import streamlit as st
import atexit
import json
import os
import logging
from SQL.Ai_sql import Sql_client
from threading import Timer, Thread
from STC_B import Process_stcB
import streamlit_authenticator as st_auth
from History.history import History
from bokeh.plotting import figure
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(
level=logging.INFO, # Set the logging level
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",code>
handlers=[
logging.FileHandler("翠滴清韵.log"), # Log to a file
logging.StreamHandler() # Also log to console
]
)
send_work = Queue()
change_work = Queue()
executor = ThreadPoolExecutor(max_workers=2)
executor_stc = ThreadPoolExecutor(max_workers=2)
if "messages" not in st.session_state:
st.session_state.messages = [
{
'role': 'assistant',
'content': '您好,欢迎使用翠滴清韵👋'
}
]
if 'state' not in st.session_state:
st.session_state['state'] = 0
if 'change_thread' not in st.session_state:
st.session_state['change_thread'] = 0
if 'change_data' not in st.session_state:
st.session_state['change_data'] = None
def change_thread(data, pool):
pool.put(data)
print(f"---------------------------------------------------put data {data}")
def init_session_state():
if 'sets' not in st.session_state:
st.session_state['sets'] = {
'Plant_name': '',
'Plant_light': '',
'Plant_humidity': '',
'Plant_max_temp': '',
'Plant_min_temp': ''
}
if 'error' not in st.session_state:
st.session_state['error'] = ''
if 'history' not in st.session_state:
st.session_state['history'] = history
if 'df' not in st.session_state:
st.session_state['df'] = None
def time_put_work(queue, interval, message):
Timer(interval, time_put_work, args=(queue, interval, message)).start()
queue.put(message)
def change_put_work(queue, interval, message, attempt, max_attempt):
if attempt <= max_attempt:
Timer(interval, change_put_work, args=(queue, interval, message, attempt + 1, max_attempt)).start()
queue.put(message)
print(f"第{attempt}次")
else:
return
def cur_path():
return os.path.dirname(os.path.realpath(__file__))
def read_json(abs_path, path):
with open(os.path.join(abs_path, path), 'r', encoding='utf-8') as f:code>
data = json.load(f)
logging.info(f"Loaded JSON file from {os.path.join(abs_path, path)}")
return data
def write_json(abs_path, path, data):
with open(os.path.join(abs_path, path), 'w', encoding='utf-8') as f:code>
json.dump(data, f, ensure_ascii=False, indent=4)
logging.info(f"Written JSON data to {os.path.join(abs_path, path)}")
@st.cache_resource(ttl=10800)
def Sql_connect(abs_path):
sql_sets = read_json(abs_path, 'Sets/config.json')
sql_client = Sql_client(sql_sets['mysql'])
logging.info('Connected to SQL database')
return sql_client
def chat_actions():
st.session_state['his']['chat_data'].append(
{
'role': "user",
'content': st.session_state['chat_input']
}
)
def refresh_plant(data):
st.session_state.sets = {
'Plant_name': data.iloc[0, 0],
'Plant_light': data.iloc[0, 4],
'Plant_humidity': data.iloc[0, 3],
'Plant_max_temp': data.iloc[0, 1],
'Plant_min_temp': data.iloc[0, 2]
}
logging.info(f"Refreshed plant data for: {data.iloc[0, 0]}")
def add_plant_chat():
st.session_state['his']['chat_data'].append(
{
'role': "assistant",
'content': F"已为您找到该植物信息\n植物名称:{st.session_state.sets['Plant_name']}\n 适宜光照:{st.session_state.sets['Plant_light']}\n适宜湿度:{st.session_state.sets['Plant_humidity']}\n适宜最大温度:{st.session_state.sets['Plant_max_temp']}\n适宜最小温度:{st.session_state.sets['Plant_min_temp']}\n请确认是否修改"
}
)
def page(select_work_list):
init_session_state()
with st.sidebar:
Tables = st.container(height=500)
with Tables:
for msg in st.session_state['his']['chat_data']:
with st.chat_message(name=msg['role']):
Tables.write(msg['content'])
query = st.chat_input('请输入您的植物名称', on_submit=chat_actions, key='chat_input')code>
bt1, bt2 = st.columns(2)
with bt1:
st.button('绘图', on_click=plot_state, use_container_width=True)
with bt2:
st.button('退出', on_click=cleanup, use_container_width=True)
if query:
user_input = query
if user_input == '确认修改' or user_input == '确认' or user_input == '666':
send_data = {
'info': 'change_data',
'limit': [
st.session_state.sets['Plant_max_temp'],
st.session_state.sets['Plant_light'],
st.session_state.sets['Plant_humidity'],
]
}
st.session_state['change_data'] = send_data
st.session_state['plot'] = 1
st.rerun()
else:
data = sql_client.pd_Readsql(f"SELECT * FROM plant_data WHERE name IN ('{user_input}');")
if data.empty:
st.session_state.error = '未在数据库中找到该植物,请重新输入,或者手动添加该植物'
st.session_state['state'] = 0
logging.warning(f"Plant not found: {user_input}")
else:
refresh_plant(data)
add_plant_chat()
st.rerun()
if st.session_state.error != '':
st.error(st.session_state.error)
logging.error(f"Error: {st.session_state.error}")
st.session_state.error = ''
def cleanup():
logging.info("Exiting main process")
send_work.put({'info': 'exit', 'data': None})
stc_thread.join()
def plot_state():
st.session_state['state'] = 1
@st.cache_data(ttl=10800)
def init_history():
key = History()
return key
def plot():
main_status = st.status('绘图中...', expanded=True)
stat_time = time.time()
main_status.write("第一步,读取数据")
main_status.caption("读取出最新的十条数据")
sql_Data = sql_client.pd_Readsql("SELECT * FROM data ORDER BY 时间 DESC LIMIT 10;")
main_status.dataframe(sql_Data)
main_status.write("第二步,绘制图表")
main_status.caption("绘制温度、光照、土壤湿度的变化图")
sql_Data['时间'] = pd.to_datetime(sql_Data['时间'])
temperature_data = sql_Data['温度']
x = sql_Data['时间']
print(f"我的x {x}")
print(f"时间{x}")
p = figure(title="1分钟内温度数据", x_axis_label="时间", y_axis_label="摄氏度", plot_width=600, plot_height=400,code>
x_axis_type='datetime', y_range=(30, 40))code>
p.line(x, temperature_data, legend_label="温度", line_width=2, color="yellow", line_alpha=0.8, line_dash=[8, 4])code>
p.circle(x, temperature_data, size=4, color='red', alpha=0.8)code>
for i, temp in enumerate(temperature_data):
p.text(x=x.iloc[i], y=temperature_data.iloc[i], text=[str(temp)], text_baseline="bottom", text_align="center",code>
angle=0)
main_status.bokeh_chart(p)
light_p = figure(title="1分钟内光照数据", x_axis_label="时间", y_axis_label="光照", plot_width=600, plot_height=400,code>
x_axis_type='datetime', y_range=(0, 100))code>
light_data = sql_Data['光照']
light_p.line(x, light_data, legend_label="光照", line_width=2, color="green", line_alpha=0.8, line_dash=[8, 4])code>
light_p.circle(x, light_data, size=4, color='blue', alpha=0.8)code>
for i, light in enumerate(light_data):
light_p.text(x=x.iloc[i], y=light_data.iloc[i], text=[str(light)], text_baseline="bottom", text_align="center",code>
angle=0)
main_status.bokeh_chart(light_p)
humidity_p = figure(title="1分钟内土壤湿度数据", x_axis_label="时间", y_axis_label="土壤湿度", plot_width=600,code>
plot_height=400, x_axis_type='datetime', y_range=(0, 5))code>
humidity_data = sql_Data['土壤湿度']
humidity_p.line(x, humidity_data, legend_label="土壤湿度", line_width=2, color="red", line_alpha=0.8,code>
line_dash=[8, 4])
humidity_p.circle(x, humidity_data, size=4, color='green', alpha=0.8)code>
for i, humidity in enumerate(humidity_data):
humidity_p.text(x=x.iloc[i], y=humidity_data.iloc[i], text=[str(humidity)], text_baseline="bottom",code>
text_align="center", angle=0)code>
main_status.bokeh_chart(humidity_p)
all_time = time.time() - stat_time
main_status.update(label=f"分析完成! 总耗时:{all_time}s", state="complete", expanded=True)code>
@st.cache_data(ttl=10800)
def history_data():
his = {
'chat_data': [
{
'role': 'assistant',
'content': '您好,欢迎使用翠滴清韵👋'
}
],
'df_data': [
]
}
return his
if __name__ == '__main__':
global abs_path, sql_client
st.set_page_config(
page_title='翠滴清韵'code>
)
session_state = st.session_state
abs_path = cur_path()
logging.info("Starting Streamlit application")
sql_client = Sql_connect(abs_path)
His = history_data()
if 'his' not in session_state:
st.session_state['his'] = His
history = init_history()
if 'thread' not in session_state:
st.session_state['thread'] = 0
if 'plot' not in session_state:
st.session_state['plot'] = 0
if session_state['thread'] == 0:
st.header('欢迎使用:green[翠滴清韵] :sunglasses:')
st.subheader('您的智能植物管理助手')
temp = [{'role': 'assistant', 'content': '您好,欢迎使用翠滴清韵👋'}]
stc_thread = Thread(target=Process_stcB, args=(send_work, change_work,))
stc_thread.daemon = True
stc_thread.start()
time.sleep(1)
st.session_state['thread'] = 1
logging.info("Started STC_B process")
if session_state['plot'] == 1:
executor.submit(change_thread, st.session_state['change_data'], change_work)
st.session_state['plot'] = 0
# if st.session_state['change_thread'] == 0:
# executor.submit(change_thread, {1,2})
# st.session_state['change_thread'] = 1
time_put_work(
send_work, 6,
{
'role': 'streamlit',
'info': 'select_data'
}
)
atexit.register(cleanup)
page(send_work)
if st.session_state['state'] != 0:
plot()
STC_B.py
该系统用于通过串口通信与 STC_B 设备交互,获取和处理传感器数据,并将数据存储到 MySQL 数据库中。系统主要包括以下功能模块:
日志记录:记录系统运行情况和错误信息。串口通信:与 STC_B 设备进行数据交换。数据处理:将原始数据转换为有用的格式。数据库操作:将处理后的数据存储到 MySQL 数据库中。配置管理:从配置文件中读取数据库连接信息和其他设置。
数据库写入函数
这个函数将数据插入到MySQL数据库中。sets
包含数据库连接信息,data
是待插入的数据。使用了异常处理来捕捉和记录数据库操作中的错误。
def write2sql(sets, data):
try:
conn = mysql.connector.connect(
host=sets['host'],
user=sets['user'],
password=sets['password'],
database=sets['database']
)
cursor = conn.cursor()
sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s ,%s);"
cursor.execute(sql, data)
conn.commit()
print("Data inserted successfully!")
except mysql.connector.Error as e:
conn.rollback()
print("Error inserting data into MySQL database:", e)
finally:
conn.close()
STC_B类
__init__
:初始化串口通信。如果找到可用的串口,则尝试连接;否则,设置为未连接状态。b2s
:将字节数据转换为十六进制字符串。verify_connect
:发送验证数据以确认设备连接是否正常。数据头为0xAA 55send_Data
:发送数据并返回设备的响应。manage_data
:处理从设备返回的数据,将其转换为实际的测量值。
class stc_B:
def __init__(self):
def b2s(self, data):
def verify_connect(self):
def send_Data(self, data):
def manage_data(self, data):
数据处理函数
def int2hex(data):
def wash_data(data):
int2hex
:将数据转换为十六进制的字节流。wash_data
:处理从设备获取的数据,将其转换为十进制。
注意事项
确保 MySQL 数据库配置正确。确保 STC_B 设备正常工作并连接到计算机。配置文件 Sets/conf吧vig.json
必须存在并包含有效的数据库连接信息。
完整代码
import serial
import time
import serial.tools.list_ports
import logging
from datetime import datetime
import os
import json
import mysql.connector
logging.basicConfig(
level=logging.INFO, # Set the logging level
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",code>
handlers=[
logging.FileHandler("STC_B.log"), # Log to a file
logging.StreamHandler() # Also log to console
]
)
def write2sql(sets, data):
try:
conn = mysql.connector.connect(
host=sets['host'],
user=sets['user'],
password=sets['password'],
database=sets['database']
)
cursor = conn.cursor()
sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s ,%s);"
cursor.execute(sql, data)
conn.commit()
print("Data inserted successfully!")
except mysql.connector.Error as e:
conn.rollback()
print("Error inserting data into MySQL database:", e)
finally:
conn.close()
class stc_B:
def __init__(self):
ports = serial.tools.list_ports.comports()
if ports:
port = ports[0]
self.ser = serial.Serial(port.device, 9600)
else:
self.ser = '未连接到下位机'
def b2s(self, data):
return ''.join([f'{ch:0>2x}' for ch in data]).upper()
def verify_connect(self):
# 修改发送的数据,开头两个字节是 0xaa 和 0x55
data = bytes([0xaa, 0x55, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
while self.ser.in_waiting:
time.sleep(0.1)
self.ser.write(data)
time.sleep(0.1) # 等待接收
re_data = self.ser.read_all()
sdata = self.b2s(re_data)
print(f"re_data: {re_data}, sdata: {sdata}")
return sdata
def send_Data(self, data):
while self.ser.in_waiting:
time.sleep(0.1)
self.ser.write(data)
time.sleep(0.1)
re_data = self.ser.read_all()
sdata = self.b2s(re_data)
print(f"re_data: {re_data}, sdata: {sdata}")
return sdata
def manage_data(self, data):
# 用于处理返回数据
if data[:4] != 'AA55':
return "数据返回错误"
if data[4:6] == '01' or data[4:6] == '02':
# 将下标从6-21的字符串两个一组切分开
pairs = [data[i:i + 2] for i in range(6, 22, 2)]
# 用eval还原原本数据类型
restored_data = [eval(f"0x{pair}") for pair in pairs]
result = [
restored_data[0] * 10 + restored_data[1] + restored_data[2] * 0.1,
restored_data[3] * 100 + restored_data[4] * 10 + restored_data[5],
restored_data[6] + restored_data[7] * 0.1,
]
return restored_data
else:
return "数据返回错误"
def int2hex(data):
if data[1] < 100:
data[1] = '0' + str(data[1])
s = ''.join(str(i) for i in data).replace('.', '')
byte_list = [bytes([0xaa, 0x55, 0x02])]
for i in s:
byte_list.append(bytes([int(i)]))
return b''.join(byte_list)
def wash_data(data):
res = []
res.append(data[0] * 10 + data[1] + data[2] * 0.1)
res.append(data[3] * 100 + data[4] * 10 + data[5])
res.append(data[6] + data[7] * 0.1)
return res
def cur_path():
return os.path.dirname(os.path.realpath(__file__))
def read_json(abs_path, path):
with open(os.path.join(abs_path, path), 'r', encoding='utf-8') as f:code>
data = json.load(f)
logging.info(f"Loaded JSON file from {os.path.join(abs_path, path)}")
return data
def Process_stcB(send_Work, change_work):
port = stc_B()
abs_path = cur_path()
sql_sets = read_json(abs_path, 'Sets/config.json')
all_data = {
'温度': [],
'土壤湿度': [],
'光照': [],
'时间': []
}
if port.ser != '未连接到下位机':
respond = port.verify_connect()
if respond == 'AA55000102030405060708':
logging.info("Connected to STC_B")
else:
logging.error("Failed to connect to STC_B")
return
else:
logging.error("Failed to connect to STC_B")
return
while True:
while send_Work.empty() and change_work.empty():
time.sleep(0.1)
print("send_Work.empty()", send_Work.empty())
print("back_work.empty()", change_work.empty())
if not change_work.empty():
print('-'*50)
change2data = change_work.get()
print("chage2data", change2data)
limit = change2data['limit']
data = int2hex(limit)
while port.ser.in_waiting:
time.sleep(0.1)
respond = port.send_Data(data)
eval_respond = port.manage_data(respond)
print("修改后的阈值: ", eval_respond)
data2process = send_Work.get()
print("data2process", data2process)
if data2process['info'] == 'exit':
while port.ser.in_waiting:
time.sleep(0.1)
logging.info("Exiting STC_B process")
break
elif data2process['info'] == 'select_data':
print("!!!!!!!!!!!!!!!!",data2process['role'])
data = bytes([0xaa, 0x55, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01])
respond = port.send_Data(data)
eval_respond = port.manage_data(respond)
print("eval_respond",eval_respond)
re_data = wash_data(eval_respond)
print(time.time(), re_data)
re_data.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
write2sql(sql_sets['mysql'], re_data)
sql .py
导入库
streamlit
:用于构建交互式 web 应用。json
:用于处理 JSON 文件的读取和写入。os
:用于操作系统相关功能,如路径操作。Sql_client
:从 SQL.Ai_sql
模块导入的一个类,用于与数据库交互。pandas
:用于数据操作和分析。datetime
:用于处理日期和时间。
函数定义
main
:构建 Streamlit 应用的主要逻辑,包括两个标签页(tab1
和 tab2
)
def main(sql_client, abs_path):
tab1, tab2 = st.tabs(['SQL代码', '表单操作'])
with (tab1):
st.title('数据库操作')
st.caption('您可以在此对数据库进行直接操作')
st.write("以下是最近2min中内的数据")
database_name = Sql_client.inspector.get_schema_names()[4]
table_names = Sql_client.inspector.get_table_names()
print(database_name)
print(f"table_names: {table_names}")
col1, col2 = st.columns(2)
with col1:
st.selectbox(
label='Database',code>
options=[database_name]
)
with col2:
cur_table = st.selectbox(
label='Tables',code>
options=table_names
)
cur_df = Sql_client.execute_Sql(f"SELECT * from {cur_table} ORDER BY 时间 DESC LIMIT 20;")
print(cur_df)
st.dataframe(
data=cur_df,
height=400,
width=800
)
st.write("请选择时间查看周围数据")
result = Sql_client.execute_Sql(f"SELECT 时间 from data")
time_data = result.fetchall()
time_df = pd.DataFrame(time_data, columns=['时间'])
time_options = pd.to_datetime(time_df['时间']).dt.strftime('%Y-%m-%d %H:%M:%S').tolist()
cur_time_str = st.selectbox(
label='时间',code>
options=time_options
)
before_time = Sql_client.execute_Sql(f"""
SELECT *
FROM data
WHERE 时间 < '{cur_time_str}'
ORDER BY 时间 DESC
LIMIT 5;
""")
after_time = Sql_client.execute_Sql(f"""
SELECT *
FROM data
WHERE 时间 > '{cur_time_str}'
ORDER BY 时间 ASC
LIMIT 5;
""")
before_time_df = pd.DataFrame(before_time.fetchall(), columns=before_time.keys())
after_time_df = pd.DataFrame(after_time.fetchall(), columns=after_time.keys())
combined_df = pd.concat([before_time_df, after_time_df]).reset_index(drop=True)
st.dataframe(combined_df, width=800)
with tab2:
st.write("设置阈值")
col1, col2 = st.columns(2)
with col1:
st.write("这是一个SQL代码生成页面")
st.text_area("请输入要保存的SQL代码", height=450, key="Sql_code")code>
st.write("您输入的SQL代码为:")
st.write(st.session_state.Sql_code)
说明文档
1. 功能概述
该应用允许用户:
查看数据库中的表格数据。根据时间选择查看周围的数据。生成和保存 SQL 代码。
2. 依赖库
streamlit
:用于创建 web 界面。json
:用于读取和写入 JSON 文件。os
:用于路径操作。pandas
:用于数据处理。datetime
:用于时间处理。Sql_client
:自定义的数据库操作类。
3. 函数说明
cur_path
:获取当前工作目录。read_json
:从指定路径读取 JSON 文件并返回数据。write_json
:将数据写入 JSON 文件。Sql_connect
:连接数据库并返回 Sql_client
实例。main
:构建应用的主要界面和功能,包括数据库操作和 SQL 代码生成。
4. 使用说明
标签页1(SQL代码):展示数据库中表格的最新数据,允许用户选择时间查看前后数据。标签页2(表单操作):提供一个文本区域用于输入和保存 SQL 代码。
5. 注意事项
确保 config.json
配置文件存在于指定路径中,并包含正确的数据库配置信息。数据库表结构需要与 SQL 查询语句匹配。
完整代码
import streamlit as st
import json
import os
from SQL.Ai_sql import Sql_client
import pandas as pd
from datetime import datetime
def cur_path():
current_dir = os.path.abspath(os.curdir)
return current_dir
def read_json(abs_path, path):
with open(os.path.join(abs_path, path), 'r', encoding='utf-8') as f:code>
data = json.load(f)
return data
def write_json(abs_path, path, data):
with open(os.path.join(abs_path, path), 'w', encoding='utf-8') as f:code>
json.dump(data, f, ensure_ascii=False, indent=4)
@st.cache_resource(ttl=10800)
def Sql_connect(abs_path):
sql_sets = read_json(abs_path, 'Sets/config.json')
sql_client = Sql_client(sql_sets['mysql'])
print('connect to sql')
return sql_client
def main(sql_client, abs_path):
tab1, tab2 = st.tabs(['SQL代码', '表单操作'])
with (tab1):
st.title('数据库操作')
st.caption('您可以在此对数据库进行直接操作')
st.write("以下是最近2min中内的数据")
database_name = Sql_client.inspector.get_schema_names()[4]
table_names = Sql_client.inspector.get_table_names()
print(database_name)
print(f"table_names: {table_names}")
col1, col2 = st.columns(2)
with col1:
st.selectbox(
label='Database',code>
options=[database_name]
)
with col2:
cur_table = st.selectbox(
label='Tables',code>
options=table_names
)
cur_df = Sql_client.execute_Sql(f"SELECT * from {cur_table} ORDER BY 时间 DESC LIMIT 20;")
print(cur_df)
st.dataframe(
data=cur_df,
height=400,
width=800
)
st.write("请选择时间查看周围数据")
result = Sql_client.execute_Sql(f"SELECT 时间 from data")
time_data = result.fetchall()
time_df = pd.DataFrame(time_data, columns=['时间'])
time_options = pd.to_datetime(time_df['时间']).dt.strftime('%Y-%m-%d %H:%M:%S').tolist()
cur_time_str = st.selectbox(
label='时间',code>
options=time_options
)
# around_time = Sql_client.execute_Sql(
# f"SELECT * from data WHERE 时间 = '{cur_time_str}' ORDER BY 时间 DESC LIMIT 10;")
#
# around_time_df = pd.DataFrame(around_time.fetchall(), columns=around_time.keys())
# st.dataframe(around_time_df)
before_time = Sql_client.execute_Sql(f"""
SELECT *
FROM data
WHERE 时间 < '{cur_time_str}'
ORDER BY 时间 DESC
LIMIT 5;
""")
# 查询用户选择的时间之后的五条数据
after_time = Sql_client.execute_Sql(f"""
SELECT *
FROM data
WHERE 时间 > '{cur_time_str}'
ORDER BY 时间 ASC
LIMIT 5;
""")
before_time_df = pd.DataFrame(before_time.fetchall(), columns=before_time.keys())
after_time_df = pd.DataFrame(after_time.fetchall(), columns=after_time.keys())
# 合并前五条和后五条数据
combined_df = pd.concat([before_time_df, after_time_df]).reset_index(drop=True)
st.dataframe(combined_df, width=800)
with tab2:
st.write("设置阈值")
col1, col2 = st.Acolumns(2)
with col1:
st.write("这是一个SQL代码生成页面")
st.text_area("请输入要保存的SQL代码", height=450, key="Sql_code")code>
st.write("您输入的SQL代码为:")
st.write(st.session_state.Sql_code)
if __name__ == "__main__":
abs_path = cur_path()
print(abs_path)
Sql_client = Sql_connect(abs_path)
main(Sql_client, abs_path)
config.json
MySQL数据库配置项
{
"mysql":{
"host": "localhost",
"user": "root",
"password": "***",
"database": "plants",
"port": 3306
}
}
Ai_sql.py
导入库
mysql
:用于 MySQL 数据库操作(这里没有直接使用,可能用于兼容性或其他目的)。create_engine
:用于创建数据库引擎的 SQLAlchemy 函数。text
:用于构建 SQL 语句。inspect
:用于数据库结构的检查(从 sqlalchemy
中导入)。MetaData
:用于描述数据库中的表结构。pandas
:用于处理和分析数据。SQLAlchemyError
:用于捕获 SQLAlchemy 异常。Table
、insert
:用于构建和执行 SQL 插入操作。
Sql_client 类
初始化
class Sql_client():
def __init__(self, db_sets):
self.engine = create_engine(
f'mysql+pymysql://{db_sets["user"]}:{db_sets["password"]}@{db_sets["host"]}:{db_sets["port"]}/{db_sets["database"]}',
echo=False, encoding='utf-8'code>
)
self.inspector = Inspector.from_engine(self.engine)
self.metadata = MetaData(bind=self.engine)
db_sets
:包含数据库连接信息的字典。self.engine
:创建一个 SQLAlchemy 引擎,用于连接 MySQL 数据库。self.inspector
:用于检查数据库结构的 Inspector
实例。self.metadata
:用于存储表结构信息的 MetaData
实例。
执行sql语句函数
def execute_Sql(self, sql_code):
conn = self.engine.connect()
operation = text(sql_code)
results = conn.execute(operation)
conn.close()
return results
执行任意的 SQL 语句,并返回执行结果。
读取 SQL 数据 pd_Readsql
def pd_Readsql(self, sql_code):
conn = self.engine.connect()
df = pd.read_sql(sql=sql_code, con=conn.connection)
conn.close()
return df
执行 SQL 查询并将结果读取到 Pandas DataFrame 中,方便进一步的数据处理。
获取表名 get_table_names
def get_table_names(self):
table_names = self.inspector.get_table_names()
return [table for table in table_names]
获取数据库中所有表的名称。
获取表的 DDL get_table_ddls
def get_table_ddls(self, table_name):
with self.engine.connect() as conn:
ddl_query = f"SHOW CREATE TABLE `{table_name}`"
result = conn.execute(ddl_query)
table_ddl = result.fetchone()[1]
return table_ddl
获取指定表的 DDL(数据定义语言)语句,用于描述表的结构。
读取所有表的 DDL read_tables_DDL
def get_table_ddls(self, table_name):
with self.engine.connect() as conn:
ddl_query = f"SHOW CREATE TABLE `{table_name}`"
result = conn.execute(ddl_query)
table_ddl = result.fetchone()[1]
return table_ddl
获取数据库中所有表的 DDL 语句,并将其存储在字典中。
插入数据 insert_data
def insert_data(self, table_name, data_dict):
try:
table = Table(table_name, self.metadata, autoload_with=self.engine)
stmt = insert(table).values(data_dict)
with self.engine.connect() as conn:
with conn.begin(): # 使用事务上下文管理
conn.execute(stmt)
return f"数据成功插入到表 '{table_name}' 中"
except SQLAlchemyError as e:
return f"数据插入失败: {e}"
将数据插入到指定的表中。使用事务管理以确保数据一致性。捕获并返回可能发生的 SQLAlchemy 错误。
1. 功能概述
Sql_client
类提供了与 MySQL 数据库交互的功能,包括执行 SQL 查询、读取数据、获取表结构信息以及插入数据。它使用 SQLAlchemy 作为数据库操作的工具。
2. 依赖库
mysql
:用于 MySQL 数据库操作。sqlalchemy
:用于数据库引擎创建、SQL 执行、表结构检查等。pandas
:用于数据处理。SQLAlchemyError
:用于异常处理。
3. 类和方法说明
Sql_client 类
__init__(self, db_sets)
:初始化数据库连接。db_sets
包含数据库连接所需的配置信息(用户、密码、主机、端口、数据库名)。
execute_Sql(self, sql_code)
:执行给定的 SQL 语句,并返回结果。适用于执行任意 SQL 操作。
pd_Readsql(self, sql_code)
:执行 SQL 查询并将结果读取到 Pandas DataFrame 中,便于数据分析。
get_table_names(self)
:获取数据库中所有表的名称列表。
get_table_ddls(self, table_name)
:获取指定表的 DDL(数据定义语言)语句,描述表的结构。
read_tables_DDL(self)
:获取所有表的 DDL 语句,并将结果以字典形式返回。字典的键为表名,值为对应的 DDL 语句。
insert_data(self, table_name, data_dict)
:将数据插入到指定的表中。data_dict
为数据字典,键为列名,值为对应的值。使用事务确保数据一致性。
4. 使用说明
初始化:创建 Sql_client
实例时传入数据库配置信息。执行 SQL:使用 execute_Sql
方法执行 SQL 语句。读取数据:使用 pd_Readsql
方法执行 SQL 查询并获取 DataFrame。获取表信息:使用 get_table_names
和 get_table_ddls
方法获取表名称和表结构。插入数据:使用 insert_data
方法将数据插入到指定表中。
5. 注意事项
确保 db_sets
中的配置信息正确无误,以便成功连接到数据库。使用 execute_Sql
和 pd_Readsql
时,要确保 SQL 语句的正确性和安全性,防止 SQL 注入。insert_data
方法在插入数据时会自动开启事务,如果发生错误,会自动回滚。
完整代码
import mysql
from sqlalchemy import create_engine, text,inspect, MetaData
from sqlalchemy.engine.reflection import Inspector
import pandas as pd
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import Table, MetaData, insert
class Sql_client():
def __init__(self, db_sets):
self.engine = create_engine(
f'mysql+pymysql://{db_sets["user"]}:{db_sets["password"]}@{db_sets["host"]}:{db_sets["port"]}/{db_sets["database"]}',
echo=False, encoding='utf-8'code>
)
self.inspector = Inspector.from_engine(self.engine)
self.metadata = MetaData(bind=self.engine)
def execute_Sql(self, sql_code): # 执行sql语句
conn = self.engine.connect()
operation = text(sql_code)
results = conn.execute(operation)
conn.close()
return results
def pd_Readsql(self,sql_code): # 读取sql数据
conn = self.engine.connect()
df = pd.read_sql(sql=sql_code,con=conn.connection)
conn.close()
return df
def get_table_names(self):
table_names = self.inspector.get_table_names()
return [table for table in table_names]
def get_table_ddls(self,table_name):
with self.engine.connect() as conn:
ddl_query = f"SHOW CREATE TABLE `{table_name}`"
result = conn.execute(ddl_query)
table_ddl = result.fetchone()[1]
return table_ddl
def read_tables_DDL(self): #读取所有表的DDL
ddl_dict = {}
table_names = self.get_table_names()
print(f"DDL_table_names\n:{table_names}")
for table_name in table_names:
ddl = self.get_table_ddls(table_name)
ddl_dict[table_name] = ddl
return ddl_dict
def insert_data(self, table_name, data_dict):
try:
table = Table(table_name, self.metadata, autoload_with=self.engine)
stmt = insert(table).values(data_dict)
with self.engine.connect() as conn:
with conn.begin(): # 使用事务上下文管理
conn.execute(stmt)
return f"数据成功插入到表 '{table_name}' 中"
except SQLAlchemyError as e:
return f"数据插入失败: {e}"
history.py
1. 功能概述
这段代码包含了一个用于处理历史数据和聊天记录的类 History
,以及一个异步函数 write_sql
,用于将数据写入 MySQL 数据库。主要功能包括:
将数据异步写入数据库。记录和管理聊天历史。处理和存储传感器数据及其时间戳。查询最后一条用户输入的聊天记录。
2. 依赖库
pymysql
:用于连接和操作 MySQL 数据库。pandas
:用于数据处理(尽管在此代码中未实际使用)。asyncio
:用于执行异步操作。
3. 函数和类说明
异步函数 write_sql
async def write_sql(D):
db = pymysql.connect(
host='localhost',code>
user='root',code>
password='Hby20040627!!',code>
database='plants',code>
charset='utf8'code>
)
cursor = db.cursor()
sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s, %s)"
cursor.execute(sql, D)
db.commit()
db.close()
功能:将数据异步写入 MySQL 数据库中的 data
表。参数:D
是一个包含四个元素的元组,分别对应表的四个字段(温度、光照、土壤湿度和时间)。操作:
连接到本地 MySQL 数据库。使用游标执行 INSERT
语句,将数据插入表中。提交事务并关闭数据库连接。
类 History
class History():
def __init__(self):
self.chat_history = [
{
'role': 'assistant',
'content': '您好,欢迎使用翠滴清韵👋'
}
]
self.data_history = []
self.data_time = []
功能:管理聊天记录和数据记录。属性:
chat_history
:聊天记录列表,初始包含一条欢迎消息。data_history
:存储传感器数据的列表。data_time
:存储数据时间戳的列表。
方法
add_data(self, data)
def add_data(self, data):
self.data_history.append(data)
if len(self.data_history) > 10 and len(self.data_time) > 10:
res = []
for i in range(len(self.data_history)):
res.append((self.data_history[i][0], self.data_history[i][1], self.data_history[i][2], self.data_time[i]))
asyncio.run(write_sql(res))
功能:添加数据到 data_history
列表中。逻辑:
当 data_history
和 data_time
列表中的条目超过 10 条时,将这些数据准备为一个列表 res
。使用 asyncio.run
执行异步函数 write_sql
将数据写入数据库。
add_time(self, time)
def add_time(self, time):
self.data_time.append(time)
功能:添加时间戳到 data_time
列表中。
add_chat(self, role, content)
def add_chat(self, role, content):
self.chat_history.append(
{
'role': role,
'content': content
}
)
功能:添加聊天记录。参数:
role
:角色(如用户或助手)。content
:聊天内容。
last_query(self)
def last_query(self):
for i in range(len(self.chat_history) - 1, -1, -1):
if self.chat_history[i]['role'] == 'user':
return self.chat_history[i]['content']
return self.chat_history[-1]['content']
功能:返回最后一条用户输入的聊天记录。如果没有用户输入的记录,则返回最后一条聊天记录。逻辑:从 chat_history
列表中逆序查找用户角色的记录,并返回其内容。
4. 使用说明
创建 History
实例:实例化 History
类以开始记录聊天和数据。
添加数据:
使用 add_data(data)
方法将传感器数据添加到历史记录中。如果数据超过 10 条,将异步写入数据库。使用 add_time(time)
方法添加时间戳。
添加聊天记录:
使用 add_chat(role, content)
方法记录聊天记录,role
可以是 'user'
或 'assistant'
,content
是聊天内容。
查询最后一条用户输入:
使用 last_query()
方法获取最近一条用户输入的聊天记录内容。
完整代码
import pymysql
import pandas
import asyncio
async def write_sql(D):
db = pymysql.connect(
host='localhost',code>
user='root',code>
password='Hby20040627!!',code>
database='plants',code>
charset='utf8'code>
)
cursor = db.cursor()
sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s, %s)"
cursor.execute(sql, D)
db.commit()
db.close()
class History():
def __init__(self):
self.chat_history = [
{
'role': 'assistant',
'content': '您好,欢迎使用翠滴清韵👋'
}
]
self.data_history = []
self.data_time = []
def add_data(self, data):
self.data_history.append(data)
if len(self.data_history) > 10 and len(self.data_time) > 10:
res = []
for i in range(len(self.data_history)):
res.append((self.data_history[i][0], self.data_history[i][1], self.data_history[i][2], self.data_time[i]))
asyncio.run(write_sql(res))
def add_time(self, time):
self.data_time.append(time)
def add_chat(self, role, content):
self.chat_history.append(
{
'role': role,
'content': content
}
)
def last_query(self):
for i in range(len(self.chat_history) - 1, -1, -1):
if self.chat_history[i]['role'] == 'user':
return self.chat_history[i]['content']
return self.chat_history[-1]['content']
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。