mongodb_tools.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import pymongo
  2. import openpyxl
  3. from openpyxl.styles import PatternFill
  4. import os
  5. import platform
  6. from datetime import datetime
  7. from services.quote_query_entity import QuoteQueryEntity
  8. class MongoDBTools:
  9. @staticmethod
  10. def query_mongodb_data(query_entity: QuoteQueryEntity):
  11. if query_entity is None:
  12. print("param error")
  13. return
  14. try:
  15. tools = MongoDBTools()
  16. client = tools.connect_mongodb(query_entity)
  17. if client:
  18. # 查询行情数据
  19. records, diff_records = tools.get_quote_data_by_type(client, query_entity)
  20. # 导出数据到excel
  21. # 默认为按价格
  22. file_name_pre = 'price_'
  23. if query_entity.query_type == 2:
  24. # 按时间
  25. file_name_pre = 'time_'
  26. if records is not None and len(records) > 0:
  27. tools.export_to_excel(records, diff_records, query_entity.goods_code, file_name_pre)
  28. print("time diff count: " + str(int(len(diff_records) /2)))
  29. else:
  30. print("not records")
  31. client.close()
  32. except Exception as e:
  33. print(f"MongoDB 查询失败:{e}")
  34. @staticmethod
  35. def read_files(file_folder, file_extend, limit_num):
  36. # 处理表单数据,如果需要做查询操作
  37. # 查询文件夹中的 xlsx 文件并按生成时间倒序排列
  38. files = []
  39. for filename in os.listdir(file_folder):
  40. if filename.endswith(file_extend):
  41. file_path = os.path.join(file_folder, filename)
  42. created_time = os.path.getmtime(file_path)
  43. files.append({
  44. 'filename': filename,
  45. 'created_time': datetime.fromtimestamp(created_time).strftime('%Y-%m-%d %H:%M:%S')
  46. })
  47. # 按照文件创建时间倒序排序
  48. file_list = sorted(files, key=lambda x: x['created_time'], reverse=True)
  49. # 取出最新的N个文件
  50. recent_files = file_list[:limit_num]
  51. return recent_files
  52. def connect_mongodb(self, query_entity: QuoteQueryEntity):
  53. # 创建 MongoDB 连接 URI
  54. uri = f"mongodb://{query_entity.username}:{query_entity.password}@{query_entity.host}:{query_entity.port}/{query_entity.db_name}"
  55. try:
  56. # 替换为你的 MongoDB 连接字符串
  57. # 默认本地运行的 MongoDB 连接地址
  58. client = pymongo.MongoClient(uri)
  59. print("连接 MongoDB 成功!")
  60. return client
  61. except Exception as e:
  62. print(f"连接 MongoDB 失败:{e}")
  63. return None
  64. def get_quote_data(self, client, query_entity: QuoteQueryEntity):
  65. try:
  66. # 选择数据库(如果不存在,则会自动创建)
  67. db = client[query_entity.db_name]
  68. # 选择集合(类似关系型数据库中的表)
  69. collection = db[query_entity.col_name]
  70. query = {}
  71. if query_entity.start_time is not None and len(query_entity.start_time) > 0 and query_entity.end_time is not None and len(query_entity.end_time) > 0:
  72. start_time_tick = int(datetime.strptime(query_entity.start_time, "%Y-%m-%d %H:%M:%S").timestamp())
  73. end_time_tick = int(datetime.strptime(query_entity.end_time, "%Y-%m-%d %H:%M:%S").timestamp())
  74. query = {
  75. "GC": query_entity.goods_code,
  76. "AT": {
  77. "$gte": start_time_tick, # Greater than or equal to start_time
  78. "$lte": end_time_tick # Less than or equal to end_time
  79. }
  80. }
  81. else:
  82. query = {
  83. "GC": query_entity.goods_code
  84. }
  85. # 输出当前时间
  86. print("查询MongoDB开始:", datetime.now())
  87. latest_records = None
  88. # 查询记录 record_num = 0 或 none ,取所有记录
  89. if query_entity.record_num is None or query_entity.record_num == 0:
  90. latest_records = list(
  91. # collection.find(query).sort("_id", -1)
  92. collection.find(query)
  93. )
  94. # record_num > 0, 取最新的N条
  95. if query_entity.record_num is not None and query_entity.record_num > 0:
  96. latest_records = list(
  97. # collection.find(query).sort("_id", -1)
  98. collection.find(query)
  99. .limit(query_entity.record_num) # 取最新 N 条
  100. )
  101. print("返回总记录数:", len(latest_records))
  102. print("查询MongoDB结束:", datetime.now())
  103. return latest_records
  104. except Exception as e:
  105. print(f"数据库操作失败:{e}")
  106. def get_quote_data_by_type(self, client, query_entity: QuoteQueryEntity):
  107. try:
  108. latest_records = self.get_quote_data(client, query_entity)
  109. if latest_records is None or len(latest_records) == 0:
  110. print("no records!")
  111. return None, None
  112. print("处理数据开始:", datetime.now())
  113. # 初始化变量
  114. previous = None
  115. previous_bid = None
  116. previous_sat = None
  117. # 定义时间格式
  118. sta_format = "%Y-%m-%d %H:%M:%S"
  119. diff_records = []
  120. # 遍历记录,查找 BID 差值绝对值大于 500 的记录
  121. for record in latest_records:
  122. # print("record info:", record)
  123. current_bid = record.get("Bid")
  124. current_sat = record.get("SAT")
  125. record["Color"] = '0'
  126. record["Diff"] = ''
  127. if query_entity.query_type == 1:
  128. # 1: 按价差(买价)
  129. if current_bid is not None and previous_bid is not None:
  130. difference = abs(current_bid - previous_bid)
  131. if abs(difference) > query_entity.diff_value:
  132. previous["Color"] = "1"
  133. record["Color"] = "1"
  134. record["Diff"] = difference
  135. diff_records.append(previous)
  136. diff_records.append(record)
  137. elif query_entity.query_type == 2:
  138. # 2-按时间差
  139. if current_sat is not None and previous_sat is not None:
  140. try:
  141. pre_sta_date = datetime.strptime(str(previous_sat), sta_format)
  142. cur_sta_date = datetime.strptime(current_sat, sta_format)
  143. difference = (cur_sta_date - pre_sta_date).total_seconds()
  144. if abs(difference) > query_entity.diff_value:
  145. previous["Color"] = "1"
  146. record["Color"] = "1"
  147. record["Diff"] = str(difference)
  148. diff_records.append(previous)
  149. diff_records.append(record)
  150. except Exception as e:
  151. continue
  152. previous = record
  153. previous_bid = current_bid
  154. previous_sat = current_sat
  155. print("处理数据结束:", datetime.now())
  156. return latest_records, diff_records
  157. except Exception as e:
  158. print(f"数据库操作失败:{e}")
  159. def export_to_excel(self, records, diff_records, goods_code, file_name_pre):
  160. if records is None:
  161. return
  162. print("生成Excel开始:", datetime.now())
  163. # 创建一个 Excel 文件
  164. wb = openpyxl.Workbook()
  165. ws = wb.active
  166. ws.title = "Full Data"
  167. # 更新样式
  168. ws = self.update_sheet_style(ws, records, 0)
  169. # 添加sheet2
  170. if diff_records is not None and len(diff_records) > 0:
  171. # 创建 sheet2 并填充数据
  172. ws_filter = wb.create_sheet('Filter Data') # 创建新工作表 'Sheet2'
  173. ws_filter = self.update_sheet_style(ws_filter, diff_records, 1)
  174. # 设置第二个工作表为默认激活工作表
  175. wb.active = 1 # 激活 'Sheet2',index 从 0 开始,1 表示第二个工作表
  176. # 保存 Excel 文件
  177. # 目标文件的目录
  178. dir_path = os.path.join('static', 'quote_data')
  179. # 检查目录是否存在,如果不存在则创建
  180. if not os.path.exists(dir_path):
  181. os.makedirs(dir_path)
  182. file_name = os.path.join(dir_path, file_name_pre + goods_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx")
  183. wb.save(file_name)
  184. print("生成Excel结束:", datetime.now())
  185. print("quote date export to:" + file_name)
  186. # 打开excel文件
  187. # open_excel(file_name)
  188. def update_sheet_style(self, ws, records, diff_flag = 0):
  189. if ws is None:
  190. return None
  191. # 设置黄色标记的填充样式
  192. yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
  193. # 写入表头
  194. ws.append(["GC", "SAT", "PE", "Bid", "Ask", "Color", "Diff", "_id",])
  195. # 设置列宽
  196. ws.column_dimensions['A'].width = 20 # GC 列的宽度
  197. ws.column_dimensions['B'].width = 30 # SAT 列的宽度
  198. ws.column_dimensions['C'].width = 20 # PE 列的宽度
  199. ws.column_dimensions['D'].width = 20 # Bid 列的宽度
  200. ws.column_dimensions['E'].width = 20 # Ask 列的宽度
  201. ws.column_dimensions['F'].width = 10 # Color 列的宽度
  202. ws.column_dimensions['G'].width = 10 # Diff 列的宽度
  203. ws.column_dimensions['H'].width = 30 # _id 列的宽度
  204. # 启用筛选功能
  205. ws.auto_filter.ref = ws.dimensions # 激活自动筛选
  206. # 写入数据并根据 color 属性设置行颜色
  207. for row in records:
  208. row_values = [row["GC"], row["SAT"], row["PE"], row["Bid"], row["Ask"], row["Color"], row["Diff"], str(row["_id"])]
  209. ws.append(row_values)
  210. if diff_flag == 0:
  211. # Full Data 标签, color 是 "1",则标记该行的颜色为黄色
  212. if row["Color"] == "1":
  213. # 获取当前行的行号
  214. row_num = ws.max_row
  215. # 为当前行的所有单元格设置背景颜色
  216. for cell in ws[row_num]:
  217. cell.fill = yellow_fill
  218. else:
  219. # Filter Data 有Diff值的行颜色为黄色
  220. if row["Diff"] is not None and row["Diff"] != '':
  221. # 获取当前行的行号
  222. row_num = ws.max_row
  223. # 为当前行的所有单元格设置背景颜色
  224. for cell in ws[row_num]:
  225. cell.fill = yellow_fill
  226. return ws
  227. # 自动打开 Excel 文件
  228. def open_excel(self, file_path):
  229. system_name = platform.system()
  230. if system_name == "Windows":
  231. os.startfile(file_path) # 在 Windows 上使用 os.startfile 打开文件
  232. elif system_name == "Darwin": # macOS
  233. os.system(f"open {file_path}")
  234. elif system_name == "Linux":
  235. os.system(f"xdg-open {file_path}") # 在 Linux 上使用 xdg-open 打开文件