Web植物管理系统·上位机部分

CS小麻瓜 2024-10-17 11:03:01 阅读 76

本节主要展示上位机工程,上位机部分由于采用streamlit库进行web开发,由于streamlit不支持多线程,所以会存在一定bug。欢迎大家进行批评指正。

采用streamlit构建前端所以不能直接运行python文件,而应该在根目录下启动终端,运行指令

 streamlit run main.py 

这个是streamlit官方文档,大家可以参考修改代码 streamlit官方API文档

这个是讲解演示视频,大家可以结合食用 


工程结构

根目录

        |-->History

        |        |-->history.py

        |-->Pages

        |        |-->sql.py

        |-->Sets

        |        |-->config.json

        |-->SQL

        |        |-->Ai_sql.py

        |-->mian.py

        |-->STC_B.py

 依赖的包

<code>streamlit==1.36.0

pandas==2.2.2

SQLAlchemy==1.4.0

PyMySQL==1.1.1

mysql==0.0.3

multiprocess==0.70.16

mysql==0.0.3

mysql-connector-python==9.0.0

mysqlclient==2.2.4

pyserial==3.5


main.py

库说明

queuetimepandasstreamlitatexitjsonoslogging:常用的库和工具,用于数据处理、界面展示、文件操作和日志记录。ThreadTimerThreadPoolExecutor:用于多线程处理。Sql_clientProcess_stcBHistory:自定义模块,用于SQL数据库操作、线程处理和历史数据管理。figure:Bokeh库用于数据可视化。

日志配置

logging.basicConfig(

level=logging.INFO,

format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",code>

handlers=[

logging.FileHandler("翠滴清韵.log"), # 日志写入文件

logging.StreamHandler() # 日志输出到控制台

]

)

全局变量 

send_workchange_work:用于任务调度的队列。executorexecutor_stc:线程池,用于异步任务处理。

初始化前端界面

st.session_state 用于保存不被刷新掉的数据

if "messages" not in st.session_state:

st.session_state.messages = [

{

'role': 'assistant',

'content': '您好,欢迎使用翠滴清韵👋'

}

]

if 'state' not in st.session_state:

st.session_state['state'] = 0

if 'change_thread' not in st.session_state:

st.session_state['change_thread'] = 0

if 'change_data' not in st.session_state:

st.session_state['change_data'] = None

 函数定义

change_thread(data, pool)

将数据放入线程池并打印日志。

init_session_state()

初始化会话状态中的各种变量,例如植物数据、错误信息和历史记录。

time_put_work(queue, interval, message)

每隔一定时间将消息放入队列中,用于定时任务调度。

change_put_work(queue, interval, message, attempt, max_attempt)

带有重试机制的定时任务调度函数。

cur_path()

获取当前脚本的绝对路径。

read_json(abs_path, path)

从指定路径读取JSON文件并返回其内容。

write_json(abs_path, path, data)

将数据写入指定路径的JSON文件。

Sql_connect(abs_path)

创建一个SQL客户端连接,并缓存结果。

chat_actions()

处理用户输入的聊天消息,并将其添加到会话历史记录中。

refresh_plant(data)

更新会话状态中的植物数据。

add_plant_chat()

在会话历史记录中添加植物信息的聊天消息。

page(select_work_list)

主要页面布局和功能实现:

显示聊天历史。

提供输入框让用户查询植物。

提供按钮进行图表绘制或退出应用。

根据用户输入从数据库中查询植物信息,并更新界面。

cleanup()

在退出应用时清理资源,并加入退出任务到队列。

plot_state()

设置状态以触发图表绘制。

init_history()

初始化历史记录。

plot()

绘制图表,展示植物数据的变化情况:

读取数据库中的最新数据。

使用Bokeh库绘制温度、光照和土壤湿度的变化图。

history_data()

初始化会话历史记录,包括欢迎消息。

主程序逻辑

设置页面标题。初始化全局变量和会话状态。启动后台线程处理任务。定时任务调度用于更新数据。根据会话状态决定是否绘制图表。

完整代码

import queue

import time

import pandas as pd

import streamlit as st

import atexit

import json

import os

import logging

from SQL.Ai_sql import Sql_client

from threading import Timer, Thread

from STC_B import Process_stcB

import streamlit_authenticator as st_auth

from History.history import History

from bokeh.plotting import figure

from queue import Queue

from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(

level=logging.INFO, # Set the logging level

format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",code>

handlers=[

logging.FileHandler("翠滴清韵.log"), # Log to a file

logging.StreamHandler() # Also log to console

]

)

send_work = Queue()

change_work = Queue()

executor = ThreadPoolExecutor(max_workers=2)

executor_stc = ThreadPoolExecutor(max_workers=2)

if "messages" not in st.session_state:

st.session_state.messages = [

{

'role': 'assistant',

'content': '您好,欢迎使用翠滴清韵👋'

}

]

if 'state' not in st.session_state:

st.session_state['state'] = 0

if 'change_thread' not in st.session_state:

st.session_state['change_thread'] = 0

if 'change_data' not in st.session_state:

st.session_state['change_data'] = None

def change_thread(data, pool):

pool.put(data)

print(f"---------------------------------------------------put data {data}")

def init_session_state():

if 'sets' not in st.session_state:

st.session_state['sets'] = {

'Plant_name': '',

'Plant_light': '',

'Plant_humidity': '',

'Plant_max_temp': '',

'Plant_min_temp': ''

}

if 'error' not in st.session_state:

st.session_state['error'] = ''

if 'history' not in st.session_state:

st.session_state['history'] = history

if 'df' not in st.session_state:

st.session_state['df'] = None

def time_put_work(queue, interval, message):

Timer(interval, time_put_work, args=(queue, interval, message)).start()

queue.put(message)

def change_put_work(queue, interval, message, attempt, max_attempt):

if attempt <= max_attempt:

Timer(interval, change_put_work, args=(queue, interval, message, attempt + 1, max_attempt)).start()

queue.put(message)

print(f"第{attempt}次")

else:

return

def cur_path():

return os.path.dirname(os.path.realpath(__file__))

def read_json(abs_path, path):

with open(os.path.join(abs_path, path), 'r', encoding='utf-8') as f:code>

data = json.load(f)

logging.info(f"Loaded JSON file from {os.path.join(abs_path, path)}")

return data

def write_json(abs_path, path, data):

with open(os.path.join(abs_path, path), 'w', encoding='utf-8') as f:code>

json.dump(data, f, ensure_ascii=False, indent=4)

logging.info(f"Written JSON data to {os.path.join(abs_path, path)}")

@st.cache_resource(ttl=10800)

def Sql_connect(abs_path):

sql_sets = read_json(abs_path, 'Sets/config.json')

sql_client = Sql_client(sql_sets['mysql'])

logging.info('Connected to SQL database')

return sql_client

def chat_actions():

st.session_state['his']['chat_data'].append(

{

'role': "user",

'content': st.session_state['chat_input']

}

)

def refresh_plant(data):

st.session_state.sets = {

'Plant_name': data.iloc[0, 0],

'Plant_light': data.iloc[0, 4],

'Plant_humidity': data.iloc[0, 3],

'Plant_max_temp': data.iloc[0, 1],

'Plant_min_temp': data.iloc[0, 2]

}

logging.info(f"Refreshed plant data for: {data.iloc[0, 0]}")

def add_plant_chat():

st.session_state['his']['chat_data'].append(

{

'role': "assistant",

'content': F"已为您找到该植物信息\n植物名称:{st.session_state.sets['Plant_name']}\n 适宜光照:{st.session_state.sets['Plant_light']}\n适宜湿度:{st.session_state.sets['Plant_humidity']}\n适宜最大温度:{st.session_state.sets['Plant_max_temp']}\n适宜最小温度:{st.session_state.sets['Plant_min_temp']}\n请确认是否修改"

}

)

def page(select_work_list):

init_session_state()

with st.sidebar:

Tables = st.container(height=500)

with Tables:

for msg in st.session_state['his']['chat_data']:

with st.chat_message(name=msg['role']):

Tables.write(msg['content'])

query = st.chat_input('请输入您的植物名称', on_submit=chat_actions, key='chat_input')code>

bt1, bt2 = st.columns(2)

with bt1:

st.button('绘图', on_click=plot_state, use_container_width=True)

with bt2:

st.button('退出', on_click=cleanup, use_container_width=True)

if query:

user_input = query

if user_input == '确认修改' or user_input == '确认' or user_input == '666':

send_data = {

'info': 'change_data',

'limit': [

st.session_state.sets['Plant_max_temp'],

st.session_state.sets['Plant_light'],

st.session_state.sets['Plant_humidity'],

]

}

st.session_state['change_data'] = send_data

st.session_state['plot'] = 1

st.rerun()

else:

data = sql_client.pd_Readsql(f"SELECT * FROM plant_data WHERE name IN ('{user_input}');")

if data.empty:

st.session_state.error = '未在数据库中找到该植物,请重新输入,或者手动添加该植物'

st.session_state['state'] = 0

logging.warning(f"Plant not found: {user_input}")

else:

refresh_plant(data)

add_plant_chat()

st.rerun()

if st.session_state.error != '':

st.error(st.session_state.error)

logging.error(f"Error: {st.session_state.error}")

st.session_state.error = ''

def cleanup():

logging.info("Exiting main process")

send_work.put({'info': 'exit', 'data': None})

stc_thread.join()

def plot_state():

st.session_state['state'] = 1

@st.cache_data(ttl=10800)

def init_history():

key = History()

return key

def plot():

main_status = st.status('绘图中...', expanded=True)

stat_time = time.time()

main_status.write("第一步,读取数据")

main_status.caption("读取出最新的十条数据")

sql_Data = sql_client.pd_Readsql("SELECT * FROM data ORDER BY 时间 DESC LIMIT 10;")

main_status.dataframe(sql_Data)

main_status.write("第二步,绘制图表")

main_status.caption("绘制温度、光照、土壤湿度的变化图")

sql_Data['时间'] = pd.to_datetime(sql_Data['时间'])

temperature_data = sql_Data['温度']

x = sql_Data['时间']

print(f"我的x {x}")

print(f"时间{x}")

p = figure(title="1分钟内温度数据", x_axis_label="时间", y_axis_label="摄氏度", plot_width=600, plot_height=400,code>

x_axis_type='datetime', y_range=(30, 40))code>

p.line(x, temperature_data, legend_label="温度", line_width=2, color="yellow", line_alpha=0.8, line_dash=[8, 4])code>

p.circle(x, temperature_data, size=4, color='red', alpha=0.8)code>

for i, temp in enumerate(temperature_data):

p.text(x=x.iloc[i], y=temperature_data.iloc[i], text=[str(temp)], text_baseline="bottom", text_align="center",code>

angle=0)

main_status.bokeh_chart(p)

light_p = figure(title="1分钟内光照数据", x_axis_label="时间", y_axis_label="光照", plot_width=600, plot_height=400,code>

x_axis_type='datetime', y_range=(0, 100))code>

light_data = sql_Data['光照']

light_p.line(x, light_data, legend_label="光照", line_width=2, color="green", line_alpha=0.8, line_dash=[8, 4])code>

light_p.circle(x, light_data, size=4, color='blue', alpha=0.8)code>

for i, light in enumerate(light_data):

light_p.text(x=x.iloc[i], y=light_data.iloc[i], text=[str(light)], text_baseline="bottom", text_align="center",code>

angle=0)

main_status.bokeh_chart(light_p)

humidity_p = figure(title="1分钟内土壤湿度数据", x_axis_label="时间", y_axis_label="土壤湿度", plot_width=600,code>

plot_height=400, x_axis_type='datetime', y_range=(0, 5))code>

humidity_data = sql_Data['土壤湿度']

humidity_p.line(x, humidity_data, legend_label="土壤湿度", line_width=2, color="red", line_alpha=0.8,code>

line_dash=[8, 4])

humidity_p.circle(x, humidity_data, size=4, color='green', alpha=0.8)code>

for i, humidity in enumerate(humidity_data):

humidity_p.text(x=x.iloc[i], y=humidity_data.iloc[i], text=[str(humidity)], text_baseline="bottom",code>

text_align="center", angle=0)code>

main_status.bokeh_chart(humidity_p)

all_time = time.time() - stat_time

main_status.update(label=f"分析完成! 总耗时:{all_time}s", state="complete", expanded=True)code>

@st.cache_data(ttl=10800)

def history_data():

his = {

'chat_data': [

{

'role': 'assistant',

'content': '您好,欢迎使用翠滴清韵👋'

}

],

'df_data': [

]

}

return his

if __name__ == '__main__':

global abs_path, sql_client

st.set_page_config(

page_title='翠滴清韵'code>

)

session_state = st.session_state

abs_path = cur_path()

logging.info("Starting Streamlit application")

sql_client = Sql_connect(abs_path)

His = history_data()

if 'his' not in session_state:

st.session_state['his'] = His

history = init_history()

if 'thread' not in session_state:

st.session_state['thread'] = 0

if 'plot' not in session_state:

st.session_state['plot'] = 0

if session_state['thread'] == 0:

st.header('欢迎使用:green[翠滴清韵] :sunglasses:')

st.subheader('您的智能植物管理助手')

temp = [{'role': 'assistant', 'content': '您好,欢迎使用翠滴清韵👋'}]

stc_thread = Thread(target=Process_stcB, args=(send_work, change_work,))

stc_thread.daemon = True

stc_thread.start()

time.sleep(1)

st.session_state['thread'] = 1

logging.info("Started STC_B process")

if session_state['plot'] == 1:

executor.submit(change_thread, st.session_state['change_data'], change_work)

st.session_state['plot'] = 0

# if st.session_state['change_thread'] == 0:

# executor.submit(change_thread, {1,2})

# st.session_state['change_thread'] = 1

time_put_work(

send_work, 6,

{

'role': 'streamlit',

'info': 'select_data'

}

)

atexit.register(cleanup)

page(send_work)

if st.session_state['state'] != 0:

plot()


STC_B.py

该系统用于通过串口通信与 STC_B 设备交互,获取和处理传感器数据,并将数据存储到 MySQL 数据库中。系统主要包括以下功能模块:

日志记录:记录系统运行情况和错误信息。串口通信:与 STC_B 设备进行数据交换。数据处理:将原始数据转换为有用的格式。数据库操作:将处理后的数据存储到 MySQL 数据库中。配置管理:从配置文件中读取数据库连接信息和其他设置。

数据库写入函数

这个函数将数据插入到MySQL数据库中。sets包含数据库连接信息,data是待插入的数据。使用了异常处理来捕捉和记录数据库操作中的错误。

def write2sql(sets, data):

try:

conn = mysql.connector.connect(

host=sets['host'],

user=sets['user'],

password=sets['password'],

database=sets['database']

)

cursor = conn.cursor()

sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s ,%s);"

cursor.execute(sql, data)

conn.commit()

print("Data inserted successfully!")

except mysql.connector.Error as e:

conn.rollback()

print("Error inserting data into MySQL database:", e)

finally:

conn.close()

 STC_B类

__init__:初始化串口通信。如果找到可用的串口,则尝试连接;否则,设置为未连接状态。b2s:将字节数据转换为十六进制字符串。verify_connect:发送验证数据以确认设备连接是否正常。数据头为0xAA 55send_Data:发送数据并返回设备的响应。manage_data:处理从设备返回的数据,将其转换为实际的测量值。

class stc_B:

def __init__(self):

def b2s(self, data):

def verify_connect(self):

def send_Data(self, data):

def manage_data(self, data):

 数据处理函数

def int2hex(data):

def wash_data(data):

int2hex:将数据转换为十六进制的字节流。wash_data:处理从设备获取的数据,将其转换为十进制。

注意事项

确保 MySQL 数据库配置正确。确保 STC_B 设备正常工作并连接到计算机。配置文件 Sets/conf吧vig.json 必须存在并包含有效的数据库连接信息。

完整代码

import serial

import time

import serial.tools.list_ports

import logging

from datetime import datetime

import os

import json

import mysql.connector

logging.basicConfig(

level=logging.INFO, # Set the logging level

format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",code>

handlers=[

logging.FileHandler("STC_B.log"), # Log to a file

logging.StreamHandler() # Also log to console

]

)

def write2sql(sets, data):

try:

conn = mysql.connector.connect(

host=sets['host'],

user=sets['user'],

password=sets['password'],

database=sets['database']

)

cursor = conn.cursor()

sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s ,%s);"

cursor.execute(sql, data)

conn.commit()

print("Data inserted successfully!")

except mysql.connector.Error as e:

conn.rollback()

print("Error inserting data into MySQL database:", e)

finally:

conn.close()

class stc_B:

def __init__(self):

ports = serial.tools.list_ports.comports()

if ports:

port = ports[0]

self.ser = serial.Serial(port.device, 9600)

else:

self.ser = '未连接到下位机'

def b2s(self, data):

return ''.join([f'{ch:0>2x}' for ch in data]).upper()

def verify_connect(self):

# 修改发送的数据,开头两个字节是 0xaa 和 0x55

data = bytes([0xaa, 0x55, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])

while self.ser.in_waiting:

time.sleep(0.1)

self.ser.write(data)

time.sleep(0.1) # 等待接收

re_data = self.ser.read_all()

sdata = self.b2s(re_data)

print(f"re_data: {re_data}, sdata: {sdata}")

return sdata

def send_Data(self, data):

while self.ser.in_waiting:

time.sleep(0.1)

self.ser.write(data)

time.sleep(0.1)

re_data = self.ser.read_all()

sdata = self.b2s(re_data)

print(f"re_data: {re_data}, sdata: {sdata}")

return sdata

def manage_data(self, data):

# 用于处理返回数据

if data[:4] != 'AA55':

return "数据返回错误"

if data[4:6] == '01' or data[4:6] == '02':

# 将下标从6-21的字符串两个一组切分开

pairs = [data[i:i + 2] for i in range(6, 22, 2)]

# 用eval还原原本数据类型

restored_data = [eval(f"0x{pair}") for pair in pairs]

result = [

restored_data[0] * 10 + restored_data[1] + restored_data[2] * 0.1,

restored_data[3] * 100 + restored_data[4] * 10 + restored_data[5],

restored_data[6] + restored_data[7] * 0.1,

]

return restored_data

else:

return "数据返回错误"

def int2hex(data):

if data[1] < 100:

data[1] = '0' + str(data[1])

s = ''.join(str(i) for i in data).replace('.', '')

byte_list = [bytes([0xaa, 0x55, 0x02])]

for i in s:

byte_list.append(bytes([int(i)]))

return b''.join(byte_list)

def wash_data(data):

res = []

res.append(data[0] * 10 + data[1] + data[2] * 0.1)

res.append(data[3] * 100 + data[4] * 10 + data[5])

res.append(data[6] + data[7] * 0.1)

return res

def cur_path():

return os.path.dirname(os.path.realpath(__file__))

def read_json(abs_path, path):

with open(os.path.join(abs_path, path), 'r', encoding='utf-8') as f:code>

data = json.load(f)

logging.info(f"Loaded JSON file from {os.path.join(abs_path, path)}")

return data

def Process_stcB(send_Work, change_work):

port = stc_B()

abs_path = cur_path()

sql_sets = read_json(abs_path, 'Sets/config.json')

all_data = {

'温度': [],

'土壤湿度': [],

'光照': [],

'时间': []

}

if port.ser != '未连接到下位机':

respond = port.verify_connect()

if respond == 'AA55000102030405060708':

logging.info("Connected to STC_B")

else:

logging.error("Failed to connect to STC_B")

return

else:

logging.error("Failed to connect to STC_B")

return

while True:

while send_Work.empty() and change_work.empty():

time.sleep(0.1)

print("send_Work.empty()", send_Work.empty())

print("back_work.empty()", change_work.empty())

if not change_work.empty():

print('-'*50)

change2data = change_work.get()

print("chage2data", change2data)

limit = change2data['limit']

data = int2hex(limit)

while port.ser.in_waiting:

time.sleep(0.1)

respond = port.send_Data(data)

eval_respond = port.manage_data(respond)

print("修改后的阈值: ", eval_respond)

data2process = send_Work.get()

print("data2process", data2process)

if data2process['info'] == 'exit':

while port.ser.in_waiting:

time.sleep(0.1)

logging.info("Exiting STC_B process")

break

elif data2process['info'] == 'select_data':

print("!!!!!!!!!!!!!!!!",data2process['role'])

data = bytes([0xaa, 0x55, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01])

respond = port.send_Data(data)

eval_respond = port.manage_data(respond)

print("eval_respond",eval_respond)

re_data = wash_data(eval_respond)

print(time.time(), re_data)

re_data.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

write2sql(sql_sets['mysql'], re_data)


sql .py

导入库

streamlit:用于构建交互式 web 应用。json:用于处理 JSON 文件的读取和写入。os:用于操作系统相关功能,如路径操作。Sql_client:从 SQL.Ai_sql 模块导入的一个类,用于与数据库交互。pandas:用于数据操作和分析。datetime:用于处理日期和时间。

函数定义

main:构建 Streamlit 应用的主要逻辑,包括两个标签页(tab1tab2

def main(sql_client, abs_path):

tab1, tab2 = st.tabs(['SQL代码', '表单操作'])

with (tab1):

st.title('数据库操作')

st.caption('您可以在此对数据库进行直接操作')

st.write("以下是最近2min中内的数据")

database_name = Sql_client.inspector.get_schema_names()[4]

table_names = Sql_client.inspector.get_table_names()

print(database_name)

print(f"table_names: {table_names}")

col1, col2 = st.columns(2)

with col1:

st.selectbox(

label='Database',code>

options=[database_name]

)

with col2:

cur_table = st.selectbox(

label='Tables',code>

options=table_names

)

cur_df = Sql_client.execute_Sql(f"SELECT * from {cur_table} ORDER BY 时间 DESC LIMIT 20;")

print(cur_df)

st.dataframe(

data=cur_df,

height=400,

width=800

)

st.write("请选择时间查看周围数据")

result = Sql_client.execute_Sql(f"SELECT 时间 from data")

time_data = result.fetchall()

time_df = pd.DataFrame(time_data, columns=['时间'])

time_options = pd.to_datetime(time_df['时间']).dt.strftime('%Y-%m-%d %H:%M:%S').tolist()

cur_time_str = st.selectbox(

label='时间',code>

options=time_options

)

before_time = Sql_client.execute_Sql(f"""

SELECT *

FROM data

WHERE 时间 < '{cur_time_str}'

ORDER BY 时间 DESC

LIMIT 5;

""")

after_time = Sql_client.execute_Sql(f"""

SELECT *

FROM data

WHERE 时间 > '{cur_time_str}'

ORDER BY 时间 ASC

LIMIT 5;

""")

before_time_df = pd.DataFrame(before_time.fetchall(), columns=before_time.keys())

after_time_df = pd.DataFrame(after_time.fetchall(), columns=after_time.keys())

combined_df = pd.concat([before_time_df, after_time_df]).reset_index(drop=True)

st.dataframe(combined_df, width=800)

with tab2:

st.write("设置阈值")

col1, col2 = st.columns(2)

with col1:

st.write("这是一个SQL代码生成页面")

st.text_area("请输入要保存的SQL代码", height=450, key="Sql_code")code>

st.write("您输入的SQL代码为:")

st.write(st.session_state.Sql_code)

说明文档 

1. 功能概述

该应用允许用户:

查看数据库中的表格数据。根据时间选择查看周围的数据。生成和保存 SQL 代码。

2. 依赖库

streamlit:用于创建 web 界面。json:用于读取和写入 JSON 文件。os:用于路径操作。pandas:用于数据处理。datetime:用于时间处理。Sql_client:自定义的数据库操作类。

3. 函数说明

cur_path:获取当前工作目录。read_json:从指定路径读取 JSON 文件并返回数据。write_json:将数据写入 JSON 文件。Sql_connect:连接数据库并返回 Sql_client 实例。main:构建应用的主要界面和功能,包括数据库操作和 SQL 代码生成。

4. 使用说明

标签页1(SQL代码):展示数据库中表格的最新数据,允许用户选择时间查看前后数据。标签页2(表单操作):提供一个文本区域用于输入和保存 SQL 代码。

5. 注意事项

确保 config.json 配置文件存在于指定路径中,并包含正确的数据库配置信息。数据库表结构需要与 SQL 查询语句匹配。

完整代码

import streamlit as st

import json

import os

from SQL.Ai_sql import Sql_client

import pandas as pd

from datetime import datetime

def cur_path():

current_dir = os.path.abspath(os.curdir)

return current_dir

def read_json(abs_path, path):

with open(os.path.join(abs_path, path), 'r', encoding='utf-8') as f:code>

data = json.load(f)

return data

def write_json(abs_path, path, data):

with open(os.path.join(abs_path, path), 'w', encoding='utf-8') as f:code>

json.dump(data, f, ensure_ascii=False, indent=4)

@st.cache_resource(ttl=10800)

def Sql_connect(abs_path):

sql_sets = read_json(abs_path, 'Sets/config.json')

sql_client = Sql_client(sql_sets['mysql'])

print('connect to sql')

return sql_client

def main(sql_client, abs_path):

tab1, tab2 = st.tabs(['SQL代码', '表单操作'])

with (tab1):

st.title('数据库操作')

st.caption('您可以在此对数据库进行直接操作')

st.write("以下是最近2min中内的数据")

database_name = Sql_client.inspector.get_schema_names()[4]

table_names = Sql_client.inspector.get_table_names()

print(database_name)

print(f"table_names: {table_names}")

col1, col2 = st.columns(2)

with col1:

st.selectbox(

label='Database',code>

options=[database_name]

)

with col2:

cur_table = st.selectbox(

label='Tables',code>

options=table_names

)

cur_df = Sql_client.execute_Sql(f"SELECT * from {cur_table} ORDER BY 时间 DESC LIMIT 20;")

print(cur_df)

st.dataframe(

data=cur_df,

height=400,

width=800

)

st.write("请选择时间查看周围数据")

result = Sql_client.execute_Sql(f"SELECT 时间 from data")

time_data = result.fetchall()

time_df = pd.DataFrame(time_data, columns=['时间'])

time_options = pd.to_datetime(time_df['时间']).dt.strftime('%Y-%m-%d %H:%M:%S').tolist()

cur_time_str = st.selectbox(

label='时间',code>

options=time_options

)

# around_time = Sql_client.execute_Sql(

# f"SELECT * from data WHERE 时间 = '{cur_time_str}' ORDER BY 时间 DESC LIMIT 10;")

#

# around_time_df = pd.DataFrame(around_time.fetchall(), columns=around_time.keys())

# st.dataframe(around_time_df)

before_time = Sql_client.execute_Sql(f"""

SELECT *

FROM data

WHERE 时间 < '{cur_time_str}'

ORDER BY 时间 DESC

LIMIT 5;

""")

# 查询用户选择的时间之后的五条数据

after_time = Sql_client.execute_Sql(f"""

SELECT *

FROM data

WHERE 时间 > '{cur_time_str}'

ORDER BY 时间 ASC

LIMIT 5;

""")

before_time_df = pd.DataFrame(before_time.fetchall(), columns=before_time.keys())

after_time_df = pd.DataFrame(after_time.fetchall(), columns=after_time.keys())

# 合并前五条和后五条数据

combined_df = pd.concat([before_time_df, after_time_df]).reset_index(drop=True)

st.dataframe(combined_df, width=800)

with tab2:

st.write("设置阈值")

col1, col2 = st.Acolumns(2)

with col1:

st.write("这是一个SQL代码生成页面")

st.text_area("请输入要保存的SQL代码", height=450, key="Sql_code")code>

st.write("您输入的SQL代码为:")

st.write(st.session_state.Sql_code)

if __name__ == "__main__":

abs_path = cur_path()

print(abs_path)

Sql_client = Sql_connect(abs_path)

main(Sql_client, abs_path)


 config.json

 MySQL数据库配置项

{

"mysql":{

"host": "localhost",

"user": "root",

"password": "***",

"database": "plants",

"port": 3306

}

}


Ai_sql.py

导入库

mysql:用于 MySQL 数据库操作(这里没有直接使用,可能用于兼容性或其他目的)。create_engine:用于创建数据库引擎的 SQLAlchemy 函数。text:用于构建 SQL 语句。inspect:用于数据库结构的检查(从 sqlalchemy 中导入)。MetaData:用于描述数据库中的表结构。pandas:用于处理和分析数据。SQLAlchemyError:用于捕获 SQLAlchemy 异常。Tableinsert:用于构建和执行 SQL 插入操作。

Sql_client 类

初始化

class Sql_client():

def __init__(self, db_sets):

self.engine = create_engine(

f'mysql+pymysql://{db_sets["user"]}:{db_sets["password"]}@{db_sets["host"]}:{db_sets["port"]}/{db_sets["database"]}',

echo=False, encoding='utf-8'code>

)

self.inspector = Inspector.from_engine(self.engine)

self.metadata = MetaData(bind=self.engine)

db_sets:包含数据库连接信息的字典。self.engine:创建一个 SQLAlchemy 引擎,用于连接 MySQL 数据库。self.inspector:用于检查数据库结构的 Inspector 实例。self.metadata:用于存储表结构信息的 MetaData 实例。

执行sql语句函数

def execute_Sql(self, sql_code):

conn = self.engine.connect()

operation = text(sql_code)

results = conn.execute(operation)

conn.close()

return results

执行任意的 SQL 语句,并返回执行结果。

读取 SQL 数据 pd_Readsql

def pd_Readsql(self, sql_code):

conn = self.engine.connect()

df = pd.read_sql(sql=sql_code, con=conn.connection)

conn.close()

return df

执行 SQL 查询并将结果读取到 Pandas DataFrame 中,方便进一步的数据处理。

获取表名 get_table_names

def get_table_names(self):

table_names = self.inspector.get_table_names()

return [table for table in table_names]

获取数据库中所有表的名称。

获取表的 DDL get_table_ddls

def get_table_ddls(self, table_name):

with self.engine.connect() as conn:

ddl_query = f"SHOW CREATE TABLE `{table_name}`"

result = conn.execute(ddl_query)

table_ddl = result.fetchone()[1]

return table_ddl

获取指定表的 DDL(数据定义语言)语句,用于描述表的结构。

读取所有表的 DDL read_tables_DDL

def get_table_ddls(self, table_name):

with self.engine.connect() as conn:

ddl_query = f"SHOW CREATE TABLE `{table_name}`"

result = conn.execute(ddl_query)

table_ddl = result.fetchone()[1]

return table_ddl

获取数据库中所有表的 DDL 语句,并将其存储在字典中。

插入数据 insert_data

def insert_data(self, table_name, data_dict):

try:

table = Table(table_name, self.metadata, autoload_with=self.engine)

stmt = insert(table).values(data_dict)

with self.engine.connect() as conn:

with conn.begin(): # 使用事务上下文管理

conn.execute(stmt)

return f"数据成功插入到表 '{table_name}' 中"

except SQLAlchemyError as e:

return f"数据插入失败: {e}"

将数据插入到指定的表中。使用事务管理以确保数据一致性。捕获并返回可能发生的 SQLAlchemy 错误。

1. 功能概述

Sql_client 类提供了与 MySQL 数据库交互的功能,包括执行 SQL 查询、读取数据、获取表结构信息以及插入数据。它使用 SQLAlchemy 作为数据库操作的工具。

2. 依赖库

mysql:用于 MySQL 数据库操作。sqlalchemy:用于数据库引擎创建、SQL 执行、表结构检查等。pandas:用于数据处理。SQLAlchemyError:用于异常处理。

3. 类和方法说明

Sql_client 类

__init__(self, db_sets):初始化数据库连接。db_sets 包含数据库连接所需的配置信息(用户、密码、主机、端口、数据库名)。

execute_Sql(self, sql_code):执行给定的 SQL 语句,并返回结果。适用于执行任意 SQL 操作。

pd_Readsql(self, sql_code):执行 SQL 查询并将结果读取到 Pandas DataFrame 中,便于数据分析。

get_table_names(self):获取数据库中所有表的名称列表。

get_table_ddls(self, table_name):获取指定表的 DDL(数据定义语言)语句,描述表的结构。

read_tables_DDL(self):获取所有表的 DDL 语句,并将结果以字典形式返回。字典的键为表名,值为对应的 DDL 语句。

insert_data(self, table_name, data_dict):将数据插入到指定的表中。data_dict 为数据字典,键为列名,值为对应的值。使用事务确保数据一致性。

4. 使用说明

初始化:创建 Sql_client 实例时传入数据库配置信息。执行 SQL:使用 execute_Sql 方法执行 SQL 语句。读取数据:使用 pd_Readsql 方法执行 SQL 查询并获取 DataFrame。获取表信息:使用 get_table_namesget_table_ddls 方法获取表名称和表结构。插入数据:使用 insert_data 方法将数据插入到指定表中。

5. 注意事项

确保 db_sets 中的配置信息正确无误,以便成功连接到数据库。使用 execute_Sqlpd_Readsql 时,要确保 SQL 语句的正确性和安全性,防止 SQL 注入。insert_data 方法在插入数据时会自动开启事务,如果发生错误,会自动回滚。

完整代码

import mysql

from sqlalchemy import create_engine, text,inspect, MetaData

from sqlalchemy.engine.reflection import Inspector

import pandas as pd

from sqlalchemy.exc import SQLAlchemyError

from sqlalchemy import Table, MetaData, insert

class Sql_client():

def __init__(self, db_sets):

self.engine = create_engine(

f'mysql+pymysql://{db_sets["user"]}:{db_sets["password"]}@{db_sets["host"]}:{db_sets["port"]}/{db_sets["database"]}',

echo=False, encoding='utf-8'code>

)

self.inspector = Inspector.from_engine(self.engine)

self.metadata = MetaData(bind=self.engine)

def execute_Sql(self, sql_code): # 执行sql语句

conn = self.engine.connect()

operation = text(sql_code)

results = conn.execute(operation)

conn.close()

return results

def pd_Readsql(self,sql_code): # 读取sql数据

conn = self.engine.connect()

df = pd.read_sql(sql=sql_code,con=conn.connection)

conn.close()

return df

def get_table_names(self):

table_names = self.inspector.get_table_names()

return [table for table in table_names]

def get_table_ddls(self,table_name):

with self.engine.connect() as conn:

ddl_query = f"SHOW CREATE TABLE `{table_name}`"

result = conn.execute(ddl_query)

table_ddl = result.fetchone()[1]

return table_ddl

def read_tables_DDL(self): #读取所有表的DDL

ddl_dict = {}

table_names = self.get_table_names()

print(f"DDL_table_names\n:{table_names}")

for table_name in table_names:

ddl = self.get_table_ddls(table_name)

ddl_dict[table_name] = ddl

return ddl_dict

def insert_data(self, table_name, data_dict):

try:

table = Table(table_name, self.metadata, autoload_with=self.engine)

stmt = insert(table).values(data_dict)

with self.engine.connect() as conn:

with conn.begin(): # 使用事务上下文管理

conn.execute(stmt)

return f"数据成功插入到表 '{table_name}' 中"

except SQLAlchemyError as e:

return f"数据插入失败: {e}"


history.py

1. 功能概述

这段代码包含了一个用于处理历史数据和聊天记录的类 History,以及一个异步函数 write_sql,用于将数据写入 MySQL 数据库。主要功能包括:

将数据异步写入数据库。记录和管理聊天历史。处理和存储传感器数据及其时间戳。查询最后一条用户输入的聊天记录。

2. 依赖库

pymysql:用于连接和操作 MySQL 数据库。pandas:用于数据处理(尽管在此代码中未实际使用)。asyncio:用于执行异步操作。

3. 函数和类说明

异步函数 write_sql

async def write_sql(D):

db = pymysql.connect(

host='localhost',code>

user='root',code>

password='Hby20040627!!',code>

database='plants',code>

charset='utf8'code>

)

cursor = db.cursor()

sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s, %s)"

cursor.execute(sql, D)

db.commit()

db.close()

功能:将数据异步写入 MySQL 数据库中的 data 表。参数D 是一个包含四个元素的元组,分别对应表的四个字段(温度、光照、土壤湿度和时间)。操作

连接到本地 MySQL 数据库。使用游标执行 INSERT 语句,将数据插入表中。提交事务并关闭数据库连接。

History

class History():

def __init__(self):

self.chat_history = [

{

'role': 'assistant',

'content': '您好,欢迎使用翠滴清韵👋'

}

]

self.data_history = []

self.data_time = []

功能:管理聊天记录和数据记录。属性

chat_history:聊天记录列表,初始包含一条欢迎消息。data_history:存储传感器数据的列表。data_time:存储数据时间戳的列表。

方法

add_data(self, data)

def add_data(self, data):

self.data_history.append(data)

if len(self.data_history) > 10 and len(self.data_time) > 10:

res = []

for i in range(len(self.data_history)):

res.append((self.data_history[i][0], self.data_history[i][1], self.data_history[i][2], self.data_time[i]))

asyncio.run(write_sql(res))

功能:添加数据到 data_history 列表中。逻辑

data_historydata_time 列表中的条目超过 10 条时,将这些数据准备为一个列表 res。使用 asyncio.run 执行异步函数 write_sql 将数据写入数据库。

add_time(self, time)

def add_time(self, time):

self.data_time.append(time)

功能:添加时间戳到 data_time 列表中。

add_chat(self, role, content)

def add_chat(self, role, content):

self.chat_history.append(

{

'role': role,

'content': content

}

)

功能:添加聊天记录。参数

role:角色(如用户或助手)。content:聊天内容。

last_query(self)

def last_query(self):

for i in range(len(self.chat_history) - 1, -1, -1):

if self.chat_history[i]['role'] == 'user':

return self.chat_history[i]['content']

return self.chat_history[-1]['content']

功能:返回最后一条用户输入的聊天记录。如果没有用户输入的记录,则返回最后一条聊天记录。逻辑:从 chat_history 列表中逆序查找用户角色的记录,并返回其内容。

4. 使用说明

创建 History 实例:实例化 History 类以开始记录聊天和数据。

添加数据

使用 add_data(data) 方法将传感器数据添加到历史记录中。如果数据超过 10 条,将异步写入数据库。使用 add_time(time) 方法添加时间戳。

添加聊天记录

使用 add_chat(role, content) 方法记录聊天记录,role 可以是 'user''assistant'content 是聊天内容。

查询最后一条用户输入

使用 last_query() 方法获取最近一条用户输入的聊天记录内容。

完整代码

import pymysql

import pandas

import asyncio

async def write_sql(D):

db = pymysql.connect(

host='localhost',code>

user='root',code>

password='Hby20040627!!',code>

database='plants',code>

charset='utf8'code>

)

cursor = db.cursor()

sql = "INSERT INTO data (温度, 光照, 土壤湿度, 时间) VALUES (%s, %s, %s, %s)"

cursor.execute(sql, D)

db.commit()

db.close()

class History():

def __init__(self):

self.chat_history = [

{

'role': 'assistant',

'content': '您好,欢迎使用翠滴清韵👋'

}

]

self.data_history = []

self.data_time = []

def add_data(self, data):

self.data_history.append(data)

if len(self.data_history) > 10 and len(self.data_time) > 10:

res = []

for i in range(len(self.data_history)):

res.append((self.data_history[i][0], self.data_history[i][1], self.data_history[i][2], self.data_time[i]))

asyncio.run(write_sql(res))

def add_time(self, time):

self.data_time.append(time)

def add_chat(self, role, content):

self.chat_history.append(

{

'role': role,

'content': content

}

)

def last_query(self):

for i in range(len(self.chat_history) - 1, -1, -1):

if self.chat_history[i]['role'] == 'user':

return self.chat_history[i]['content']

return self.chat_history[-1]['content']



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。