zhou.xiaoning 2 rokov pred
rodič
commit
1bcf24b885
12 zmenil súbory, kde vykonal 336 pridanie a 13 odobranie
  1. 2 1
      .gitignore
  2. 13 3
      config.yaml
  3. 1 0
      config/config.go
  4. 11 0
      config/ftp.go
  5. 26 0
      core/ftp.go
  6. 8 9
      core/viper.go
  7. 4 0
      global/global.go
  8. 3 0
      go.mod
  9. 5 0
      go.sum
  10. 18 0
      main.go
  11. 31 0
      model/database.go
  12. 214 0
      service/ftp.go

+ 2 - 1
.gitignore

@@ -2,4 +2,5 @@ log/
 .tmp/
 __debug_bin.exe
 bin/
-static/~$*
+static/~$*
+.ftp_tmp/

+ 13 - 3
config.yaml

@@ -16,17 +16,27 @@ oracle:
   address: '192.168.31.88'
   name: 'orcl'
   port: '1521'
-  user: 'mtp2_test167'
+  user: 'mtp2_test104'
   pwd: 'muchinfo'
   max-idle-conns: 10
   max-open-conns: 100
 
 # rabbitmq configuration
 rabbitmq:
-  url: 'amqp://guest:guest@192.168.31.167:5020/test'
+  url: 'amqp://guest:guest@192.168.31.104:5020/test'
   exchange: 'entry'
 
 # system configuration
 system:
   env: 'develop'  # "develop" & "public", Change to "develop" to skip authentication for development mode
-  store-path: '/home/pub/image/uploadFile' # JAVA文件上传目录 <- 已作废
+  store-path: '/home/pub/image/uploadFile' # JAVA文件上传目录 <- 已作废
+
+# ftp configuration
+ftp:
+  address: '218.17.158.45'
+  port: '23110'
+  name: 'wmsuser'
+  pwd: 'Much*ws#230410'
+  time-at: 5 # 每天执行小时数,0-23
+  folder: 'wms_reckon'
+  folder-bakup: 'wms_reckon_bk'

+ 1 - 0
config/config.go

@@ -5,4 +5,5 @@ type Server struct {
 	Oracle   Oracle   `mapstructure:"oracle" json:"oracle" yaml:"oracle"`
 	Rabbitmq Rabbitmq `mapstructure:"rabbitmq" json:"rabbitmq" yaml:"rabbitmq"`
 	System   System   `mapstructure:"system" json:"system" yaml:"system"`
+	FTP      FTP      `mapstructure:"ftp" json:"ftp" yaml:"ftp"`
 }

+ 11 - 0
config/ftp.go

@@ -0,0 +1,11 @@
+package config
+
+type FTP struct {
+	Address     string `mapstructure:"address" json:"address" yaml:"address"`
+	Port        string `mapstructure:"port" json:"port" yaml:"port"`
+	Name        string `mapstructure:"name" json:"name" yaml:"name"`
+	PWD         string `mapstructure:"pwd" json:"pwd" yaml:"pwd"`
+	TimeAt      int    `mapstructure:"time-at" json:"time-at" yaml:"time-at"`
+	Folder      string `mapstructure:"folder" json:"folder" yaml:"folder"`
+	FolderBakup string `mapstructure:"folder-bakup" json:"folder-bakup" yaml:"folder-bakup"`
+}

+ 26 - 0
core/ftp.go

@@ -0,0 +1,26 @@
+package core
+
+import (
+	"fmt"
+	"mtp20_assisted/global"
+
+	"github.com/jlaffaye/ftp"
+	"go.uber.org/zap"
+)
+
+// FTP 连接FTP
+func FTP() (client *ftp.ServerConn, err error) {
+	// 连接FTP服务器
+	if client, err = ftp.Dial(fmt.Sprintf("%v:%v", global.M2A_CONFIG.FTP.Address, global.M2A_CONFIG.FTP.Port)); err != nil {
+		global.M2A_LOG.Error("ftp connect failed, err:", zap.Error(err))
+		return
+	}
+
+	// 登录FTP服务器
+	if err = client.Login(global.M2A_CONFIG.FTP.Name, global.M2A_CONFIG.FTP.PWD); err != nil {
+		global.M2A_LOG.Error("ftp login failed, err:", zap.Error(err))
+		return
+	}
+
+	return
+}

+ 8 - 9
core/viper.go

@@ -1,7 +1,6 @@
 package core
 
 import (
-	"flag"
 	"fmt"
 	"mtp20_assisted/global"
 
@@ -13,14 +12,14 @@ import (
 func Viper(path ...string) *viper.Viper {
 	var config string
 
-	if len(path) == 0 {
-		flag.StringVar(&config, "c", "", "choose config file.")
-		flag.Parse()
-		config = "config.yaml"
-	} else { // 函数传递的可变参数的第一个值赋值于config
-		config = path[0]
-		fmt.Printf("您正在使用func Viper()传递的值,config的路径为%s\n", config)
-	}
+	// if len(path) == 0 {
+	// 	flag.StringVar(&config, "c", "", "choose config file.")
+	// 	flag.Parse()
+	config = "config.yaml"
+	// } else { // 函数传递的可变参数的第一个值赋值于config
+	// 	config = path[0]
+	// 	fmt.Printf("您正在使用func Viper()传递的值,config的路径为%s\n", config)
+	// }
 
 	v := viper.New()
 	v.SetConfigFile(config)

+ 4 - 0
global/global.go

@@ -6,6 +6,8 @@ import (
 	"github.com/spf13/viper"
 	"go.uber.org/zap"
 	"xorm.io/xorm"
+
+	"github.com/jlaffaye/ftp"
 )
 
 var (
@@ -18,4 +20,6 @@ var (
 	M2A_MSGPROCESSER map[string]MsgProcesser // 总线信息处理器,key - 主题
 
 	M2A_DONE chan bool
+
+	M2A_FTP *ftp.ServerConn
 )

+ 3 - 0
go.mod

@@ -6,6 +6,7 @@ require (
 	github.com/fsnotify/fsnotify v1.6.0
 	github.com/gofrs/uuid v4.0.0+incompatible
 	github.com/golang/protobuf v1.5.2
+	github.com/jlaffaye/ftp v0.1.0
 	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
 	github.com/mattn/go-oci8 v0.1.1
 	github.com/nguyenthenguyen/docx v0.0.0-20220721043308-1903da0ef37d
@@ -20,6 +21,8 @@ require (
 require (
 	github.com/goccy/go-json v0.8.1 // indirect
 	github.com/golang/snappy v0.0.4 // indirect
+	github.com/hashicorp/errwrap v1.0.0 // indirect
+	github.com/hashicorp/go-multierror v1.1.1 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/jonboulle/clockwork v0.3.0 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect

+ 5 - 0
go.sum

@@ -215,11 +215,14 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
 github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
 github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
 github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
 github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
 github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
 github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
 github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
 github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
 github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
 github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
@@ -289,6 +292,8 @@ github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0f
 github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
 github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
 github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
+github.com/jlaffaye/ftp v0.1.0 h1:DLGExl5nBoSFoNshAUHwXAezXwXBvFdx7/qwhucWNSE=
+github.com/jlaffaye/ftp v0.1.0/go.mod h1:hhq4G4crv+nW2qXtNYcuzLeOudG92Ps37HEKeg2e3lE=
 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
 github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
 github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg=

+ 18 - 0
main.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"flag"
 	"mtp20_assisted/core"
 	"mtp20_assisted/global"
 	"mtp20_assisted/service"
@@ -39,6 +40,23 @@ func main() {
 	// 开始接收总线消息
 	service.StartRabbitMQReceive()
 
+	// 连接FTP
+	var err error
+	if global.M2A_FTP, err = core.FTP(); err == nil {
+		defer global.M2A_FTP.Quit()
+	}
+	// 开始FTP Timer任务
+	service.InitTimer()
+
+	// 判断输入参数
+	var ftp bool
+	flag.BoolVar(&ftp, "ftp", false, "是否立即执行FTP导入工作")
+	flag.Parse()
+	if ftp {
+		service.ImportWMSReckon()
+	}
+	// service.ImportWMSReckon()
+
 	global.M2A_DONE = make(chan bool)
 	<-global.M2A_DONE
 }

+ 31 - 0
model/database.go

@@ -415,3 +415,34 @@ type Wskhopenaccountconfig struct {
 func (r *Wskhopenaccountconfig) TableName() string {
 	return "WSKH_OPENACCOUNTCONFIG"
 }
+
+// Gzbscreckonorder 保税仓结算单表 - WMS系统导入
+type Gzbscreckonorder struct {
+	ORDERID       int64     `json:"orderid" xorm:"ORDERID"`             // 单据ID(SEQ_GZ_BSCRECKONORDER)
+	USERKEY       string    `json:"userkey" xorm:"USERKEY"`             // 用户唯一标识
+	WMSORDERID    string    `json:"wmsorderid" xorm:"WMSORDERID"`       // WMS结算单流水号
+	CONTRACTNO    string    `json:"contractno" xorm:"CONTRACTNO"`       // 合同编号
+	RECKONMONTH   string    `json:"reckonmonth" xorm:"RECKONMONTH"`     // 结算月份(yyyyMM)
+	SERVICEFEE    string    `json:"servicefee" xorm:"SERVICEFEE"`       // 分拣室服务费
+	STORAGEFEE    string    `json:"storagefee" xorm:"STORAGEFEE"`       // 仓储费
+	PREMIUM       string    `json:"premium" xorm:"PREMIUM"`             // 保险费
+	POWERFEE      string    `json:"powerfee" xorm:"POWERFEE"`           // 分拣室电费
+	CUSTOMSFEE    string    `json:"customsfee" xorm:"CUSTOMSFEE"`       // 报关费
+	TOTALFEE      float64   `json:"totalfee" xorm:"TOTALFEE"`           // 合计费用
+	APPLICANT     string    `json:"applicant" xorm:"APPLICANT"`         // 申请人
+	APPLICANTTIME string    `json:"applicanttime" xorm:"APPLICANTTIME"` // 申请时间
+	IMPORTFILE    string    `json:"importfile" xorm:"IMPORTFILE"`       // 导入文件名
+	IMPORTTIME    time.Time `json:"importtime" xorm:"IMPORTTIME"`       // 导入时间
+	USERID        int64     `json:"userid" xorm:"USERID"`               // 用户ID
+	ACCOUNTID     int64     `json:"accountid" xorm:"ACCOUNTID"`         // 资金账户ID
+	PAYSTATUS     int32     `json:"paystatus" xorm:"PAYSTATUS"`         // 支付状态 - 2:待支付 3:已支付(枚举:GZBSCPayStatus)
+	CONFIRMTIME   time.Time `json:"confirmtime" xorm:"CONFIRMTIME"`     // 确认时间
+	PAYTIME       time.Time `json:"paytime" xorm:"PAYTIME"`             // 支付时间
+	HANDLESTATUS  int32     `json:"handlestatus" xorm:"HANDLESTATUS"`   // 处理状态
+	PAYTRADEDATE  string    `json:"paytradedate" xorm:"PAYTRADEDATE"`   // 支付交易日(yyyyMMdd)
+}
+
+// TableName is GZ_BSCRECKONORDER
+func (r *Gzbscreckonorder) TableName() string {
+	return "GZ_BSCRECKONORDER"
+}

+ 214 - 0
service/ftp.go

@@ -0,0 +1,214 @@
+package service
+
+import (
+	"fmt"
+	"io"
+	"mtp20_assisted/global"
+	"mtp20_assisted/model"
+	"mtp20_assisted/utils"
+	"os"
+	"strconv"
+	"time"
+
+	"encoding/csv"
+	"encoding/hex"
+
+	"go.uber.org/zap"
+)
+
+func InitTimer() (err error) {
+	loc, err := time.LoadLocation("Local")
+	targetTime := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), global.M2A_CONFIG.FTP.TimeAt, 0, 0, 0, loc)
+
+	// 启动协程处理任务
+	go func() {
+		for range time.Tick(24 * time.Hour) {
+			// 计算时间差,等待执行时间
+			dur := time.Until(targetTime)
+			if dur < 0 {
+				targetTime = targetTime.Add(24 * time.Hour)
+				dur = time.Until(targetTime)
+			}
+			time.Sleep(dur)
+
+			// 执行函数
+			ImportWMSReckon()
+		}
+	}()
+
+	return
+}
+
+// ImportWMSReckon 导入WMS结算单
+func ImportWMSReckon() {
+	// 暂存ftp文件的临时目录
+	os.RemoveAll("./.ftp_tmp")
+	os.Mkdir("./.ftp_tmp", os.ModePerm)
+
+	// 列出FTP服务器上的文件列表
+	entries, err := global.M2A_FTP.List(global.M2A_CONFIG.FTP.Folder)
+	if err != nil {
+		global.M2A_LOG.Error("FTP获取文件列表失败, err:", zap.Error(err))
+		return
+	}
+	for _, entry := range entries {
+		// 下载FTP服务器上的文件
+		srcFile, err := global.M2A_FTP.Retr(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name)
+		if err != nil {
+			global.M2A_LOG.Error("获取FTP源文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
+			continue
+		}
+		dstFile, err := os.Create("./.ftp_tmp/" + entry.Name)
+		if err != nil {
+			global.M2A_LOG.Error("创建FTP临时文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
+			continue
+		}
+		_, err = io.Copy(dstFile, srcFile)
+		if err != nil {
+			global.M2A_LOG.Error("下载FTP文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
+			continue
+		}
+		srcFile.Close()
+
+		// 创建CSV Reader
+		dstFile.Close()
+		dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
+		// defer dstFile.Close()
+		reader := csv.NewReader(dstFile)
+		// 设置分隔符为逗号
+		reader.Comma = ','
+		// 读取所有行
+		rows, e := reader.ReadAll()
+		if e != nil {
+			err = e
+			global.M2A_LOG.Error("读取CSV文件失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
+			continue
+		}
+		for i, row := range rows {
+			if i == 0 { // 第一行是列头
+				continue
+			}
+
+			// 检验记录
+			// 会员统一信用代码
+			userKey := row[0]
+			key, _ := hex.DecodeString(utils.AESSecretKey)
+			var userKeyEncrypted []byte
+			if userKeyEncrypted, err = utils.AESEncrypt([]byte(userKey), key); err != nil || userKeyEncrypted == nil {
+				global.M2A_LOG.Error("加密会员统一信用代码失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
+				continue
+			}
+			userinfo := model.Userinfo{
+				CARDNUM: hex.EncodeToString(userKeyEncrypted),
+			}
+			has := false
+			if has, err = global.M2A_DB.Get(&userinfo); err != nil || !has {
+				global.M2A_LOG.Error("用户唯一标识在数据库中无对应记录或读取数据库失败,", zap.String("UserKey", string(userKeyEncrypted)), zap.String("err", err.Error()))
+				continue
+			}
+
+			// WMS结算单流水号
+			wmsOrderID := row[1]
+			gzbscreckonorder := model.Gzbscreckonorder{
+				WMSORDERID: wmsOrderID,
+			}
+			has = false
+			if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
+				global.M2A_LOG.Error("WMS结算单流水号已在数据库中存在,", zap.String("WMSOrderID", wmsOrderID))
+				continue
+			}
+
+			// “会员统一信用代码”+ “月份(yyyyMM)“
+			reckonMonth := row[3]
+			gzbscreckonorder = model.Gzbscreckonorder{
+				USERKEY:     userKey,
+				RECKONMONTH: reckonMonth,
+			}
+			has = false
+			if has, _ = global.M2A_DB.Get(&gzbscreckonorder); has {
+				global.M2A_LOG.Error("会员统一信用代码及月份已在数据库中存在,", zap.String("UserKey", userKey), zap.String("ReckonMonth", reckonMonth))
+				continue
+			}
+
+			// 总费用
+			totalFeeStr := row[9]
+			if totalFeeStr == "" {
+				global.M2A_LOG.Error("总费用为空,", zap.String("UserKey", userKey))
+				continue
+			}
+			totalFee, err := strconv.ParseFloat(totalFeeStr, 64)
+			if err != nil || totalFee == 0.0 {
+				global.M2A_LOG.Error("总费用值错误,", zap.String("UserKey", userKey), zap.String("totalFee", totalFeeStr))
+				continue
+			}
+
+			// 获取资金账户ID
+			taaccounts := make([]model.Taaccount, 0)
+			if err := global.M2A_DB.Where("USERID = ?", userinfo.USERID).Find(&taaccounts); err != nil || len(taaccounts) == 0 {
+				global.M2A_LOG.Error("获取资金账户失败,", zap.String("UserKey", userKey), zap.String("err", err.Error()))
+				continue
+			}
+
+			// 写入数据库
+			sql := fmt.Sprintf(`
+				insert into GZ_BSCRECKONORDER 
+				(
+					OrderID,
+					UserKey,
+					WMSOrderID,
+					ContractNo,
+					ReckonMonth,
+					ServiceFee,
+					StorageFee,
+					Premium,
+					PowerFee,
+					CustomsFee,
+					TotalFee,
+					Applicant,
+					ApplicantTime,
+					ImportFile,
+					UserID,
+					AccountID,
+					PayStatus
+				) values 
+				(
+					SEQ_GZ_BSCRECKONORDER.nextval,
+					'%v',
+					'%v',
+					'%v',
+					'%v',
+					'%v',
+					'%v',
+					'%v',
+					'%v',
+					'%v',
+					%v,
+					'%v',
+					'%v',
+					'%v',
+					%v,
+					%v,
+					2
+				)
+			`, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11],
+				entry.Name, userinfo.USERID, taaccounts[0].ACCOUNTID)
+			if _, err := global.M2A_DB.Exec(sql); err != nil {
+				global.M2A_LOG.Error("FTP数据写入数据库失败,", zap.String("UserKey", row[0]), zap.String("err", err.Error()))
+				continue
+			}
+
+			// 移入FTP备份文件夹
+			dstFile.Close()
+			dstFile, _ = os.Open("./.ftp_tmp/" + entry.Name)
+			// defer dstFile.Close()
+			// 注意上传之前需要把之前的关闭,不然会报类似 移入FTP备份文件夹失败,	{"file": "xxxxxxxxxxxx_202304.csv", "err": "229 Entering Extended Passive Mode (|||23113|)."}
+			if err = global.M2A_FTP.Stor(global.M2A_CONFIG.FTP.FolderBakup+"/"+entry.Name, dstFile); err != nil {
+				global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
+			}
+			if err = global.M2A_FTP.Delete(global.M2A_CONFIG.FTP.Folder + "/" + entry.Name); err != nil {
+				global.M2A_LOG.Error("移入FTP备份文件夹失败,", zap.String("file", entry.Name), zap.String("err", err.Error()))
+			}
+			dstFile.Close()
+		}
+	}
+}