Compare commits

..

2 Commits

Author SHA1 Message Date
59c93d5d47 welcome update 2026-03-13 15:35:24 +03:00
0adad329dc major voice updates, added voice stresser, minor updates 2026-03-13 15:08:44 +03:00
21 changed files with 1606 additions and 463 deletions

View File

@ -171,10 +171,11 @@ func run() error {
cfg.RateLimit.RequestsPerMinute, cfg.RateLimit.RequestsPerMinute,
cfg.RateLimit.Burst, cfg.RateLimit.Burst,
true, true,
cfg.RateLimit.BypassToken,
) )
logger.Info("rate limiting enabled") logger.Info("rate limiting enabled")
} else { } else {
rateLimiter = ratelimit.NewLimiter(nil, 500, 100, false) rateLimiter = ratelimit.NewLimiter(nil, 500, 100, false, cfg.RateLimit.BypassToken)
} }
rateLimitInterceptor := ratelimit.NewInterceptor(rateLimiter) rateLimitInterceptor := ratelimit.NewInterceptor(rateLimiter)

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/Alexander-D-Karpov/concord/internal/version" "github.com/Alexander-D-Karpov/concord/internal/version"
"github.com/Alexander-D-Karpov/concord/internal/voice/status"
"github.com/joho/godotenv" "github.com/joho/godotenv"
"go.uber.org/zap" "go.uber.org/zap"
@ -80,13 +81,12 @@ func run() error {
sessionManager := session.NewManager() sessionManager := session.NewManager()
roomManager := room.NewManager() roomManager := room.NewManager()
// Router needs access to both session and room state
voiceRouter := router.NewRouter(sessionManager, logger)
// Metrics and health // Metrics and health
metrics := telemetry.NewMetrics(logger) metrics := telemetry.NewMetrics(logger)
telemetryLogger := telemetry.NewLogger(logger) telemetryLogger := telemetry.NewLogger(logger)
voiceRouter := router.NewRouter(sessionManager, logger, metrics)
healthServer := health.NewServer(logger) healthServer := health.NewServer(logger)
healthServer.RegisterCheck("sessions", func(ctx context.Context) error { healthServer.RegisterCheck("sessions", func(ctx context.Context) error {
sessions := sessionManager.GetAllSessions() sessions := sessionManager.GetAllSessions()
@ -147,6 +147,14 @@ func run() error {
) )
netinfo.PrintAccessBanner(advertised, "Concord Voice Server") netinfo.PrintAccessBanner(advertised, "Concord Voice Server")
statusSrv := status.NewServer(sessionManager, jwtManager, metrics, logger)
go func() {
err := statusSrv.Start(ctx, cfg.Voice.StatusPort)
if err != nil {
logger.Error("status server error", zap.Error(err))
}
}()
var registrar *discovery.Registrar var registrar *discovery.Registrar
if cfg.Voice.RegistryURL != "" { if cfg.Voice.RegistryURL != "" {
publicAddr := advertised.PublicHost publicAddr := advertised.PublicHost

60
go.work.sum Normal file
View File

@ -0,0 +1,60 @@
cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU=
cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 h1:UQUsRi8WTzhZntp5313l+CHIAT95ojUI2lpP/ExlZa4=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0/go.mod h1:Cz6ft6Dkn3Et6l2v2a9/RpN7epQ1GtDlO6lj8bEcOvw=
github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjHpqDjYY=
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/antihax/optional v1.0.0 h1:xK2lYat7ZLaVVcIuj82J8kIro4V6kDe0AUDFboUCwcg=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
github.com/envoyproxy/go-control-plane v0.13.4/go.mod h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI=
github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo=
github.com/golang/glog v1.2.5 h1:DrW6hGnjIhtvhOIiAKT6Psh/Kd/ldepEa81DKeiRJ5I=
github.com/golang/glog v1.2.5/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 h1:F7q2tNlCaHY9nMKHR6XH9/qkp8FktLnIcy6jJNyOCQw=
go.opentelemetry.io/contrib/detectors/gcp v1.36.0/go.mod h1:IbBN8uAIIx734PTonTPxAxnjc2pQTxWNkwfstZ+6H2k=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c=
golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU=
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss=
golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc=
golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg=

View File

@ -70,6 +70,7 @@ type VoiceConfig struct {
Secret string Secret string
RegistryURL string RegistryURL string
PublicHost string PublicHost string
StatusPort int
} }
type LoggingConfig struct { type LoggingConfig struct {
@ -97,6 +98,7 @@ type RateLimitConfig struct {
Enabled bool Enabled bool
RequestsPerMinute int RequestsPerMinute int
Burst int Burst int
BypassToken string
} }
type EmailConfig struct { type EmailConfig struct {
@ -150,6 +152,7 @@ func Load() (*Config, error) {
Secret: getEnv("VOICE_SECRET", "change-me-voice-server-secret"), Secret: getEnv("VOICE_SECRET", "change-me-voice-server-secret"),
RegistryURL: getEnv("REGISTRY_URL", "localhost:9090"), RegistryURL: getEnv("REGISTRY_URL", "localhost:9090"),
PublicHost: getEnv("VOICE_PUBLIC_HOST", "localhost"), PublicHost: getEnv("VOICE_PUBLIC_HOST", "localhost"),
StatusPort: getEnvInt("VOICE_STATUS_PORT", 9092),
}, },
Logging: LoggingConfig{ Logging: LoggingConfig{
Level: getEnv("LOG_LEVEL", "info"), Level: getEnv("LOG_LEVEL", "info"),
@ -169,6 +172,7 @@ func Load() (*Config, error) {
Enabled: getEnvBool("RATE_LIMIT_ENABLED", true), Enabled: getEnvBool("RATE_LIMIT_ENABLED", true),
RequestsPerMinute: getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60), RequestsPerMinute: getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60),
Burst: getEnvInt("RATE_LIMIT_BURST", 10), Burst: getEnvInt("RATE_LIMIT_BURST", 10),
BypassToken: getEnv("RATE_LIMIT_BYPASS_TOKEN", ""),
}, },
Storage: StorageConfig{ Storage: StorageConfig{
Path: getEnv("STORAGE_PATH", "./uploads"), Path: getEnv("STORAGE_PATH", "./uploads"),

View File

@ -124,7 +124,7 @@ func (g *Gateway) Start(ctx context.Context, port int) error {
func customMatcher(key string) (string, bool) { func customMatcher(key string) (string, bool) {
switch key { switch key {
case "authorization", "x-request-id", "x-correlation-id", "grpc-timeout": case "authorization", "x-request-id", "x-correlation-id", "grpc-timeout", "x-concord-ratelimit-bypass":
return key, true return key, true
default: default:
return runtime.DefaultHeaderMatcher(key) return runtime.DefaultHeaderMatcher(key)

View File

@ -11,6 +11,8 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
const BypassMetadataKey = "x-concord-ratelimit-bypass"
type Interceptor struct { type Interceptor struct {
limiter *Limiter limiter *Limiter
} }
@ -26,6 +28,10 @@ func (i *Interceptor) Unary() grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler, handler grpc.UnaryHandler,
) (interface{}, error) { ) (interface{}, error) {
if i.limiter.ShouldBypass(ctx) {
return handler(ctx, req)
}
key := i.getKey(ctx, info.FullMethod) key := i.getKey(ctx, info.FullMethod)
allowed, err := i.limiter.Allow(ctx, key) allowed, err := i.limiter.Allow(ctx, key)
@ -49,6 +55,11 @@ func (i *Interceptor) Stream() grpc.StreamServerInterceptor {
handler grpc.StreamHandler, handler grpc.StreamHandler,
) error { ) error {
ctx := ss.Context() ctx := ss.Context()
if i.limiter.ShouldBypass(ctx) {
return handler(srv, ss)
}
key := i.getKey(ctx, info.FullMethod) key := i.getKey(ctx, info.FullMethod)
allowed, err := i.limiter.Allow(ctx, key) allowed, err := i.limiter.Allow(ctx, key)

View File

@ -2,12 +2,15 @@ package ratelimit
import ( import (
"context" "context"
"crypto/subtle"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
"github.com/Alexander-D-Karpov/concord/internal/infra/cache" "github.com/Alexander-D-Karpov/concord/internal/infra/cache"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"google.golang.org/grpc/metadata"
) )
type Limiter struct { type Limiter struct {
@ -17,6 +20,8 @@ type Limiter struct {
localCache map[string]*rate.Limiter localCache map[string]*rate.Limiter
mu sync.RWMutex mu sync.RWMutex
cleanupDone chan struct{} cleanupDone chan struct{}
bypassToken string
} }
type LimitConfig struct { type LimitConfig struct {
@ -24,10 +29,11 @@ type LimitConfig struct {
Burst int Burst int
} }
func NewLimiter(cache *cache.Cache, requestsPerMinute, burst int, enabled bool) *Limiter { func NewLimiter(cache *cache.Cache, requestsPerMinute, burst int, enabled bool, bypassToken string) *Limiter {
l := &Limiter{ l := &Limiter{
cache: cache, cache: cache,
enabled: enabled, enabled: enabled,
bypassToken: strings.TrimSpace(bypassToken),
limits: map[string]LimitConfig{ limits: map[string]LimitConfig{
"default": { "default": {
RequestsPerMinute: requestsPerMinute, RequestsPerMinute: requestsPerMinute,
@ -57,6 +63,30 @@ func NewLimiter(cache *cache.Cache, requestsPerMinute, burst int, enabled bool)
return l return l
} }
func (l *Limiter) ShouldBypass(ctx context.Context) bool {
if l.bypassToken == "" {
return false
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return false
}
values := md.Get(BypassMetadataKey)
if len(values) == 0 {
return false
}
for _, candidate := range values {
if subtle.ConstantTimeCompare([]byte(strings.TrimSpace(candidate)), []byte(l.bypassToken)) == 1 {
return true
}
}
return false
}
func (l *Limiter) Allow(ctx context.Context, key string) (bool, error) { func (l *Limiter) Allow(ctx context.Context, key string) (bool, error) {
if !l.enabled { if !l.enabled {
return true, nil return true, nil

View File

@ -48,13 +48,17 @@ func EffectiveStatus(statusPreference string, presence string) string {
return StatusOffline return StatusOffline
} }
if statusPreference == StatusDND {
if presence == StatusOffline {
return StatusOffline
}
return StatusDND
}
switch presence { switch presence {
case StatusAway: case StatusAway:
return StatusAway return StatusAway
case StatusOnline: case StatusOnline:
if statusPreference == StatusDND {
return StatusDND
}
return StatusOnline return StatusOnline
default: default:
return StatusOffline return StatusOffline
@ -351,33 +355,10 @@ func (s *Service) UpdateStatus(ctx context.Context, status string) (*User, error
return nil, errors.BadRequest("invalid user id") return nil, errors.BadRequest("invalid user id")
} }
switch status { preference := NormalizeStatusPreference(status)
case StatusAway:
if s.presence != nil {
if err := s.presence.SetAway(ctx, id); err != nil {
return nil, err
}
}
case StatusOnline, StatusDND, StatusOffline, "invisible": if err := s.repo.UpdateStatus(ctx, id, preference); err != nil {
normalized := NormalizeStatusPreference(status) return nil, err
if err := s.repo.UpdateStatus(ctx, id, normalized); err != nil {
return nil, err
}
if s.presence != nil {
if normalized != StatusOffline && s.presence.GetStatus(id) == StatusOffline {
if err := s.presence.SetOnline(ctx, id); err != nil {
return nil, err
}
} else {
s.presence.Refresh(ctx, id)
}
}
default:
return nil, errors.BadRequest("invalid status")
} }
user, err := s.repo.GetByID(ctx, id) user, err := s.repo.GetByID(ctx, id)
@ -386,6 +367,11 @@ func (s *Service) UpdateStatus(ctx context.Context, status string) (*User, error
} }
s.decorateSelfUserStatus(user) s.decorateSelfUserStatus(user)
if s.presence != nil {
s.presence.broadcastCurrentStatus(ctx, id)
}
return user, nil return user, nil
} }

View File

@ -31,7 +31,7 @@ const (
APIPatch = 0 APIPatch = 0
VoiceMajor = 0 VoiceMajor = 0
VoiceMinor = 1 VoiceMinor = 2
VoicePatch = 0 VoicePatch = 0
) )

View File

@ -279,10 +279,11 @@ type ReceiverReport struct {
} }
type ParticipantLeftPayload struct { type ParticipantLeftPayload struct {
UserID string `json:"user_id"` UserID string `json:"user_id"`
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
SSRC uint32 `json:"ssrc"` SSRC uint32 `json:"ssrc"`
VideoSSRC uint32 `json:"video_ssrc,omitempty"` VideoSSRC uint32 `json:"video_ssrc,omitempty"`
ScreenSSRC uint32 `json:"screen_ssrc,omitempty"`
} }
type SubscribePayload struct { type SubscribePayload struct {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,49 @@
package udp
import (
"sync"
"sync/atomic"
)
type packetBuffer struct {
buf []byte
n int
refs atomic.Int32
pool *sync.Pool
}
func newPacketPool() sync.Pool {
p := sync.Pool{}
p.New = func() interface{} {
return &packetBuffer{
buf: make([]byte, maxPacketLen),
pool: &p,
}
}
return p
}
func (p *packetBuffer) PrepareForRead() []byte {
p.n = 0
p.refs.Store(1)
return p.buf[:cap(p.buf)]
}
func (p *packetBuffer) SetLen(n int) {
p.n = n
}
func (p *packetBuffer) Bytes() []byte {
return p.buf[:p.n]
}
func (p *packetBuffer) Retain() {
p.refs.Add(1)
}
func (p *packetBuffer) Release() {
if p.refs.Add(-1) == 0 {
p.n = 0
p.pool.Put(p)
}
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"runtime"
"sync" "sync"
"github.com/Alexander-D-Karpov/concord/internal/auth/jwt" "github.com/Alexander-D-Karpov/concord/internal/auth/jwt"
@ -30,7 +31,7 @@ type ServerPool struct {
} }
type poolJob struct { type poolJob struct {
data []byte pkt *packetBuffer
addr *net.UDPAddr addr *net.UDPAddr
conn *net.UDPConn conn *net.UDPConn
} }
@ -61,12 +62,7 @@ func NewServerPool(
metrics: metrics, metrics: metrics,
workChan: make(chan *poolJob, workChanSize), workChan: make(chan *poolJob, workChanSize),
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
packetPool: sync.Pool{ packetPool: newPacketPool(),
New: func() interface{} {
b := make([]byte, maxPacketLen)
return &b
},
},
} }
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@ -108,6 +104,11 @@ func (p *ServerPool) ConnForPort(port int) *net.UDPConn {
} }
func (p *ServerPool) Start(ctx context.Context) error { func (p *ServerPool) Start(ctx context.Context) error {
numWorkers := runtime.NumCPU() * 2
if numWorkers < 4 {
numWorkers = 4
}
for i := 0; i < numWorkers; i++ { for i := 0; i < numWorkers; i++ {
p.wg.Add(1) p.wg.Add(1)
go p.worker() go p.worker()
@ -136,12 +137,12 @@ func (p *ServerPool) readLoop(conn *net.UDPConn) {
defer p.wg.Done() defer p.wg.Done()
for { for {
bufPtr := p.packetPool.Get().(*[]byte) pkt := p.packetPool.Get().(*packetBuffer)
buf := *bufPtr buf := pkt.PrepareForRead()
n, addr, err := conn.ReadFromUDP(buf) n, addr, err := conn.ReadFromUDP(buf)
if err != nil { if err != nil {
p.packetPool.Put(bufPtr) pkt.Release()
select { select {
case <-p.stopChan: case <-p.stopChan:
return return
@ -151,20 +152,19 @@ func (p *ServerPool) readLoop(conn *net.UDPConn) {
} }
if n > maxPacketLen { if n > maxPacketLen {
p.packetPool.Put(bufPtr) pkt.Release()
continue continue
} }
data := make([]byte, n) pkt.SetLen(n)
copy(data, buf[:n])
p.packetPool.Put(bufPtr)
select { select {
case p.workChan <- &poolJob{data: data, addr: addr, conn: conn}: case p.workChan <- &poolJob{pkt: pkt, addr: addr, conn: conn}:
default: default:
if p.metrics != nil { if p.metrics != nil {
p.metrics.RecordPacketDropped() p.metrics.RecordPacketDropped()
} }
pkt.Release()
} }
} }
} }
@ -176,7 +176,8 @@ func (p *ServerPool) worker() {
select { select {
case job := <-p.workChan: case job := <-p.workChan:
if job != nil { if job != nil {
p.handler.HandlePacket(job.data, job.addr, job.conn) p.handler.HandlePacketOwned(job.pkt.Bytes(), job.pkt, job.addr, job.conn)
job.pkt.Release()
} }
case <-p.stopChan: case <-p.stopChan:
return return

File diff suppressed because it is too large Load Diff

21
tools/voicetest/go.mod Normal file
View File

@ -0,0 +1,21 @@
module github.com/Alexander-D-Karpov/concord/tools/voicetest
go 1.25.1
require (
github.com/Alexander-D-Karpov/concord v0.0.0
golang.org/x/crypto v0.43.0
google.golang.org/grpc v1.76.0
)
require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.34.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 // indirect
google.golang.org/protobuf v1.36.10 // indirect
)
replace github.com/Alexander-D-Karpov/concord => ../../

42
tools/voicetest/go.sum Normal file
View File

@ -0,0 +1,42 @@
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc=
go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 h1:8XJ4pajGwOlasW+L13MnEGA8W4115jJySQtVfS2/IBU=
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4/go.mod h1:NnuHhy+bxcg30o7FnVAZbXsPHUDQ9qKWAQKCD7VxFtk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 h1:i8QOKZfYg6AbGVZzUAY3LrNWCKF8O6zFisU9Wl9RER4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=

635
tools/voicetest/main.go Normal file

File diff suppressed because it is too large Load Diff