import pymongo import openpyxl from openpyxl.styles import PatternFill import os import platform from datetime import datetime from services.quote_query_entity import QuoteQueryEntity class MongoDBTools: @staticmethod def query_mongodb_data(query_entity: QuoteQueryEntity): if query_entity is None: print("param error") return try: tools = MongoDBTools() client = tools.connect_mongodb(query_entity) if client: # 查询行情数据 records, diff_records = tools.get_quote_data_by_type(client, query_entity) # 导出数据到excel # 默认为按价格 file_name_pre = 'price_' if query_entity.query_type == 2: # 按时间 file_name_pre = 'time_' if records is not None and len(records) > 0: tools.export_to_excel(records, diff_records, query_entity.goods_code, file_name_pre) print("time diff count: " + str(int(len(diff_records) /2))) client.close() except Exception as e: print(f"MongoDB 查询失败:{e}") @staticmethod def read_files(file_folder, file_extend, limit_num): # 处理表单数据,如果需要做查询操作 # 查询文件夹中的 xlsx 文件并按生成时间倒序排列 files = [] for filename in os.listdir(file_folder): if filename.endswith(file_extend): file_path = os.path.join(file_folder, filename) created_time = os.path.getmtime(file_path) files.append({ 'filename': filename, 'created_time': datetime.fromtimestamp(created_time).strftime('%Y-%m-%d %H:%M:%S') }) # 按照文件创建时间倒序排序 file_list = sorted(files, key=lambda x: x['created_time'], reverse=True) # 取出最新的N个文件 recent_files = file_list[:limit_num] return recent_files def connect_mongodb(self, query_entity: QuoteQueryEntity): # 创建 MongoDB 连接 URI uri = f"mongodb://{query_entity.username}:{query_entity.password}@{query_entity.host}:{query_entity.port}/{query_entity.db_name}" try: # 替换为你的 MongoDB 连接字符串 # 默认本地运行的 MongoDB 连接地址 client = pymongo.MongoClient(uri) print("连接 MongoDB 成功!") return client except Exception as e: print(f"连接 MongoDB 失败:{e}") return None def get_quote_data(self, client, query_entity: QuoteQueryEntity): try: # 选择数据库(如果不存在,则会自动创建) db = client[query_entity.db_name] # 选择集合(类似关系型数据库中的表) collection = db[query_entity.col_name] query = {} if query_entity.start_time is not None and len(query_entity.start_time) > 0 and query_entity.end_time is not None and len(query_entity.end_time) > 0: query = { "GC": query_entity.goods_code, "SAT": { "$gte": query_entity.start_time, # Greater than or equal to start_time "$lte": query_entity.end_time # Less than or equal to end_time } } else: query = { "GC": query_entity.goods_code } latest_records = None # 查询记录 record_num = 0 或 none ,取所有记录 if query_entity.record_num is None or query_entity.record_num == 0: latest_records = list( collection.find(query).sort("_id", -1) ) # record_num > 0, 取最新的N条 if query_entity.record_num is not None and query_entity.record_num > 0: latest_records = list( collection.find(query).sort("_id", -1) .limit(query_entity.record_num) # 取最新 N 条 ) return latest_records except Exception as e: print(f"数据库操作失败:{e}") def get_quote_data_by_type(self, client, query_entity: QuoteQueryEntity): try: latest_records = self.get_quote_data(client, query_entity) if latest_records is None or len(latest_records) == 0: print("no records!") return None, None # 初始化变量 previous = None previous_bid = None previous_sat = None # 定义时间格式 sta_format = "%Y-%m-%d %H:%M:%S" diff_records = [] # 遍历记录,查找 BID 差值绝对值大于 500 的记录 for record in latest_records: # print("record info:", record) current_bid = record.get("Bid") current_sat = record.get("SAT") record["Color"] = '0' if query_entity.query_type == 1: # 1: 按价差(买价) if current_bid is not None and previous_bid is not None: difference = abs(current_bid - previous_bid) if abs(difference) > query_entity.diff_value: previous["Color"] = "1" record["Color"] = "1" diff_records.append(previous) diff_records.append(record) elif query_entity.query_type == 2: # 2-按时间差 if current_sat is not None and previous_sat is not None: try: pre_sta_date = datetime.strptime(str(previous_sat), sta_format) cur_sta_date = datetime.strptime(current_sat, sta_format) difference = (cur_sta_date - pre_sta_date).total_seconds() if abs(difference) > query_entity.diff_value: previous["Color"] = "1" record["Color"] = "1" diff_records.append(previous) diff_records.append(record) except Exception as e: continue previous = record previous_bid = current_bid previous_sat = current_sat return latest_records, diff_records except Exception as e: print(f"数据库操作失败:{e}") def export_to_excel(self, records, diff_records, goods_code, file_name_pre): if records is None: return # 创建一个 Excel 文件 wb = openpyxl.Workbook() ws = wb.active ws.title = "Full Data" # 更新样式 ws = self.update_sheet_style(ws, records) # 添加sheet2 if diff_records is not None and len(diff_records) > 0: # 创建 sheet2 并填充数据 ws_filter = wb.create_sheet('Filter Data') # 创建新工作表 'Sheet2' ws_filter = self.update_sheet_style(ws_filter, diff_records) # 设置第二个工作表为默认激活工作表 wb.active = 1 # 激活 'Sheet2',index 从 0 开始,1 表示第二个工作表 # 保存 Excel 文件 # 目标文件的目录 dir_path = os.path.join('static', 'quote_data') # 检查目录是否存在,如果不存在则创建 if not os.path.exists(dir_path): os.makedirs(dir_path) file_name = os.path.join(dir_path, file_name_pre + goods_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx") wb.save(file_name) print("quote date export to:" + file_name) # 打开excel文件 # open_excel(file_name) def update_sheet_style(self, ws, records): if ws is None: return None # 设置黄色标记的填充样式 yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") # 写入表头 ws.append(["GC", "SAT", "PE", "Bid", "Ask", "Color"]) # 设置列宽 ws.column_dimensions['A'].width = 20 # GC 列的宽度 ws.column_dimensions['B'].width = 30 # SAT 列的宽度 ws.column_dimensions['C'].width = 20 # PE 列的宽度 ws.column_dimensions['D'].width = 20 # Bid 列的宽度 ws.column_dimensions['E'].width = 20 # Ask 列的宽度 ws.column_dimensions['F'].width = 20 # Color 列的宽度 # 启用筛选功能 ws.auto_filter.ref = ws.dimensions # 激活自动筛选 # 写入数据并根据 color 属性设置行颜色 for row in records: row_values = [row["GC"], row["SAT"], row["PE"], row["Bid"], row["Ask"], row["Color"]] ws.append(row_values) # 如果 color 是 "yellow",则标记该行的颜色为黄色 if row["Color"] == "1": # 获取当前行的行号 row_num = ws.max_row # 为当前行的所有单元格设置背景颜色 for cell in ws[row_num]: cell.fill = yellow_fill return ws # 自动打开 Excel 文件 def open_excel(self, file_path): system_name = platform.system() if system_name == "Windows": os.startfile(file_path) # 在 Windows 上使用 os.startfile 打开文件 elif system_name == "Darwin": # macOS os.system(f"open {file_path}") elif system_name == "Linux": os.system(f"xdg-open {file_path}") # 在 Linux 上使用 xdg-open 打开文件