concord/internal/voice/session/session.go

536 lines
10 KiB
Go

package session
import (
"net"
"sync"
"sync/atomic"
"time"
"github.com/Alexander-D-Karpov/concord/internal/voice/crypto"
)
type Session struct {
ID uint32
UserID string
RoomID string
SSRC uint32 // Audio
VideoSSRC uint32 // Camera
ScreenSSRC uint32 // Screen Share
addr *net.UDPAddr
lastActivity time.Time
Crypto *crypto.SessionCrypto
Muted bool
VideoEnabled bool
ScreenSharing bool
Speaking bool
Subscriptions map[uint32]bool
mu sync.RWMutex
AudioSeq uint16
VideoSeq uint16
ScreenSeq uint16
AudioTS uint32
VideoTS uint32
retransmitBufs map[uint32]*RetransmitBuffer
}
func cloneUDPAddr(addr *net.UDPAddr) *net.UDPAddr {
if addr == nil {
return nil
}
cp := *addr
if addr.IP != nil {
cp.IP = append(net.IP(nil), addr.IP...)
}
return &cp
}
func udpAddrEqual(a, b *net.UDPAddr) bool {
switch {
case a == nil && b == nil:
return true
case a == nil || b == nil:
return false
}
return a.Port == b.Port && a.Zone == b.Zone && a.IP.Equal(b.IP)
}
func udpAddrKey(addr *net.UDPAddr) string {
if addr == nil {
return ""
}
return addr.String()
}
func (s *Session) GetAddr() *net.UDPAddr {
s.mu.RLock()
defer s.mu.RUnlock()
return cloneUDPAddr(s.addr)
}
func (s *Session) LastActivity() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastActivity
}
func (s *Session) touchLocked(now time.Time) {
s.lastActivity = now
}
func (s *Session) AddrChanged(addr *net.UDPAddr) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return !udpAddrEqual(s.addr, addr)
}
func (s *Session) replaceAddr(addr *net.UDPAddr) (oldKey, newKey string, changed bool) {
s.mu.Lock()
defer s.mu.Unlock()
if udpAddrEqual(s.addr, addr) {
return "", "", false
}
oldKey = udpAddrKey(s.addr)
s.addr = cloneUDPAddr(addr)
newKey = udpAddrKey(s.addr)
return oldKey, newKey, true
}
func (s *Session) SetMuted(muted bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.Muted = muted
}
func (s *Session) SetVideoEnabled(enabled bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.VideoEnabled = enabled
}
func (s *Session) SetScreenSharing(enabled bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.ScreenSharing = enabled
}
func (s *Session) SetSpeaking(speaking bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.Speaking = speaking
}
func (s *Session) NextAudioSeq() uint16 {
s.mu.Lock()
defer s.mu.Unlock()
seq := s.AudioSeq
s.AudioSeq++
return seq
}
func (s *Session) NextVideoSeq() uint16 {
s.mu.Lock()
defer s.mu.Unlock()
seq := s.VideoSeq
s.VideoSeq++
return seq
}
func (s *Session) StoreForRetransmit(ssrc uint32, seq uint16, data []byte) {
s.mu.RLock()
buf := s.retransmitBufs[ssrc]
s.mu.RUnlock()
if buf == nil {
s.mu.Lock()
if s.retransmitBufs[ssrc] == nil {
s.retransmitBufs[ssrc] = NewRetransmitBuffer(500 * time.Millisecond)
}
buf = s.retransmitBufs[ssrc]
s.mu.Unlock()
}
buf.Store(seq, data)
}
func (s *Session) GetForRetransmit(ssrc uint32, seq uint16) []byte {
s.mu.RLock()
buf := s.retransmitBufs[ssrc]
s.mu.RUnlock()
if buf == nil {
return nil
}
return buf.Get(seq)
}
type RetransmitBuffer struct {
mu sync.RWMutex
packets map[uint16]*CachedPacket
maxAge time.Duration
}
type CachedPacket struct {
Data []byte
Timestamp time.Time
Sequence uint16
}
func NewRetransmitBuffer(maxAge time.Duration) *RetransmitBuffer {
return &RetransmitBuffer{
packets: make(map[uint16]*CachedPacket),
maxAge: maxAge,
}
}
func (rb *RetransmitBuffer) Store(seq uint16, data []byte) {
rb.mu.Lock()
defer rb.mu.Unlock()
dataCopy := make([]byte, len(data))
copy(dataCopy, data)
now := time.Now()
rb.packets[seq] = &CachedPacket{
Data: dataCopy,
Timestamp: now,
Sequence: seq,
}
for s, p := range rb.packets {
if now.Sub(p.Timestamp) > rb.maxAge {
delete(rb.packets, s)
}
}
}
func (rb *RetransmitBuffer) Get(seq uint16) []byte {
rb.mu.RLock()
defer rb.mu.RUnlock()
if p, ok := rb.packets[seq]; ok {
if time.Since(p.Timestamp) <= rb.maxAge {
return p.Data
}
}
return nil
}
// roomEntry keeps a mutable session map and an immutable snapshot slice.
// Writers rebuild the snapshot only on join/leave.
// Readers just load the snapshot with zero allocation.
type roomEntry struct {
sessions map[uint32]*Session
snapshot atomic.Value // stores []*Session
}
func newRoomEntry() *roomEntry {
r := &roomEntry{
sessions: make(map[uint32]*Session),
}
r.snapshot.Store([]*Session(nil))
return r
}
func (r *roomEntry) rebuildSnapshotLocked() {
snap := make([]*Session, 0, len(r.sessions))
for _, s := range r.sessions {
snap = append(snap, s)
}
r.snapshot.Store(snap)
}
func (r *roomEntry) Snapshot() []*Session {
v := r.snapshot.Load()
if v == nil {
return nil
}
return v.([]*Session)
}
type Manager struct {
mu sync.RWMutex
sessions map[uint32]*Session // sessionID -> session
userRoom map[string]*Session // "user:room" -> session
roomMap map[string]*roomEntry
ssrcMap map[uint32]*Session // SSRC -> session
addrMap map[string]*Session // "ip:port" -> session
nextID uint32
nextSSRC uint32
}
func NewManager() *Manager {
return &Manager{
sessions: make(map[uint32]*Session),
userRoom: make(map[string]*Session),
roomMap: make(map[string]*roomEntry),
ssrcMap: make(map[uint32]*Session),
addrMap: make(map[string]*Session),
nextID: 1000,
nextSSRC: 2000,
}
}
func userRoomKey(userID, roomID string) string { return userID + ":" + roomID }
func (m *Manager) CreateSession(
userID, roomID string,
addr *net.UDPAddr,
sessionCrypto *crypto.SessionCrypto,
videoEnabled bool,
) *Session {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
sessionID := m.nextID
m.nextID++
audioSSRC := m.nextSSRC
m.nextSSRC++
videoSSRC := m.nextSSRC
m.nextSSRC++
screenSSRC := m.nextSSRC
m.nextSSRC++
sess := &Session{
ID: sessionID,
UserID: userID,
RoomID: roomID,
SSRC: audioSSRC,
VideoSSRC: videoSSRC,
ScreenSSRC: screenSSRC,
addr: cloneUDPAddr(addr),
lastActivity: now,
Crypto: sessionCrypto,
Muted: false,
VideoEnabled: videoEnabled,
ScreenSharing: false,
Speaking: false,
retransmitBufs: make(map[uint32]*RetransmitBuffer),
}
m.sessions[sessionID] = sess
m.userRoom[userRoomKey(userID, roomID)] = sess
room := m.roomMap[roomID]
if room == nil {
room = newRoomEntry()
m.roomMap[roomID] = room
}
room.sessions[sessionID] = sess
room.rebuildSnapshotLocked()
m.ssrcMap[audioSSRC] = sess
m.ssrcMap[videoSSRC] = sess
m.ssrcMap[screenSSRC] = sess
if sess.addr != nil {
m.addrMap[udpAddrKey(sess.addr)] = sess
}
return sess
}
func (m *Manager) BindAddr(sessionID uint32, addr *net.UDPAddr) {
if addr == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
sess := m.sessions[sessionID]
if sess == nil {
return
}
oldKey, newKey, changed := sess.replaceAddr(addr)
if !changed {
return
}
if oldKey != "" {
delete(m.addrMap, oldKey)
}
if newKey != "" {
m.addrMap[newKey] = sess
}
}
func (m *Manager) Touch(sessionID uint32) {
m.mu.RLock()
sess := m.sessions[sessionID]
m.mu.RUnlock()
if sess == nil {
return
}
now := time.Now()
sess.mu.Lock()
sess.touchLocked(now)
sess.mu.Unlock()
}
func (m *Manager) GetSession(sessionID uint32) *Session {
m.mu.RLock()
defer m.mu.RUnlock()
return m.sessions[sessionID]
}
func (m *Manager) GetBySSRC(ssrc uint32) *Session {
m.mu.RLock()
defer m.mu.RUnlock()
return m.ssrcMap[ssrc]
}
func (m *Manager) GetByAddr(addr *net.UDPAddr) *Session {
if addr == nil {
return nil
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.addrMap[udpAddrKey(addr)]
}
func (m *Manager) GetSessionByUserInRoom(userID, roomID string) *Session {
m.mu.RLock()
defer m.mu.RUnlock()
return m.userRoom[userRoomKey(userID, roomID)]
}
// Hot-path read: no slice allocation.
func (m *Manager) GetRoomSessions(roomID string) []*Session {
m.mu.RLock()
room := m.roomMap[roomID]
m.mu.RUnlock()
if room == nil {
return nil
}
return room.Snapshot()
}
func (m *Manager) RemoveSession(sessionID uint32) {
m.mu.Lock()
defer m.mu.Unlock()
sess := m.sessions[sessionID]
if sess == nil {
return
}
delete(m.sessions, sessionID)
delete(m.userRoom, userRoomKey(sess.UserID, sess.RoomID))
delete(m.ssrcMap, sess.SSRC)
delete(m.ssrcMap, sess.VideoSSRC)
delete(m.ssrcMap, sess.ScreenSSRC)
if sess.addr != nil {
delete(m.addrMap, udpAddrKey(sess.addr))
}
if room := m.roomMap[sess.RoomID]; room != nil {
delete(room.sessions, sessionID)
if len(room.sessions) == 0 {
delete(m.roomMap, sess.RoomID)
} else {
room.rebuildSnapshotLocked()
}
}
}
func (m *Manager) CleanupInactive(timeout time.Duration) []uint32 {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
var removed []uint32
touchedRooms := make(map[string]*roomEntry)
for sessionID, sess := range m.sessions {
last := sess.LastActivity()
if now.Sub(last) <= timeout {
continue
}
removed = append(removed, sessionID)
delete(m.sessions, sessionID)
delete(m.userRoom, userRoomKey(sess.UserID, sess.RoomID))
delete(m.ssrcMap, sess.SSRC)
delete(m.ssrcMap, sess.VideoSSRC)
delete(m.ssrcMap, sess.ScreenSSRC)
if sess.addr != nil {
delete(m.addrMap, udpAddrKey(sess.addr))
}
if room := m.roomMap[sess.RoomID]; room != nil {
delete(room.sessions, sessionID)
if len(room.sessions) == 0 {
delete(m.roomMap, sess.RoomID)
} else {
touchedRooms[sess.RoomID] = room
}
}
}
for _, room := range touchedRooms {
room.rebuildSnapshotLocked()
}
return removed
}
func (m *Manager) GetActiveSessions(activeWithin time.Duration) []*Session {
cutoff := time.Now().Add(-activeWithin)
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]*Session, 0, len(m.sessions))
for _, s := range m.sessions {
if s.LastActivity().After(cutoff) {
out = append(out, s)
}
}
return out
}
func (s *Session) UpdateSubscriptions(subs []uint32) {
s.mu.Lock()
defer s.mu.Unlock()
if len(subs) == 0 {
s.Subscriptions = nil
return
}
s.Subscriptions = make(map[uint32]bool, len(subs))
for _, ssrc := range subs {
s.Subscriptions[ssrc] = true
}
}
func (s *Session) IsSubscribedTo(ssrc uint32) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.Subscriptions == nil {
return true
}
return s.Subscriptions[ssrc]
}