ftp.go 6.7 KB


  1. package service
  2. import (
  3. "fmt"
  4. "io"
  5. "mtp20_assisted/core"
  6. "mtp20_assisted/global"
  7. "mtp20_assisted/model"
  8. "mtp20_assisted/utils"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "encoding/csv"
  14. "encoding/hex"
  15. "go.uber.org/zap"
  16. )
  17. func InitTimer() (err error) {
  18. // 获取触发时间点
  19. timeStr := global.M2A_CONFIG.FTP.TimeAt
  20. timeArr := strings.Split(timeStr, ":")
  21. hour, _ := strconv.Atoi(timeArr[0])
  22. min, _ := strconv.Atoi(timeArr[1])
  23. sec, _ := strconv.Atoi(timeArr[2])
  24. loc, err := time.LoadLocation("Local")
  25. targetTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), hour, min, sec, 0, loc)
  26. duration := time.Duration(global.M2A_CONFIG.FTP.Duration)
  27. // 启动协程处理任务
  28. go func() {
  29. for range time.Tick(duration * time.Minute) {
  30. // 计算时间差,等待执行时间
  31. dur := time.Until(targetTime)
  32. if dur < 0 {
  33. targetTime = targetTime.Add(24 * time.Hour)
  34. dur = time.Until(targetTime)
  35. }
  36. time.Sleep(dur)
  37. // 执行函数
  38. ImportWMSReckon()
  39. }
  40. }()
  41. return
  42. }
  43. // ImportWMSReckon 导入WMS结算单
  44. func ImportWMSReckon() {
  45. // 连接FTP
  46. var err error
  47. if global.M2A_FTP, err = core.FTP(); err != nil {
  48. global.M2A_LOG.Error("连接FTP失败, err:", zap.Error(err))
  49. return
  50. }
  51. defer global.M2A_FTP.Quit()
  52. // 暂存ftp文件的临时目录
  53. os.RemoveAll("./.ftp_tmp")
  54. os.Mkdir("./.ftp_tmp", os.ModePerm)
  55. // 列出FTP服务器上的文件列表
  56. entries, err := global.M2A_FTP.List(global.M2A_CONFIG.FTP.Folder)
  57. if err != nil {
  58. global.M2A_LOG.Error("FTP获取文件列表失败, err:", zap.Error(err))
  59. return
  60. }
  61. for _, entry := range entries {
  62. // 下载FTP服务器上的文件
  63. srcFile, err := global.M2A_FTP.Retr(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name)
  64. if err != nil {
  65. global.M2A_LOG.Error("获取FTP源文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  66. continue
  67. }
  68. dstFile, err := os.Create("./.ftp_tmp/" + entry.Name)
  69. if err != nil {
  70. global.M2A_LOG.Error("创建FTP临时文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  71. continue
  72. }
  73. _, err = io.Copy(dstFile, srcFile)
  74. if err != nil {
  75. global.M2A_LOG.Error("下载FTP文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  76. continue
  77. }
  78. srcFile.Close()
  79. // 创建CSV Reader
  80. dstFile.Close()
  81. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  82. // defer dstFile.Close()
  83. reader := csv.NewReader(dstFile)
  84. // 设置分隔符为逗号
  85. reader.Comma = ','
  86. // 读取所有行
  87. rows, e := reader.ReadAll()
  88. if e != nil {
  89. err = e
  90. global.M2A_LOG.Error("读取CSV文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  91. continue
  92. }
  93. for i, row := range rows {
  94. if i == 0 { // 第一行是列头
  95. continue
  96. }
  97. // 检验记录
  98. // 会员统一信用代码
  99. userKey := row[0]
  100. key, _ := hex.DecodeString(utils.AESSecretKey)
  101. var userKeyEncrypted []byte
  102. if userKeyEncrypted, err = utils.AESEncrypt([]byte(userKey), key); err != nil || userKeyEncrypted == nil {
  103. global.M2A_LOG.Error("加密会员统一信用代码失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  104. continue
  105. }
  106. userinfo := model.Userinfo{
  107. CARDNUM: hex.EncodeToString(userKeyEncrypted),
  108. }
  109. has := false
  110. if has, err = global.M2A_DB.Get(&userinfo); err != nil || !has {
  111. global.M2A_LOG.Error("用户唯一标识在数据库中无对应记录或读取数据库失败,", zap.String("UserKey", string(userKeyEncrypted)), zap.String("err", err.Error()))
  112. continue
  113. }
  114. // WMS结算单流水号
  115. wmsOrderID := row[1]
  116. gzbscreckonorder := model.Gzbscreckonorder{
  117. WMSORDERID: wmsOrderID,
  118. }
  119. has = false
  120. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  121. global.M2A_LOG.Error("WMS结算单流水号已在数据库中存在,", zap.String("WMSOrderID", wmsOrderID))
  122. continue
  123. }
  124. // “会员统一信用代码”+ “月份(yyyyMM)“
  125. reckonMonth := row[3]
  126. gzbscreckonorder = model.Gzbscreckonorder{
  127. USERKEY: userKey,
  128. RECKONMONTH: reckonMonth,
  129. }
  130. has = false
  131. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  132. global.M2A_LOG.Error("会员统一信用代码及月份已在数据库中存在,", zap.String("UserKey", userKey), zap.String("ReckonMonth", reckonMonth))
  133. continue
  134. }
  135. // 总费用
  136. totalFeeStr := row[9]
  137. if totalFeeStr == "" {
  138. global.M2A_LOG.Error("总费用为空,", zap.String("UserKey", userKey))
  139. continue
  140. }
  141. totalFee, err := strconv.ParseFloat(totalFeeStr, 64)
  142. if err != nil || totalFee == 0.0 {
  143. global.M2A_LOG.Error("总费用值错误,", zap.String("UserKey", userKey), zap.String("totalFee", totalFeeStr))
  144. continue
  145. }
  146. // 获取资金账户ID
  147. taaccounts := make([]model.Taaccount, 0)
  148. if err := global.M2A_DB.Where("USERID = ?", userinfo.USERID).Find(&taaccounts); err != nil || len(taaccounts) == 0 {
  149. global.M2A_LOG.Error("获取资金账户失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  150. continue
  151. }
  152. // 写入数据库
  153. sql := fmt.Sprintf(`
  154. insert into GZ_BSCRECKONORDER
  155. (
  156. OrderID,
  157. UserKey,
  158. WMSOrderID,
  159. ContractNo,
  160. ReckonMonth,
  161. ServiceFee,
  162. StorageFee,
  163. Premium,
  164. PowerFee,
  165. CustomsFee,
  166. TotalFee,
  167. Applicant,
  168. ApplicantTime,
  169. ImportFile,
  170. UserID,
  171. AccountID,
  172. PayStatus
  173. ) values
  174. (
  175. SEQ_GZ_BSCRECKONORDER.nextval,
  176. '%v',
  177. '%v',
  178. '%v',
  179. '%v',
  180. '%v',
  181. '%v',
  182. '%v',
  183. '%v',
  184. '%v',
  185. %v,
  186. '%v',
  187. '%v',
  188. '%v',
  189. %v,
  190. %v,
  191. 2
  192. )
  193. `, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11],
  194. entry.Name, userinfo.USERID, taaccounts[0].ACCOUNTID)
  195. if _, err := global.M2A_DB.Exec(sql); err != nil {
  196. global.M2A_LOG.Error("FTP数据写入数据库失败,", zap.String("UserKey", row[0]), zap.String("err", err.Error()))
  197. continue
  198. }
  199. // 移入FTP备份文件夹
  200. dstFile.Close()
  201. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  202. // defer dstFile.Close()
  203. // 注意上传之前需要把之前的关闭,不然会报类似 移入FTP备份文件夹失败, {"file": "xxxxxxxxxxxx_202304.csv", "err": "229 Entering Extended Passive Mode (|||23113|)."}
  204. if err = global.M2A_FTP.Stor(global.M2A_CONFIG.FTP.FolderBakup+"/"+entry.Name, dstFile); err != nil {
  205. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  206. }
  207. if err = global.M2A_FTP.Delete(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name); err != nil {
  208. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  209. }
  210. dstFile.Close()
  211. }
  212. }
  213. }