mongodb_tools.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import pymongo
  2. import openpyxl
  3. from openpyxl.styles import PatternFill
  4. import os
  5. import platform
  6. from datetime import datetime
  7. from services.quote_query_entity import QuoteQueryEntity
  8. class MongoDBTools:
  9. @staticmethod
  10. def query_mongodb_data(query_entity: QuoteQueryEntity):
  11. if query_entity is None:
  12. print("param error")
  13. return
  14. try:
  15. tools = MongoDBTools()
  16. client = tools.connect_mongodb(query_entity)
  17. if client:
  18. # 查询行情数据
  19. records, diff_records = tools.get_quote_data_by_type(client, query_entity)
  20. # 导出数据到excel
  21. # 默认为按价格
  22. file_name_pre = 'price_'
  23. if query_entity.query_type == 2:
  24. # 按时间
  25. file_name_pre = 'time_'
  26. if records is not None and len(records) > 0:
  27. tools.export_to_excel(records, diff_records, query_entity.goods_code, file_name_pre)
  28. print("time diff count: " + str(int(len(diff_records) /2)))
  29. client.close()
  30. except Exception as e:
  31. print(f"MongoDB 查询失败:{e}")
  32. @staticmethod
  33. def read_files(file_folder, file_extend, limit_num):
  34. # 处理表单数据,如果需要做查询操作
  35. # 查询文件夹中的 xlsx 文件并按生成时间倒序排列
  36. files = []
  37. for filename in os.listdir(file_folder):
  38. if filename.endswith(file_extend):
  39. file_path = os.path.join(file_folder, filename)
  40. created_time = os.path.getmtime(file_path)
  41. files.append({
  42. 'filename': filename,
  43. 'created_time': datetime.fromtimestamp(created_time).strftime('%Y-%m-%d %H:%M:%S')
  44. })
  45. # 按照文件创建时间倒序排序
  46. file_list = sorted(files, key=lambda x: x['created_time'], reverse=True)
  47. # 取出最新的N个文件
  48. recent_files = file_list[:limit_num]
  49. return recent_files
  50. def connect_mongodb(self, query_entity: QuoteQueryEntity):
  51. # 创建 MongoDB 连接 URI
  52. uri = f"mongodb://{query_entity.username}:{query_entity.password}@{query_entity.host}:{query_entity.port}/{query_entity.db_name}"
  53. try:
  54. # 替换为你的 MongoDB 连接字符串
  55. # 默认本地运行的 MongoDB 连接地址
  56. client = pymongo.MongoClient(uri)
  57. print("连接 MongoDB 成功!")
  58. return client
  59. except Exception as e:
  60. print(f"连接 MongoDB 失败:{e}")
  61. return None
  62. def get_quote_data(self, client, query_entity: QuoteQueryEntity):
  63. try:
  64. # 选择数据库(如果不存在,则会自动创建)
  65. db = client[query_entity.db_name]
  66. # 选择集合(类似关系型数据库中的表)
  67. collection = db[query_entity.col_name]
  68. query = {}
  69. if query_entity.start_time is not None and len(query_entity.start_time) > 0 and query_entity.end_time is not None and len(query_entity.end_time) > 0:
  70. query = {
  71. "GC": query_entity.goods_code,
  72. "SAT": {
  73. "$gte": query_entity.start_time, # Greater than or equal to start_time
  74. "$lte": query_entity.end_time # Less than or equal to end_time
  75. }
  76. }
  77. else:
  78. query = {
  79. "GC": query_entity.goods_code
  80. }
  81. latest_records = None
  82. # 查询记录 record_num = 0 或 none ,取所有记录
  83. if query_entity.record_num is None or query_entity.record_num == 0:
  84. latest_records = list(
  85. collection.find(query).sort("_id", -1)
  86. )
  87. # record_num > 0, 取最新的N条
  88. if query_entity.record_num is not None and query_entity.record_num > 0:
  89. latest_records = list(
  90. collection.find(query).sort("_id", -1)
  91. .limit(query_entity.record_num) # 取最新 N 条
  92. )
  93. return latest_records
  94. except Exception as e:
  95. print(f"数据库操作失败:{e}")
  96. def get_quote_data_by_type(self, client, query_entity: QuoteQueryEntity):
  97. try:
  98. latest_records = self.get_quote_data(client, query_entity)
  99. if latest_records is None or len(latest_records) == 0:
  100. print("no records!")
  101. return None, None
  102. # 初始化变量
  103. previous = None
  104. previous_bid = None
  105. previous_sat = None
  106. # 定义时间格式
  107. sta_format = "%Y-%m-%d %H:%M:%S"
  108. diff_records = []
  109. # 遍历记录,查找 BID 差值绝对值大于 500 的记录
  110. for record in latest_records:
  111. # print("record info:", record)
  112. current_bid = record.get("Bid")
  113. current_sat = record.get("SAT")
  114. record["Color"] = '0'
  115. if query_entity.query_type == 1:
  116. # 1: 按价差(买价)
  117. if current_bid is not None and previous_bid is not None:
  118. difference = abs(current_bid - previous_bid)
  119. if abs(difference) > query_entity.diff_value:
  120. previous["Color"] = "1"
  121. record["Color"] = "1"
  122. diff_records.append(previous)
  123. diff_records.append(record)
  124. elif query_entity.query_type == 2:
  125. # 2-按时间差
  126. if current_sat is not None and previous_sat is not None:
  127. try:
  128. pre_sta_date = datetime.strptime(str(previous_sat), sta_format)
  129. cur_sta_date = datetime.strptime(current_sat, sta_format)
  130. difference = (cur_sta_date - pre_sta_date).total_seconds()
  131. if abs(difference) > query_entity.diff_value:
  132. previous["Color"] = "1"
  133. record["Color"] = "1"
  134. diff_records.append(previous)
  135. diff_records.append(record)
  136. except Exception as e:
  137. continue
  138. previous = record
  139. previous_bid = current_bid
  140. previous_sat = current_sat
  141. return latest_records, diff_records
  142. except Exception as e:
  143. print(f"数据库操作失败:{e}")
  144. def export_to_excel(self, records, diff_records, goods_code, file_name_pre):
  145. if records is None:
  146. return
  147. # 创建一个 Excel 文件
  148. wb = openpyxl.Workbook()
  149. ws = wb.active
  150. ws.title = "Full Data"
  151. # 更新样式
  152. ws = self.update_sheet_style(ws, records)
  153. # 添加sheet2
  154. if diff_records is not None and len(diff_records) > 0:
  155. # 创建 sheet2 并填充数据
  156. ws_filter = wb.create_sheet('Filter Data') # 创建新工作表 'Sheet2'
  157. ws_filter = self.update_sheet_style(ws_filter, diff_records)
  158. # 设置第二个工作表为默认激活工作表
  159. wb.active = 1 # 激活 'Sheet2',index 从 0 开始,1 表示第二个工作表
  160. # 保存 Excel 文件
  161. # 目标文件的目录
  162. dir_path = os.path.join('static', 'quote_data')
  163. # 检查目录是否存在,如果不存在则创建
  164. if not os.path.exists(dir_path):
  165. os.makedirs(dir_path)
  166. file_name = os.path.join(dir_path, file_name_pre + goods_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx")
  167. wb.save(file_name)
  168. print("quote date export to:" + file_name)
  169. # 打开excel文件
  170. # open_excel(file_name)
  171. def update_sheet_style(self, ws, records):
  172. if ws is None:
  173. return None
  174. # 设置黄色标记的填充样式
  175. yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
  176. # 写入表头
  177. ws.append(["GC", "SAT", "PE", "Bid", "Ask", "Color"])
  178. # 设置列宽
  179. ws.column_dimensions['A'].width = 20 # GC 列的宽度
  180. ws.column_dimensions['B'].width = 30 # SAT 列的宽度
  181. ws.column_dimensions['C'].width = 20 # PE 列的宽度
  182. ws.column_dimensions['D'].width = 20 # Bid 列的宽度
  183. ws.column_dimensions['E'].width = 20 # Ask 列的宽度
  184. ws.column_dimensions['F'].width = 20 # Color 列的宽度
  185. # 启用筛选功能
  186. ws.auto_filter.ref = ws.dimensions # 激活自动筛选
  187. # 写入数据并根据 color 属性设置行颜色
  188. for row in records:
  189. row_values = [row["GC"], row["SAT"], row["PE"], row["Bid"], row["Ask"], row["Color"]]
  190. ws.append(row_values)
  191. # 如果 color 是 "yellow",则标记该行的颜色为黄色
  192. if row["Color"] == "1":
  193. # 获取当前行的行号
  194. row_num = ws.max_row
  195. # 为当前行的所有单元格设置背景颜色
  196. for cell in ws[row_num]:
  197. cell.fill = yellow_fill
  198. return ws
  199. # 自动打开 Excel 文件
  200. def open_excel(self, file_path):
  201. system_name = platform.system()
  202. if system_name == "Windows":
  203. os.startfile(file_path) # 在 Windows 上使用 os.startfile 打开文件
  204. elif system_name == "Darwin": # macOS
  205. os.system(f"open {file_path}")
  206. elif system_name == "Linux":
  207. os.system(f"xdg-open {file_path}") # 在 Linux 上使用 xdg-open 打开文件