| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- import pymongo
- import openpyxl
- from openpyxl.styles import PatternFill
- import os
- import platform
- from datetime import datetime
- from services.quote_query_entity import QuoteQueryEntity
-
- class MongoDBTools:
-
- @staticmethod
- def query_mongodb_data(query_entity: QuoteQueryEntity):
- if query_entity is None:
- print("param error")
- return
- try:
- tools = MongoDBTools()
- client = tools.connect_mongodb(query_entity)
- if client:
- # 查询行情数据
- records, diff_records = tools.get_quote_data_by_type(client, query_entity)
-
- # 导出数据到excel
- # 默认为按价格
- file_name_pre = 'price_'
- if query_entity.query_type == 2:
- # 按时间
- file_name_pre = 'time_'
- if records is not None and len(records) > 0:
- tools.export_to_excel(records, diff_records, query_entity.goods_code, file_name_pre)
- print("time diff count: " + str(int(len(diff_records) /2)))
-
- client.close()
- except Exception as e:
- print(f"MongoDB 查询失败:{e}")
-
- @staticmethod
- def read_files(file_folder, file_extend, limit_num):
- # 处理表单数据,如果需要做查询操作
- # 查询文件夹中的 xlsx 文件并按生成时间倒序排列
- files = []
- for filename in os.listdir(file_folder):
- if filename.endswith(file_extend):
- file_path = os.path.join(file_folder, filename)
- created_time = os.path.getmtime(file_path)
- files.append({
- 'filename': filename,
- 'created_time': datetime.fromtimestamp(created_time).strftime('%Y-%m-%d %H:%M:%S')
- })
- # 按照文件创建时间倒序排序
- file_list = sorted(files, key=lambda x: x['created_time'], reverse=True)
-
- # 取出最新的N个文件
- recent_files = file_list[:limit_num]
-
- return recent_files
-
- def connect_mongodb(self, query_entity: QuoteQueryEntity):
- # 创建 MongoDB 连接 URI
- uri = f"mongodb://{query_entity.username}:{query_entity.password}@{query_entity.host}:{query_entity.port}/{query_entity.db_name}"
- try:
- # 替换为你的 MongoDB 连接字符串
- # 默认本地运行的 MongoDB 连接地址
- client = pymongo.MongoClient(uri)
- print("连接 MongoDB 成功!")
- return client
- except Exception as e:
- print(f"连接 MongoDB 失败:{e}")
- return None
- def get_quote_data(self, client, query_entity: QuoteQueryEntity):
- try:
- # 选择数据库(如果不存在,则会自动创建)
- db = client[query_entity.db_name]
- # 选择集合(类似关系型数据库中的表)
- collection = db[query_entity.col_name]
- query = {}
-
- if query_entity.start_time is not None and len(query_entity.start_time) > 0 and query_entity.end_time is not None and len(query_entity.end_time) > 0:
- query = {
- "GC": query_entity.goods_code,
- "SAT": {
- "$gte": query_entity.start_time, # Greater than or equal to start_time
- "$lte": query_entity.end_time # Less than or equal to end_time
- }
- }
- else:
- query = {
- "GC": query_entity.goods_code
- }
- latest_records = None
- # 查询记录 record_num = 0 或 none ,取所有记录
- if query_entity.record_num is None or query_entity.record_num == 0:
- latest_records = list(
- collection.find(query).sort("_id", -1)
- )
- # record_num > 0, 取最新的N条
- if query_entity.record_num is not None and query_entity.record_num > 0:
- latest_records = list(
- collection.find(query).sort("_id", -1)
- .limit(query_entity.record_num) # 取最新 N 条
- )
- return latest_records
- except Exception as e:
- print(f"数据库操作失败:{e}")
-
- def get_quote_data_by_type(self, client, query_entity: QuoteQueryEntity):
- try:
- latest_records = self.get_quote_data(client, query_entity)
-
- if latest_records is None or len(latest_records) == 0:
- print("no records!")
- return None, None
- # 初始化变量
- previous = None
- previous_bid = None
- previous_sat = None
- # 定义时间格式
- sta_format = "%Y-%m-%d %H:%M:%S"
- diff_records = []
-
- # 遍历记录,查找 BID 差值绝对值大于 500 的记录
- for record in latest_records:
- # print("record info:", record)
- current_bid = record.get("Bid")
- current_sat = record.get("SAT")
- record["Color"] = '0'
- if query_entity.query_type == 1:
- # 1: 按价差(买价)
- if current_bid is not None and previous_bid is not None:
- difference = abs(current_bid - previous_bid)
- if abs(difference) > query_entity.diff_value:
- previous["Color"] = "1"
- record["Color"] = "1"
- diff_records.append(previous)
- diff_records.append(record)
- elif query_entity.query_type == 2:
- # 2-按时间差
- if current_sat is not None and previous_sat is not None:
- try:
- pre_sta_date = datetime.strptime(str(previous_sat), sta_format)
- cur_sta_date = datetime.strptime(current_sat, sta_format)
- difference = (cur_sta_date - pre_sta_date).total_seconds()
- if abs(difference) > query_entity.diff_value:
- previous["Color"] = "1"
- record["Color"] = "1"
- diff_records.append(previous)
- diff_records.append(record)
- except Exception as e:
- continue
-
- previous = record
- previous_bid = current_bid
- previous_sat = current_sat
-
- return latest_records, diff_records
- except Exception as e:
- print(f"数据库操作失败:{e}")
-
- def export_to_excel(self, records, diff_records, goods_code, file_name_pre):
- if records is None:
- return
-
- # 创建一个 Excel 文件
- wb = openpyxl.Workbook()
- ws = wb.active
- ws.title = "Full Data"
-
- # 更新样式
- ws = self.update_sheet_style(ws, records)
- # 添加sheet2
- if diff_records is not None and len(diff_records) > 0:
- # 创建 sheet2 并填充数据
- ws_filter = wb.create_sheet('Filter Data') # 创建新工作表 'Sheet2'
- ws_filter = self.update_sheet_style(ws_filter, diff_records)
-
- # 设置第二个工作表为默认激活工作表
- wb.active = 1 # 激活 'Sheet2',index 从 0 开始,1 表示第二个工作表
-
- # 保存 Excel 文件
- # 目标文件的目录
- dir_path = os.path.join('static', 'quote_data')
- # 检查目录是否存在,如果不存在则创建
- if not os.path.exists(dir_path):
- os.makedirs(dir_path)
- file_name = os.path.join(dir_path, file_name_pre + goods_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx")
- wb.save(file_name)
-
- print("quote date export to:" + file_name)
-
- # 打开excel文件
- # open_excel(file_name)
-
- def update_sheet_style(self, ws, records):
- if ws is None:
- return None
-
- # 设置黄色标记的填充样式
- yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
-
- # 写入表头
- ws.append(["GC", "SAT", "PE", "Bid", "Ask", "Color"])
-
- # 设置列宽
- ws.column_dimensions['A'].width = 20 # GC 列的宽度
- ws.column_dimensions['B'].width = 30 # SAT 列的宽度
- ws.column_dimensions['C'].width = 20 # PE 列的宽度
- ws.column_dimensions['D'].width = 20 # Bid 列的宽度
- ws.column_dimensions['E'].width = 20 # Ask 列的宽度
- ws.column_dimensions['F'].width = 20 # Color 列的宽度
-
- # 启用筛选功能
- ws.auto_filter.ref = ws.dimensions # 激活自动筛选
-
- # 写入数据并根据 color 属性设置行颜色
- for row in records:
- row_values = [row["GC"], row["SAT"], row["PE"], row["Bid"], row["Ask"], row["Color"]]
- ws.append(row_values)
-
- # 如果 color 是 "yellow",则标记该行的颜色为黄色
- if row["Color"] == "1":
- # 获取当前行的行号
- row_num = ws.max_row
- # 为当前行的所有单元格设置背景颜色
- for cell in ws[row_num]:
- cell.fill = yellow_fill
-
- return ws
-
- # 自动打开 Excel 文件
- def open_excel(self, file_path):
- system_name = platform.system()
-
- if system_name == "Windows":
- os.startfile(file_path) # 在 Windows 上使用 os.startfile 打开文件
- elif system_name == "Darwin": # macOS
- os.system(f"open {file_path}")
- elif system_name == "Linux":
- os.system(f"xdg-open {file_path}") # 在 Linux 上使用 xdg-open 打开文件
|