ftp.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package service
  2. import (
  3. "fmt"
  4. "io"
  5. "mtp20_assisted/global"
  6. "mtp20_assisted/model"
  7. "mtp20_assisted/utils"
  8. "os"
  9. "strconv"
  10. "time"
  11. "encoding/csv"
  12. "encoding/hex"
  13. "go.uber.org/zap"
  14. )
  15. func InitTimer() (err error) {
  16. loc, err := time.LoadLocation("Local")
  17. targetTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), global.M2A_CONFIG.FTP.TimeAt, 0, 0, 0, loc)
  18. // 启动协程处理任务
  19. go func() {
  20. for range time.Tick(24 * time.Hour) {
  21. // 计算时间差,等待执行时间
  22. dur := time.Until(targetTime)
  23. if dur < 0 {
  24. targetTime = targetTime.Add(24 * time.Hour)
  25. dur = time.Until(targetTime)
  26. }
  27. time.Sleep(dur)
  28. // 执行函数
  29. ImportWMSReckon()
  30. }
  31. }()
  32. return
  33. }
  34. // ImportWMSReckon 导入WMS结算单
  35. func ImportWMSReckon() {
  36. // 暂存ftp文件的临时目录
  37. os.RemoveAll("./.ftp_tmp")
  38. os.Mkdir("./.ftp_tmp", os.ModePerm)
  39. // 列出FTP服务器上的文件列表
  40. entries, err := global.M2A_FTP.List(global.M2A_CONFIG.FTP.Folder)
  41. if err != nil {
  42. global.M2A_LOG.Error("FTP获取文件列表失败, err:", zap.Error(err))
  43. return
  44. }
  45. for _, entry := range entries {
  46. // 下载FTP服务器上的文件
  47. srcFile, err := global.M2A_FTP.Retr(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name)
  48. if err != nil {
  49. global.M2A_LOG.Error("获取FTP源文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  50. continue
  51. }
  52. dstFile, err := os.Create("./.ftp_tmp/" + entry.Name)
  53. if err != nil {
  54. global.M2A_LOG.Error("创建FTP临时文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  55. continue
  56. }
  57. _, err = io.Copy(dstFile, srcFile)
  58. if err != nil {
  59. global.M2A_LOG.Error("下载FTP文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  60. continue
  61. }
  62. srcFile.Close()
  63. // 创建CSV Reader
  64. dstFile.Close()
  65. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  66. // defer dstFile.Close()
  67. reader := csv.NewReader(dstFile)
  68. // 设置分隔符为逗号
  69. reader.Comma = ','
  70. // 读取所有行
  71. rows, e := reader.ReadAll()
  72. if e != nil {
  73. err = e
  74. global.M2A_LOG.Error("读取CSV文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  75. continue
  76. }
  77. for i, row := range rows {
  78. if i == 0 { // 第一行是列头
  79. continue
  80. }
  81. // 检验记录
  82. // 会员统一信用代码
  83. userKey := row[0]
  84. key, _ := hex.DecodeString(utils.AESSecretKey)
  85. var userKeyEncrypted []byte
  86. if userKeyEncrypted, err = utils.AESEncrypt([]byte(userKey), key); err != nil || userKeyEncrypted == nil {
  87. global.M2A_LOG.Error("加密会员统一信用代码失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  88. continue
  89. }
  90. userinfo := model.Userinfo{
  91. CARDNUM: hex.EncodeToString(userKeyEncrypted),
  92. }
  93. has := false
  94. if has, err = global.M2A_DB.Get(&userinfo); err != nil || !has {
  95. global.M2A_LOG.Error("用户唯一标识在数据库中无对应记录或读取数据库失败,", zap.String("UserKey", string(userKeyEncrypted)), zap.String("err", err.Error()))
  96. continue
  97. }
  98. // WMS结算单流水号
  99. wmsOrderID := row[1]
  100. gzbscreckonorder := model.Gzbscreckonorder{
  101. WMSORDERID: wmsOrderID,
  102. }
  103. has = false
  104. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  105. global.M2A_LOG.Error("WMS结算单流水号已在数据库中存在,", zap.String("WMSOrderID", wmsOrderID))
  106. continue
  107. }
  108. // “会员统一信用代码”+ “月份(yyyyMM)“
  109. reckonMonth := row[3]
  110. gzbscreckonorder = model.Gzbscreckonorder{
  111. USERKEY: userKey,
  112. RECKONMONTH: reckonMonth,
  113. }
  114. has = false
  115. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  116. global.M2A_LOG.Error("会员统一信用代码及月份已在数据库中存在,", zap.String("UserKey", userKey), zap.String("ReckonMonth", reckonMonth))
  117. continue
  118. }
  119. // 总费用
  120. totalFeeStr := row[9]
  121. if totalFeeStr == "" {
  122. global.M2A_LOG.Error("总费用为空,", zap.String("UserKey", userKey))
  123. continue
  124. }
  125. totalFee, err := strconv.ParseFloat(totalFeeStr, 64)
  126. if err != nil || totalFee == 0.0 {
  127. global.M2A_LOG.Error("总费用值错误,", zap.String("UserKey", userKey), zap.String("totalFee", totalFeeStr))
  128. continue
  129. }
  130. // 获取资金账户ID
  131. taaccounts := make([]model.Taaccount, 0)
  132. if err := global.M2A_DB.Where("USERID = ?", userinfo.USERID).Find(&taaccounts); err != nil || len(taaccounts) == 0 {
  133. global.M2A_LOG.Error("获取资金账户失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  134. continue
  135. }
  136. // 写入数据库
  137. sql := fmt.Sprintf(`
  138. insert into GZ_BSCRECKONORDER
  139. (
  140. OrderID,
  141. UserKey,
  142. WMSOrderID,
  143. ContractNo,
  144. ReckonMonth,
  145. ServiceFee,
  146. StorageFee,
  147. Premium,
  148. PowerFee,
  149. CustomsFee,
  150. TotalFee,
  151. Applicant,
  152. ApplicantTime,
  153. ImportFile,
  154. UserID,
  155. AccountID,
  156. PayStatus
  157. ) values
  158. (
  159. SEQ_GZ_BSCRECKONORDER.nextval,
  160. '%v',
  161. '%v',
  162. '%v',
  163. '%v',
  164. '%v',
  165. '%v',
  166. '%v',
  167. '%v',
  168. '%v',
  169. %v,
  170. '%v',
  171. '%v',
  172. '%v',
  173. %v,
  174. %v,
  175. 2
  176. )
  177. `, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11],
  178. entry.Name, userinfo.USERID, taaccounts[0].ACCOUNTID)
  179. if _, err := global.M2A_DB.Exec(sql); err != nil {
  180. global.M2A_LOG.Error("FTP数据写入数据库失败,", zap.String("UserKey", row[0]), zap.String("err", err.Error()))
  181. continue
  182. }
  183. // 移入FTP备份文件夹
  184. dstFile.Close()
  185. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  186. // defer dstFile.Close()
  187. // 注意上传之前需要把之前的关闭,不然会报类似 移入FTP备份文件夹失败, {"file": "xxxxxxxxxxxx_202304.csv", "err": "229 Entering Extended Passive Mode (|||23113|)."}
  188. if err = global.M2A_FTP.Stor(global.M2A_CONFIG.FTP.FolderBakup+"/"+entry.Name, dstFile); err != nil {
  189. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  190. }
  191. if err = global.M2A_FTP.Delete(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name); err != nil {
  192. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  193. }
  194. dstFile.Close()
  195. }
  196. }
  197. }