本文仅限学习交流,学习如何开发程序,如文章中使用的信息侵犯了相关方权益,请联系作者删除,作者不对下面信息负有任何责任
数据获取
通过passMark获取数据,并存入mysql数据库中
https://www.cpubenchmark.net/CPU_mega_page.html
开发程序对关键词解析并返回
代码如下,在windows下运行
from selenium import webdriver
from selenium.webdriver.common.by import By
import mysql.connector
from mysql.connector import Error
import time
def cpu():
# 初始化 WebDriver
driver = webdriver.Chrome()
try:
# 访问目标 URL
url = "https://www.cpubenchmark.net/CPU_mega_page.html"
driver.get(url)
# 等待页面加载完成(可选)
driver.implicitly_wait(10) # 等待 10 秒
# 再暂停5秒是为了手动选择all获取全部
print("开始暂停...")
time.sleep(5) # 暂停5秒
print("暂停结束,继续执行程序。")
# 定位表格元素
table = driver.find_element(By.XPATH, "//table[@class='dataTable-blue dataTable no-footer']")
# 建立数据库连接
connection = mysql.connector.connect(
host='mysqlIp',
database='dbname',
user='username',
password='password'
)
if connection.is_connected():
cursor = connection.cursor()
print("Database connection established.")
# 清空目标表
truncate_table(cursor)
# 提取表格数据
rows = table.find_elements(By.TAG_NAME, "tr")
inserted_count = 0
for row in rows:
cells = row.find_elements(By.TAG_NAME, "td")
if len(cells) >= 7:
name = cells[1].text.strip()
cores = cells[2].text.strip()
mark = cells[3].text.strip()
tdp = cells[5].text.strip()
socket = cells[6].text.strip()
category = cells[7].text.strip()
# 插入数据到数据库
insert_data(cursor, name, cores, mark, tdp, socket, category)
inserted_count += 1
print(f"Inserted data: Name={name}, Cores={cores}, Mark={mark}, TDP={tdp}, Socket={socket}, Category={category}")
# 判断是否需要交换表
if inserted_count > get_cpus_count(cursor):
swap_tables(cursor)
print("Tables swapped successfully.")
else:
print("Data count is less than existing data; no update performed.")
# 提交事务
connection.commit()
except Error as e:
print("Error while connecting to MySQL", e)
finally:
# 关闭 WebDriver 和数据库连接
driver.quit()
if connection.is_connected():
cursor.close()
connection.close()
print("MySQL connection is closed")
def truncate_table(cursor):
"""清空目标表"""
query = "TRUNCATE TABLE cpus_tmp;"
cursor.execute(query)
print("table 'cpus_tmp' TRUNCATED.")
def insert_data(cursor, name, cores, mark, tdp, socket, category):
"""插入数据到cpus_tmp表"""
print(f"start insert : Name: {name}, Cores: {cores}, Mark: {mark}, TDP: {tdp}, Socket: {socket}, Category: {category}")
query = """
INSERT INTO cpus_tmp (name, cores, mark, tdp, socket, category)
VALUES (%s, %s, %s, %s, %s, %s);
"""
values = (name, cores, mark, tdp, socket, category)
cursor.execute(query, values)
def get_cpus_count(cursor):
"""获取cpus表中的记录数"""
query = "SELECT COUNT(*) FROM cpus;"
cursor.execute(query)
result = cursor.fetchone()
return int(result[0]) if result and len(result) > 0 else 0
def swap_tables(cursor):
"""交换cpus_tmp和cpus表"""
query = """
RENAME TABLE cpus TO temp, cpus_tmp TO cpus, temp TO cpus_tmp;
"""
cursor.execute(query)
# 调用函数
cpu()
以上代码执行后将数据存入数据库,以便后面的应用使用。
开发python的web应用,并放入docker运行
将 Flask 应用程序放入 Docker 容器中并通过 Nginx 代理访问,可以按照以下步骤进行:
步骤 1:准备 Flask 应用程序
确保你的 Flask 应用程序已经准备好,并且能够正常运行。这里以一个简单的 Flask 应用为例:
from flask import Flask, request, make_response
import mysql.connector
from mysql.connector import Error
import xml.etree.ElementTree as ET
import time
import hashlib
app = Flask(__name__)
# 配置 MySQL 数据库连接参数
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
# 微信 Token
WECHAT_TOKEN = 'your_token_here'
def verify_signature(signature, timestamp, nonce):
""" 验证微信服务器发送的签名 """
# 将 token、timestamp 和 nonce 按照字典序排序
sorted_params = sorted([WECHAT_TOKEN, timestamp, nonce])
# 拼接字符串
concatenated_string = ''.join(sorted_params)
# 使用 SHA1 计算哈希值
hash_object = hashlib.sha1(concatenated_string.encode('utf-8'))
calculated_signature = hash_object.hexdigest()
# 比较计算得到的哈希值与 signature
return calculated_signature == signature
def get_reply(keyword):
""" 从数据库中查询关键词对应的回复信息 """
try:
connection = mysql.connector.connect(**db_config)
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
query = "SELECT reply FROM data WHERE keyword = %s"
cursor.execute(query, (keyword,))
result = cursor.fetchone()
cursor.close()
return result['reply'] if result else None
except Error as e:
print(f"Error connecting to MySQL Platform: {e}")
finally:
if connection.is_connected():
connection.close()
return "查询失败,请稍后重试!"
@app.route('/wechat', methods=['GET', 'POST'])
def wechat():
if request.method == 'GET':
# 验证请求
signature = request.args.get('signature', '')
timestamp = request.args.get('timestamp', '')
nonce = request.args.get('nonce', '')
echostr = request.args.get('echostr', '')
# 验证 signature
if verify_signature(signature, timestamp, nonce):
# 如果验证通过,返回 echostr
return make_response(echostr)
else:
# 如果验证不通过,返回错误信息
return make_response("Invalid signature"), 403
elif request.method == 'POST':
# 解析 XML 格式的消息
xml_data = request.data.decode('utf-8')
root = ET.fromstring(xml_data)
# 获取消息类型
msg_type = root.find('MsgType').text
# 检查消息类型是否为 text
if msg_type == 'text':
# 获取消息内容
content = root.find('Content').text
# 设置几个关键词及其对应的回复
keywords = {
'关键词1': '关键词1的回复',
'关键词2': '关键词2的回复',
'关键词3': '关键词3的回复'
}
# 检查消息内容中是否包含关键词
for keyword, reply in keywords.items():
if keyword in content:
# 构建回复的 XML 格式消息
response_xml = f"""
<xml>
<ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
<FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{reply}]]></Content>
<FuncFlag>0</FuncFlag>
</xml>
"""
return make_response(response_xml)
# 如果没有匹配的关键词,查询数据库
reply = get_reply(content)
if reply:
# 构建回复的 XML 格式消息
response_xml = f"""
<xml>
<ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
<FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{reply}]]></Content>
<FuncFlag>0</FuncFlag>
</xml>
"""
return make_response(response_xml)
# 如果数据库查询失败,返回默认回复
default_reply = "查询失败,请稍后重试!"
response_xml = f"""
<xml>
<ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
<FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{default_reply}]]></Content>
<FuncFlag>0</FuncFlag>
</xml>
"""
return make_response(response_xml)
# 如果不是 text 类型的消息,返回默认响应
default_response = f"""
<xml>
<ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
<FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[仅支持文本消息]]></Content>
<FuncFlag>0</FuncFlag>
</xml>
"""
return make_response(default_response)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
进行本地测试:
curl -X GET "http://localhost:5000/wechat?signature=9b1fc0530a7d30b8ae3004aea631b59c5c1134d2×tamp=123456789&nonce=987654321&echostr=test_echo_str22"
可以正常返回test_echo_str22,这个要注意的是signature必须是计算好的。
curl -X POST -H "Content-Type: application/xml" -d '<xml><ToUserName><![CDATA[toUser]]></ToUserName><FromUserName><![CDATA[fromUser]]></FromUserName><CreateTime>123456789</CreateTime><MsgType><![CDATA[text]]></MsgType><Content><![CDATA[59]]></Content><MsgId>1234567890123456</MsgId></xml>' http://localhost:5000/wechat
返回正常:
<xml>
<ToUserName><![CDATA[fromUser]]></ToUserName>
<FromUserName><![CDATA[toUser]]></FromUserName>
<CreateTime>1726403271</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[[5-Way CPU] AMD Ryzen 9 5950X]]></Content>
<FuncFlag>0</FuncFlag>
</xml>
步骤 2:创建 Dockerfile
在 Flask 应用所在的目录下创建一个 Dockerfile 文件,用于构建 Docker 镜像:
# 使用官方 Python 镜像作为基础镜像
FROM python:3.8-slim
# 设置工作目录
WORKDIR /app
# 将应用代码复制到容器中
COPY . .
# 安装 Flask 和其他依赖
RUN pip install --no-cache-dir flask
# 暴露端口
EXPOSE 5000
# 运行 Flask 应用
CMD ["python", "app.py"]
步骤 3:构建 Docker 镜像
在包含 Dockerfile 的目录下运行以下命令来构建 Docker 镜像:
docker build -t your_flask_app .
其中 your_flask_app 是你为 Docker 镜像指定的标签。
步骤 4:运行 Docker 容器
运行 Docker 容器,并将容器端口映射到宿主机端口:
docker run -d -p 5000:5000 your_flask_app
这将启动一个 Docker 容器,并将容器内部的 5000 端口映射到宿主机的 5000 端口。
步骤 5:配置 Nginx 作为反向代理
假设你的 Nginx 服务器已经在运行,并且监听在 80 端口。你需要在 Nginx 的配置文件中添加一条新的 location 规则,将请求转发到 Flask 应用所在的端口(在这个例子中是 5000 端口)。编辑 Nginx 配置文件(通常是 /etc/nginx/nginx.conf 或 /etc/nginx/sites-enabled/default):
server {
listen 80;
server_name example.com;
location /wechat {
proxy_pass http://localhost:5000/wechat;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
步骤 6:重新加载 Nginx 配置
重新加载 Nginx 配置,使新的 location 规则生效:
sudo systemctl reload nginx
或者,如果你使用的是 Nginx 的非系统服务管理方式:
sudo nginx -s reload
总结
通过以上步骤,将 Flask 应用程序放入 Docker 容器中,并通过 Nginx 作为反向代理来访问该应用。现在,所有的针对 /wechat 路径的请求都将被转发到运行在 Docker 容器中的 Flask 应用。
Flask 应用作为web服务,接收微信请求并从数据库中查询数据然后返回。
评论区