testbus.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package busdriver
  2. import (
  3. "fmt"
  4. "net/http"
  5. "time"
  6. "github.com/gin-gonic/gin"
  7. )
  8. // TestBus 测试接口
  9. func TestBus() {
  10. MqConnect()
  11. //defer closeMQ()
  12. MqExchange()
  13. // 订阅主题
  14. err1 := SubscribeTopic("test1")
  15. err2 := SubscribeTopic("test2")
  16. err3 := SubscribeTopic("test3")
  17. if err1 != nil || err2 != nil || err3 != nil {
  18. fmt.Println("subscribe topic failed")
  19. return
  20. }
  21. MqStart()
  22. // 启动发送协程
  23. go func() {
  24. for {
  25. for _, subinfo := range SubInfos {
  26. msg := "test message:" + time.Now().Format("2006-01-02 15:04:05")
  27. Publish(subinfo.Topic, msg)
  28. }
  29. time.Sleep(5 * time.Second)
  30. }
  31. }()
  32. }
  33. // SubTopicReq 主题订阅请求
  34. type SubTopicReq struct {
  35. Topic string `json:"topic" binding:"required"`
  36. }
  37. // MsgPublishReq 消息发送请求
  38. type MsgPublishReq struct {
  39. Topic string `json:"topic" binding:"required"`
  40. Content string `json:"content" binding:"required"`
  41. }
  42. // SubTopic 主题订阅
  43. func SubTopic(c *gin.Context) {
  44. var subreq SubTopicReq
  45. err := c.ShouldBind(&subreq)
  46. if err != nil {
  47. c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
  48. }
  49. err = SubscribeTopic(subreq.Topic)
  50. if err != nil {
  51. fmt.Println("subscribe topic failed:", err)
  52. }
  53. c.SecureJSON(http.StatusOK, 0)
  54. }
  55. // MsgPublish 消息发送
  56. func MsgPublish(c *gin.Context) {
  57. var msgpublish MsgPublishReq
  58. err := c.ShouldBind(&msgpublish)
  59. if err != nil {
  60. c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
  61. }
  62. err = Publish(msgpublish.Topic, msgpublish.Content)
  63. if err != nil {
  64. fmt.Println("publish topic failed", err)
  65. }
  66. c.SecureJSON(http.StatusOK, 0)
  67. }