|
|
@@ -0,0 +1,241 @@
|
|
|
+import pymongo
|
|
|
+import openpyxl
|
|
|
+from openpyxl.styles import PatternFill
|
|
|
+import os
|
|
|
+import platform
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+from services.quote_query_entity import QuoteQueryEntity
|
|
|
+
|
|
|
+class MongoDBTools:
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def query_mongodb_data(query_entity: QuoteQueryEntity):
|
|
|
+ if query_entity is None:
|
|
|
+ print("param error")
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ tools = MongoDBTools()
|
|
|
+ client = tools.connect_mongodb(query_entity)
|
|
|
+ if client:
|
|
|
+ # 查询行情数据
|
|
|
+ records, diff_records = tools.get_quote_data_by_type(client, query_entity)
|
|
|
+
|
|
|
+ # 导出数据到excel
|
|
|
+ # 默认为按价格
|
|
|
+ file_name_pre = "static\quote_data\\price_";
|
|
|
+ if query_entity.query_type == 2:
|
|
|
+ # 按时间
|
|
|
+ file_name_pre = "static\quote_data\\time_";
|
|
|
+ if records is not None and len(records) > 0:
|
|
|
+ tools.export_to_excel(records, diff_records, query_entity.goods_code, file_name_pre)
|
|
|
+ print("time diff count: " + str(int(len(diff_records) /2)))
|
|
|
+
|
|
|
+ client.close()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"MongoDB 查询失败:{e}")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def read_files(file_folder, file_extend, limit_num):
|
|
|
+ # 处理表单数据,如果需要做查询操作
|
|
|
+ # 查询文件夹中的 xlsx 文件并按生成时间倒序排列
|
|
|
+ files = []
|
|
|
+ for filename in os.listdir(file_folder):
|
|
|
+ if filename.endswith(file_extend):
|
|
|
+ file_path = os.path.join(file_folder, filename)
|
|
|
+ created_time = os.path.getmtime(file_path)
|
|
|
+ files.append({
|
|
|
+ 'filename': filename,
|
|
|
+ 'created_time': datetime.fromtimestamp(created_time).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ })
|
|
|
+
|
|
|
+ # 按照文件创建时间倒序排序
|
|
|
+ file_list = sorted(files, key=lambda x: x['created_time'], reverse=True)
|
|
|
+
|
|
|
+ # 取出最新的N个文件
|
|
|
+ recent_files = file_list[:limit_num]
|
|
|
+
|
|
|
+ return recent_files
|
|
|
+
|
|
|
+ def connect_mongodb(self, query_entity: QuoteQueryEntity):
|
|
|
+ # 创建 MongoDB 连接 URI
|
|
|
+ uri = f"mongodb://{query_entity.username}:{query_entity.password}@{query_entity.host}:{query_entity.port}/{query_entity.db_name}"
|
|
|
+ try:
|
|
|
+ # 替换为你的 MongoDB 连接字符串
|
|
|
+ # 默认本地运行的 MongoDB 连接地址
|
|
|
+ client = pymongo.MongoClient(uri)
|
|
|
+ print("连接 MongoDB 成功!")
|
|
|
+ return client
|
|
|
+ except Exception as e:
|
|
|
+ print(f"连接 MongoDB 失败:{e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_quote_data(self, client, query_entity: QuoteQueryEntity):
|
|
|
+ try:
|
|
|
+ # 选择数据库(如果不存在,则会自动创建)
|
|
|
+ db = client[query_entity.db_name]
|
|
|
+
|
|
|
+ # 选择集合(类似关系型数据库中的表)
|
|
|
+ collection = db[query_entity.col_name]
|
|
|
+ query = {}
|
|
|
+
|
|
|
+ if query_entity.start_time is not None and len(query_entity.start_time) > 0 and query_entity.end_time is not None and len(query_entity.end_time) > 0:
|
|
|
+ query = {
|
|
|
+ "GC": query_entity.goods_code,
|
|
|
+ "SAT": {
|
|
|
+ "$gte": query_entity.start_time, # Greater than or equal to start_time
|
|
|
+ "$lte": query_entity.end_time # Less than or equal to end_time
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ query = {
|
|
|
+ "GC": query_entity.goods_code
|
|
|
+ }
|
|
|
+
|
|
|
+ latest_records = None
|
|
|
+ # 查询记录 record_num = 0 或 none ,取所有记录
|
|
|
+ if query_entity.record_num is None or query_entity.record_num == 0:
|
|
|
+ latest_records = list(
|
|
|
+ collection.find(query).sort("_id", -1)
|
|
|
+ )
|
|
|
+ # record_num > 0, 取最新的N条
|
|
|
+ if query_entity.record_num is not None and query_entity.record_num > 0:
|
|
|
+ latest_records = list(
|
|
|
+ collection.find(query).sort("_id", -1)
|
|
|
+ .limit(query_entity.record_num) # 取最新 N 条
|
|
|
+ )
|
|
|
+
|
|
|
+ return latest_records
|
|
|
+ except Exception as e:
|
|
|
+ print(f"数据库操作失败:{e}")
|
|
|
+
|
|
|
+ def get_quote_data_by_type(self, client, query_entity: QuoteQueryEntity):
|
|
|
+ try:
|
|
|
+ latest_records = self.get_quote_data(client, query_entity)
|
|
|
+
|
|
|
+ if latest_records is None or len(latest_records) == 0:
|
|
|
+ print("no records!")
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # 初始化变量
|
|
|
+ previous = None
|
|
|
+ previous_bid = None
|
|
|
+ previous_sat = None
|
|
|
+ # 定义时间格式
|
|
|
+ sta_format = "%Y-%m-%d %H:%M:%S"
|
|
|
+ diff_records = []
|
|
|
+
|
|
|
+ # 遍历记录,查找 BID 差值绝对值大于 500 的记录
|
|
|
+ for record in latest_records:
|
|
|
+ # print("record info:", record)
|
|
|
+ current_bid = record.get("Bid")
|
|
|
+ current_sat = record.get("SAT")
|
|
|
+ record["Color"] = '0'
|
|
|
+ if query_entity.query_type == 1:
|
|
|
+ # 1: 按价差(买价)
|
|
|
+ if current_bid is not None and previous_bid is not None:
|
|
|
+ difference = abs(current_bid - previous_bid)
|
|
|
+ if abs(difference) > query_entity.diff_value:
|
|
|
+ previous["Color"] = "1"
|
|
|
+ record["Color"] = "1"
|
|
|
+ diff_records.append(previous)
|
|
|
+ diff_records.append(record)
|
|
|
+ elif query_entity.query_type == 2:
|
|
|
+ # 2-按时间差
|
|
|
+ if current_sat is not None and previous_sat is not None:
|
|
|
+ try:
|
|
|
+ pre_sta_date = datetime.strptime(str(previous_sat), sta_format)
|
|
|
+ cur_sta_date = datetime.strptime(current_sat, sta_format)
|
|
|
+ difference = (cur_sta_date - pre_sta_date).total_seconds()
|
|
|
+ if abs(difference) > query_entity.diff_value:
|
|
|
+ previous["Color"] = "1"
|
|
|
+ record["Color"] = "1"
|
|
|
+ diff_records.append(previous)
|
|
|
+ diff_records.append(record)
|
|
|
+ except Exception as e:
|
|
|
+ continue
|
|
|
+
|
|
|
+ previous = record
|
|
|
+ previous_bid = current_bid
|
|
|
+ previous_sat = current_sat
|
|
|
+
|
|
|
+ return latest_records, diff_records
|
|
|
+ except Exception as e:
|
|
|
+ print(f"数据库操作失败:{e}")
|
|
|
+
|
|
|
+ def export_to_excel(self, records, diff_records, goods_code, file_name_pre):
|
|
|
+ if records is None:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 创建一个 Excel 文件
|
|
|
+ wb = openpyxl.Workbook()
|
|
|
+ ws = wb.active
|
|
|
+ ws.title = "Full Data"
|
|
|
+
|
|
|
+ # 更新样式
|
|
|
+ ws = self.update_sheet_style(ws, records)
|
|
|
+
|
|
|
+ # 添加sheet2
|
|
|
+ if diff_records is not None and len(diff_records) > 0:
|
|
|
+ # 创建 sheet2 并填充数据
|
|
|
+ ws_filter = wb.create_sheet('Filter Data') # 创建新工作表 'Sheet2'
|
|
|
+ ws_filter = self.update_sheet_style(ws_filter, diff_records)
|
|
|
+
|
|
|
+ # 设置第二个工作表为默认激活工作表
|
|
|
+ wb.active = 1 # 激活 'Sheet2',index 从 0 开始,1 表示第二个工作表
|
|
|
+
|
|
|
+ # 保存 Excel 文件
|
|
|
+ file_name = file_name_pre + goods_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx"
|
|
|
+ wb.save(file_name)
|
|
|
+
|
|
|
+ print("quote date export to:" + file_name)
|
|
|
+
|
|
|
+ # 打开excel文件
|
|
|
+ # open_excel(file_name)
|
|
|
+
|
|
|
+ def update_sheet_style(self, ws, records):
|
|
|
+ if ws is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 设置黄色标记的填充样式
|
|
|
+ yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
|
|
|
+
|
|
|
+ # 写入表头
|
|
|
+ ws.append(["GC", "SAT", "PE", "Bid", "Ask", "Color"])
|
|
|
+
|
|
|
+ # 设置列宽
|
|
|
+ ws.column_dimensions['A'].width = 20 # GC 列的宽度
|
|
|
+ ws.column_dimensions['B'].width = 30 # SAT 列的宽度
|
|
|
+ ws.column_dimensions['C'].width = 20 # PE 列的宽度
|
|
|
+ ws.column_dimensions['D'].width = 20 # Bid 列的宽度
|
|
|
+ ws.column_dimensions['E'].width = 20 # Ask 列的宽度
|
|
|
+ ws.column_dimensions['F'].width = 20 # Color 列的宽度
|
|
|
+
|
|
|
+ # 启用筛选功能
|
|
|
+ ws.auto_filter.ref = ws.dimensions # 激活自动筛选
|
|
|
+
|
|
|
+ # 写入数据并根据 color 属性设置行颜色
|
|
|
+ for row in records:
|
|
|
+ row_values = [row["GC"], row["SAT"], row["PE"], row["Bid"], row["Ask"], row["Color"]]
|
|
|
+ ws.append(row_values)
|
|
|
+
|
|
|
+ # 如果 color 是 "yellow",则标记该行的颜色为黄色
|
|
|
+ if row["Color"] == "1":
|
|
|
+ # 获取当前行的行号
|
|
|
+ row_num = ws.max_row
|
|
|
+ # 为当前行的所有单元格设置背景颜色
|
|
|
+ for cell in ws[row_num]:
|
|
|
+ cell.fill = yellow_fill
|
|
|
+
|
|
|
+ return ws
|
|
|
+
|
|
|
+ # 自动打开 Excel 文件
|
|
|
+ def open_excel(self, file_path):
|
|
|
+ system_name = platform.system()
|
|
|
+
|
|
|
+ if system_name == "Windows":
|
|
|
+ os.startfile(file_path) # 在 Windows 上使用 os.startfile 打开文件
|
|
|
+ elif system_name == "Darwin": # macOS
|
|
|
+ os.system(f"open {file_path}")
|
|
|
+ elif system_name == "Linux":
|
|
|
+ os.system(f"xdg-open {file_path}") # 在 Linux 上使用 xdg-open 打开文件
|