ftp.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "mtp20_assisted/core"
  7. "mtp20_assisted/global"
  8. "mtp20_assisted/model"
  9. "mtp20_assisted/utils"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "encoding/csv"
  15. "encoding/hex"
  16. "go.uber.org/zap"
  17. )
  18. func InitTimer() (err error) {
  19. // 获取触发时间点
  20. timeStr := global.M2A_CONFIG.FTP.TimeAt
  21. timeArr := strings.Split(timeStr, ":")
  22. hour, _ := strconv.Atoi(timeArr[0])
  23. min, _ := strconv.Atoi(timeArr[1])
  24. sec, _ := strconv.Atoi(timeArr[2])
  25. duration := time.Duration(global.M2A_CONFIG.FTP.Duration)
  26. // 启动协程处理任务
  27. go func() {
  28. for range time.Tick(duration * time.Minute) {
  29. // for range time.Tick(1) {
  30. if loc, err := time.LoadLocation("Local"); err == nil {
  31. targetTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), hour, min, sec, 0, loc)
  32. // 计算时间差,等待执行时间
  33. // dur := time.Until(targetTime)
  34. // if dur < 0 {
  35. // targetTime = targetTime.Add(24 * time.Hour)
  36. // dur = time.Until(targetTime)
  37. // }
  38. // time.Sleep(dur)
  39. if time.Now().After(targetTime) {
  40. // 执行函数
  41. global.M2A_LOG.Info("WMS结算单导入:启动-ImportWMSReckon")
  42. ImportWMSReckon()
  43. }
  44. }
  45. }
  46. }()
  47. return
  48. }
  49. // ImportWMSReckon 导入WMS结算单
  50. func ImportWMSReckon() {
  51. // 连接FTP
  52. var err error
  53. if global.M2A_FTP, err = core.FTP(); err != nil {
  54. global.M2A_LOG.Error("连接FTP失败, err:", zap.Error(err))
  55. return
  56. }
  57. defer global.M2A_FTP.Quit()
  58. // 暂存ftp文件的临时目录
  59. os.RemoveAll("./.ftp_tmp")
  60. os.Mkdir("./.ftp_tmp", os.ModePerm)
  61. // 列出FTP服务器上的文件列表
  62. entries, err := global.M2A_FTP.List(global.M2A_CONFIG.FTP.Folder)
  63. if err != nil {
  64. global.M2A_LOG.Error("FTP获取文件列表失败, err:", zap.Error(err))
  65. return
  66. }
  67. global.M2A_LOG.Info("WMS结算单导入:FTP获取文件列表成功")
  68. for _, entry := range entries {
  69. // 下载FTP服务器上的文件
  70. srcFile, err := global.M2A_FTP.Retr(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name)
  71. if err != nil {
  72. global.M2A_LOG.Error("获取FTP源文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  73. continue
  74. }
  75. dstFile, err := os.Create("./.ftp_tmp/" + entry.Name)
  76. if err != nil {
  77. global.M2A_LOG.Error("创建FTP临时文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  78. continue
  79. }
  80. _, err = io.Copy(dstFile, srcFile)
  81. if err != nil {
  82. global.M2A_LOG.Error("下载FTP文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  83. continue
  84. }
  85. srcFile.Close()
  86. // 创建CSV Reader
  87. dstFile.Close()
  88. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  89. // defer dstFile.Close()
  90. reader := csv.NewReader(dstFile)
  91. // 设置分隔符为逗号
  92. reader.Comma = ','
  93. // 读取所有行
  94. rows, e := reader.ReadAll()
  95. if e != nil {
  96. err = e
  97. global.M2A_LOG.Error("读取CSV文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  98. continue
  99. }
  100. for i, row := range rows {
  101. if i == 0 { // 第一行是列头
  102. continue
  103. }
  104. // 检验记录
  105. // 会员统一信用代码
  106. userKey := row[0]
  107. key, _ := hex.DecodeString(utils.AESSecretKey)
  108. var userKeyEncrypted []byte
  109. if userKeyEncrypted, err = utils.AESEncrypt([]byte(userKey), key); err != nil || userKeyEncrypted == nil {
  110. if err == nil {
  111. err = errors.New("DataNoFound")
  112. }
  113. global.M2A_LOG.Error("加密会员统一信用代码失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  114. continue
  115. }
  116. userinfo := model.Userinfo{
  117. CARDNUM: hex.EncodeToString(userKeyEncrypted),
  118. }
  119. has := false
  120. if has, err = global.M2A_DB.Get(&userinfo); err != nil || !has {
  121. if err == nil {
  122. err = errors.New("DataNoFound")
  123. }
  124. global.M2A_LOG.Error("用户唯一标识在数据库中无对应记录或读取数据库失败,", zap.String("UserKey", string(userKeyEncrypted)), zap.String("err", err.Error()))
  125. continue
  126. }
  127. // WMS结算单流水号
  128. wmsOrderID := row[1]
  129. gzbscreckonorder := model.Gzbscreckonorder{
  130. WMSORDERID: wmsOrderID,
  131. }
  132. has = false
  133. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  134. global.M2A_LOG.Error("WMS结算单流水号已在数据库中存在,", zap.String("WMSOrderID", wmsOrderID))
  135. continue
  136. }
  137. // “会员统一信用代码”+ “月份(yyyyMM)“
  138. reckonMonth := row[3]
  139. gzbscreckonorder = model.Gzbscreckonorder{
  140. USERKEY: userKey,
  141. RECKONMONTH: reckonMonth,
  142. }
  143. has = false
  144. if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
  145. global.M2A_LOG.Error("会员统一信用代码及月份已在数据库中存在,", zap.String("UserKey", userKey), zap.String("ReckonMonth", reckonMonth))
  146. continue
  147. }
  148. // 总费用
  149. totalFeeStr := row[9]
  150. if totalFeeStr == "" {
  151. global.M2A_LOG.Error("总费用为空,", zap.String("UserKey", userKey))
  152. continue
  153. }
  154. totalFee, err := strconv.ParseFloat(totalFeeStr, 64)
  155. if err != nil || totalFee == 0.0 {
  156. global.M2A_LOG.Error("总费用值错误,", zap.String("UserKey", userKey), zap.String("totalFee", totalFeeStr))
  157. continue
  158. }
  159. // 获取资金账户ID
  160. taaccounts := make([]model.Taaccount, 0)
  161. if err := global.M2A_DB.Where("USERID = ?", userinfo.USERID).Find(&taaccounts); err != nil || len(taaccounts) == 0 {
  162. if err == nil {
  163. err = errors.New("DataNoFound")
  164. }
  165. global.M2A_LOG.Error("获取资金账户失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
  166. continue
  167. }
  168. // 写入数据库
  169. sql := fmt.Sprintf(`
  170. insert into GZ_BSCRECKONORDER
  171. (
  172. OrderID,
  173. UserKey,
  174. WMSOrderID,
  175. ContractNo,
  176. ReckonMonth,
  177. ServiceFee,
  178. StorageFee,
  179. Premium,
  180. PowerFee,
  181. CustomsFee,
  182. TotalFee,
  183. Applicant,
  184. ApplicantTime,
  185. ImportFile,
  186. UserID,
  187. AccountID,
  188. PayStatus
  189. ) values
  190. (
  191. SEQ_GZ_BSCRECKONORDER.nextval,
  192. '%v',
  193. '%v',
  194. '%v',
  195. '%v',
  196. '%v',
  197. '%v',
  198. '%v',
  199. '%v',
  200. '%v',
  201. %v,
  202. '%v',
  203. '%v',
  204. '%v',
  205. %v,
  206. %v,
  207. 2
  208. )
  209. `, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11],
  210. entry.Name, userinfo.USERID, taaccounts[0].ACCOUNTID)
  211. if _, err := global.M2A_DB.Exec(sql); err != nil {
  212. global.M2A_LOG.Error("FTP数据写入数据库失败,", zap.String("UserKey", row[0]), zap.String("err", err.Error()))
  213. continue
  214. }
  215. // 移入FTP备份文件夹
  216. dstFile.Close()
  217. dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
  218. // defer dstFile.Close()
  219. // 注意上传之前需要把之前的关闭,不然会报类似 移入FTP备份文件夹失败, {"file": "xxxxxxxxxxxx_202304.csv", "err": "229 Entering Extended Passive Mode (|||23113|)."}
  220. if err = global.M2A_FTP.Stor(global.M2A_CONFIG.FTP.FolderBakup+"/"+entry.Name, dstFile); err != nil {
  221. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  222. }
  223. if err = global.M2A_FTP.Delete(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name); err != nil {
  224. global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
  225. }
  226. dstFile.Close()
  227. }
  228. }
  229. }