mirror of
https://github.com/Alexander-D-Karpov/concord.git
synced 2026-03-16 22:04:15 +03:00
636 lines
15 KiB
Go
636 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/aes"
|
|
"crypto/cipher"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
authv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/auth/v1"
|
|
callv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/call/v1"
|
|
membershipv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/membership/v1"
|
|
roomsv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/rooms/v1"
|
|
usersv1 "github.com/Alexander-D-Karpov/concord/api/gen/go/users/v1"
|
|
"golang.org/x/crypto/hkdf"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
const (
|
|
mediaHeaderSize = 24
|
|
nonceSize = 12
|
|
nonceBaseSize = 4
|
|
|
|
pktHello = 0x01
|
|
pktWelcome = 0x02
|
|
pktAudio = 0x03
|
|
pktVideo = 0x04
|
|
pktPing = 0x05
|
|
pktPong = 0x06
|
|
pktBye = 0x07
|
|
|
|
codecOpus = 1
|
|
codecH264 = 2
|
|
)
|
|
|
|
var (
|
|
grpcAddr = flag.String("grpc", envOr("GRPC_API_URL", "localhost:9090"), "gRPC API address")
|
|
useTLS = flag.Bool("tls", envOr("USE_TLS", "false") == "true", "TLS for gRPC")
|
|
numClients = flag.Int("clients", 3, "simulated clients")
|
|
testDur = flag.Duration("duration", 30*time.Second, "test duration")
|
|
sendVideo = flag.Bool("video", false, "send fake video packets")
|
|
roomName = flag.String("room", "stress-test-room", "room name")
|
|
baseHandle = flag.String("handle", "stressbot", "base handle")
|
|
pw = flag.String("password", "testtest123", "bot password")
|
|
audioRateMs = flag.Int("audio-rate", 20, "audio interval ms")
|
|
videoRateMs = flag.Int("video-rate", 33, "video interval ms")
|
|
rateLimitBypassToken = flag.String("rl-bypass-token", envOr("RATE_LIMIT_BYPASS_TOKEN", ""), "rate limit bypass token")
|
|
)
|
|
|
|
func envOr(key, def string) string {
|
|
if v := os.Getenv(key); v != "" {
|
|
return v
|
|
}
|
|
return def
|
|
}
|
|
|
|
type stats struct {
|
|
audioSent atomic.Uint64
|
|
videoSent atomic.Uint64
|
|
audioRecv atomic.Uint64
|
|
videoRecv atomic.Uint64
|
|
pongRecv atomic.Uint64
|
|
welcomeOK atomic.Uint64
|
|
errors atomic.Uint64
|
|
bytesOut atomic.Uint64
|
|
bytesIn atomic.Uint64
|
|
rttSamples []time.Duration
|
|
rttMu sync.Mutex
|
|
}
|
|
|
|
func (s *stats) addRTT(d time.Duration) {
|
|
s.rttMu.Lock()
|
|
s.rttSamples = append(s.rttSamples, d)
|
|
s.rttMu.Unlock()
|
|
}
|
|
|
|
func (s *stats) summary() string {
|
|
s.rttMu.Lock()
|
|
defer s.rttMu.Unlock()
|
|
var avg, mn, mx time.Duration
|
|
if n := len(s.rttSamples); n > 0 {
|
|
mn = s.rttSamples[0]
|
|
for _, r := range s.rttSamples {
|
|
avg += r
|
|
if r < mn {
|
|
mn = r
|
|
}
|
|
if r > mx {
|
|
mx = r
|
|
}
|
|
}
|
|
avg /= time.Duration(n)
|
|
}
|
|
return fmt.Sprintf(
|
|
"audio_tx=%d video_tx=%d audio_rx=%d video_rx=%d pongs=%d welcomes=%d errs=%d out=%dKB in=%dKB rtt(avg=%v min=%v max=%v n=%d)",
|
|
s.audioSent.Load(), s.videoSent.Load(),
|
|
s.audioRecv.Load(), s.videoRecv.Load(),
|
|
s.pongRecv.Load(), s.welcomeOK.Load(), s.errors.Load(),
|
|
s.bytesOut.Load()/1024, s.bytesIn.Load()/1024,
|
|
avg, mn, mx, len(s.rttSamples),
|
|
)
|
|
}
|
|
|
|
type bot struct {
|
|
idx int
|
|
handle string
|
|
userID string
|
|
token string
|
|
roomID string
|
|
voiceToken string
|
|
udpHost string
|
|
udpPort int
|
|
ssrc uint32
|
|
videoSSRC uint32
|
|
screenSSRC uint32
|
|
keyMaterial []byte
|
|
keyID byte
|
|
conn *net.UDPConn
|
|
st *stats
|
|
ready chan struct{}
|
|
}
|
|
|
|
func withRateLimitBypass(ctx context.Context) context.Context {
|
|
if *rateLimitBypassToken == "" {
|
|
return ctx
|
|
}
|
|
return metadata.AppendToOutgoingContext(ctx, "x-concord-ratelimit-bypass", *rateLimitBypassToken)
|
|
}
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
log.SetFlags(log.Ltime | log.Lmicroseconds)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
rpcBaseCtx := withRateLimitBypass(ctx)
|
|
defer cancel()
|
|
|
|
sig := make(chan os.Signal, 1)
|
|
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() { <-sig; log.Println("shutting down..."); cancel() }()
|
|
|
|
var dialOpt grpc.DialOption
|
|
if *useTLS {
|
|
dialOpt = grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, ""))
|
|
} else {
|
|
dialOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
|
|
}
|
|
|
|
conn, err := grpc.NewClient(*grpcAddr, dialOpt)
|
|
if err != nil {
|
|
log.Fatalf("grpc dial: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
authC := authv1.NewAuthServiceClient(conn)
|
|
usersC := usersv1.NewUsersServiceClient(conn)
|
|
roomsC := roomsv1.NewRoomsServiceClient(conn)
|
|
callC := callv1.NewCallServiceClient(conn)
|
|
memberC := membershipv1.NewMembershipServiceClient(conn)
|
|
|
|
st := &stats{}
|
|
bots := make([]*bot, *numClients)
|
|
|
|
for i := 0; i < *numClients; i++ {
|
|
h := fmt.Sprintf("%s%d", *baseHandle, i)
|
|
log.Printf("[SETUP] auth %s", h)
|
|
tok, err := loginOrRegister(rpcBaseCtx, authC, h, *pw)
|
|
if err != nil {
|
|
log.Fatalf("auth %s: %v", h, err)
|
|
}
|
|
|
|
selfCtx := withAuth(rpcBaseCtx, tok)
|
|
self, err := usersC.GetSelf(selfCtx, &usersv1.GetSelfRequest{})
|
|
if err != nil {
|
|
log.Fatalf("getSelf %s: %v", h, err)
|
|
}
|
|
|
|
bots[i] = &bot{
|
|
idx: i,
|
|
handle: h,
|
|
userID: self.Id,
|
|
token: tok,
|
|
st: st,
|
|
ready: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
log.Printf("[SETUP] creating room %q", *roomName)
|
|
ownerCtx := withAuth(rpcBaseCtx, bots[0].token)
|
|
|
|
roomResp, err := roomsC.CreateRoom(ownerCtx, &roomsv1.CreateRoomRequest{
|
|
Name: *roomName,
|
|
Description: "voice stress test",
|
|
})
|
|
if err != nil {
|
|
log.Printf("[SETUP] create room failed (may exist): %v — listing", err)
|
|
lr, err2 := roomsC.ListRoomsForUser(ownerCtx, &roomsv1.ListRoomsForUserRequest{})
|
|
if err2 != nil {
|
|
log.Fatalf("list rooms: %v", err2)
|
|
}
|
|
found := false
|
|
for _, r := range lr.Rooms {
|
|
if r.Name == *roomName {
|
|
for _, b := range bots {
|
|
b.roomID = r.Id
|
|
}
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
log.Fatalf("room %q not found", *roomName)
|
|
}
|
|
} else {
|
|
for _, b := range bots {
|
|
b.roomID = roomResp.Id
|
|
}
|
|
}
|
|
log.Printf("[SETUP] room_id=%s", bots[0].roomID)
|
|
|
|
for i := 1; i < *numClients; i++ {
|
|
log.Printf("[SETUP] inviting bot %d (%s) to room", i, bots[i].userID)
|
|
_, err := memberC.Invite(ownerCtx, &membershipv1.InviteRequest{
|
|
RoomId: bots[0].roomID,
|
|
UserId: bots[i].userID,
|
|
})
|
|
if err != nil {
|
|
log.Printf("[SETUP] invite bot %d failed (may already be member): %v", i, err)
|
|
} else {
|
|
botCtx := withAuth(rpcBaseCtx, bots[i].token)
|
|
invites, err := memberC.ListRoomInvites(botCtx, &membershipv1.ListRoomInvitesRequest{})
|
|
if err == nil {
|
|
for _, inv := range invites.Incoming {
|
|
if inv.RoomId == bots[0].roomID {
|
|
_, _ = memberC.AcceptRoomInvite(botCtx, &membershipv1.AcceptRoomInviteRequest{InviteId: inv.Id})
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, b := range bots {
|
|
log.Printf("[SETUP] bot %s joining voice", b.handle)
|
|
bCtx := withAuth(rpcBaseCtx, b.token)
|
|
vr, err := callC.JoinVoice(bCtx, &callv1.JoinVoiceRequest{
|
|
RoomId: b.roomID,
|
|
AudioOnly: !*sendVideo,
|
|
})
|
|
if err != nil {
|
|
log.Fatalf("join voice %s: %v", b.handle, err)
|
|
}
|
|
b.voiceToken = vr.VoiceToken
|
|
b.udpHost = vr.Endpoint.Host
|
|
b.udpPort = int(vr.Endpoint.Port)
|
|
b.keyMaterial = vr.Crypto.KeyMaterial
|
|
if len(vr.Crypto.KeyId) > 0 {
|
|
b.keyID = vr.Crypto.KeyId[0]
|
|
}
|
|
log.Printf("[SETUP] bot %s: endpoint=%s:%d participants=%d", b.handle, b.udpHost, b.udpPort, len(vr.Participants))
|
|
}
|
|
|
|
log.Printf("[TEST] starting %d bots for %v", *numClients, *testDur)
|
|
|
|
testCtx, testCancel := context.WithTimeout(ctx, *testDur)
|
|
defer testCancel()
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for _, b := range bots {
|
|
if err := b.dial(); err != nil {
|
|
log.Fatalf("dial %s: %v", b.handle, err)
|
|
}
|
|
if err := b.hello(); err != nil {
|
|
log.Fatalf("hello %s: %v", b.handle, err)
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(b *bot) { defer wg.Done(); b.recvLoop(testCtx) }(b)
|
|
|
|
wg.Add(1)
|
|
go func(b *bot) { defer wg.Done(); b.audioLoop(testCtx) }(b)
|
|
|
|
if *sendVideo {
|
|
wg.Add(1)
|
|
go func(b *bot) { defer wg.Done(); b.vidLoop(testCtx) }(b)
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(b *bot) { defer wg.Done(); b.pingLoop(testCtx) }(b)
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
tick := time.NewTicker(5 * time.Second)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
log.Printf("[STATS] %s", st.summary())
|
|
case <-testCtx.Done():
|
|
tick.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
for _, b := range bots {
|
|
bye := make([]byte, 5)
|
|
bye[0] = pktBye
|
|
binary.BigEndian.PutUint32(bye[1:], b.ssrc)
|
|
b.conn.Write(bye)
|
|
b.conn.Close()
|
|
}
|
|
|
|
for _, b := range bots {
|
|
bCtx := withAuth(rpcBaseCtx, b.token)
|
|
callC.LeaveVoice(bCtx, &callv1.LeaveVoiceRequest{RoomId: b.roomID})
|
|
}
|
|
|
|
log.Println("========== FINAL ==========")
|
|
log.Println(st.summary())
|
|
|
|
if st.errors.Load() > 0 {
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func loginOrRegister(ctx context.Context, c authv1.AuthServiceClient, h, p string) (string, error) {
|
|
r, err := c.LoginPassword(ctx, &authv1.LoginPasswordRequest{Handle: h, Password: p})
|
|
if err == nil {
|
|
return r.AccessToken, nil
|
|
}
|
|
r2, err2 := c.Register(ctx, &authv1.RegisterRequest{Handle: h, Password: p, DisplayName: "Bot " + h})
|
|
if err2 != nil {
|
|
return "", fmt.Errorf("login: %v; register: %v", err, err2)
|
|
}
|
|
return r2.AccessToken, nil
|
|
}
|
|
|
|
func withAuth(ctx context.Context, token string) context.Context {
|
|
return metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
|
|
}
|
|
|
|
func (b *bot) dial() error {
|
|
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", b.udpHost, b.udpPort))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c, err := net.DialUDP("udp", nil, addr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.SetReadBuffer(4 << 20)
|
|
c.SetWriteBuffer(4 << 20)
|
|
b.conn = c
|
|
return nil
|
|
}
|
|
|
|
func (b *bot) hello() error {
|
|
payload := map[string]interface{}{
|
|
"token": b.voiceToken,
|
|
"protocol": 1,
|
|
"codec": "opus",
|
|
"room_id": b.roomID,
|
|
"user_id": b.userID,
|
|
"video_enabled": *sendVideo,
|
|
"video_codec": "h264",
|
|
"crypto": map[string]interface{}{
|
|
"aead": "aes-256-gcm",
|
|
"key_id": []byte{b.keyID},
|
|
},
|
|
}
|
|
js, _ := json.Marshal(payload)
|
|
pkt := append([]byte{pktHello}, js...)
|
|
_, err := b.conn.Write(pkt)
|
|
return err
|
|
}
|
|
|
|
func (b *bot) recvLoop(ctx context.Context) {
|
|
buf := make([]byte, 64*1024)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
b.conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
|
|
n, err := b.conn.Read(buf)
|
|
if err != nil {
|
|
if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
|
continue
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
b.st.errors.Add(1)
|
|
continue
|
|
}
|
|
|
|
if n < 1 {
|
|
continue
|
|
}
|
|
|
|
b.st.bytesIn.Add(uint64(n))
|
|
|
|
switch buf[0] {
|
|
case pktWelcome:
|
|
var w map[string]interface{}
|
|
if err := json.Unmarshal(buf[1:n], &w); err != nil {
|
|
log.Printf("[BOT %d] bad welcome json (%d bytes): %v", b.idx, n, err)
|
|
continue
|
|
}
|
|
if v, ok := w["ssrc"].(float64); ok {
|
|
b.ssrc = uint32(v)
|
|
}
|
|
if v, ok := w["video_ssrc"].(float64); ok {
|
|
b.videoSSRC = uint32(v)
|
|
}
|
|
if v, ok := w["screen_ssrc"].(float64); ok {
|
|
b.screenSSRC = uint32(v)
|
|
}
|
|
b.st.welcomeOK.Add(1)
|
|
log.Printf("[BOT %d] welcome ssrc=%d video=%d screen=%d", b.idx, b.ssrc, b.videoSSRC, b.screenSSRC)
|
|
select {
|
|
case <-b.ready:
|
|
default:
|
|
close(b.ready)
|
|
}
|
|
|
|
case pktAudio:
|
|
b.st.audioRecv.Add(1)
|
|
|
|
case pktVideo:
|
|
b.st.videoRecv.Add(1)
|
|
|
|
case pktPong:
|
|
if n >= 9 {
|
|
sent := int64(binary.BigEndian.Uint64(buf[1:9]))
|
|
rtt := time.Duration(time.Now().UnixMilli()-sent) * time.Millisecond
|
|
b.st.pongRecv.Add(1)
|
|
b.st.addRTT(rtt)
|
|
}
|
|
|
|
case pktHello, pktBye:
|
|
// ignore control packets not needed by the stress tool
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bot) audioLoop(ctx context.Context) {
|
|
select {
|
|
case <-b.ready:
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(10 * time.Second):
|
|
log.Printf("[BOT %d] timeout waiting for welcome, skipping audio", b.idx)
|
|
return
|
|
}
|
|
|
|
if b.ssrc == 0 {
|
|
log.Printf("[BOT %d] ssrc=0, skip audio", b.idx)
|
|
return
|
|
}
|
|
|
|
tick := time.NewTicker(time.Duration(*audioRateMs) * time.Millisecond)
|
|
defer tick.Stop()
|
|
|
|
fakeOpus := make([]byte, 160)
|
|
rand.Read(fakeOpus)
|
|
|
|
var seq uint16
|
|
var ctr uint64
|
|
var ts uint32
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-tick.C:
|
|
pkt := b.mediaPkt(pktAudio, 0, codecOpus, b.ssrc, seq, ts, ctr, fakeOpus)
|
|
if _, err := b.conn.Write(pkt); err != nil {
|
|
b.st.errors.Add(1)
|
|
} else {
|
|
b.st.audioSent.Add(1)
|
|
b.st.bytesOut.Add(uint64(len(pkt)))
|
|
}
|
|
seq++
|
|
ctr++
|
|
ts += 960
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bot) vidLoop(ctx context.Context) {
|
|
select {
|
|
case <-b.ready:
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(3 * time.Second):
|
|
return
|
|
}
|
|
|
|
if b.videoSSRC == 0 {
|
|
return
|
|
}
|
|
|
|
tick := time.NewTicker(time.Duration(*videoRateMs) * time.Millisecond)
|
|
defer tick.Stop()
|
|
|
|
fake := make([]byte, 800)
|
|
rand.Read(fake)
|
|
|
|
var seq uint16
|
|
var ctr uint64
|
|
var ts uint32
|
|
var fc int
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-tick.C:
|
|
var flags uint8
|
|
if fc%90 == 0 {
|
|
flags = 0x01
|
|
}
|
|
pkt := b.mediaPkt(pktVideo, flags, codecH264, b.videoSSRC, seq, ts, ctr, fake)
|
|
if _, err := b.conn.Write(pkt); err != nil {
|
|
b.st.errors.Add(1)
|
|
} else {
|
|
b.st.videoSent.Add(1)
|
|
b.st.bytesOut.Add(uint64(len(pkt)))
|
|
}
|
|
seq++
|
|
ctr++
|
|
ts += 3000
|
|
fc++
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bot) pingLoop(ctx context.Context) {
|
|
tick := time.NewTicker(5 * time.Second)
|
|
defer tick.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-tick.C:
|
|
pkt := make([]byte, 9)
|
|
pkt[0] = pktPing
|
|
binary.BigEndian.PutUint64(pkt[1:], uint64(time.Now().UnixMilli()))
|
|
b.conn.Write(pkt)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *bot) mediaPkt(typ, flags, codec uint8, ssrc uint32, seq uint16, ts uint32, ctr uint64, payload []byte) []byte {
|
|
hdr := make([]byte, mediaHeaderSize)
|
|
hdr[0] = typ
|
|
hdr[1] = flags
|
|
hdr[2] = b.keyID
|
|
hdr[3] = codec
|
|
binary.BigEndian.PutUint16(hdr[4:6], seq)
|
|
binary.BigEndian.PutUint32(hdr[6:10], ts)
|
|
binary.BigEndian.PutUint32(hdr[10:14], ssrc)
|
|
binary.BigEndian.PutUint64(hdr[14:22], ctr)
|
|
|
|
enc := b.seal(hdr, payload, ssrc, ctr)
|
|
out := make([]byte, mediaHeaderSize+len(enc))
|
|
copy(out, hdr)
|
|
copy(out[mediaHeaderSize:], enc)
|
|
return out
|
|
}
|
|
|
|
func (b *bot) seal(aad, plaintext []byte, ssrc uint32, counter uint64) []byte {
|
|
if len(b.keyMaterial) != 32 {
|
|
return plaintext
|
|
}
|
|
nonce := b.nonce(ssrc, counter)
|
|
block, err := aes.NewCipher(b.keyMaterial)
|
|
if err != nil {
|
|
return plaintext
|
|
}
|
|
gcm, err := cipher.NewGCM(block)
|
|
if err != nil {
|
|
return plaintext
|
|
}
|
|
return gcm.Seal(nil, nonce, plaintext, aad)
|
|
}
|
|
|
|
func (b *bot) nonce(ssrc uint32, counter uint64) []byte {
|
|
nb := deriveNonceBase(b.keyMaterial, b.roomID, ssrc, b.keyID)
|
|
n := make([]byte, nonceSize)
|
|
copy(n[:nonceBaseSize], nb)
|
|
binary.BigEndian.PutUint64(n[nonceBaseSize:], counter)
|
|
return n
|
|
}
|
|
|
|
func deriveNonceBase(keyMaterial []byte, roomID string, ssrc uint32, keyID byte) []byte {
|
|
info := []byte("nonce-base\x00")
|
|
info = append(info, []byte(roomID)...)
|
|
info = append(info, keyID)
|
|
buf := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(buf, ssrc)
|
|
info = append(info, buf...)
|
|
|
|
reader := hkdf.New(sha256.New, keyMaterial, nil, info)
|
|
out := make([]byte, nonceBaseSize)
|
|
io.ReadFull(reader, out)
|
|
return out
|
|
}
|