package service import ( "errors" "fmt" "io" "mtp20_assisted/core" "mtp20_assisted/global" "mtp20_assisted/model" "mtp20_assisted/utils" "os" "strconv" "strings" "time" "encoding/csv" "encoding/hex" "go.uber.org/zap" ) func InitTimer() (err error) { // 获取触发时间点 timeStr := global.M2A_CONFIG.FTP.TimeAt timeArr := strings.Split(timeStr, ":") hour, _ := strconv.Atoi(timeArr[0]) min, _ := strconv.Atoi(timeArr[1]) sec, _ := strconv.Atoi(timeArr[2]) duration := time.Duration(global.M2A_CONFIG.FTP.Duration) // 启动协程处理任务 go func() { for range time.Tick(duration * time.Minute) { // for range time.Tick(1) { if loc, err := time.LoadLocation("Local"); err == nil { targetTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), hour, min, sec, 0, loc) // 计算时间差,等待执行时间 // dur := time.Until(targetTime) // if dur < 0 { // targetTime = targetTime.Add(24 * time.Hour) // dur = time.Until(targetTime) // } // time.Sleep(dur) if time.Now().After(targetTime) { // 执行函数 global.M2A_LOG.Info("WMS结算单导入:启动-ImportWMSReckon") ImportWMSReckon() } } } }() return } // ImportWMSReckon 导入WMS结算单 func ImportWMSReckon() { // 连接FTP var err error if global.M2A_FTP, err = core.FTP(); err != nil { global.M2A_LOG.Error("连接FTP失败, err:", zap.Error(err)) return } defer global.M2A_FTP.Quit() // 暂存ftp文件的临时目录 os.RemoveAll("./.ftp_tmp") os.Mkdir("./.ftp_tmp", os.ModePerm) // 列出FTP服务器上的文件列表 entries, err := global.M2A_FTP.List(global.M2A_CONFIG.FTP.Folder) if err != nil { global.M2A_LOG.Error("FTP获取文件列表失败, err:", zap.Error(err)) return } global.M2A_LOG.Info("WMS结算单导入:FTP获取文件列表成功") for _, entry := range entries { // 下载FTP服务器上的文件 srcFile, err := global.M2A_FTP.Retr(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name) if err != nil { global.M2A_LOG.Error("获取FTP源文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error())) continue } dstFile, err := os.Create("./.ftp_tmp/" + entry.Name) if err != nil { global.M2A_LOG.Error("创建FTP临时文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error())) continue } _, err = io.Copy(dstFile, srcFile) if err != nil { global.M2A_LOG.Error("下载FTP文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error())) continue } srcFile.Close() // 创建CSV Reader dstFile.Close() dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name) // defer dstFile.Close() reader := csv.NewReader(dstFile) // 设置分隔符为逗号 reader.Comma = ',' // 读取所有行 rows, e := reader.ReadAll() if e != nil { err = e global.M2A_LOG.Error("读取CSV文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error())) continue } for i, row := range rows { if i == 0 { // 第一行是列头 continue } // 检验记录 // 会员统一信用代码 userKey := row[0] key, _ := hex.DecodeString(utils.AESSecretKey) var userKeyEncrypted []byte if userKeyEncrypted, err = utils.AESEncrypt([]byte(userKey), key); err != nil || userKeyEncrypted == nil { if err == nil { err = errors.New("DataNoFound") } global.M2A_LOG.Error("加密会员统一信用代码失败,", zap.String("UserKey", userKey), zap.String("err", err.Error())) continue } userinfo := model.Userinfo{ CARDNUM: hex.EncodeToString(userKeyEncrypted), } has := false if has, err = global.M2A_DB.Get(&userinfo); err != nil || !has { if err == nil { err = errors.New("DataNoFound") } global.M2A_LOG.Error("用户唯一标识在数据库中无对应记录或读取数据库失败,", zap.String("UserKey", string(userKeyEncrypted)), zap.String("err", err.Error())) continue } // WMS结算单流水号 wmsOrderID := row[1] gzbscreckonorder := model.Gzbscreckonorder{ WMSORDERID: wmsOrderID, } has = false if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has { global.M2A_LOG.Error("WMS结算单流水号已在数据库中存在,", zap.String("WMSOrderID", wmsOrderID)) continue } // “会员统一信用代码”+ “月份(yyyyMM)“ reckonMonth := row[3] gzbscreckonorder = model.Gzbscreckonorder{ USERKEY: userKey, RECKONMONTH: reckonMonth, } has = false if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has { global.M2A_LOG.Error("会员统一信用代码及月份已在数据库中存在,", zap.String("UserKey", userKey), zap.String("ReckonMonth", reckonMonth)) continue } // 总费用 totalFeeStr := row[9] if totalFeeStr == "" { global.M2A_LOG.Error("总费用为空,", zap.String("UserKey", userKey)) continue } totalFee, err := strconv.ParseFloat(totalFeeStr, 64) if err != nil || totalFee == 0.0 { global.M2A_LOG.Error("总费用值错误,", zap.String("UserKey", userKey), zap.String("totalFee", totalFeeStr)) continue } // 获取资金账户ID taaccounts := make([]model.Taaccount, 0) if err := global.M2A_DB.Where("USERID = ?", userinfo.USERID).Find(&taaccounts); err != nil || len(taaccounts) == 0 { if err == nil { err = errors.New("DataNoFound") } global.M2A_LOG.Error("获取资金账户失败,", zap.String("UserKey", userKey), zap.String("err", err.Error())) continue } // 写入数据库 sql := fmt.Sprintf(` insert into GZ_BSCRECKONORDER ( OrderID, UserKey, WMSOrderID, ContractNo, ReckonMonth, ServiceFee, StorageFee, Premium, PowerFee, CustomsFee, TotalFee, Applicant, ApplicantTime, ImportFile, UserID, AccountID, PayStatus ) values ( SEQ_GZ_BSCRECKONORDER.nextval, '%v', '%v', '%v', '%v', '%v', '%v', '%v', '%v', '%v', %v, '%v', '%v', '%v', %v, %v, 2 ) `, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], entry.Name, userinfo.USERID, taaccounts[0].ACCOUNTID) if _, err := global.M2A_DB.Exec(sql); err != nil { global.M2A_LOG.Error("FTP数据写入数据库失败,", zap.String("UserKey", row[0]), zap.String("err", err.Error())) continue } // 移入FTP备份文件夹 dstFile.Close() dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name) // defer dstFile.Close() // 注意上传之前需要把之前的关闭,不然会报类似 移入FTP备份文件夹失败, {"file": "xxxxxxxxxxxx_202304.csv", "err": "229 Entering Extended Passive Mode (|||23113|)."} if err = global.M2A_FTP.Stor(global.M2A_CONFIG.FTP.FolderBakup+"/"+entry.Name, dstFile); err != nil { global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error())) } if err = global.M2A_FTP.Delete(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name); err != nil { global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error())) } dstFile.Close() } } }