ftp.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package service
  2. import (
  3. "fmt"
  4. "io"
  5. "mtp20_assisted/core"
  6. "mtp20_assisted/global"
  7. "mtp20_assisted/model"
  8. "mtp20_assisted/utils"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "encoding/csv"
  14. "encoding/hex"
  15. "go.uber.org/zap"
  16. )
  17. func InitTimer() (err error) {
  18. // 获取触发时间点
  19. timeStr := global.M2A_CONFIG.FTP.TimeAt
  20. timeArr := strings.Split(timeStr, ":")
  21. hour, _ := strconv.Atoi(timeArr[0])
  22. min, _ := strconv.Atoi(timeArr[1])
  23. sec, _ := strconv.Atoi(timeArr[2])
  24. duration := time.Duration(global.M2A_CONFIG.FTP.Duration)
  25. // 启动协程处理任务
  26. go func() {
  27. for range time.Tick(duration * time.Minute) {
  28. // for range time.Tick(1) {
  29. if loc, err := time.LoadLocation("Local"); err == nil {
  30. targetTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), hour, min, sec, 0, loc)
  31. // 计算时间差,等待执行时间
  32. // dur := time.Until(targetTime)
  33. // if dur < 0 {
  34. // targetTime = targetTime.Add(24 * time.Hour)
  35. // dur = time.Until(targetTime)
  36. // }
  37. // time.Sleep(dur)
  38. if time.Now().Before(targetTime) {
  39. // 执行函数
  40. global.M2A_LOG.Info("WMS结算单导入:启动-ImportWMSReckon")
  41. ImportWMSReckon()
  42. }
  43. }
  44. }
  45. }()
  46. return
  47. }
  48. // ImportWMSReckon 导入WMS结算单
  49. func ImportWMSReckon() {
  50. // 连接FTP
  51. var err error
  52. if global.M2A_FTP, err = core.FTP(); err != nil {
  53. global.M2A_LOG.Error("连接FTP失败, err:", zap.Error(err))
  54. return
  55. }
  56. defer global.M2A_FTP.Quit()
  57. // 暂存ftp文件的临时目录
  58. os.RemoveAll("./.ftp_tmp")
  59. os.Mkdir("./.ftp_tmp", os.ModePerm)
  60. // 列出FTP服务器上的文件列表
  61. entries, err := global.M2A_FTP.List(global.M2A_CONFIG.FTP.Folder)
  62. if err != nil {
  63. global.M2A_LOG.Error("FTP获取文件列表失败, err:", zap.Error(err))
  64. return
  65. }
  66. global.M2A_LOG.Info("WMS结算单导入:FTP获取文件列表成功")
  67. for _, entry := range entries {
  68. // 下载FTP服务器上的文件
  69. srcFile, err := global.M2A_FTP.Retr(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name)
  70. if err != nil {
  71. global.M2A_LOG.Error("获取FTP源文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  72. continue
  73. }
  74. dstFile, err := os.Create("./.ftp_tmp/" + entry.Name)
  75. if err != nil {
  76. global.M2A_LOG.Error("创建FTP临时文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  77. continue
  78. }
  79. _, err = io.Copy(dstFile, srcFile)
  80. if err != nil {
  81. global.M2A_LOG.Error("下载FTP文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  82. continue
  83. }
  84. srcFile.Close()
  85. // 创建CSV Reader
  86. dstFile.Close()
  87. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  88. // defer dstFile.Close()
  89. reader := csv.NewReader(dstFile)
  90. // 设置分隔符为逗号
  91. reader.Comma = ','
  92. // 读取所有行
  93. rows, e := reader.ReadAll()
  94. if e != nil {
  95. err = e
  96. global.M2A_LOG.Error("读取CSV文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  97. continue
  98. }
  99. for i, row := range rows {
  100. if i == 0 { // 第一行是列头
  101. continue
  102. }
  103. // 检验记录
  104. // 会员统一信用代码
  105. userKey := row[0]
  106. key, _ := hex.DecodeString(utils.AESSecretKey)
  107. var userKeyEncrypted []byte
  108. if userKeyEncrypted, err = utils.AESEncrypt([]byte(userKey), key); err != nil || userKeyEncrypted == nil {
  109. global.M2A_LOG.Error("加密会员统一信用代码失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  110. continue
  111. }
  112. userinfo := model.Userinfo{
  113. CARDNUM: hex.EncodeToString(userKeyEncrypted),
  114. }
  115. has := false
  116. if has, err = global.M2A_DB.Get(&userinfo); err != nil || !has {
  117. global.M2A_LOG.Error("用户唯一标识在数据库中无对应记录或读取数据库失败,", zap.String("UserKey", string(userKeyEncrypted)), zap.String("err", err.Error()))
  118. continue
  119. }
  120. // WMS结算单流水号
  121. wmsOrderID := row[1]
  122. gzbscreckonorder := model.Gzbscreckonorder{
  123. WMSORDERID: wmsOrderID,
  124. }
  125. has = false
  126. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  127. global.M2A_LOG.Error("WMS结算单流水号已在数据库中存在,", zap.String("WMSOrderID", wmsOrderID))
  128. continue
  129. }
  130. // “会员统一信用代码”+ “月份(yyyyMM)“
  131. reckonMonth := row[3]
  132. gzbscreckonorder = model.Gzbscreckonorder{
  133. USERKEY: userKey,
  134. RECKONMONTH: reckonMonth,
  135. }
  136. has = false
  137. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  138. global.M2A_LOG.Error("会员统一信用代码及月份已在数据库中存在,", zap.String("UserKey", userKey), zap.String("ReckonMonth", reckonMonth))
  139. continue
  140. }
  141. // 总费用
  142. totalFeeStr := row[9]
  143. if totalFeeStr == "" {
  144. global.M2A_LOG.Error("总费用为空,", zap.String("UserKey", userKey))
  145. continue
  146. }
  147. totalFee, err := strconv.ParseFloat(totalFeeStr, 64)
  148. if err != nil || totalFee == 0.0 {
  149. global.M2A_LOG.Error("总费用值错误,", zap.String("UserKey", userKey), zap.String("totalFee", totalFeeStr))
  150. continue
  151. }
  152. // 获取资金账户ID
  153. taaccounts := make([]model.Taaccount, 0)
  154. if err := global.M2A_DB.Where("USERID = ?", userinfo.USERID).Find(&taaccounts); err != nil || len(taaccounts) == 0 {
  155. global.M2A_LOG.Error("获取资金账户失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  156. continue
  157. }
  158. // 写入数据库
  159. sql := fmt.Sprintf(`
  160. insert into GZ_BSCRECKONORDER
  161. (
  162. OrderID,
  163. UserKey,
  164. WMSOrderID,
  165. ContractNo,
  166. ReckonMonth,
  167. ServiceFee,
  168. StorageFee,
  169. Premium,
  170. PowerFee,
  171. CustomsFee,
  172. TotalFee,
  173. Applicant,
  174. ApplicantTime,
  175. ImportFile,
  176. UserID,
  177. AccountID,
  178. PayStatus
  179. ) values
  180. (
  181. SEQ_GZ_BSCRECKONORDER.nextval,
  182. '%v',
  183. '%v',
  184. '%v',
  185. '%v',
  186. '%v',
  187. '%v',
  188. '%v',
  189. '%v',
  190. '%v',
  191. %v,
  192. '%v',
  193. '%v',
  194. '%v',
  195. %v,
  196. %v,
  197. 2
  198. )
  199. `, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11],
  200. entry.Name, userinfo.USERID, taaccounts[0].ACCOUNTID)
  201. if _, err := global.M2A_DB.Exec(sql); err != nil {
  202. global.M2A_LOG.Error("FTP数据写入数据库失败,", zap.String("UserKey", row[0]), zap.String("err", err.Error()))
  203. continue
  204. }
  205. // 移入FTP备份文件夹
  206. dstFile.Close()
  207. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  208. // defer dstFile.Close()
  209. // 注意上传之前需要把之前的关闭,不然会报类似 移入FTP备份文件夹失败, {"file": "xxxxxxxxxxxx_202304.csv", "err": "229 Entering Extended Passive Mode (|||23113|)."}
  210. if err = global.M2A_FTP.Stor(global.M2A_CONFIG.FTP.FolderBakup+"/"+entry.Name, dstFile); err != nil {
  211. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  212. }
  213. if err = global.M2A_FTP.Delete(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name); err != nil {
  214. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  215. }
  216. dstFile.Close()
  217. }
  218. }
  219. }