conn.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package pool
  2. import (
  3. "context"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. "github.com/go-redis/redis/internal/proto"
  8. )
  9. var noDeadline = time.Time{}
  10. type Conn struct {
  11. netConn net.Conn
  12. rd *proto.Reader
  13. wr *proto.Writer
  14. Inited bool
  15. pooled bool
  16. createdAt time.Time
  17. usedAt int64 // atomic
  18. }
  19. func NewConn(netConn net.Conn) *Conn {
  20. cn := &Conn{
  21. netConn: netConn,
  22. createdAt: time.Now(),
  23. }
  24. cn.rd = proto.NewReader(netConn)
  25. cn.wr = proto.NewWriter(netConn)
  26. cn.SetUsedAt(time.Now())
  27. return cn
  28. }
  29. func (cn *Conn) UsedAt() time.Time {
  30. unix := atomic.LoadInt64(&cn.usedAt)
  31. return time.Unix(unix, 0)
  32. }
  33. func (cn *Conn) SetUsedAt(tm time.Time) {
  34. atomic.StoreInt64(&cn.usedAt, tm.Unix())
  35. }
  36. func (cn *Conn) SetNetConn(netConn net.Conn) {
  37. cn.netConn = netConn
  38. cn.rd.Reset(netConn)
  39. cn.wr.Reset(netConn)
  40. }
  41. func (cn *Conn) Write(b []byte) (int, error) {
  42. return cn.netConn.Write(b)
  43. }
  44. func (cn *Conn) RemoteAddr() net.Addr {
  45. return cn.netConn.RemoteAddr()
  46. }
  47. func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
  48. tm := cn.deadline(ctx, timeout)
  49. _ = cn.netConn.SetReadDeadline(tm)
  50. return fn(cn.rd)
  51. }
  52. func (cn *Conn) WithWriter(
  53. ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
  54. ) error {
  55. tm := cn.deadline(ctx, timeout)
  56. _ = cn.netConn.SetWriteDeadline(tm)
  57. firstErr := fn(cn.wr)
  58. err := cn.wr.Flush()
  59. if err != nil && firstErr == nil {
  60. firstErr = err
  61. }
  62. return firstErr
  63. }
  64. func (cn *Conn) Close() error {
  65. return cn.netConn.Close()
  66. }
  67. func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
  68. tm := time.Now()
  69. cn.SetUsedAt(tm)
  70. if timeout > 0 {
  71. tm = tm.Add(timeout)
  72. }
  73. if ctx != nil {
  74. deadline, ok := ctx.Deadline()
  75. if ok {
  76. if timeout == 0 {
  77. return deadline
  78. }
  79. if deadline.Before(tm) {
  80. return deadline
  81. }
  82. return tm
  83. }
  84. }
  85. if timeout > 0 {
  86. return tm
  87. }
  88. return noDeadline
  89. }