commit 74f319a33d7ed56b02f44f45b5978104ca934024 Author: Yun Date: Sun May 3 22:37:59 2026 +0800 commit diff --git a/connpoolx.go b/connpoolx.go new file mode 100644 index 0000000..e38e0a6 --- /dev/null +++ b/connpoolx.go @@ -0,0 +1,315 @@ +package connpoolx + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + "xiaoxin-plus/pkg/langx" + + uuid "github.com/satori/go.uuid" + "github.com/zeromicro/go-zero/core/logc" +) + +// 长连接的连接池管理 + +var connList = sync.Map{} + +type Conn struct { + ctx context.Context + cancel context.CancelFunc // 取消事件 + stream StreamInterface // 连接 + userInfo UserInfo // 用户信息 + callback NotifyFunc + + mux sync.Mutex // 互斥锁 + createTime time.Time // 连接事件 + swapTime time.Time // 最后互相传递消息时间 + isClose bool // 是否关闭 +} + +type UserInfo struct { + TenantId int64 `json:"tenant_id"` + ClientId int64 `json:"client_id"` + UserId int64 `json:"user_id"` + Token string `json:"token"` + TraceId string `json:"trace_id"` // WS的用户UID +} + +type Message struct { + MsgId string `json:"msg_id"` // 每条消息独立的UUID (自动赋值) + Action string `json:"action"` // 操作 + Code int `json:"code"` // 响应码 + Message string `json:"message"` // 提示信息 + Data string `json:"data"` // 数据信息 + MsgTime int64 `json:"msg_time"` // 消息时间 + ConnId string `json:"conn_id"` // 连接的uuid + OriMsgId string `json:"ori_msg_id"` // 响应原来请求的uuid(对端传来) +} + +// 事件 +const ( + ActionEvent string = "event" // 指令操作(要求完成某件事) + ActionPing string = "ping" // 心跳 + ActionTips string = "tips" // 提示消息 + ActionMessage string = "message" // 普通消息 + ActionLogin string = "login" // 登陆消息 +) + +type StreamInterface interface { + Send(string) error // 发送 + Read() (string, error) // 读取 + Close() // 关闭 +} + +// 回调方法 +type NotifyFunc interface { + Register(wc *Conn) error // 注册用户 + Unregister(uid string) error // 取消注册 + DealEvent(uid string, data string) (string, error) // 处理事件 + DealRead(uid string, data string) error // 处理读取的消息 + Heartbeat(uid string) // 心跳事件 + BeforeSend(uid string, data string) error // 消息发送前回调(可能需要拦截) +} + +func Accept(ctx context.Context, cancel context.CancelFunc, stream StreamInterface, callback NotifyFunc, user UserInfo) error { + user.TraceId = uuid.NewV4().String() + logc.Infof(ctx, "Accept: %+v", user) + + conn := &Conn{ + ctx: ctx, + cancel: cancel, + stream: stream, + createTime: time.Now(), + swapTime: time.Now(), + userInfo: user, + callback: callback, + } + + connList.Store(user.TraceId, conn) + + logc.Infof(ctx, "Accept store succ") + + // 发送消息 + // go conn.write() + + logc.Infof(ctx, "Accept Register begin") + + err := conn.callback.Register(conn) + if err != nil { + conn.close() + logc.Errorf(ctx, "Accept Register error: %v", err) + return err + } + + // 读取消息 + go conn.read() + + // 登录响应 + b := conn.buildMessage(ActionLogin, langx.GetCode(langx.Success), user.TraceId, langx.Success, "") + conn.send(b) + + logc.Infof(ctx, "Accept finish") + + return nil +} + +func (c *Conn) read() { + for { + select { + case <-c.ctx.Done(): + return + default: + message, err := c.stream.Read() + // logc.Infof(c.ctx, "pool.read stream traceId:%+v message:%+v", c.userInfo.TraceId, message) + if err != nil { + c.close() + logc.Errorf(c.ctx, "pool.read stream err:%+v", err) + return + } + if message == "" { + continue + } + + c.mux.Lock() + c.swapTime = time.Now() + c.mux.Unlock() + + // 文字消息 + if string(message) == "ping" { + c.send("pong") + continue + } + + msg, err := c.parseMessage(message) + // logc.Infof(c.ctx, "pool.read parseMessage: message:%+v msg:%+v err:%+v\n", message, msg, err) + if err != nil { + // 无法识别 + str := c.buildMessage(ActionTips, langx.GetCode(langx.ErrorMsgUnparse), "ori_data:"+message, langx.ErrorMsgUnparse+" err:"+err.Error(), "") + c.send(str) + continue + } + + if msg.Action == ActionEvent { + data, err := c.callback.DealEvent(c.userInfo.TraceId, msg.Data) + if err != nil { + str := c.buildMessage(ActionTips, langx.GetCode(langx.ErrorMsgUnparse), "ori_data:"+message, "err:"+err.Error(), msg.MsgId) + c.send(str) + continue + } else { + str := c.buildMessage(ActionTips, langx.GetCode(langx.Success), data, langx.Success, "") + c.send(str) + continue + } + } else if msg.Action == ActionPing { + if msg.Data == "pong" { + str := c.buildMessage(ActionPing, langx.GetCode(langx.Success), "pong", langx.Success, "") + c.send(str) + continue + } + } else { + // 其他消息 + err := c.callback.DealRead(c.userInfo.TraceId, message) + if err != nil { + str := c.buildMessage(ActionTips, langx.GetCode(langx.Error), "ori_data:"+message, "方法错误", msg.MsgId) + c.send(str) + continue + } + } + } + } +} + +func (c *Conn) Send(msg Message) error { + str := c.buildMessage(msg.Action, msg.Code, msg.Data, msg.Message, msg.OriMsgId) + return c.send(str) +} + +func (c *Conn) send(data string) error { + err := c.callback.BeforeSend(c.userInfo.TraceId, data) + if err != nil { + c.close() + logc.Errorf(c.ctx, "Conn send BeforeSend error: %v", err) + return err + } + err = c.stream.Send(data) + if err != nil { + c.close() + logc.Errorf(c.ctx, "Conn send error: %v", err) + return err + } + return nil +} + +func (c *Conn) Close(msg *Message) { + if msg != nil { + str := c.buildMessage(msg.Action, msg.Code, msg.Data, msg.Message, msg.OriMsgId) + c.send(str) + } + c.close() + logc.Infof(c.ctx, "Conn Close: %+v", c.userInfo) +} + +func (c *Conn) GetUserInfo() (uuid string, user UserInfo) { + c.mux.Lock() + defer c.mux.Unlock() + return c.userInfo.TraceId, c.userInfo +} + +func (c *Conn) close() { + c.mux.Lock() + c.isClose = true + c.mux.Unlock() + + c.callback.Unregister(c.userInfo.TraceId) + + time.Sleep(time.Second * 1) + + c.stream.Close() + logc.Infof(c.ctx, "Conn close: %+v", c.userInfo) + c.cancel() +} + +// 构造消息 +func (c *Conn) buildMessage(action string, code int, data string, msg string, oriMsgId string) string { + d := Message{ + MsgId: uuid.NewV4().String(), + Action: action, + Code: code, + Message: msg, + Data: data, + MsgTime: time.Now().UnixMilli(), + ConnId: c.userInfo.TraceId, + OriMsgId: oriMsgId, + } + b, _ := json.Marshal(d) + return string(b) +} + +// 消息转换 +func (c *Conn) parseMessage(data string) (*Message, error) { + msg := Message{} + err := json.Unmarshal([]byte(data), &msg) + if err != nil { + return nil, err + } + fmt.Printf("parseMessage data:%+v msg:%+v\n", data, msg) + return &msg, nil +} + +func init() { + go func() { + for { + Heartbeat(context.Background()) + time.Sleep(time.Second * 5) + } + }() +} + +func Heartbeat(ctx context.Context) bool { + connList.Range(func(key, value interface{}) bool { + go func(key interface{}, value interface{}) { + uid, ok := key.(string) + if !ok { + return + } + c, ok := value.(*Conn) + if !ok { + return + } + + c.mux.Lock() + isClose := c.isClose + swapTime := c.swapTime + c.mux.Unlock() + + // 已关闭的链接就清除掉 + if isClose { + connList.Delete(uid) + return + } + + // 超过15s就关闭连接 + if swapTime.Add(time.Minute * 2).Before(time.Now()) { + c.close() + logc.Infof(ctx, "Heartbeat close: %+v", c.userInfo) + return + } + + // 回调事件 + c.callback.Heartbeat(uid) + + // // 15s内有发过消息不需要发心跳 + // if swapTime.Add(time.Second * 60).After(time.Now()) { + // return true + // } + + // // 发送消息 + // b, _ := buildMessage(ActionPing, lang.GetCode(lang.Success), "ping", lang.Success, "") + // wc.sendMessage(SyncTypeAsync, b) + }(key, value) + return true + }) + return true +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ebc7fc6 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module code.yun.ink/pkg/connpoolx + +go 1.19 + +require github.com/satori/go.uuid v1.2.0 + +require gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6e011d7 --- /dev/null +++ b/go.sum @@ -0,0 +1,9 @@ +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/more/connPoolx.go b/more/connPoolx.go new file mode 100644 index 0000000..f794623 --- /dev/null +++ b/more/connPoolx.go @@ -0,0 +1,115 @@ +package more + +import ( + "sync" + + uuid "github.com/satori/go.uuid" +) + +// 同一个key对应多个连接 +// 2023年6月29日13:45:25 + +// 场景: +// 1. 同时支持多个用户在线 +// 1. 一个用户对应多个连接 +// 2. 在其中一个用户连接中发送消息,同用户其他连接也能收到 + +type connPool struct { + pool map[string]map[string]PoolValue + idx map[string]string // 索引 + mu sync.RWMutex +} + +type PoolValue interface { + Close() error +} + +// 初始化连接池 +func NewConnPool() *connPool { + return &connPool{ + pool: make(map[string]map[string]PoolValue), + idx: make(map[string]string), + mu: sync.RWMutex{}, + } +} + +// 添加连接, 返回connId +func (c *connPool) Store(key string, value PoolValue) string { + c.mu.Lock() + defer c.mu.Unlock() + val, _ := c.pool[key] + + connId := uuid.NewV4().String() + val[connId] = value + c.pool[key] = val + c.idx[connId] = key + return connId +} + +// 获取一个用户的所有连接 +func (c *connPool) LoadByKey(key string) (conns map[string]PoolValue, ok bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + valMap, ok := c.pool[key] + + return valMap, ok +} + +// 根据连接ID获取连接 +func (c *connPool) LoadByConnId(connId string) (*PoolValue, bool) { + c.mu.RLock() + defer c.mu.Unlock() + + key, ok := c.idx[connId] + if !ok { + return nil, false + } + + valMap, ok := c.pool[key] + if !ok { + return nil, false + } + + val, ok := valMap[connId] + if !ok { + return nil, false + } + return &val, true +} + +// 关闭连接 +func (c *connPool) Close(connId string) { + c.mu.Lock() + defer c.mu.Unlock() + + key, ok := c.idx[connId] + if !ok { + return + } + valMap, ok := c.pool[key] + if !ok { + return + } + val, ok := valMap[connId] + if !ok { + return + } + delete(valMap, connId) + + // 异步关闭 + go val.Close() +} + +// 遍历所有的连接 +func (c *connPool) Range(f func(key string, connId string, values PoolValue)) { + c.mu.RLock() + defer c.mu.Unlock() + + for key, val := range c.pool { + val := val + for keyPoo, valPoo := range val { + go f(key, keyPoo, valPoo) + } + } +} diff --git a/simple/connPoolx.go b/simple/connPoolx.go new file mode 100644 index 0000000..933f0e5 --- /dev/null +++ b/simple/connPoolx.go @@ -0,0 +1,59 @@ +package simple + +import ( + "errors" + "sync" +) + +// 连接池 + +// 核心思想:连接放在这里面统一管理,所有的新建与删除需要外部操作 + +// 适用场景:每一个key对应一个val + +type connPool struct { + pool sync.Map +} + +type PoolValue interface { + Close() error +} + +func NewConnPool() *connPool { + return &connPool{} +} + +// 同名将覆盖 +func (c *connPool) Store(key string, value PoolValue) { + c.pool.Store(key, value) +} + +func (c *connPool) Load(key string) (value PoolValue, ok bool) { + val, ok := c.pool.Load(key) + if !ok { + return nil, false + } + value, ok = val.(PoolValue) + if !ok { + return nil, false + } + return value, true +} + +// 关闭链接 +func (c *connPool) Close(key string) error { + val, ok := c.pool.LoadAndDelete(key) + if ok { + value, ok := val.(PoolValue) + if !ok { + return errors.New("解析异常") + } + return value.Close() + } + return nil +} + +// 遍历所有的连接 +func (c *connPool) Range(f func(key any, value any) bool) { + c.pool.Range(f) +}