import pymongo import openpyxl from openpyxl.styles import PatternFill import os import platform from datetime import datetime from services.quote_query_entity import QuoteQueryEntity class MongoDBTools: @staticmethod def query_mongodb_data(query_entity: QuoteQueryEntity): if query_entity is None: print("param error") return try: tools = MongoDBTools() client = tools.connect_mongodb(query_entity) if client: # 查询行情数据 records, diff_records = tools.get_quote_data_by_type(client, query_entity) # 导出数据到excel # 默认为按价格 file_name_pre = 'price_' if query_entity.query_type == 2: # 按时间 file_name_pre = 'time_' if records is not None and len(records) > 0: tools.export_to_excel(records, diff_records, query_entity.goods_code, file_name_pre) print("time diff count: " + str(int(len(diff_records) /2))) else: print("not records") client.close() except Exception as e: print(f"MongoDB 查询失败:{e}") @staticmethod def read_files(file_folder, file_extend, limit_num): # 处理表单数据,如果需要做查询操作 # 查询文件夹中的 xlsx 文件并按生成时间倒序排列 files = [] for filename in os.listdir(file_folder): if filename.endswith(file_extend): file_path = os.path.join(file_folder, filename) created_time = os.path.getmtime(file_path) files.append({ 'filename': filename, 'created_time': datetime.fromtimestamp(created_time).strftime('%Y-%m-%d %H:%M:%S') }) # 按照文件创建时间倒序排序 file_list = sorted(files, key=lambda x: x['created_time'], reverse=True) # 取出最新的N个文件 recent_files = file_list[:limit_num] return recent_files def connect_mongodb(self, query_entity: QuoteQueryEntity): # 创建 MongoDB 连接 URI uri = f"mongodb://{query_entity.username}:{query_entity.password}@{query_entity.host}:{query_entity.port}/{query_entity.db_name}" try: # 替换为你的 MongoDB 连接字符串 # 默认本地运行的 MongoDB 连接地址 client = pymongo.MongoClient(uri) print("连接 MongoDB 成功!") return client except Exception as e: print(f"连接 MongoDB 失败:{e}") return None def get_quote_data(self, client, query_entity: QuoteQueryEntity): try: # 选择数据库(如果不存在,则会自动创建) db = client[query_entity.db_name] # 选择集合(类似关系型数据库中的表) collection = db[query_entity.col_name] query = {} if query_entity.start_time is not None and len(query_entity.start_time) > 0 and query_entity.end_time is not None and len(query_entity.end_time) > 0: start_time_tick = int(datetime.strptime(query_entity.start_time, "%Y-%m-%d %H:%M:%S").timestamp()) end_time_tick = int(datetime.strptime(query_entity.end_time, "%Y-%m-%d %H:%M:%S").timestamp()) query = { "GC": query_entity.goods_code, "AT": { "$gte": start_time_tick, # Greater than or equal to start_time "$lte": end_time_tick # Less than or equal to end_time } } else: query = { "GC": query_entity.goods_code } # 输出当前时间 print("查询MongoDB开始:", datetime.now()) latest_records = None # 查询记录 record_num = 0 或 none ,取所有记录 if query_entity.record_num is None or query_entity.record_num == 0: latest_records = list( # collection.find(query).sort("_id", -1) collection.find(query) ) # record_num > 0, 取最新的N条 if query_entity.record_num is not None and query_entity.record_num > 0: latest_records = list( # collection.find(query).sort("_id", -1) collection.find(query) .limit(query_entity.record_num) # 取最新 N 条 ) print("返回总记录数:", len(latest_records)) print("查询MongoDB结束:", datetime.now()) return latest_records except Exception as e: print(f"数据库操作失败:{e}") def get_quote_data_by_type(self, client, query_entity: QuoteQueryEntity): try: latest_records = self.get_quote_data(client, query_entity) if latest_records is None or len(latest_records) == 0: print("no records!") return None, None print("处理数据开始:", datetime.now()) # 初始化变量 previous = None previous_bid = None previous_sat = None # 定义时间格式 sta_format = "%Y-%m-%d %H:%M:%S" diff_records = [] # 遍历记录,查找 BID 差值绝对值大于 500 的记录 for record in latest_records: # print("record info:", record) current_bid = record.get("Bid") current_sat = record.get("SAT") record["Color"] = '0' record["Diff"] = '' if query_entity.query_type == 1: # 1: 按价差(买价) if current_bid is not None and previous_bid is not None: difference = abs(current_bid - previous_bid) if abs(difference) > query_entity.diff_value: previous["Color"] = "1" record["Color"] = "1" record["Diff"] = difference diff_records.append(previous) diff_records.append(record) elif query_entity.query_type == 2: # 2-按时间差 if current_sat is not None and previous_sat is not None: try: pre_sta_date = datetime.strptime(str(previous_sat), sta_format) cur_sta_date = datetime.strptime(current_sat, sta_format) difference = (cur_sta_date - pre_sta_date).total_seconds() if abs(difference) > query_entity.diff_value: previous["Color"] = "1" record["Color"] = "1" record["Diff"] = str(difference) diff_records.append(previous) diff_records.append(record) except Exception as e: continue previous = record previous_bid = current_bid previous_sat = current_sat print("处理数据结束:", datetime.now()) return latest_records, diff_records except Exception as e: print(f"数据库操作失败:{e}") def export_to_excel(self, records, diff_records, goods_code, file_name_pre): if records is None: return print("生成Excel开始:", datetime.now()) # 创建一个 Excel 文件 wb = openpyxl.Workbook() ws = wb.active ws.title = "Full Data" # 更新样式 ws = self.update_sheet_style(ws, records, 0) # 添加sheet2 if diff_records is not None and len(diff_records) > 0: # 创建 sheet2 并填充数据 ws_filter = wb.create_sheet('Filter Data') # 创建新工作表 'Sheet2' ws_filter = self.update_sheet_style(ws_filter, diff_records, 1) # 设置第二个工作表为默认激活工作表 wb.active = 1 # 激活 'Sheet2',index 从 0 开始,1 表示第二个工作表 # 保存 Excel 文件 # 目标文件的目录 dir_path = os.path.join('static', 'quote_data') # 检查目录是否存在,如果不存在则创建 if not os.path.exists(dir_path): os.makedirs(dir_path) file_name = os.path.join(dir_path, file_name_pre + goods_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx") wb.save(file_name) print("生成Excel结束:", datetime.now()) print("quote date export to:" + file_name) # 打开excel文件 # open_excel(file_name) def update_sheet_style(self, ws, records, diff_flag = 0): if ws is None: return None # 设置黄色标记的填充样式 yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") # 写入表头 ws.append(["GC", "SAT", "PE", "Bid", "Ask", "Color", "Diff", "_id",]) # 设置列宽 ws.column_dimensions['A'].width = 20 # GC 列的宽度 ws.column_dimensions['B'].width = 30 # SAT 列的宽度 ws.column_dimensions['C'].width = 20 # PE 列的宽度 ws.column_dimensions['D'].width = 20 # Bid 列的宽度 ws.column_dimensions['E'].width = 20 # Ask 列的宽度 ws.column_dimensions['F'].width = 10 # Color 列的宽度 ws.column_dimensions['G'].width = 10 # Diff 列的宽度 ws.column_dimensions['H'].width = 30 # _id 列的宽度 # 启用筛选功能 ws.auto_filter.ref = ws.dimensions # 激活自动筛选 # 写入数据并根据 color 属性设置行颜色 for row in records: row_values = [row["GC"], row["SAT"], row["PE"], row["Bid"], row["Ask"], row["Color"], row["Diff"], str(row["_id"])] ws.append(row_values) if diff_flag == 0: # Full Data 标签, color 是 "1",则标记该行的颜色为黄色 if row["Color"] == "1": # 获取当前行的行号 row_num = ws.max_row # 为当前行的所有单元格设置背景颜色 for cell in ws[row_num]: cell.fill = yellow_fill else: # Filter Data 有Diff值的行颜色为黄色 if row["Diff"] is not None and row["Diff"] != '': # 获取当前行的行号 row_num = ws.max_row # 为当前行的所有单元格设置背景颜色 for cell in ws[row_num]: cell.fill = yellow_fill return ws # 自动打开 Excel 文件 def open_excel(self, file_path): system_name = platform.system() if system_name == "Windows": os.startfile(file_path) # 在 Windows 上使用 os.startfile 打开文件 elif system_name == "Darwin": # macOS os.system(f"open {file_path}") elif system_name == "Linux": os.system(f"xdg-open {file_path}") # 在 Linux 上使用 xdg-open 打开文件