mirror of
https://github.com/yshtcn/ServerChanPush2TelegramBot.git
synced 2025-12-11 16:50:22 +08:00
409 lines
16 KiB
Python
409 lines
16 KiB
Python
from flask import Flask, request, jsonify
|
|
import requests
|
|
import logging
|
|
import json
|
|
import re
|
|
from urllib.parse import unquote
|
|
from datetime import datetime
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import base64
|
|
from html import escape
|
|
|
|
# 获取当前日期
|
|
current_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
# 获取当前工作目录
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# 定义数据和日志目录
|
|
data_dir = os.path.join(base_dir, "data")
|
|
log_dir = os.path.join(data_dir, "log")
|
|
|
|
# 创建必要的目录
|
|
os.makedirs(data_dir, exist_ok=True)
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# 检查配置文件是否存在,如果不存在则复制示例配置文件并退出程序
|
|
config_path = os.path.join(data_dir, 'bot_config.json')
|
|
example_config_path = os.path.join(base_dir, 'bot_config_example.json')
|
|
if not os.path.exists(config_path):
|
|
if os.path.exists(example_config_path):
|
|
shutil.copyfile(example_config_path, config_path)
|
|
logging.error(f"Configuration file not found. {example_config_path} copied to {config_path}. Exiting.")
|
|
sys.exit(1)
|
|
else:
|
|
logging.error(f"Configuration file not found and example config file {example_config_path} does not exist. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# 创建一个处理器,该处理器每天午夜都会创建一个新的日志文件
|
|
handler = TimedRotatingFileHandler(os.path.join(log_dir, f"received_requests_StartAt{current_date}.log"), when="midnight", interval=1, backupCount=10, encoding='utf-8')
|
|
handler.suffix = "To%Y-%m-%d.log"
|
|
|
|
# 配置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
handlers=[handler],
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S"
|
|
)
|
|
|
|
# 初始化 Flask 应用
|
|
app = Flask(__name__)
|
|
|
|
# 读取配置文件
|
|
def load_config():
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
config = json.load(f)
|
|
for bot_config in config:
|
|
api_url_template = bot_config.get('api_url', '')
|
|
main_bot_id = bot_config.get('main_bot_id', '')
|
|
if '[AUTO_REPLACE_MAIN_BOT_ID]' in api_url_template:
|
|
bot_config['api_url'] = api_url_template.replace('[AUTO_REPLACE_MAIN_BOT_ID]', main_bot_id)
|
|
return config
|
|
|
|
# 保存接收到的请求数据
|
|
def save_received_data(received_url, received_data):
|
|
try:
|
|
current_date = datetime.now().strftime("%Y-%m-%d")
|
|
with open(os.path.join(log_dir, f"received_data_{current_date}.json"), "a", encoding='utf-8') as f:
|
|
json.dump({"received_url": received_url, "received_data": received_data}, f, ensure_ascii=False)
|
|
f.write("\n")
|
|
except Exception as e:
|
|
logging.error(f"Failed to save received data: {e}")
|
|
|
|
# 保存发送的请求数据
|
|
def save_sent_data(api_url, payload):
|
|
try:
|
|
current_date = datetime.now().strftime("%Y-%m-%d")
|
|
with open(os.path.join(log_dir, f"sent_data_{current_date}.json"), "a", encoding='utf-8') as f:
|
|
json.dump({"sent_url": api_url, "sent_data": payload}, f, ensure_ascii=False)
|
|
f.write("\n")
|
|
except Exception as e:
|
|
logging.error(f"Failed to save sent data: {e}")
|
|
|
|
def unescape_url(escaped_url: str) -> str:
|
|
return escaped_url.replace("\\/", "/")
|
|
|
|
def convert_str_gbk_to_utf8(text_str):
|
|
try:
|
|
return text_str
|
|
except:
|
|
return text_str # 如果转换失败,则返回原始字符串
|
|
|
|
|
|
def send_messages_in_batches(batch_size=10):
|
|
"""
|
|
批量发送待发送的消息,每次最多发送 batch_size 条。
|
|
:param batch_size: 每批次发送的最大消息数量
|
|
:return: 成功发送的数量和剩余待发送消息数量
|
|
"""
|
|
pending_messages = read_pending_messages()
|
|
if not pending_messages:
|
|
return {"message": "No pending messages to send", "pending_messages_count": 0}
|
|
|
|
to_send = pending_messages[:batch_size] # 取出前 batch_size 条消息
|
|
remaining_messages = pending_messages[batch_size:] # 保留剩余的消息
|
|
failed_messages = []
|
|
|
|
for msg in to_send:
|
|
success, _ = send_telegram_message(
|
|
msg['bot_id'],
|
|
msg['chat_id'],
|
|
msg['title'],
|
|
msg.get('desp'),
|
|
msg.get('url')
|
|
)
|
|
if not success:
|
|
failed_messages.append(msg)
|
|
|
|
# 将失败的消息与剩余未尝试发送的消息合并,并写回文件
|
|
remaining_messages.extend(failed_messages)
|
|
write_pending_messages(remaining_messages)
|
|
|
|
return {
|
|
"message": "Batch processing completed",
|
|
"successfully_sent_count": len(to_send) - len(failed_messages),
|
|
"remaining_pending_messages_count": len(remaining_messages)
|
|
}
|
|
|
|
# 读取待发送的消息
|
|
def read_pending_messages():
|
|
try:
|
|
with open(os.path.join(data_dir, "pending_messages.json"), "r") as f:
|
|
encoded_messages = json.load(f)
|
|
decoded_messages = []
|
|
for msg in encoded_messages:
|
|
decoded_msg = {
|
|
key: base64.b64decode(value).decode('utf-8') if value is not None and key in ['bot_id', 'chat_id', 'title', 'desp', 'url'] else value
|
|
for key, value in msg.items()
|
|
}
|
|
decoded_messages.append(decoded_msg)
|
|
return decoded_messages
|
|
except FileNotFoundError:
|
|
return []
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
|
|
# 写入待发送的消息
|
|
def write_pending_messages(messages):
|
|
encoded_messages = []
|
|
for msg in messages:
|
|
encoded_msg = {
|
|
key: base64.b64encode(value.encode('utf-8')).decode('utf-8') if value is not None and key in ['bot_id', 'chat_id', 'title', 'desp', 'url'] else value
|
|
for key, value in msg.items()
|
|
}
|
|
encoded_messages.append(encoded_msg)
|
|
with open(os.path.join(data_dir, "pending_messages.json"), "w") as f:
|
|
json.dump(encoded_messages, f, ensure_ascii=False)
|
|
|
|
|
|
# 发送 Telegram 消息
|
|
def send_telegram_message(bot_id, chat_id, title, desp=None, url=None):
|
|
all_bots_config = load_config()
|
|
found = False
|
|
delimiter = None
|
|
api_url = None
|
|
proxies = None
|
|
|
|
for config in all_bots_config:
|
|
main_bot_id = config['main_bot_id']
|
|
main_chat_id = config.get('main_chat_id', '')
|
|
sub_bots = config['sub_bots']
|
|
api_url = config.get('api_url', None)
|
|
proxies = config.get('proxies', None)
|
|
if bot_id == main_bot_id and chat_id == main_chat_id:
|
|
for sub_bot in sub_bots:
|
|
for keyword in sub_bot['keywords']:
|
|
keyword_decode = keyword.decode('utf-8') if isinstance(keyword, bytes) else keyword
|
|
title_decode = title.decode('utf-8') if isinstance(title, bytes) else title
|
|
if keyword_decode.lower() in title_decode.lower():
|
|
bot_id = sub_bot['bot_id']
|
|
chat_id = sub_bot['chat_id']
|
|
delimiter = sub_bot.get('delimiter')
|
|
found = True
|
|
break
|
|
if found:
|
|
break
|
|
if found:
|
|
break
|
|
|
|
api_url = api_url or f"https://api.telegram.org/bot{bot_id}/sendMessage"
|
|
text = title
|
|
text += f"\n\n{(desp.split(delimiter)[0] if delimiter and desp else desp) if desp else ''}"
|
|
text = text.rstrip()
|
|
|
|
text = escape(text) # 处理 HTML 标签
|
|
text = re.sub(r'<[^>]+>', '', text) # 移除所有 HTML 标签
|
|
|
|
# 添加对消息长度的检查和拆分
|
|
max_length = 4096 # Telegram 允许的最大消息长度
|
|
messages = []
|
|
while len(text) > max_length:
|
|
part = text[:max_length]
|
|
last_space = part.rfind(' ')
|
|
if last_space != -1:
|
|
part = part[:last_space]
|
|
messages.append(part)
|
|
text = text[len(part):].strip()
|
|
messages.append(text)
|
|
|
|
if url:
|
|
for i in range(len(messages)):
|
|
if i == len(messages) - 1:
|
|
messages[i] += f"\n\n<a href=\"{url}\">详情:</a>{url}"
|
|
messages[i] = unescape_url(messages[i])
|
|
|
|
success = True
|
|
for msg in messages:
|
|
payload = {
|
|
'chat_id': chat_id,
|
|
'text': msg,
|
|
'parse_mode': 'HTML',
|
|
'disable_web_page_preview': False
|
|
}
|
|
|
|
try:
|
|
response = requests.post(api_url, data=payload, proxies=proxies, timeout=2)
|
|
logging.info(f"response: {response.text}")
|
|
if response.status_code == 200 and response.json().get("ok"):
|
|
converted_sent_data = convert_str_gbk_to_utf8(str(payload))
|
|
save_sent_data(api_url, converted_sent_data)
|
|
else:
|
|
success = False
|
|
except requests.RequestException as e:
|
|
logging.error(f"Failed to send message: {e}")
|
|
success = False
|
|
|
|
return success, None if success else {"error": "Failed to send all parts of the message"}
|
|
|
|
|
|
@app.route('/', methods=['GET', 'POST'])
|
|
def index():
|
|
received_url = request.url
|
|
received_url = unquote(received_url)
|
|
received_data = request.form.to_dict() if request.form else None
|
|
|
|
# Save received request data
|
|
converted_received_data = convert_str_gbk_to_utf8(str(received_data))
|
|
save_received_data(received_url, converted_received_data)
|
|
|
|
logging.info(f"Received URL: {received_url}")
|
|
logging.info(f"Received POST Data: {received_data}")
|
|
|
|
bot_id = request.args.get('bot_id') or (received_data.get('bot_id') if received_data else None)
|
|
chat_id = request.args.get('chat_id') or (received_data.get('chat_id') if received_data else None)
|
|
title = request.args.get('title') or (received_data.get('title') if received_data else None)
|
|
desp = request.args.get('desp') or (received_data.get('desp') if received_data else None)
|
|
url = request.args.get('url') or (received_data.get('url') if received_data else None)
|
|
|
|
# Initialize empty list for error messages
|
|
error_list = []
|
|
|
|
# Check if bot_id, chat_id, and title are empty
|
|
if bot_id is None:
|
|
error_list.append("bot_id is a required field.")
|
|
if chat_id is None:
|
|
error_list.append("chat_id is a required field.")
|
|
if title is None:
|
|
error_list.append("title is a required field.")
|
|
|
|
# If error_list is not empty, return error messages and 400 status code
|
|
if error_list:
|
|
TestStatus = request.args.get('TestStatus') or (received_data.get('TestStatus') if received_data else None)
|
|
if TestStatus is None:
|
|
return jsonify({"error": error_list}), 400
|
|
else:
|
|
pending_messages = read_pending_messages()
|
|
pending_count = len(pending_messages)
|
|
|
|
if TestStatus == '1':
|
|
return jsonify({"ok": "the test passed"}), 200
|
|
elif TestStatus == '2':
|
|
if pending_count == 0:
|
|
return jsonify({"ok": "the test passed", "pending_messages_count": pending_count}), 200
|
|
else:
|
|
return jsonify({"ok": "the test passed", "pending_messages_count": pending_count}), 202
|
|
elif TestStatus == '3':
|
|
pending_messages = read_pending_messages()
|
|
pending_count = len(pending_messages)
|
|
if pending_messages:
|
|
new_pending_messages = []
|
|
for msg in pending_messages:
|
|
success, _ = send_telegram_message(msg['bot_id'], msg['chat_id'], msg['title'], msg['desp'], msg.get('url'))
|
|
if not success:
|
|
new_pending_messages.append(msg)
|
|
write_pending_messages(new_pending_messages)
|
|
return jsonify({"ok": "re-sent pending messages", "pending_messages_count": pending_count, "remaining_pending_messages_count": len(new_pending_messages)}), 200
|
|
else:
|
|
return jsonify({"ok": "no pending messages to re-send", "pending_messages_count": pending_count}), 200
|
|
elif TestStatus == '4':
|
|
pending_messages = read_pending_messages()
|
|
pending_count = len(pending_messages)
|
|
|
|
if pending_count == 0:
|
|
return jsonify({
|
|
"ok": "no pending messages to re-send",
|
|
"pending_messages_count": pending_count,
|
|
"remaining_pending_messages_count": 0,
|
|
"successfully_sent_count": 0
|
|
}), 200
|
|
|
|
result = send_messages_in_batches(batch_size=3)
|
|
remaining_messages = read_pending_messages()
|
|
remaining_count = len(remaining_messages)
|
|
|
|
return jsonify({
|
|
"ok": "batch processed",
|
|
"pending_messages_count": pending_count,
|
|
"remaining_pending_messages_count": remaining_count,
|
|
"successfully_sent_count": result["successfully_sent_count"]
|
|
}), 200
|
|
elif TestStatus == '5':
|
|
pending_messages = read_pending_messages()
|
|
if not pending_messages:
|
|
return jsonify({
|
|
"ok": "no pending messages",
|
|
"messages": []
|
|
}), 200
|
|
|
|
# Create a new list without bot_id for each message
|
|
safe_messages = []
|
|
for msg in pending_messages:
|
|
safe_msg = {
|
|
'chat_id': msg['chat_id'],
|
|
'title': msg['title'],
|
|
'desp': msg['desp'],
|
|
'url': msg.get('url')
|
|
}
|
|
safe_messages.append(safe_msg)
|
|
|
|
return jsonify({
|
|
"ok": "pending messages retrieved",
|
|
"messages": safe_messages,
|
|
"total_count": len(safe_messages),
|
|
"clear_messages_url": request.host_url + "?TestStatus=6"
|
|
}), 200
|
|
elif TestStatus == '6':
|
|
# Clear all pending messages
|
|
write_pending_messages([])
|
|
return jsonify({
|
|
"ok": "all pending messages cleared",
|
|
"remaining_messages_count": 0
|
|
}), 200
|
|
|
|
# Original message sending logic continues...
|
|
pending_messages = read_pending_messages()
|
|
|
|
success, response = send_telegram_message(bot_id, chat_id, title, desp, url)
|
|
|
|
if success:
|
|
pending_messages = read_pending_messages()
|
|
pending_count = len(pending_messages)
|
|
|
|
if pending_count == 0:
|
|
return jsonify({
|
|
"ok": "send message success",
|
|
"response": response,
|
|
}), 200
|
|
|
|
result = send_messages_in_batches(batch_size=2)
|
|
remaining_messages = read_pending_messages()
|
|
remaining_count = len(remaining_messages)
|
|
|
|
return jsonify({
|
|
"ok": "send message success and batch processed",
|
|
"pending_messages_count": pending_count,
|
|
"remaining_pending_messages_count": remaining_count,
|
|
"successfully_sent_count": result["successfully_sent_count"]
|
|
}), 200
|
|
else:
|
|
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
if "【This is a delayed message】" not in desp:
|
|
if desp is None:
|
|
desp = f"【This is a delayed message】\n\nTimestamp: {current_timestamp}"
|
|
else:
|
|
desp = desp + f"\n\n【This is a delayed message】\n\nTimestamp: {current_timestamp}"
|
|
|
|
pending_messages.append({
|
|
'bot_id': bot_id,
|
|
'chat_id': chat_id,
|
|
'title': title,
|
|
'desp': desp,
|
|
'url': url
|
|
})
|
|
write_pending_messages(pending_messages)
|
|
return jsonify({"error": "Failed to send message, added to pending list"}), 202
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
config = load_config()
|
|
port = config[0].get("port", 5000)
|
|
app.run(host='0.0.0.0', port=port)
|
|
|