zhou.xiaoning há 2 anos atrás
pai
commit
0a78883b79
2 ficheiros alterados com 16 adições e 9 exclusões
  1. 3 3
      initialize/rabbitmq.go
  2. 13 6
      publish/publish.go

+ 3 - 3
initialize/rabbitmq.go

@@ -253,8 +253,8 @@ func (t *MQProc) onNtf(funcode uint32, sessionId uint32, bytes *[]byte) {
 		// 发送信息
 		for _, item := range clients {
 			// c := clients[i]
-			// sessionId, _ := strconv.Atoi(item.SessionID)
-			// c := client.Clients[sessionId]
+			sessionId, _ := strconv.Atoi(item.SessionID)
+			c := client.Clients[sessionId]
 			// c.WriteTradeWsBuf(b)
 
 			// 分发给订阅者
@@ -263,7 +263,7 @@ func (t *MQProc) onNtf(funcode uint32, sessionId uint32, bytes *[]byte) {
 			global.M2A_Publish.Publish(publish.Topic_Trading, m)
 
 			// 给客户端通知
-			// global.M2A_LOG.Info("[S->C]给客户端通知", zap.Any("ntf", funcode), zap.Any("clients", c.LoginID), zap.Any("SessionID", c.SessionID), zap.Any("len", len(b)))
+			global.M2A_LOG.Info("[S->C]给客户端通知", zap.Any("ntf", funcode), zap.Any("clients", c.LoginID), zap.Any("SessionID", c.SessionID), zap.Any("len", len(b)))
 		}
 	}
 }

+ 13 - 6
publish/publish.go

@@ -1,6 +1,7 @@
 package publish
 
 import (
+	"fmt"
 	"sync"
 )
 
@@ -49,12 +50,18 @@ func (p *Publisher) Publish(topic Topic, data interface{}) {
 	p.mu.RLock()
 	defer p.mu.RUnlock()
 
-	subs := p.subscribers[topic]
-	for _, ch := range subs {
-		select {
-		case ch <- data:
-		default:
-			// 如果消息管道已满,直接跳过,不阻塞订阅者
+	if subs, ok := p.subscribers[topic]; ok {
+		if topic == Topic_Trading {
+			fmt.Printf("Publish-Topic_Trading 获取sub, topic: %v sub: %v \n", topic, subs)
+		}
+		for _, ch := range subs {
+			select {
+			case ch <- data:
+			default:
+				// 如果消息管道已满,直接跳过,不阻塞订阅者
+			}
 		}
+	} else {
+		fmt.Printf("Publish 获取订阅目标失败, topic: %v \n", topic)
 	}
 }