mirror of
https://github.com/Alexander-D-Karpov/concord.git
synced 2026-03-16 22:04:15 +03:00
Compare commits
9 Commits
v20260226-
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 59c93d5d47 | |||
| 0adad329dc | |||
| 24e8340b88 | |||
| 029448292e | |||
| dfa99fad23 | |||
| d055a75fd3 | |||
| 512b5ec57e | |||
| 368422e21c | |||
| 1bda3a8a81 |
|
|
@ -124,6 +124,17 @@ message SearchMessagesResponse {
|
||||||
bool has_more = 2;
|
bool has_more = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ListMessagesSinceRequest {
|
||||||
|
string room_id = 1;
|
||||||
|
string after_message_id = 2;
|
||||||
|
int32 limit = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ListMessagesSinceResponse {
|
||||||
|
repeated concord.common.v1.Message messages = 1;
|
||||||
|
bool has_more = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message MarkAsReadRequest {
|
message MarkAsReadRequest {
|
||||||
string room_id = 1;
|
string room_id = 1;
|
||||||
string message_id = 2;
|
string message_id = 2;
|
||||||
|
|
@ -235,6 +246,12 @@ service ChatService {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpc ListMessagesSince(ListMessagesSinceRequest) returns (ListMessagesSinceResponse) {
|
||||||
|
option (google.api.http) = {
|
||||||
|
get: "/v1/rooms/{room_id}/messages/since/{after_message_id}"
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
rpc MarkAsRead(MarkAsReadRequest) returns (MarkAsReadResponse) {
|
rpc MarkAsRead(MarkAsReadRequest) returns (MarkAsReadResponse) {
|
||||||
option (google.api.http) = {
|
option (google.api.http) = {
|
||||||
post: "/v1/rooms/{room_id}/read"
|
post: "/v1/rooms/{room_id}/read"
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ message User {
|
||||||
string status = 6;
|
string status = 6;
|
||||||
string bio = 7;
|
string bio = 7;
|
||||||
string avatar_thumbnail_url = 8;
|
string avatar_thumbnail_url = 8;
|
||||||
|
string status_preference = 9;
|
||||||
}
|
}
|
||||||
|
|
||||||
message AvatarEntry {
|
message AvatarEntry {
|
||||||
|
|
|
||||||
|
|
@ -80,10 +80,19 @@ message ListDMChannelsResponse {
|
||||||
int32 total_unread = 3;
|
int32 total_unread = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message SendDMAttachment {
|
||||||
|
string filename = 1;
|
||||||
|
string content_type = 2;
|
||||||
|
bytes data = 3;
|
||||||
|
int32 width = 4;
|
||||||
|
int32 height = 5;
|
||||||
|
}
|
||||||
|
|
||||||
message SendDMRequest {
|
message SendDMRequest {
|
||||||
string channel_id = 1;
|
string channel_id = 1;
|
||||||
string content = 2;
|
string content = 2;
|
||||||
string reply_to_id = 3;
|
string reply_to_id = 3;
|
||||||
|
repeated SendDMAttachment attachments = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SendDMResponse {
|
message SendDMResponse {
|
||||||
|
|
|
||||||
28
api/proto/unfurl/v1/unfurl.proto
Normal file
28
api/proto/unfurl/v1/unfurl.proto
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package concord.unfurl.v1;
|
||||||
|
|
||||||
|
option go_package = "github.com/Alexander-D-Karpov/concord/api/gen/go/unfurl/v1;unfurlv1";
|
||||||
|
|
||||||
|
import "google/api/annotations.proto";
|
||||||
|
|
||||||
|
message UnfurlRequest {
|
||||||
|
string url = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UnfurlResponse {
|
||||||
|
string url = 1;
|
||||||
|
string title = 2;
|
||||||
|
string description = 3;
|
||||||
|
string image = 4;
|
||||||
|
string site_name = 5;
|
||||||
|
string favicon = 6;
|
||||||
|
}
|
||||||
|
|
||||||
|
service UnfurlService {
|
||||||
|
rpc Unfurl(UnfurlRequest) returns (UnfurlResponse) {
|
||||||
|
option (google.api.http) = {
|
||||||
|
get: "/v1/unfurl"
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -11,10 +11,12 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
unfurlv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/unfurl/v1"
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/readtracking"
|
"github.com/Alexander-D-Karpov/concord/internal/readtracking"
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/security"
|
"github.com/Alexander-D-Karpov/concord/internal/security"
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/swagger"
|
"github.com/Alexander-D-Karpov/concord/internal/swagger"
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/typing"
|
"github.com/Alexander-D-Karpov/concord/internal/typing"
|
||||||
|
"github.com/Alexander-D-Karpov/concord/internal/unfurl"
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/version"
|
"github.com/Alexander-D-Karpov/concord/internal/version"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
@ -115,10 +117,6 @@ func run() error {
|
||||||
}
|
}
|
||||||
logger.Info("migrations applied successfully")
|
logger.Info("migrations applied successfully")
|
||||||
|
|
||||||
if _, err := database.Pool.Exec(ctx, `UPDATE users SET status = 'offline'`); err != nil {
|
|
||||||
logger.Warn("failed to reset user statuses", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
var cacheClient *cache.Cache
|
var cacheClient *cache.Cache
|
||||||
if cfg.Redis.Enabled {
|
if cfg.Redis.Enabled {
|
||||||
cacheClient, err = cache.New(
|
cacheClient, err = cache.New(
|
||||||
|
|
@ -173,10 +171,11 @@ func run() error {
|
||||||
cfg.RateLimit.RequestsPerMinute,
|
cfg.RateLimit.RequestsPerMinute,
|
||||||
cfg.RateLimit.Burst,
|
cfg.RateLimit.Burst,
|
||||||
true,
|
true,
|
||||||
|
cfg.RateLimit.BypassToken,
|
||||||
)
|
)
|
||||||
logger.Info("rate limiting enabled")
|
logger.Info("rate limiting enabled")
|
||||||
} else {
|
} else {
|
||||||
rateLimiter = ratelimit.NewLimiter(nil, 500, 100, false)
|
rateLimiter = ratelimit.NewLimiter(nil, 500, 100, false, cfg.RateLimit.BypassToken)
|
||||||
}
|
}
|
||||||
rateLimitInterceptor := ratelimit.NewInterceptor(rateLimiter)
|
rateLimitInterceptor := ratelimit.NewInterceptor(rateLimiter)
|
||||||
|
|
||||||
|
|
@ -192,9 +191,13 @@ func run() error {
|
||||||
if cacheClient != nil {
|
if cacheClient != nil {
|
||||||
usersRepo = users.NewRepositoryWithCache(database.Pool, cacheClient)
|
usersRepo = users.NewRepositoryWithCache(database.Pool, cacheClient)
|
||||||
}
|
}
|
||||||
|
|
||||||
eventsHub := events.NewHub(logger, database.Pool, aside)
|
eventsHub := events.NewHub(logger, database.Pool, aside)
|
||||||
|
|
||||||
usersService := users.NewService(usersRepo, eventsHub, cfg.Storage.Path, cfg.Storage.URL)
|
presenceManager := users.NewPresenceManager(usersRepo, eventsHub)
|
||||||
|
defer presenceManager.Stop()
|
||||||
|
|
||||||
|
usersService := users.NewService(usersRepo, eventsHub, presenceManager, cfg.Storage.Path, cfg.Storage.URL)
|
||||||
usersHandler := users.NewHandler(usersService)
|
usersHandler := users.NewHandler(usersService)
|
||||||
|
|
||||||
roomsRepo := rooms.NewRepository(database.Pool)
|
roomsRepo := rooms.NewRepository(database.Pool)
|
||||||
|
|
@ -217,9 +220,6 @@ func run() error {
|
||||||
membershipService := membership.NewService(roomsRepo, eventsHub, aside)
|
membershipService := membership.NewService(roomsRepo, eventsHub, aside)
|
||||||
membershipHandler := membership.NewHandler(membershipService)
|
membershipHandler := membership.NewHandler(membershipService)
|
||||||
|
|
||||||
presenceManager := users.NewPresenceManager(usersRepo, eventsHub)
|
|
||||||
defer presenceManager.Stop()
|
|
||||||
|
|
||||||
streamHandler := stream.NewHandler(eventsHub, presenceManager)
|
streamHandler := stream.NewHandler(eventsHub, presenceManager)
|
||||||
|
|
||||||
voiceAssignService := voiceassign.NewService(database.Pool, jwtManager, cacheClient)
|
voiceAssignService := voiceassign.NewService(database.Pool, jwtManager, cacheClient)
|
||||||
|
|
@ -231,7 +231,7 @@ func run() error {
|
||||||
if cacheClient != nil {
|
if cacheClient != nil {
|
||||||
friendsRepo = friends.NewRepositoryWithCache(database.Pool, cacheClient)
|
friendsRepo = friends.NewRepositoryWithCache(database.Pool, cacheClient)
|
||||||
}
|
}
|
||||||
friendsService := friends.NewService(friendsRepo, eventsHub, usersRepo)
|
friendsService := friends.NewService(friendsRepo, eventsHub, usersRepo, presenceManager)
|
||||||
friendsHandler := friends.NewHandler(friendsService)
|
friendsHandler := friends.NewHandler(friendsService)
|
||||||
|
|
||||||
adminService := admin.NewService(database.Pool, roomsRepo, eventsHub, logger)
|
adminService := admin.NewService(database.Pool, roomsRepo, eventsHub, logger)
|
||||||
|
|
@ -239,11 +239,14 @@ func run() error {
|
||||||
|
|
||||||
dmRepo := dm.NewRepository(database.Pool)
|
dmRepo := dm.NewRepository(database.Pool)
|
||||||
dmMsgRepo := dm.NewMessageRepository(database.Pool, snowflakeGen)
|
dmMsgRepo := dm.NewMessageRepository(database.Pool, snowflakeGen)
|
||||||
dmService := dm.NewService(dmRepo, dmMsgRepo, usersRepo, eventsHub, voiceAssignService, logger)
|
dmService := dm.NewService(dmRepo, dmMsgRepo, usersRepo, eventsHub, voiceAssignService, presenceManager, logger)
|
||||||
dmHandler := dm.NewHandler(dmService, storageService)
|
dmHandler := dm.NewHandler(dmService, storageService)
|
||||||
dmHandler.SetReadTrackingService(readTrackingSvc)
|
dmHandler.SetReadTrackingService(readTrackingSvc)
|
||||||
dmHandler.SetTypingService(typingSvc)
|
dmHandler.SetTypingService(typingSvc)
|
||||||
|
|
||||||
|
unfurlService := unfurl.NewService(cacheClient)
|
||||||
|
unfurlHandler := unfurl.NewHandler(unfurlService)
|
||||||
|
|
||||||
var oauthManager *oauth.Manager
|
var oauthManager *oauth.Manager
|
||||||
if len(cfg.Auth.OAuth) > 0 {
|
if len(cfg.Auth.OAuth) > 0 {
|
||||||
oauthManager = oauth.NewManager(cfg.Auth)
|
oauthManager = oauth.NewManager(cfg.Auth)
|
||||||
|
|
@ -311,6 +314,7 @@ func run() error {
|
||||||
friendsv1.RegisterFriendsServiceServer(grpcServer, friendsHandler)
|
friendsv1.RegisterFriendsServiceServer(grpcServer, friendsHandler)
|
||||||
adminv1.RegisterAdminServiceServer(grpcServer, adminHandler)
|
adminv1.RegisterAdminServiceServer(grpcServer, adminHandler)
|
||||||
dmv1.RegisterDMServiceServer(grpcServer, dmHandler)
|
dmv1.RegisterDMServiceServer(grpcServer, dmHandler)
|
||||||
|
unfurlv1.RegisterUnfurlServiceServer(grpcServer, unfurlHandler)
|
||||||
reflection.Register(grpcServer)
|
reflection.Register(grpcServer)
|
||||||
|
|
||||||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.Server.GRPCPort))
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.Server.GRPCPort))
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/version"
|
"github.com/Alexander-D-Karpov/concord/internal/version"
|
||||||
|
"github.com/Alexander-D-Karpov/concord/internal/voice/status"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
|
@ -80,13 +81,12 @@ func run() error {
|
||||||
sessionManager := session.NewManager()
|
sessionManager := session.NewManager()
|
||||||
roomManager := room.NewManager()
|
roomManager := room.NewManager()
|
||||||
|
|
||||||
// Router needs access to both session and room state
|
|
||||||
voiceRouter := router.NewRouter(sessionManager, logger)
|
|
||||||
|
|
||||||
// Metrics and health
|
// Metrics and health
|
||||||
metrics := telemetry.NewMetrics(logger)
|
metrics := telemetry.NewMetrics(logger)
|
||||||
telemetryLogger := telemetry.NewLogger(logger)
|
telemetryLogger := telemetry.NewLogger(logger)
|
||||||
|
|
||||||
|
voiceRouter := router.NewRouter(sessionManager, logger, metrics)
|
||||||
|
|
||||||
healthServer := health.NewServer(logger)
|
healthServer := health.NewServer(logger)
|
||||||
healthServer.RegisterCheck("sessions", func(ctx context.Context) error {
|
healthServer.RegisterCheck("sessions", func(ctx context.Context) error {
|
||||||
sessions := sessionManager.GetAllSessions()
|
sessions := sessionManager.GetAllSessions()
|
||||||
|
|
@ -147,6 +147,14 @@ func run() error {
|
||||||
)
|
)
|
||||||
netinfo.PrintAccessBanner(advertised, "Concord Voice Server")
|
netinfo.PrintAccessBanner(advertised, "Concord Voice Server")
|
||||||
|
|
||||||
|
statusSrv := status.NewServer(sessionManager, jwtManager, metrics, logger)
|
||||||
|
go func() {
|
||||||
|
err := statusSrv.Start(ctx, cfg.Voice.StatusPort)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("status server error", zap.Error(err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
var registrar *discovery.Registrar
|
var registrar *discovery.Registrar
|
||||||
if cfg.Voice.RegistryURL != "" {
|
if cfg.Voice.RegistryURL != "" {
|
||||||
publicAddr := advertised.PublicHost
|
publicAddr := advertised.PublicHost
|
||||||
|
|
|
||||||
60
go.work.sum
Normal file
60
go.work.sum
Normal file
|
|
@ -0,0 +1,60 @@
|
||||||
|
cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
|
||||||
|
cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
|
||||||
|
cloud.google.com/go/compute/metadata v0.7.0 h1:PBWF+iiAerVNe8UCHxdOt6eHLVc3ydFeOCw78U8ytSU=
|
||||||
|
cloud.google.com/go/compute/metadata v0.7.0/go.mod h1:j5MvL9PprKL39t166CoB1uVHfQMs4tFQZZcKwksXUjo=
|
||||||
|
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 h1:UQUsRi8WTzhZntp5313l+CHIAT95ojUI2lpP/ExlZa4=
|
||||||
|
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0/go.mod h1:Cz6ft6Dkn3Et6l2v2a9/RpN7epQ1GtDlO6lj8bEcOvw=
|
||||||
|
github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjHpqDjYY=
|
||||||
|
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
|
||||||
|
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
|
||||||
|
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
|
||||||
|
github.com/antihax/optional v1.0.0 h1:xK2lYat7ZLaVVcIuj82J8kIro4V6kDe0AUDFboUCwcg=
|
||||||
|
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||||
|
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
|
||||||
|
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
|
||||||
|
github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
|
||||||
|
github.com/envoyproxy/go-control-plane v0.13.4/go.mod h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA=
|
||||||
|
github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
|
||||||
|
github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
|
||||||
|
github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI=
|
||||||
|
github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4=
|
||||||
|
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
|
||||||
|
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
|
||||||
|
github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI=
|
||||||
|
github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo=
|
||||||
|
github.com/golang/glog v1.2.5 h1:DrW6hGnjIhtvhOIiAKT6Psh/Kd/ldepEa81DKeiRJ5I=
|
||||||
|
github.com/golang/glog v1.2.5/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
|
||||||
|
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
|
||||||
|
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
|
||||||
|
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||||
|
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||||
|
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
|
||||||
|
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||||
|
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||||
|
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||||
|
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||||
|
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||||
|
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
|
||||||
|
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||||
|
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
|
||||||
|
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
|
||||||
|
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
|
||||||
|
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||||
|
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
|
||||||
|
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
|
||||||
|
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||||
|
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||||
|
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
|
||||||
|
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
|
||||||
|
github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
|
||||||
|
github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
|
||||||
|
go.opentelemetry.io/contrib/detectors/gcp v1.36.0 h1:F7q2tNlCaHY9nMKHR6XH9/qkp8FktLnIcy6jJNyOCQw=
|
||||||
|
go.opentelemetry.io/contrib/detectors/gcp v1.36.0/go.mod h1:IbBN8uAIIx734PTonTPxAxnjc2pQTxWNkwfstZ+6H2k=
|
||||||
|
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||||
|
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||||
|
golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c=
|
||||||
|
golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU=
|
||||||
|
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
|
||||||
|
golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss=
|
||||||
|
golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc=
|
||||||
|
golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg=
|
||||||
|
|
@ -94,6 +94,37 @@ func (h *Handler) SendMessage(ctx context.Context, req *chatv1.SendMessageReques
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) ListMessagesSince(ctx context.Context, req *chatv1.ListMessagesSinceRequest) (*chatv1.ListMessagesSinceResponse, error) {
|
||||||
|
if req.RoomId == "" || req.AfterMessageId == "" {
|
||||||
|
return nil, errors.ToGRPCError(errors.BadRequest("room_id and after_message_id are required"))
|
||||||
|
}
|
||||||
|
|
||||||
|
afterID, err := strconv.ParseInt(req.AfterMessageId, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.ToGRPCError(errors.BadRequest("invalid after_message_id"))
|
||||||
|
}
|
||||||
|
|
||||||
|
limit := int(req.Limit)
|
||||||
|
if limit <= 0 || limit > 200 {
|
||||||
|
limit = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
messages, hasMore, err := h.service.ListMessages(ctx, req.RoomId, nil, &afterID, limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.ToGRPCError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
protoMessages := make([]*commonv1.Message, len(messages))
|
||||||
|
for i, msg := range messages {
|
||||||
|
protoMessages[i] = toProtoMessage(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &chatv1.ListMessagesSinceResponse{
|
||||||
|
Messages: protoMessages,
|
||||||
|
HasMore: hasMore,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handler) EditMessage(ctx context.Context, req *chatv1.EditMessageRequest) (*chatv1.EditMessageResponse, error) {
|
func (h *Handler) EditMessage(ctx context.Context, req *chatv1.EditMessageRequest) (*chatv1.EditMessageResponse, error) {
|
||||||
if req.RoomId == "" || req.MessageId == "" || req.Content == "" {
|
if req.RoomId == "" || req.MessageId == "" || req.Content == "" {
|
||||||
return nil, errors.ToGRPCError(errors.BadRequest("room_id, message_id and content are required"))
|
return nil, errors.ToGRPCError(errors.BadRequest("room_id, message_id and content are required"))
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -70,6 +70,7 @@ type VoiceConfig struct {
|
||||||
Secret string
|
Secret string
|
||||||
RegistryURL string
|
RegistryURL string
|
||||||
PublicHost string
|
PublicHost string
|
||||||
|
StatusPort int
|
||||||
}
|
}
|
||||||
|
|
||||||
type LoggingConfig struct {
|
type LoggingConfig struct {
|
||||||
|
|
@ -97,6 +98,7 @@ type RateLimitConfig struct {
|
||||||
Enabled bool
|
Enabled bool
|
||||||
RequestsPerMinute int
|
RequestsPerMinute int
|
||||||
Burst int
|
Burst int
|
||||||
|
BypassToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
type EmailConfig struct {
|
type EmailConfig struct {
|
||||||
|
|
@ -150,6 +152,7 @@ func Load() (*Config, error) {
|
||||||
Secret: getEnv("VOICE_SECRET", "change-me-voice-server-secret"),
|
Secret: getEnv("VOICE_SECRET", "change-me-voice-server-secret"),
|
||||||
RegistryURL: getEnv("REGISTRY_URL", "localhost:9090"),
|
RegistryURL: getEnv("REGISTRY_URL", "localhost:9090"),
|
||||||
PublicHost: getEnv("VOICE_PUBLIC_HOST", "localhost"),
|
PublicHost: getEnv("VOICE_PUBLIC_HOST", "localhost"),
|
||||||
|
StatusPort: getEnvInt("VOICE_STATUS_PORT", 9092),
|
||||||
},
|
},
|
||||||
Logging: LoggingConfig{
|
Logging: LoggingConfig{
|
||||||
Level: getEnv("LOG_LEVEL", "info"),
|
Level: getEnv("LOG_LEVEL", "info"),
|
||||||
|
|
@ -169,6 +172,7 @@ func Load() (*Config, error) {
|
||||||
Enabled: getEnvBool("RATE_LIMIT_ENABLED", true),
|
Enabled: getEnvBool("RATE_LIMIT_ENABLED", true),
|
||||||
RequestsPerMinute: getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60),
|
RequestsPerMinute: getEnvInt("RATE_LIMIT_REQUESTS_PER_MINUTE", 60),
|
||||||
Burst: getEnvInt("RATE_LIMIT_BURST", 10),
|
Burst: getEnvInt("RATE_LIMIT_BURST", 10),
|
||||||
|
BypassToken: getEnv("RATE_LIMIT_BYPASS_TOKEN", ""),
|
||||||
},
|
},
|
||||||
Storage: StorageConfig{
|
Storage: StorageConfig{
|
||||||
Path: getEnv("STORAGE_PATH", "./uploads"),
|
Path: getEnv("STORAGE_PATH", "./uploads"),
|
||||||
|
|
|
||||||
|
|
@ -103,11 +103,40 @@ func (h *Handler) SendDM(ctx context.Context, req *dmv1.SendDMRequest) (*dmv1.Se
|
||||||
if req.ChannelId == "" {
|
if req.ChannelId == "" {
|
||||||
return nil, errors.ToGRPCError(errors.BadRequest("channel_id is required"))
|
return nil, errors.ToGRPCError(errors.BadRequest("channel_id is required"))
|
||||||
}
|
}
|
||||||
if req.Content == "" {
|
if req.Content == "" && len(req.Attachments) == 0 {
|
||||||
return nil, errors.ToGRPCError(errors.BadRequest("content is required"))
|
return nil, errors.ToGRPCError(errors.BadRequest("content or attachments are required"))
|
||||||
}
|
}
|
||||||
|
|
||||||
msg, err := h.service.SendMessage(ctx, req.ChannelId, req.Content, req.ReplyToId, nil, nil)
|
var attachments []DMAttachment
|
||||||
|
for _, att := range req.Attachments {
|
||||||
|
if len(att.Data) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fileInfo, err := h.storage.Store(ctx, att.Data, att.Filename, att.ContentType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.ToGRPCError(errors.BadRequest("failed to store attachment: " + err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
width := int(att.Width)
|
||||||
|
height := int(att.Height)
|
||||||
|
if fileInfo.Width > 0 {
|
||||||
|
width = fileInfo.Width
|
||||||
|
height = fileInfo.Height
|
||||||
|
}
|
||||||
|
|
||||||
|
attachments = append(attachments, DMAttachment{
|
||||||
|
ID: uuid.MustParse(fileInfo.ID),
|
||||||
|
URL: fileInfo.URL,
|
||||||
|
Filename: att.Filename,
|
||||||
|
ContentType: fileInfo.ContentType,
|
||||||
|
Size: fileInfo.Size,
|
||||||
|
Width: width,
|
||||||
|
Height: height,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := h.service.SendMessage(ctx, req.ChannelId, req.Content, req.ReplyToId, attachments, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.ToGRPCError(err)
|
return nil, errors.ToGRPCError(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,20 +22,44 @@ type Service struct {
|
||||||
usersRepo *users.Repository
|
usersRepo *users.Repository
|
||||||
hub *events.Hub
|
hub *events.Hub
|
||||||
voiceAssign *voiceassign.Service
|
voiceAssign *voiceassign.Service
|
||||||
|
presence *users.PresenceManager
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(repo *Repository, msgRepo *MessageRepository, usersRepo *users.Repository, hub *events.Hub, voiceAssign *voiceassign.Service, logger *zap.Logger) *Service {
|
func NewService(
|
||||||
|
repo *Repository,
|
||||||
|
msgRepo *MessageRepository,
|
||||||
|
usersRepo *users.Repository,
|
||||||
|
hub *events.Hub,
|
||||||
|
voiceAssign *voiceassign.Service,
|
||||||
|
presence *users.PresenceManager,
|
||||||
|
logger *zap.Logger,
|
||||||
|
) *Service {
|
||||||
return &Service{
|
return &Service{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
msgRepo: msgRepo,
|
msgRepo: msgRepo,
|
||||||
usersRepo: usersRepo,
|
usersRepo: usersRepo,
|
||||||
hub: hub,
|
hub: hub,
|
||||||
voiceAssign: voiceAssign,
|
voiceAssign: voiceAssign,
|
||||||
|
presence: presence,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) decorateChannelStatuses(channels []*DMChannelWithUser) {
|
||||||
|
for _, ch := range channels {
|
||||||
|
presence := users.StatusOffline
|
||||||
|
if s.presence != nil {
|
||||||
|
presence = s.presence.GetStatus(ch.OtherUserID)
|
||||||
|
}
|
||||||
|
|
||||||
|
ch.OtherUserStatus = users.EffectiveStatus(
|
||||||
|
users.NormalizeStatusPreference(ch.OtherUserStatus),
|
||||||
|
presence,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) GetOrCreateDM(ctx context.Context, otherUserID string) (*DMChannel, error) {
|
func (s *Service) GetOrCreateDM(ctx context.Context, otherUserID string) (*DMChannel, error) {
|
||||||
userID := interceptor.GetUserID(ctx)
|
userID := interceptor.GetUserID(ctx)
|
||||||
if userID == "" {
|
if userID == "" {
|
||||||
|
|
@ -80,7 +104,13 @@ func (s *Service) ListDMs(ctx context.Context) ([]*DMChannelWithUser, error) {
|
||||||
return nil, errors.BadRequest("invalid user id")
|
return nil, errors.BadRequest("invalid user id")
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.repo.ListByUser(ctx, userUUID)
|
channels, err := s.repo.ListByUser(ctx, userUUID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.decorateChannelStatuses(channels)
|
||||||
|
return channels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) CloseDM(ctx context.Context, channelID string) error {
|
func (s *Service) CloseDM(ctx context.Context, channelID string) error {
|
||||||
|
|
|
||||||
|
|
@ -99,8 +99,9 @@ func (r *Repository) CreateFriendRequest(ctx context.Context, fromUserID, toUser
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO friend_requests (id, from_user_id, to_user_id, status)
|
INSERT INTO friend_requests (id, from_user_id, to_user_id, status)
|
||||||
VALUES ($1, $2, $3, $4)
|
VALUES ($1, $2, $3, $4)
|
||||||
ON CONFLICT (from_user_id, to_user_id) DO UPDATE SET status = 'pending', updated_at = NOW()
|
ON CONFLICT (from_user_id, to_user_id)
|
||||||
RETURNING created_at, updated_at
|
DO UPDATE SET status = 'pending', updated_at = NOW()
|
||||||
|
RETURNING id, created_at, updated_at
|
||||||
`
|
`
|
||||||
|
|
||||||
err := r.pool.QueryRow(ctx, query,
|
err := r.pool.QueryRow(ctx, query,
|
||||||
|
|
@ -108,7 +109,7 @@ func (r *Repository) CreateFriendRequest(ctx context.Context, fromUserID, toUser
|
||||||
req.FromUserID,
|
req.FromUserID,
|
||||||
req.ToUserID,
|
req.ToUserID,
|
||||||
req.Status,
|
req.Status,
|
||||||
).Scan(&req.CreatedAt, &req.UpdatedAt)
|
).Scan(&req.ID, &req.CreatedAt, &req.UpdatedAt)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -13,6 +13,7 @@ import (
|
||||||
friendsv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/friends/v1"
|
friendsv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/friends/v1"
|
||||||
membershipv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/membership/v1"
|
membershipv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/membership/v1"
|
||||||
roomsv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/rooms/v1"
|
roomsv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/rooms/v1"
|
||||||
|
unfurlv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/unfurl/v1"
|
||||||
usersv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/users/v1"
|
usersv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/users/v1"
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/middleware"
|
"github.com/Alexander-D-Karpov/concord/internal/middleware"
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/version"
|
"github.com/Alexander-D-Karpov/concord/internal/version"
|
||||||
|
|
@ -62,6 +63,7 @@ func (g *Gateway) Init(ctx context.Context) error {
|
||||||
callv1.RegisterCallServiceHandlerFromEndpoint,
|
callv1.RegisterCallServiceHandlerFromEndpoint,
|
||||||
friendsv1.RegisterFriendsServiceHandlerFromEndpoint,
|
friendsv1.RegisterFriendsServiceHandlerFromEndpoint,
|
||||||
dmv1.RegisterDMServiceHandlerFromEndpoint,
|
dmv1.RegisterDMServiceHandlerFromEndpoint,
|
||||||
|
unfurlv1.RegisterUnfurlServiceHandlerFromEndpoint,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, register := range handlers {
|
for _, register := range handlers {
|
||||||
|
|
@ -122,7 +124,7 @@ func (g *Gateway) Start(ctx context.Context, port int) error {
|
||||||
|
|
||||||
func customMatcher(key string) (string, bool) {
|
func customMatcher(key string) (string, bool) {
|
||||||
switch key {
|
switch key {
|
||||||
case "authorization", "x-request-id", "x-correlation-id", "grpc-timeout":
|
case "authorization", "x-request-id", "x-correlation-id", "grpc-timeout", "x-concord-ratelimit-bypass":
|
||||||
return key, true
|
return key, true
|
||||||
default:
|
default:
|
||||||
return runtime.DefaultHeaderMatcher(key)
|
return runtime.DefaultHeaderMatcher(key)
|
||||||
|
|
|
||||||
13
internal/infra/migrations/012_fts_improvements.sql
Normal file
13
internal/infra/migrations/012_fts_improvements.sql
Normal file
|
|
@ -0,0 +1,13 @@
|
||||||
|
DROP INDEX IF EXISTS idx_messages_content_search;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_messages_fts
|
||||||
|
ON messages USING gin(to_tsvector('simple', content))
|
||||||
|
WHERE deleted_at IS NULL;
|
||||||
|
|
||||||
|
DROP INDEX IF EXISTS idx_dm_messages_content_search;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_dm_messages_fts
|
||||||
|
ON dm_messages USING gin(to_tsvector('simple', content))
|
||||||
|
WHERE deleted_at IS NULL;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_users_handle_lower ON users(lower(handle));
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -11,6 +11,8 @@ import (
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const BypassMetadataKey = "x-concord-ratelimit-bypass"
|
||||||
|
|
||||||
type Interceptor struct {
|
type Interceptor struct {
|
||||||
limiter *Limiter
|
limiter *Limiter
|
||||||
}
|
}
|
||||||
|
|
@ -26,6 +28,10 @@ func (i *Interceptor) Unary() grpc.UnaryServerInterceptor {
|
||||||
info *grpc.UnaryServerInfo,
|
info *grpc.UnaryServerInfo,
|
||||||
handler grpc.UnaryHandler,
|
handler grpc.UnaryHandler,
|
||||||
) (interface{}, error) {
|
) (interface{}, error) {
|
||||||
|
if i.limiter.ShouldBypass(ctx) {
|
||||||
|
return handler(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
key := i.getKey(ctx, info.FullMethod)
|
key := i.getKey(ctx, info.FullMethod)
|
||||||
|
|
||||||
allowed, err := i.limiter.Allow(ctx, key)
|
allowed, err := i.limiter.Allow(ctx, key)
|
||||||
|
|
@ -49,6 +55,11 @@ func (i *Interceptor) Stream() grpc.StreamServerInterceptor {
|
||||||
handler grpc.StreamHandler,
|
handler grpc.StreamHandler,
|
||||||
) error {
|
) error {
|
||||||
ctx := ss.Context()
|
ctx := ss.Context()
|
||||||
|
|
||||||
|
if i.limiter.ShouldBypass(ctx) {
|
||||||
|
return handler(srv, ss)
|
||||||
|
}
|
||||||
|
|
||||||
key := i.getKey(ctx, info.FullMethod)
|
key := i.getKey(ctx, info.FullMethod)
|
||||||
|
|
||||||
allowed, err := i.limiter.Allow(ctx, key)
|
allowed, err := i.limiter.Allow(ctx, key)
|
||||||
|
|
|
||||||
|
|
@ -2,12 +2,15 @@ package ratelimit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/subtle"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/infra/cache"
|
"github.com/Alexander-D-Karpov/concord/internal/infra/cache"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Limiter struct {
|
type Limiter struct {
|
||||||
|
|
@ -17,6 +20,8 @@ type Limiter struct {
|
||||||
localCache map[string]*rate.Limiter
|
localCache map[string]*rate.Limiter
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
cleanupDone chan struct{}
|
cleanupDone chan struct{}
|
||||||
|
|
||||||
|
bypassToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
type LimitConfig struct {
|
type LimitConfig struct {
|
||||||
|
|
@ -24,10 +29,11 @@ type LimitConfig struct {
|
||||||
Burst int
|
Burst int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLimiter(cache *cache.Cache, requestsPerMinute, burst int, enabled bool) *Limiter {
|
func NewLimiter(cache *cache.Cache, requestsPerMinute, burst int, enabled bool, bypassToken string) *Limiter {
|
||||||
l := &Limiter{
|
l := &Limiter{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
enabled: enabled,
|
enabled: enabled,
|
||||||
|
bypassToken: strings.TrimSpace(bypassToken),
|
||||||
limits: map[string]LimitConfig{
|
limits: map[string]LimitConfig{
|
||||||
"default": {
|
"default": {
|
||||||
RequestsPerMinute: requestsPerMinute,
|
RequestsPerMinute: requestsPerMinute,
|
||||||
|
|
@ -57,6 +63,30 @@ func NewLimiter(cache *cache.Cache, requestsPerMinute, burst int, enabled bool)
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Limiter) ShouldBypass(ctx context.Context) bool {
|
||||||
|
if l.bypassToken == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
values := md.Get(BypassMetadataKey)
|
||||||
|
if len(values) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, candidate := range values {
|
||||||
|
if subtle.ConstantTimeCompare([]byte(strings.TrimSpace(candidate)), []byte(l.bypassToken)) == 1 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Limiter) Allow(ctx context.Context, key string) (bool, error) {
|
func (l *Limiter) Allow(ctx context.Context, key string) (bool, error) {
|
||||||
if !l.enabled {
|
if !l.enabled {
|
||||||
return true, nil
|
return true, nil
|
||||||
|
|
|
||||||
|
|
@ -451,8 +451,13 @@ func (r *Repository) AssignVoiceServer(ctx context.Context, roomID, serverID uui
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Repository) CreateRoomInvite(ctx context.Context, roomID, invitedUserID, invitedBy uuid.UUID) (*RoomInvite, error) {
|
func (r *Repository) CreateRoomInvite(
|
||||||
invite := &RoomInvite{
|
ctx context.Context,
|
||||||
|
roomID uuid.UUID,
|
||||||
|
invitedUserID uuid.UUID,
|
||||||
|
invitedBy uuid.UUID,
|
||||||
|
) (*RoomInvite, error) {
|
||||||
|
inv := &RoomInvite{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
InvitedUserID: invitedUserID,
|
InvitedUserID: invitedUserID,
|
||||||
|
|
@ -463,22 +468,28 @@ func (r *Repository) CreateRoomInvite(ctx context.Context, roomID, invitedUserID
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO room_invites (id, room_id, invited_user_id, invited_by, status)
|
INSERT INTO room_invites (id, room_id, invited_user_id, invited_by, status)
|
||||||
VALUES ($1, $2, $3, $4, $5)
|
VALUES ($1, $2, $3, $4, $5)
|
||||||
ON CONFLICT (room_id, invited_user_id) DO UPDATE SET
|
ON CONFLICT (room_id, invited_user_id)
|
||||||
invited_by = $4,
|
DO UPDATE SET
|
||||||
status = 'pending',
|
status = 'pending',
|
||||||
|
invited_by = EXCLUDED.invited_by,
|
||||||
updated_at = NOW()
|
updated_at = NOW()
|
||||||
RETURNING created_at, updated_at
|
RETURNING id, created_at, updated_at
|
||||||
`
|
`
|
||||||
|
|
||||||
err := r.pool.QueryRow(ctx, query,
|
err := r.pool.QueryRow(
|
||||||
invite.ID,
|
ctx,
|
||||||
invite.RoomID,
|
query,
|
||||||
invite.InvitedUserID,
|
inv.ID,
|
||||||
invite.InvitedBy,
|
inv.RoomID,
|
||||||
invite.Status,
|
inv.InvitedUserID,
|
||||||
).Scan(&invite.CreatedAt, &invite.UpdatedAt)
|
inv.InvitedBy,
|
||||||
|
inv.Status,
|
||||||
|
).Scan(&inv.ID, &inv.CreatedAt, &inv.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return invite, err
|
return inv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Repository) GetRoomInvite(ctx context.Context, id uuid.UUID) (*RoomInvite, error) {
|
func (r *Repository) GetRoomInvite(ctx context.Context, id uuid.UUID) (*RoomInvite, error) {
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package typing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
streamv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/stream/v1"
|
streamv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/stream/v1"
|
||||||
|
|
@ -15,6 +16,8 @@ type Service struct {
|
||||||
repo *Repository
|
repo *Repository
|
||||||
hub *events.Hub
|
hub *events.Hub
|
||||||
usersRepo *users.Repository
|
usersRepo *users.Repository
|
||||||
|
rateMu sync.Mutex
|
||||||
|
lastTyped map[string]time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(repo *Repository, hub *events.Hub, usersRepo *users.Repository) *Service {
|
func NewService(repo *Repository, hub *events.Hub, usersRepo *users.Repository) *Service {
|
||||||
|
|
@ -22,10 +25,40 @@ func NewService(repo *Repository, hub *events.Hub, usersRepo *users.Repository)
|
||||||
repo: repo,
|
repo: repo,
|
||||||
hub: hub,
|
hub: hub,
|
||||||
usersRepo: usersRepo,
|
usersRepo: usersRepo,
|
||||||
|
lastTyped: make(map[string]time.Time),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const typingRateLimit = 2 * time.Second
|
||||||
|
|
||||||
|
func (s *Service) checkTypingRate(userID uuid.UUID, targetID uuid.UUID) bool {
|
||||||
|
key := userID.String() + ":" + targetID.String()
|
||||||
|
s.rateMu.Lock()
|
||||||
|
defer s.rateMu.Unlock()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
if last, ok := s.lastTyped[key]; ok && now.Sub(last) < typingRateLimit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
s.lastTyped[key] = now
|
||||||
|
|
||||||
|
if len(s.lastTyped) > 10000 {
|
||||||
|
cutoff := now.Add(-typingRateLimit * 2)
|
||||||
|
for k, t := range s.lastTyped {
|
||||||
|
if t.Before(cutoff) {
|
||||||
|
delete(s.lastTyped, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) StartTypingInRoom(ctx context.Context, userID, roomID uuid.UUID) error {
|
func (s *Service) StartTypingInRoom(ctx context.Context, userID, roomID uuid.UUID) error {
|
||||||
|
if !s.checkTypingRate(userID, roomID) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.repo.SetTypingInRoom(ctx, userID, roomID); err != nil {
|
if err := s.repo.SetTypingInRoom(ctx, userID, roomID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -44,6 +77,10 @@ func (s *Service) StopTypingInRoom(ctx context.Context, userID, roomID uuid.UUID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) StartTypingInDM(ctx context.Context, userID, channelID uuid.UUID, otherUserID uuid.UUID) error {
|
func (s *Service) StartTypingInDM(ctx context.Context, userID, channelID uuid.UUID, otherUserID uuid.UUID) error {
|
||||||
|
if !s.checkTypingRate(userID, channelID) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.repo.SetTypingInDM(ctx, userID, channelID); err != nil {
|
if err := s.repo.SetTypingInDM(ctx, userID, channelID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
37
internal/unfurl/handler.go
Normal file
37
internal/unfurl/handler.go
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
package unfurl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
unfurlv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/unfurl/v1"
|
||||||
|
"github.com/Alexander-D-Karpov/concord/internal/common/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Handler struct {
|
||||||
|
unfurlv1.UnimplementedUnfurlServiceServer
|
||||||
|
service *Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHandler(service *Service) *Handler {
|
||||||
|
return &Handler{service: service}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) Unfurl(ctx context.Context, req *unfurlv1.UnfurlRequest) (*unfurlv1.UnfurlResponse, error) {
|
||||||
|
if req.Url == "" {
|
||||||
|
return nil, errors.ToGRPCError(errors.BadRequest("url is required"))
|
||||||
|
}
|
||||||
|
|
||||||
|
preview, err := h.service.Unfurl(ctx, req.Url)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.ToGRPCError(errors.BadRequest("failed to unfurl: " + err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &unfurlv1.UnfurlResponse{
|
||||||
|
Url: preview.URL,
|
||||||
|
Title: preview.Title,
|
||||||
|
Description: preview.Description,
|
||||||
|
Image: preview.Image,
|
||||||
|
SiteName: preview.SiteName,
|
||||||
|
Favicon: preview.Favicon,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
193
internal/unfurl/service.go
Normal file
193
internal/unfurl/service.go
Normal file
File diff suppressed because it is too large
Load Diff
|
|
@ -31,16 +31,46 @@ type ProcessedAvatar struct {
|
||||||
Height int
|
Height int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var magicBytes = map[string][]byte{
|
||||||
|
"image/jpeg": {0xFF, 0xD8, 0xFF},
|
||||||
|
"image/png": {0x89, 0x50, 0x4E, 0x47},
|
||||||
|
"image/gif": {0x47, 0x49, 0x46},
|
||||||
|
"image/webp": {0x52, 0x49, 0x46, 0x46},
|
||||||
|
}
|
||||||
|
|
||||||
|
func ValidateImageMagic(data []byte) (string, error) {
|
||||||
|
for mime, magic := range magicBytes {
|
||||||
|
if len(data) >= len(magic) {
|
||||||
|
match := true
|
||||||
|
for i, b := range magic {
|
||||||
|
if data[i] != b {
|
||||||
|
match = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if match {
|
||||||
|
return mime, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("unrecognized image format")
|
||||||
|
}
|
||||||
|
|
||||||
func ProcessAvatarImage(data []byte) (*ProcessedAvatar, error) {
|
func ProcessAvatarImage(data []byte) (*ProcessedAvatar, error) {
|
||||||
if len(data) > MaxAvatarBytes {
|
if len(data) > MaxAvatarBytes {
|
||||||
return nil, fmt.Errorf("image too large: %d bytes (max %d)", len(data), MaxAvatarBytes)
|
return nil, fmt.Errorf("image too large: %d bytes (max %d)", len(data), MaxAvatarBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err := ValidateImageMagic(data); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid image: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
src, _, err := image.Decode(bytes.NewReader(data))
|
src, _, err := image.Decode(bytes.NewReader(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("decode image: %w", err)
|
return nil, fmt.Errorf("decode image: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-encoding to JPEG strips all EXIF/metadata automatically
|
||||||
bounds := src.Bounds()
|
bounds := src.Bounds()
|
||||||
w, h := bounds.Dx(), bounds.Dy()
|
w, h := bounds.Dx(), bounds.Dy()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -160,6 +160,7 @@ func toProtoUser(user *User) *commonv1.User {
|
||||||
AvatarThumbnailUrl: user.AvatarThumbnailURL,
|
AvatarThumbnailUrl: user.AvatarThumbnailURL,
|
||||||
CreatedAt: timestamppb.New(user.CreatedAt),
|
CreatedAt: timestamppb.New(user.CreatedAt),
|
||||||
Status: user.Status,
|
Status: user.Status,
|
||||||
|
StatusPreference: user.StatusPreference,
|
||||||
}
|
}
|
||||||
if user.Bio != "" {
|
if user.Bio != "" {
|
||||||
proto.Bio = user.Bio
|
proto.Bio = user.Bio
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -25,6 +25,7 @@ type User struct {
|
||||||
OAuthSubject *string
|
OAuthSubject *string
|
||||||
CreatedAt time.Time
|
CreatedAt time.Time
|
||||||
DeletedAt *time.Time
|
DeletedAt *time.Time
|
||||||
|
StatusPreference string
|
||||||
}
|
}
|
||||||
|
|
||||||
type UserAvatar struct {
|
type UserAvatar struct {
|
||||||
|
|
@ -467,3 +468,24 @@ func (r *Repository) GetLatestUserAvatar(ctx context.Context, userID uuid.UUID)
|
||||||
}
|
}
|
||||||
return av, err
|
return av, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Repository) GetStatusPreference(ctx context.Context, userID uuid.UUID) (string, error) {
|
||||||
|
var status string
|
||||||
|
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT COALESCE(status, 'online') FROM users WHERE id = $1`,
|
||||||
|
userID,
|
||||||
|
).Scan(&status)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch status {
|
||||||
|
case StatusDND:
|
||||||
|
return StatusDND, nil
|
||||||
|
case StatusOffline:
|
||||||
|
return StatusOffline, nil
|
||||||
|
default:
|
||||||
|
return StatusOnline, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -27,11 +27,11 @@ var concordeFleet = []string{
|
||||||
|
|
||||||
const (
|
const (
|
||||||
APIMajor = 0
|
APIMajor = 0
|
||||||
APIMinor = 1
|
APIMinor = 3
|
||||||
APIPatch = 0
|
APIPatch = 0
|
||||||
|
|
||||||
VoiceMajor = 0
|
VoiceMajor = 0
|
||||||
VoiceMinor = 1
|
VoiceMinor = 2
|
||||||
VoicePatch = 0
|
VoicePatch = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -283,6 +283,7 @@ type ParticipantLeftPayload struct {
|
||||||
RoomID string `json:"room_id"`
|
RoomID string `json:"room_id"`
|
||||||
SSRC uint32 `json:"ssrc"`
|
SSRC uint32 `json:"ssrc"`
|
||||||
VideoSSRC uint32 `json:"video_ssrc,omitempty"`
|
VideoSSRC uint32 `json:"video_ssrc,omitempty"`
|
||||||
|
ScreenSSRC uint32 `json:"screen_ssrc,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SubscribePayload struct {
|
type SubscribePayload struct {
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
190
internal/voice/status/server.go
Normal file
190
internal/voice/status/server.go
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
49
internal/voice/udp/packet_buffer.go
Normal file
49
internal/voice/udp/packet_buffer.go
Normal file
|
|
@ -0,0 +1,49 @@
|
||||||
|
package udp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type packetBuffer struct {
|
||||||
|
buf []byte
|
||||||
|
n int
|
||||||
|
refs atomic.Int32
|
||||||
|
pool *sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPacketPool() sync.Pool {
|
||||||
|
p := sync.Pool{}
|
||||||
|
p.New = func() interface{} {
|
||||||
|
return &packetBuffer{
|
||||||
|
buf: make([]byte, maxPacketLen),
|
||||||
|
pool: &p,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *packetBuffer) PrepareForRead() []byte {
|
||||||
|
p.n = 0
|
||||||
|
p.refs.Store(1)
|
||||||
|
return p.buf[:cap(p.buf)]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *packetBuffer) SetLen(n int) {
|
||||||
|
p.n = n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *packetBuffer) Bytes() []byte {
|
||||||
|
return p.buf[:p.n]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *packetBuffer) Retain() {
|
||||||
|
p.refs.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *packetBuffer) Release() {
|
||||||
|
if p.refs.Add(-1) == 0 {
|
||||||
|
p.n = 0
|
||||||
|
p.pool.Put(p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/auth/jwt"
|
"github.com/Alexander-D-Karpov/concord/internal/auth/jwt"
|
||||||
|
|
@ -30,7 +31,7 @@ type ServerPool struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type poolJob struct {
|
type poolJob struct {
|
||||||
data []byte
|
pkt *packetBuffer
|
||||||
addr *net.UDPAddr
|
addr *net.UDPAddr
|
||||||
conn *net.UDPConn
|
conn *net.UDPConn
|
||||||
}
|
}
|
||||||
|
|
@ -61,12 +62,7 @@ func NewServerPool(
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
workChan: make(chan *poolJob, workChanSize),
|
workChan: make(chan *poolJob, workChanSize),
|
||||||
stopChan: make(chan struct{}),
|
stopChan: make(chan struct{}),
|
||||||
packetPool: sync.Pool{
|
packetPool: newPacketPool(),
|
||||||
New: func() interface{} {
|
|
||||||
b := make([]byte, maxPacketLen)
|
|
||||||
return &b
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
|
|
@ -108,6 +104,11 @@ func (p *ServerPool) ConnForPort(port int) *net.UDPConn {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ServerPool) Start(ctx context.Context) error {
|
func (p *ServerPool) Start(ctx context.Context) error {
|
||||||
|
numWorkers := runtime.NumCPU() * 2
|
||||||
|
if numWorkers < 4 {
|
||||||
|
numWorkers = 4
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; i < numWorkers; i++ {
|
for i := 0; i < numWorkers; i++ {
|
||||||
p.wg.Add(1)
|
p.wg.Add(1)
|
||||||
go p.worker()
|
go p.worker()
|
||||||
|
|
@ -136,12 +137,12 @@ func (p *ServerPool) readLoop(conn *net.UDPConn) {
|
||||||
defer p.wg.Done()
|
defer p.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
bufPtr := p.packetPool.Get().(*[]byte)
|
pkt := p.packetPool.Get().(*packetBuffer)
|
||||||
buf := *bufPtr
|
buf := pkt.PrepareForRead()
|
||||||
|
|
||||||
n, addr, err := conn.ReadFromUDP(buf)
|
n, addr, err := conn.ReadFromUDP(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.packetPool.Put(bufPtr)
|
pkt.Release()
|
||||||
select {
|
select {
|
||||||
case <-p.stopChan:
|
case <-p.stopChan:
|
||||||
return
|
return
|
||||||
|
|
@ -151,20 +152,19 @@ func (p *ServerPool) readLoop(conn *net.UDPConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if n > maxPacketLen {
|
if n > maxPacketLen {
|
||||||
p.packetPool.Put(bufPtr)
|
pkt.Release()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
data := make([]byte, n)
|
pkt.SetLen(n)
|
||||||
copy(data, buf[:n])
|
|
||||||
p.packetPool.Put(bufPtr)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case p.workChan <- &poolJob{data: data, addr: addr, conn: conn}:
|
case p.workChan <- &poolJob{pkt: pkt, addr: addr, conn: conn}:
|
||||||
default:
|
default:
|
||||||
if p.metrics != nil {
|
if p.metrics != nil {
|
||||||
p.metrics.RecordPacketDropped()
|
p.metrics.RecordPacketDropped()
|
||||||
}
|
}
|
||||||
|
pkt.Release()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -176,7 +176,8 @@ func (p *ServerPool) worker() {
|
||||||
select {
|
select {
|
||||||
case job := <-p.workChan:
|
case job := <-p.workChan:
|
||||||
if job != nil {
|
if job != nil {
|
||||||
p.handler.HandlePacket(job.data, job.addr, job.conn)
|
p.handler.HandlePacketOwned(job.pkt.Bytes(), job.pkt, job.addr, job.conn)
|
||||||
|
job.pkt.Release()
|
||||||
}
|
}
|
||||||
case <-p.stopChan:
|
case <-p.stopChan:
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/Alexander-D-Karpov/concord/internal/auth/jwt"
|
"github.com/Alexander-D-Karpov/concord/internal/auth/jwt"
|
||||||
|
|
@ -27,12 +28,11 @@ type Server struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type packetJob struct {
|
type packetJob struct {
|
||||||
data []byte
|
pkt *packetBuffer
|
||||||
addr *net.UDPAddr
|
addr *net.UDPAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
numWorkers = 4
|
|
||||||
workChanSize = 10000
|
workChanSize = 10000
|
||||||
maxPacketLen = 1500
|
maxPacketLen = 1500
|
||||||
)
|
)
|
||||||
|
|
@ -78,18 +78,18 @@ func NewServer(
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
stopChan: make(chan struct{}),
|
stopChan: make(chan struct{}),
|
||||||
workChan: make(chan *packetJob, workChanSize),
|
workChan: make(chan *packetJob, workChanSize),
|
||||||
packetPool: sync.Pool{
|
packetPool: newPacketPool(),
|
||||||
New: func() interface{} {
|
|
||||||
b := make([]byte, maxPacketLen)
|
|
||||||
return &b
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Start(ctx context.Context) error {
|
func (s *Server) Start(ctx context.Context) error {
|
||||||
s.logger.Info("UDP server starting", zap.String("address", s.conn.LocalAddr().String()))
|
s.logger.Info("UDP server starting", zap.String("address", s.conn.LocalAddr().String()))
|
||||||
|
|
||||||
|
numWorkers := runtime.NumCPU() * 2
|
||||||
|
if numWorkers < 4 {
|
||||||
|
numWorkers = 4
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; i < numWorkers; i++ {
|
for i := 0; i < numWorkers; i++ {
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.worker()
|
go s.worker()
|
||||||
|
|
@ -110,12 +110,12 @@ func (s *Server) readLoop() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
bufPtr := s.packetPool.Get().(*[]byte)
|
pkt := s.packetPool.Get().(*packetBuffer)
|
||||||
buf := *bufPtr
|
buf := pkt.PrepareForRead()
|
||||||
|
|
||||||
n, addr, err := s.conn.ReadFromUDP(buf)
|
n, addr, err := s.conn.ReadFromUDP(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.packetPool.Put(bufPtr)
|
pkt.Release()
|
||||||
select {
|
select {
|
||||||
case <-s.stopChan:
|
case <-s.stopChan:
|
||||||
return
|
return
|
||||||
|
|
@ -125,20 +125,19 @@ func (s *Server) readLoop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if n > maxPacketLen {
|
if n > maxPacketLen {
|
||||||
s.packetPool.Put(bufPtr)
|
pkt.Release()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
data := make([]byte, n)
|
pkt.SetLen(n)
|
||||||
copy(data, buf[:n])
|
|
||||||
s.packetPool.Put(bufPtr)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s.workChan <- &packetJob{data: data, addr: addr}:
|
case s.workChan <- &packetJob{pkt: pkt, addr: addr}:
|
||||||
default:
|
default:
|
||||||
if s.metrics != nil {
|
if s.metrics != nil {
|
||||||
s.metrics.RecordPacketDropped()
|
s.metrics.RecordPacketDropped()
|
||||||
}
|
}
|
||||||
|
pkt.Release()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -150,7 +149,8 @@ func (s *Server) worker() {
|
||||||
select {
|
select {
|
||||||
case job := <-s.workChan:
|
case job := <-s.workChan:
|
||||||
if job != nil {
|
if job != nil {
|
||||||
s.handler.HandlePacket(job.data, job.addr, s.conn)
|
s.handler.HandlePacketOwned(job.pkt.Bytes(), job.pkt, job.addr, s.conn)
|
||||||
|
job.pkt.Release()
|
||||||
}
|
}
|
||||||
case <-s.stopChan:
|
case <-s.stopChan:
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ INCLUDES="-I. \
|
||||||
-I$PROTO_DEPS_DIR/grpc-gateway \
|
-I$PROTO_DEPS_DIR/grpc-gateway \
|
||||||
-I$PROTO_DEPS_DIR/protobuf/src"
|
-I$PROTO_DEPS_DIR/protobuf/src"
|
||||||
|
|
||||||
for dir in common/v1 auth/v1 users/v1 rooms/v1 membership/v1 chat/v1 call/v1 registry/v1 admin/v1 friends/v1 dm/v1; do
|
for dir in common/v1 auth/v1 users/v1 rooms/v1 membership/v1 chat/v1 call/v1 registry/v1 admin/v1 friends/v1 dm/v1 unfurl/v1; do
|
||||||
echo "Generating for $dir..."
|
echo "Generating for $dir..."
|
||||||
|
|
||||||
protoc $INCLUDES \
|
protoc $INCLUDES \
|
||||||
|
|
@ -67,6 +67,7 @@ protoc $INCLUDES \
|
||||||
call/v1/*.proto \
|
call/v1/*.proto \
|
||||||
friends/v1/*.proto \
|
friends/v1/*.proto \
|
||||||
dm/v1/*.proto \
|
dm/v1/*.proto \
|
||||||
admin/v1/*.proto
|
admin/v1/*.proto \
|
||||||
|
unfurl/v1/*.proto
|
||||||
|
|
||||||
echo "Protobuf generation complete!"
|
echo "Protobuf generation complete!"
|
||||||
21
tools/voicetest/go.mod
Normal file
21
tools/voicetest/go.mod
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
module github.com/Alexander-D-Karpov/concord/tools/voicetest
|
||||||
|
|
||||||
|
go 1.25.1
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/Alexander-D-Karpov/concord v0.0.0
|
||||||
|
golang.org/x/crypto v0.43.0
|
||||||
|
google.golang.org/grpc v1.76.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
|
||||||
|
golang.org/x/net v0.45.0 // indirect
|
||||||
|
golang.org/x/sys v0.37.0 // indirect
|
||||||
|
golang.org/x/text v0.34.0 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 // indirect
|
||||||
|
google.golang.org/protobuf v1.36.10 // indirect
|
||||||
|
)
|
||||||
|
|
||||||
|
replace github.com/Alexander-D-Karpov/concord => ../../
|
||||||
42
tools/voicetest/go.sum
Normal file
42
tools/voicetest/go.sum
Normal file
|
|
@ -0,0 +1,42 @@
|
||||||
|
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||||
|
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||||
|
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||||
|
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||||
|
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||||
|
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||||
|
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||||
|
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
|
||||||
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||||
|
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
|
||||||
|
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
|
||||||
|
go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
|
||||||
|
go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
|
||||||
|
go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
|
||||||
|
go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
|
||||||
|
go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc=
|
||||||
|
go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
|
||||||
|
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
|
||||||
|
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
|
||||||
|
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
|
||||||
|
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
|
||||||
|
golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
|
||||||
|
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
|
||||||
|
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
|
||||||
|
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||||
|
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
|
||||||
|
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
|
||||||
|
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||||
|
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 h1:8XJ4pajGwOlasW+L13MnEGA8W4115jJySQtVfS2/IBU=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4/go.mod h1:NnuHhy+bxcg30o7FnVAZbXsPHUDQ9qKWAQKCD7VxFtk=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 h1:i8QOKZfYg6AbGVZzUAY3LrNWCKF8O6zFisU9Wl9RER4=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ=
|
||||||
|
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
|
||||||
|
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
|
||||||
|
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||||
|
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||||
635
tools/voicetest/main.go
Normal file
635
tools/voicetest/main.go
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user