mirror of
https://github.com/Alexander-D-Karpov/concord.git
synced 2026-03-16 22:04:15 +03:00
325 lines
8.3 KiB
Go
325 lines
8.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/Alexander-D-Karpov/concord/internal/version"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/status"
|
|
"github.com/joho/godotenv"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/Alexander-D-Karpov/concord/internal/auth/jwt"
|
|
"github.com/Alexander-D-Karpov/concord/internal/common/config"
|
|
"github.com/Alexander-D-Karpov/concord/internal/common/logging"
|
|
"github.com/Alexander-D-Karpov/concord/internal/common/netinfo"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/control"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/discovery"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/health"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/room"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/router"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/session"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/telemetry"
|
|
"github.com/Alexander-D-Karpov/concord/internal/voice/udp"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
func main() {
|
|
if err := run(); err != nil {
|
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func run() error {
|
|
_ = godotenv.Load(".env")
|
|
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
return fmt.Errorf("load config: %w", err)
|
|
}
|
|
|
|
logger, err := logging.Init(
|
|
cfg.Logging.Level,
|
|
cfg.Logging.Format,
|
|
cfg.Logging.Output,
|
|
cfg.Logging.EnableFile,
|
|
cfg.Logging.FilePath,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("init logging: %w", err)
|
|
}
|
|
defer func(logger *zap.Logger) {
|
|
if err := logger.Sync(); err != nil {
|
|
if errors.Is(err, syscall.EINVAL) || errors.Is(err, syscall.ENOTTY) {
|
|
return
|
|
}
|
|
fmt.Fprintf(os.Stderr, "error syncing logger: %v\n", err)
|
|
}
|
|
}(logger)
|
|
|
|
serverID := cfg.Voice.ServerID
|
|
if serverID == "" {
|
|
serverID = uuid.New().String()
|
|
logger.Info("generated server ID", zap.String("server_id", serverID))
|
|
}
|
|
|
|
logger.Info("starting concord-voice",
|
|
zap.String("version", version.Voice()),
|
|
zap.String("server_id", serverID),
|
|
zap.String("region", cfg.Voice.Region),
|
|
)
|
|
|
|
// Initialize core components
|
|
jwtManager := jwt.NewManager(cfg.Auth.JWTSecret, cfg.Auth.VoiceJWTSecret)
|
|
sessionManager := session.NewManager()
|
|
roomManager := room.NewManager()
|
|
|
|
// Metrics and health
|
|
metrics := telemetry.NewMetrics(logger)
|
|
telemetryLogger := telemetry.NewLogger(logger)
|
|
|
|
voiceRouter := router.NewRouter(sessionManager, logger, metrics)
|
|
|
|
healthServer := health.NewServer(logger)
|
|
healthServer.RegisterCheck("sessions", func(ctx context.Context) error {
|
|
sessions := sessionManager.GetAllSessions()
|
|
if len(sessions) > 10000 {
|
|
return fmt.Errorf("too many sessions: %d", len(sessions))
|
|
}
|
|
return nil
|
|
})
|
|
healthServer.RegisterCheck("rooms", func(ctx context.Context) error {
|
|
rooms := roomManager.GetAllRooms()
|
|
if len(rooms) > 1000 {
|
|
return fmt.Errorf("too many rooms: %d", len(rooms))
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Create UDP server pool
|
|
udpPort := cfg.Voice.UDPPortStart
|
|
portCount := cfg.Voice.UDPPortCount
|
|
if portCount <= 0 {
|
|
portCount = 50
|
|
}
|
|
if portCount > (cfg.Voice.UDPPortEnd - cfg.Voice.UDPPortStart) {
|
|
portCount = cfg.Voice.UDPPortEnd - cfg.Voice.UDPPortStart
|
|
}
|
|
|
|
udpPool, err := udp.NewServerPool(
|
|
cfg.Voice.UDPHost,
|
|
cfg.Voice.UDPPortStart,
|
|
portCount,
|
|
sessionManager,
|
|
voiceRouter,
|
|
jwtManager,
|
|
logger,
|
|
metrics,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("create UDP pool: %w", err)
|
|
}
|
|
|
|
// Create control server for registry communication
|
|
controlServer := control.NewServer(
|
|
sessionManager,
|
|
logger,
|
|
serverID,
|
|
cfg.Voice.Region,
|
|
"concord-voice",
|
|
1000,
|
|
)
|
|
|
|
// Compute advertised addresses
|
|
ctx := context.Background()
|
|
advertised := netinfo.ComputeAdvertised(
|
|
ctx,
|
|
cfg.Voice.PublicHost,
|
|
cfg.Voice.UDPHost,
|
|
udpPort,
|
|
)
|
|
netinfo.PrintAccessBanner(advertised, "Concord Voice Server")
|
|
|
|
statusSrv := status.NewServer(sessionManager, jwtManager, metrics, logger)
|
|
go func() {
|
|
err := statusSrv.Start(ctx, cfg.Voice.StatusPort)
|
|
if err != nil {
|
|
logger.Error("status server error", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
var registrar *discovery.Registrar
|
|
if cfg.Voice.RegistryURL != "" {
|
|
publicAddr := advertised.PublicHost
|
|
if publicAddr == "" {
|
|
publicAddr = advertised.LANHost
|
|
}
|
|
|
|
// Use the port from advertised, don't append again
|
|
udpAddress := fmt.Sprintf("%s:%d", publicAddr, advertised.Port)
|
|
ctrlAddress := fmt.Sprintf("%s:%d", publicAddr, cfg.Voice.ControlPort)
|
|
|
|
registrar, err = discovery.NewRegistrar(
|
|
cfg.Voice.RegistryURL,
|
|
serverID,
|
|
"concord-voice",
|
|
cfg.Voice.Region,
|
|
udpAddress,
|
|
ctrlAddress,
|
|
1000,
|
|
logger,
|
|
)
|
|
if err != nil {
|
|
logger.Warn("failed to create registrar", zap.Error(err))
|
|
} else {
|
|
if err := registrar.Register(ctx); err != nil {
|
|
logger.Warn("failed to register with main API", zap.Error(err))
|
|
} else {
|
|
logger.Info("registered with main API",
|
|
zap.String("registry_url", cfg.Voice.RegistryURL),
|
|
)
|
|
}
|
|
|
|
// Stats function for heartbeat
|
|
statsFunc := func() (int32, int32, float64, float64) {
|
|
rooms := sessionManager.GetAllRooms()
|
|
sessions := sessionManager.GetAllSessions()
|
|
|
|
var m runtime.MemStats
|
|
runtime.ReadMemStats(&m)
|
|
cpu := float64(runtime.NumGoroutine()) / 100.0
|
|
|
|
stats := metrics.GetStats()
|
|
outboundMbps := float64(stats.BytesSent) / (1024 * 1024)
|
|
|
|
return int32(len(rooms)), int32(len(sessions)), cpu, outboundMbps
|
|
}
|
|
|
|
registrar.StartHeartbeat(ctx, 30*time.Second, statsFunc)
|
|
}
|
|
}
|
|
|
|
// Create cancellable context for graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
errChan := make(chan error, 4)
|
|
|
|
// Start UDP server
|
|
go func() {
|
|
logger.Info("starting UDP pool", zap.Int("start_port", cfg.Voice.UDPPortStart), zap.Int("count", portCount))
|
|
if err := udpPool.Start(ctx); err != nil {
|
|
errChan <- fmt.Errorf("UDP pool: %w", err)
|
|
}
|
|
}()
|
|
|
|
// Start control server
|
|
go func() {
|
|
logger.Info("starting control server", zap.Int("port", cfg.Voice.ControlPort))
|
|
if err := controlServer.Start(ctx, cfg.Voice.ControlPort); err != nil {
|
|
errChan <- fmt.Errorf("control server: %w", err)
|
|
}
|
|
}()
|
|
|
|
// Start metrics server
|
|
go func() {
|
|
logger.Info("starting metrics server", zap.Int("port", 9101))
|
|
if err := metrics.Start(ctx, 9101, "/metrics"); err != nil {
|
|
errChan <- fmt.Errorf("metrics server: %w", err)
|
|
}
|
|
}()
|
|
|
|
// Start health server
|
|
go func() {
|
|
logger.Info("starting health server", zap.Int("port", 8082))
|
|
if err := healthServer.Start(ctx, 8082, "/health"); err != nil {
|
|
errChan <- fmt.Errorf("health server: %w", err)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
cleanupTicker := time.NewTicker(30 * time.Second)
|
|
statsTicker := time.NewTicker(10 * time.Second)
|
|
defer cleanupTicker.Stop()
|
|
defer statsTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-cleanupTicker.C:
|
|
removed := sessionManager.CleanupInactive(2 * time.Minute)
|
|
if len(removed) > 0 {
|
|
logger.Info("cleaned up inactive sessions",
|
|
zap.Int("count", len(removed)),
|
|
)
|
|
|
|
for _, sessionID := range removed {
|
|
telemetryLogger.LogSessionEnded(sessionID, "", "")
|
|
}
|
|
}
|
|
|
|
case <-statsTicker.C:
|
|
sessions := sessionManager.GetAllSessions()
|
|
rooms := sessionManager.GetAllRooms() // Use session manager's room tracking
|
|
|
|
metrics.SetActiveSessions(int32(len(sessions)))
|
|
metrics.SetActiveRooms(int32(len(rooms)))
|
|
|
|
stats := metrics.GetStats()
|
|
if len(sessions) > 0 {
|
|
logger.Debug("server stats",
|
|
zap.Int("active_sessions", len(sessions)),
|
|
zap.Int("active_rooms", len(rooms)),
|
|
zap.Uint64("packets_received", stats.PacketsReceived),
|
|
zap.Uint64("packets_sent", stats.PacketsSent),
|
|
zap.Uint64("bytes_received", stats.BytesReceived),
|
|
zap.Uint64("bytes_sent", stats.BytesSent),
|
|
zap.Uint64("packets_dropped", stats.PacketsDropped),
|
|
)
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait for shutdown signal
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
|
|
|
select {
|
|
case err := <-errChan:
|
|
logger.Error("server error", zap.Error(err))
|
|
return err
|
|
case sig := <-sigChan:
|
|
logger.Info("received shutdown signal", zap.String("signal", sig.String()))
|
|
}
|
|
|
|
// Graceful shutdown
|
|
logger.Info("shutting down gracefully...")
|
|
|
|
// Stop accepting new connections
|
|
cancel()
|
|
|
|
// Stop router
|
|
voiceRouter.Stop()
|
|
|
|
// Stop registrar
|
|
if registrar != nil {
|
|
registrar.Stop()
|
|
}
|
|
|
|
// Give goroutines time to finish
|
|
time.Sleep(2 * time.Second)
|
|
|
|
logger.Info("shutdown complete")
|
|
return nil
|
|
}
|