mirror of
https://github.com/Alexander-D-Karpov/concord.git
synced 2026-03-16 22:04:15 +03:00
450 lines
12 KiB
Go
450 lines
12 KiB
Go
package udp
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"net"
|
|
|
|
voiceauth "github.com/Alexander-D-Karpov/concord/internal/voice/auth"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/crypto"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/protocol"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/router"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/session"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/telemetry"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type Handler struct {
|
|
sessionManager *session.Manager
|
|
router *router.Router
|
|
validator *voiceauth.Validator
|
|
logger *zap.Logger
|
|
metrics *telemetry.Metrics
|
|
}
|
|
|
|
func NewHandler(
|
|
sessionManager *session.Manager,
|
|
voiceRouter *router.Router,
|
|
validator *voiceauth.Validator,
|
|
logger *zap.Logger,
|
|
metrics *telemetry.Metrics,
|
|
) *Handler {
|
|
return &Handler{
|
|
sessionManager: sessionManager,
|
|
router: voiceRouter,
|
|
validator: validator,
|
|
logger: logger,
|
|
metrics: metrics,
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handlePacket(data []byte, owner router.PacketOwner, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
if len(data) < 1 {
|
|
return
|
|
}
|
|
|
|
if h.metrics != nil {
|
|
h.metrics.RecordPacketReceived(uint64(len(data)))
|
|
}
|
|
|
|
switch data[0] {
|
|
case protocol.PacketTypeHello:
|
|
h.handleHello(data, addr, conn)
|
|
case protocol.PacketTypeAudio, protocol.PacketTypeVideo:
|
|
h.handleMedia(data, owner, addr, conn)
|
|
case protocol.PacketTypePing:
|
|
h.handlePing(data, addr, conn)
|
|
case protocol.PacketTypeBye:
|
|
h.handleBye(data, addr, conn)
|
|
case protocol.PacketTypeSpeaking:
|
|
h.handleSpeaking(data, addr, conn)
|
|
case protocol.PacketTypeMediaState:
|
|
h.handleMediaState(data, addr, conn)
|
|
case protocol.PacketTypeNack:
|
|
h.handleNack(data, addr, conn)
|
|
case protocol.PacketTypePli:
|
|
h.handlePli(data, addr, conn)
|
|
case protocol.PacketTypeRR:
|
|
h.handleReceiverReport(data, addr, conn)
|
|
case protocol.PacketTypeSubscribe:
|
|
h.handleSubscribe(data, addr, conn)
|
|
case protocol.PacketTypeQualityReport:
|
|
h.handleQualityReport(data, addr, conn)
|
|
}
|
|
}
|
|
|
|
func (h *Handler) HandlePacket(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
h.handlePacket(data, nil, addr, conn)
|
|
}
|
|
|
|
func (h *Handler) HandlePacketOwned(data []byte, owner router.PacketOwner, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
h.handlePacket(data, owner, addr, conn)
|
|
}
|
|
|
|
func (h *Handler) handleMedia(data []byte, owner router.PacketOwner, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
if len(data) < protocol.MediaHeaderSize {
|
|
return
|
|
}
|
|
|
|
hdr, err := protocol.ParseMediaHeader(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
sess := h.sessionManager.GetBySSRC(hdr.SSRC)
|
|
if sess == nil {
|
|
return
|
|
}
|
|
|
|
if sess.AddrChanged(addr) {
|
|
h.sessionManager.BindAddr(sess.ID, addr)
|
|
}
|
|
h.sessionManager.Touch(sess.ID)
|
|
|
|
// Store retransmit only for video.
|
|
// Audio is high-rate and usually not worth the copy + map churn.
|
|
if hdr.Type == protocol.PacketTypeVideo {
|
|
sess.StoreForRetransmit(hdr.SSRC, hdr.Sequence, data)
|
|
}
|
|
|
|
if owner != nil {
|
|
h.router.RouteMediaOwned(*hdr, data, owner, addr, conn)
|
|
} else {
|
|
h.router.RouteMediaRaw(*hdr, data, addr, conn)
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handleHello(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
var hello protocol.HelloPayload
|
|
if err := json.Unmarshal(data[1:], &hello); err != nil {
|
|
h.logger.Error("failed to unmarshal hello", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
claims, err := h.validator.ValidateToken(context.Background(), hello.Token)
|
|
if err != nil {
|
|
h.logger.Warn("invalid token in hello", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
var sessionCrypto *crypto.SessionCrypto
|
|
if hello.Crypto != nil && len(hello.Crypto.KeyMaterial) == crypto.KeySize {
|
|
var keyID uint8
|
|
if len(hello.Crypto.KeyID) > 0 {
|
|
keyID = hello.Crypto.KeyID[0]
|
|
}
|
|
sessionCrypto, _ = crypto.NewSessionCrypto(hello.Crypto.KeyMaterial, hello.Crypto.NonceBase, keyID)
|
|
}
|
|
|
|
existing := h.sessionManager.GetSessionByUserInRoom(claims.UserID, claims.RoomID)
|
|
if existing != nil {
|
|
h.broadcastParticipantLeft(existing.RoomID, existing.UserID, existing.SSRC, existing.VideoSSRC, conn)
|
|
h.sessionManager.RemoveSession(existing.ID)
|
|
}
|
|
|
|
existingSessions := h.sessionManager.GetRoomSessions(claims.RoomID)
|
|
|
|
sess := h.sessionManager.CreateSession(claims.UserID, claims.RoomID, addr, sessionCrypto, hello.VideoEnabled)
|
|
|
|
welcome := protocol.WelcomePayload{
|
|
SessionID: sess.ID,
|
|
SSRC: sess.SSRC,
|
|
VideoSSRC: sess.VideoSSRC,
|
|
ScreenSSRC: sess.ScreenSSRC,
|
|
Participants: make([]protocol.ParticipantInfo, 0),
|
|
}
|
|
|
|
welcomeData, _ := json.Marshal(welcome)
|
|
out := make([]byte, 1+len(welcomeData))
|
|
out[0] = protocol.PacketTypeWelcome
|
|
copy(out[1:], welcomeData)
|
|
|
|
if err := h.send(out, addr, conn); err != nil {
|
|
h.logger.Warn("failed to send welcome", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Send existing participants as separate small packets to the new client.
|
|
h.sendInitialParticipantSnapshot(addr, existingSessions, conn)
|
|
|
|
// Notify existing participants about the new one.
|
|
h.broadcastJoined(claims.RoomID, sess, conn)
|
|
|
|
h.logger.Info("session created",
|
|
zap.String("user_id", claims.UserID),
|
|
zap.String("room_id", claims.RoomID),
|
|
zap.Uint32("ssrc", sess.SSRC),
|
|
zap.Uint32("video_ssrc", sess.VideoSSRC),
|
|
zap.Uint32("screen_ssrc", sess.ScreenSSRC),
|
|
)
|
|
}
|
|
|
|
func (h *Handler) sendInitialParticipantSnapshot(to *net.UDPAddr, sessions []*session.Session, conn *net.UDPConn) {
|
|
for _, s := range sessions {
|
|
if s == nil {
|
|
continue
|
|
}
|
|
|
|
payload := protocol.MediaStatePayload{
|
|
SSRC: s.SSRC,
|
|
VideoSSRC: s.VideoSSRC,
|
|
ScreenSSRC: s.ScreenSSRC,
|
|
UserID: s.UserID,
|
|
RoomID: s.RoomID,
|
|
Muted: s.Muted,
|
|
VideoEnabled: s.VideoEnabled,
|
|
ScreenSharing: s.ScreenSharing,
|
|
}
|
|
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
pkt := make([]byte, 1+len(data))
|
|
pkt[0] = protocol.PacketTypeMediaState
|
|
copy(pkt[1:], data)
|
|
|
|
_ = h.send(pkt, to, conn)
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handlePing(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
if sess := h.sessionManager.GetByAddr(addr); sess != nil {
|
|
h.sessionManager.Touch(sess.ID)
|
|
}
|
|
|
|
pong := make([]byte, len(data))
|
|
pong[0] = protocol.PacketTypePong
|
|
copy(pong[1:], data[1:])
|
|
_ = h.send(pong, addr, conn)
|
|
}
|
|
|
|
func (h *Handler) handleBye(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
if len(data) < 5 {
|
|
return
|
|
}
|
|
|
|
ssrc := binary.BigEndian.Uint32(data[1:5])
|
|
sess := h.sessionManager.GetBySSRC(ssrc)
|
|
if sess == nil {
|
|
return
|
|
}
|
|
|
|
roomID := sess.RoomID
|
|
userID := sess.UserID
|
|
videoSSRC := sess.VideoSSRC
|
|
|
|
h.sessionManager.RemoveSession(sess.ID)
|
|
h.broadcastParticipantLeft(roomID, userID, ssrc, videoSSRC, conn)
|
|
|
|
h.logger.Info("session ended",
|
|
zap.String("user_id", userID),
|
|
zap.String("room_id", roomID),
|
|
)
|
|
}
|
|
|
|
func (h *Handler) handleSpeaking(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
var speaking protocol.SpeakingPayload
|
|
if err := json.Unmarshal(data[1:], &speaking); err != nil {
|
|
return
|
|
}
|
|
|
|
sess := h.sessionManager.GetBySSRC(speaking.SSRC)
|
|
if sess == nil {
|
|
return
|
|
}
|
|
|
|
sess.SetSpeaking(speaking.Speaking)
|
|
h.sessionManager.Touch(sess.ID)
|
|
|
|
sessions := h.sessionManager.GetRoomSessions(sess.RoomID)
|
|
payload, _ := json.Marshal(speaking)
|
|
pkt := make([]byte, 1+len(payload))
|
|
pkt[0] = protocol.PacketTypeSpeaking
|
|
copy(pkt[1:], payload)
|
|
|
|
for _, s := range sessions {
|
|
if s.SSRC == speaking.SSRC {
|
|
continue
|
|
}
|
|
if to := s.GetAddr(); to != nil {
|
|
_ = h.send(pkt, to, conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handleMediaState(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
var ms protocol.MediaStatePayload
|
|
if err := json.Unmarshal(data[1:], &ms); err != nil {
|
|
return
|
|
}
|
|
|
|
sess := h.sessionManager.GetBySSRC(ms.SSRC)
|
|
if sess == nil {
|
|
return
|
|
}
|
|
|
|
sess.SetMuted(ms.Muted)
|
|
sess.SetVideoEnabled(ms.VideoEnabled)
|
|
sess.SetScreenSharing(ms.ScreenSharing)
|
|
h.sessionManager.Touch(sess.ID)
|
|
h.broadcastMediaState(sess.RoomID, sess, conn)
|
|
}
|
|
|
|
func (h *Handler) handleNack(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
nack, err := protocol.ParseNack(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
target := h.sessionManager.GetBySSRC(nack.SSRC)
|
|
if target == nil {
|
|
return
|
|
}
|
|
for _, seq := range nack.Sequences {
|
|
if cached := target.GetForRetransmit(nack.SSRC, seq); cached != nil {
|
|
_ = h.send(cached, addr, conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handlePli(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
pli, err := protocol.ParsePli(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
target := h.sessionManager.GetBySSRC(pli.SSRC)
|
|
if target == nil {
|
|
return
|
|
}
|
|
|
|
if to := target.GetAddr(); to != nil {
|
|
_ = h.send(data, to, conn)
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handleReceiverReport(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
rr, err := protocol.ParseReceiverReport(data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
target := h.sessionManager.GetBySSRC(rr.SSRC)
|
|
if target == nil {
|
|
return
|
|
}
|
|
|
|
if to := target.GetAddr(); to != nil {
|
|
_ = h.send(data, to, conn)
|
|
}
|
|
}
|
|
|
|
func (h *Handler) handleSubscribe(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
sess := h.sessionManager.GetByAddr(addr)
|
|
if sess == nil {
|
|
return
|
|
}
|
|
|
|
var payload protocol.SubscribePayload
|
|
if err := json.Unmarshal(data[1:], &payload); err != nil {
|
|
return
|
|
}
|
|
|
|
sess.UpdateSubscriptions(payload.Subscriptions)
|
|
}
|
|
|
|
func (h *Handler) handleQualityReport(data []byte, addr *net.UDPAddr, conn *net.UDPConn) {
|
|
if len(data) < 2 {
|
|
return
|
|
}
|
|
|
|
sess := h.sessionManager.GetByAddr(addr)
|
|
if sess == nil {
|
|
return
|
|
}
|
|
|
|
h.sessionManager.Touch(sess.ID)
|
|
|
|
sessions := h.sessionManager.GetRoomSessions(sess.RoomID)
|
|
for _, other := range sessions {
|
|
if other.ID == sess.ID {
|
|
continue
|
|
}
|
|
if to := other.GetAddr(); to != nil {
|
|
_ = h.send(data, to, conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Handler) broadcastJoined(roomID string, newSess *session.Session, conn *net.UDPConn) {
|
|
sessions := h.sessionManager.GetRoomSessions(roomID)
|
|
|
|
payload := protocol.MediaStatePayload{
|
|
SSRC: newSess.SSRC, VideoSSRC: newSess.VideoSSRC, ScreenSSRC: newSess.ScreenSSRC,
|
|
UserID: newSess.UserID, RoomID: roomID,
|
|
Muted: newSess.Muted, VideoEnabled: newSess.VideoEnabled, ScreenSharing: newSess.ScreenSharing,
|
|
}
|
|
data, _ := json.Marshal(payload)
|
|
pkt := make([]byte, 1+len(data))
|
|
pkt[0] = protocol.PacketTypeMediaState
|
|
copy(pkt[1:], data)
|
|
|
|
for _, s := range sessions {
|
|
if s.ID == newSess.ID {
|
|
continue
|
|
}
|
|
if to := s.GetAddr(); to != nil {
|
|
_ = h.send(pkt, to, conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Handler) broadcastParticipantLeft(roomID, userID string, ssrc, videoSSRC uint32, conn *net.UDPConn) {
|
|
sessions := h.sessionManager.GetRoomSessions(roomID)
|
|
|
|
payload := protocol.ParticipantLeftPayload{UserID: userID, RoomID: roomID, SSRC: ssrc, VideoSSRC: videoSSRC}
|
|
data, _ := json.Marshal(payload)
|
|
pkt := make([]byte, 1+len(data))
|
|
pkt[0] = protocol.PacketTypeParticipantLeft
|
|
copy(pkt[1:], data)
|
|
|
|
for _, s := range sessions {
|
|
if to := s.GetAddr(); to != nil {
|
|
_ = h.send(pkt, to, conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Handler) broadcastMediaState(roomID string, sess *session.Session, conn *net.UDPConn) {
|
|
sessions := h.sessionManager.GetRoomSessions(roomID)
|
|
|
|
payload := protocol.MediaStatePayload{
|
|
SSRC: sess.SSRC, VideoSSRC: sess.VideoSSRC, ScreenSSRC: sess.ScreenSSRC,
|
|
UserID: sess.UserID, RoomID: roomID,
|
|
Muted: sess.Muted, VideoEnabled: sess.VideoEnabled, ScreenSharing: sess.ScreenSharing,
|
|
}
|
|
data, _ := json.Marshal(payload)
|
|
pkt := make([]byte, 1+len(data))
|
|
pkt[0] = protocol.PacketTypeMediaState
|
|
copy(pkt[1:], data)
|
|
|
|
for _, s := range sessions {
|
|
if s.ID == sess.ID {
|
|
continue
|
|
}
|
|
if to := s.GetAddr(); to != nil {
|
|
_ = h.send(pkt, to, conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Handler) send(data []byte, addr *net.UDPAddr, conn *net.UDPConn) error {
|
|
_, err := conn.WriteToUDP(data, addr)
|
|
if err == nil && h.metrics != nil {
|
|
h.metrics.RecordPacketSent(uint64(len(data)))
|
|
}
|
|
return err
|
|
}
|