ftp.go 7.2 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "mtp20_assisted/core"
  7. "mtp20_assisted/global"
  8. "mtp20_assisted/model"
  9. "mtp20_assisted/utils"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "encoding/csv"
  15. "encoding/hex"
  16. "go.uber.org/zap"
  17. )
  18. func InitTimer() (err error) {
  19. // 获取触发时间点
  20. timeStr := global.M2A_CONFIG.FTP.TimeAt
  21. timeArr := strings.Split(timeStr, ":")
  22. hour, _ := strconv.Atoi(timeArr[0])
  23. min, _ := strconv.Atoi(timeArr[1])
  24. sec, _ := strconv.Atoi(timeArr[2])
  25. duration := time.Duration(global.M2A_CONFIG.FTP.Duration)
  26. // 启动协程处理任务
  27. go func() {
  28. for range time.Tick(duration * time.Minute) {
  29. // for range time.Tick(1) {
  30. if loc, err := time.LoadLocation("Local"); err == nil {
  31. targetTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), hour, min, sec, 0, loc)
  32. fmt.Println(targetTime)
  33. // 计算时间差,等待执行时间
  34. // dur := time.Until(targetTime)
  35. // if dur < 0 {
  36. // targetTime = targetTime.Add(24 * time.Hour)
  37. // dur = time.Until(targetTime)
  38. // }
  39. // time.Sleep(dur)
  40. if time.Now().After(targetTime) {
  41. // 执行函数
  42. global.M2A_LOG.Info("WMS结算单导入:启动-ImportWMSReckon")
  43. ImportWMSReckon()
  44. }
  45. }
  46. }
  47. }()
  48. return
  49. }
  50. // ImportWMSReckon 导入WMS结算单
  51. func ImportWMSReckon() {
  52. // 连接FTP
  53. var err error
  54. if global.M2A_FTP, err = core.FTP(); err != nil {
  55. global.M2A_LOG.Error("连接FTP失败, err:", zap.Error(err))
  56. return
  57. }
  58. defer global.M2A_FTP.Quit()
  59. // 暂存ftp文件的临时目录
  60. os.RemoveAll("./.ftp_tmp")
  61. os.Mkdir("./.ftp_tmp", os.ModePerm)
  62. // 列出FTP服务器上的文件列表
  63. entries, err := global.M2A_FTP.List(global.M2A_CONFIG.FTP.Folder)
  64. if err != nil {
  65. global.M2A_LOG.Error("FTP获取文件列表失败, err:", zap.Error(err))
  66. return
  67. }
  68. global.M2A_LOG.Info("WMS结算单导入:FTP获取文件列表成功")
  69. for _, entry := range entries {
  70. // 下载FTP服务器上的文件
  71. srcFile, err := global.M2A_FTP.Retr(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name)
  72. if err != nil {
  73. global.M2A_LOG.Error("获取FTP源文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  74. continue
  75. }
  76. dstFile, err := os.Create("./.ftp_tmp/" + entry.Name)
  77. if err != nil {
  78. global.M2A_LOG.Error("创建FTP临时文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  79. continue
  80. }
  81. _, err = io.Copy(dstFile, srcFile)
  82. if err != nil {
  83. global.M2A_LOG.Error("下载FTP文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  84. continue
  85. }
  86. srcFile.Close()
  87. // 创建CSV Reader
  88. dstFile.Close()
  89. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  90. // defer dstFile.Close()
  91. reader := csv.NewReader(dstFile)
  92. // 设置分隔符为逗号
  93. reader.Comma = ','
  94. // 读取所有行
  95. rows, e := reader.ReadAll()
  96. if e != nil {
  97. err = e
  98. global.M2A_LOG.Error("读取CSV文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  99. continue
  100. }
  101. for i, row := range rows {
  102. if i == 0 { // 第一行是列头
  103. continue
  104. }
  105. // 检验记录
  106. // 会员统一信用代码
  107. userKey := row[0]
  108. key, _ := hex.DecodeString(utils.AESSecretKey)
  109. var userKeyEncrypted []byte
  110. if userKeyEncrypted, err = utils.AESEncrypt([]byte(userKey), key); err != nil || userKeyEncrypted == nil {
  111. if err == nil {
  112. err = errors.New("DataNoFound")
  113. }
  114. global.M2A_LOG.Error("加密会员统一信用代码失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  115. continue
  116. }
  117. userinfo := model.Userinfo{
  118. CARDNUM: hex.EncodeToString(userKeyEncrypted),
  119. }
  120. has := false
  121. if has, err = global.M2A_DB.Get(&userinfo); err != nil || !has {
  122. if err == nil {
  123. err = errors.New("DataNoFound")
  124. }
  125. global.M2A_LOG.Error("用户唯一标识在数据库中无对应记录或读取数据库失败,", zap.String("UserKey", string(userKeyEncrypted)), zap.String("err", err.Error()))
  126. continue
  127. }
  128. // WMS结算单流水号
  129. wmsOrderID := row[1]
  130. gzbscreckonorder := model.Gzbscreckonorder{
  131. WMSORDERID: wmsOrderID,
  132. }
  133. has = false
  134. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  135. global.M2A_LOG.Error("WMS结算单流水号已在数据库中存在,", zap.String("WMSOrderID", wmsOrderID))
  136. continue
  137. }
  138. // “会员统一信用代码”+ “月份(yyyyMM)“
  139. reckonMonth := row[3]
  140. gzbscreckonorder = model.Gzbscreckonorder{
  141. USERKEY: userKey,
  142. RECKONMONTH: reckonMonth,
  143. }
  144. has = false
  145. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  146. global.M2A_LOG.Error("会员统一信用代码及月份已在数据库中存在,", zap.String("UserKey", userKey), zap.String("ReckonMonth", reckonMonth))
  147. continue
  148. }
  149. // 总费用
  150. totalFeeStr := row[9]
  151. if totalFeeStr == "" {
  152. global.M2A_LOG.Error("总费用为空,", zap.String("UserKey", userKey))
  153. continue
  154. }
  155. totalFee, err := strconv.ParseFloat(totalFeeStr, 64)
  156. if err != nil || totalFee == 0.0 {
  157. global.M2A_LOG.Error("总费用值错误,", zap.String("UserKey", userKey), zap.String("totalFee", totalFeeStr))
  158. continue
  159. }
  160. // 获取资金账户ID
  161. taaccounts := make([]model.Taaccount, 0)
  162. if err := global.M2A_DB.Where("USERID = ?", userinfo.USERID).Find(&taaccounts); err != nil || len(taaccounts) == 0 {
  163. if err == nil {
  164. err = errors.New("DataNoFound")
  165. }
  166. global.M2A_LOG.Error("获取资金账户失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  167. continue
  168. }
  169. // 写入数据库
  170. sql := fmt.Sprintf(`
  171. insert into GZ_BSCRECKONORDER
  172. (
  173. OrderID,
  174. UserKey,
  175. WMSOrderID,
  176. ContractNo,
  177. ReckonMonth,
  178. ServiceFee,
  179. StorageFee,
  180. Premium,
  181. PowerFee,
  182. CustomsFee,
  183. TotalFee,
  184. Applicant,
  185. ApplicantTime,
  186. ImportFile,
  187. UserID,
  188. AccountID,
  189. PayStatus
  190. ) values
  191. (
  192. SEQ_GZ_BSCRECKONORDER.nextval,
  193. '%v',
  194. '%v',
  195. '%v',
  196. '%v',
  197. '%v',
  198. '%v',
  199. '%v',
  200. '%v',
  201. '%v',
  202. %v,
  203. '%v',
  204. '%v',
  205. '%v',
  206. %v,
  207. %v,
  208. 2
  209. )
  210. `, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11],
  211. entry.Name, userinfo.USERID, taaccounts[0].ACCOUNTID)
  212. if _, err := global.M2A_DB.Exec(sql); err != nil {
  213. global.M2A_LOG.Error("FTP数据写入数据库失败,", zap.String("UserKey", row[0]), zap.String("err", err.Error()))
  214. continue
  215. }
  216. // 移入FTP备份文件夹
  217. dstFile.Close()
  218. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  219. // defer dstFile.Close()
  220. // 注意上传之前需要把之前的关闭,不然会报类似 移入FTP备份文件夹失败, {"file": "xxxxxxxxxxxx_202304.csv", "err": "229 Entering Extended Passive Mode (|||23113|)."}
  221. if err = global.M2A_FTP.Stor(global.M2A_CONFIG.FTP.FolderBakup+"/"+entry.Name, dstFile); err != nil {
  222. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  223. }
  224. if err = global.M2A_FTP.Delete(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name); err != nil {
  225. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  226. }
  227. dstFile.Close()
  228. }
  229. }
  230. }