bank-emul/internal/mq/consumer.go

275 lines
7.1 KiB
Go

package mq
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
"bank-emulator/internal/bank"
)
const (
requestQueue = "bank.requests"
responseQueue = "bank.responses"
)
type Command struct {
CorrelationID string `json:"correlationId"`
Operation string `json:"operation"`
Payload string `json:"payload"`
ReplyQueue string `json:"replyQueue"`
}
type Reply struct {
CorrelationID string `json:"correlationId"`
OK bool `json:"ok"`
Payload string `json:"payload,omitempty"`
Error string `json:"error,omitempty"`
}
func RunConsumer(ctx context.Context, url string, b *bank.Bank) error {
conn, err := amqp.Dial(ctx, url, nil)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
defer conn.Close()
session, err := conn.NewSession(ctx, nil)
if err != nil {
return fmt.Errorf("session: %w", err)
}
receiver, err := session.NewReceiver(ctx, requestQueue, nil)
if err != nil {
return fmt.Errorf("receiver: %w", err)
}
defer receiver.Close(ctx)
sender, err := session.NewSender(ctx, responseQueue, nil)
if err != nil {
return fmt.Errorf("sender: %w", err)
}
defer sender.Close(ctx)
log.Printf("[AMQP] consumer started on %s", requestQueue)
for {
msg, err := receiver.Receive(ctx, nil)
if err != nil {
return fmt.Errorf("receive: %w", err)
}
body, err := extractBody(msg)
if err != nil {
log.Printf("[AMQP] body extract failed: %v", err)
receiver.AcceptMessage(ctx, msg)
continue
}
var cmd Command
if err := json.Unmarshal([]byte(body), &cmd); err != nil {
log.Printf("[AMQP] decode failed: %v", err)
receiver.AcceptMessage(ctx, msg)
continue
}
corrID := pickCorrelationID(msg, cmd.CorrelationID)
reply := dispatch(b, cmd)
reply.CorrelationID = corrID
log.Printf("[AMQP] op=%s corr=%s ok=%v err=%q", cmd.Operation, corrID, reply.OK, reply.Error)
replyJSON, _ := json.Marshal(reply)
replyMsg := &amqp.Message{
Value: string(replyJSON),
Properties: &amqp.MessageProperties{
CorrelationID: corrID,
},
}
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
if err := sender.Send(sendCtx, replyMsg, nil); err != nil {
log.Printf("[AMQP] reply send failed for %s: %v", corrID, err)
}
cancel()
receiver.AcceptMessage(ctx, msg)
}
}
func extractBody(msg *amqp.Message) (string, error) {
if msg.Value != nil {
if s, ok := msg.Value.(string); ok {
return s, nil
}
}
if len(msg.Data) > 0 {
return string(msg.Data[0]), nil
}
return "", fmt.Errorf("empty body")
}
func pickCorrelationID(msg *amqp.Message, fallback string) string {
if msg.Properties != nil && msg.Properties.CorrelationID != nil {
if s, ok := msg.Properties.CorrelationID.(string); ok && s != "" {
return s
}
}
return fallback
}
func dispatch(b *bank.Bank, cmd Command) Reply {
switch cmd.Operation {
case "VALIDATE":
var p struct {
CardNumber string `json:"cardNumber"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
if err := b.ValidateCard(p.CardNumber); err != nil {
return Reply{OK: false, Error: err.Error()}
}
return Reply{OK: true}
case "INIT_BIND":
var p struct {
CardNumber string `json:"cardNumber"`
CVV string `json:"cvv"`
Expiry string `json:"expiry"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
s, err := b.InitiateBind(p.CardNumber, p.CVV, p.Expiry)
if err != nil {
return Reply{OK: false, Error: err.Error()}
}
if s == nil || s.SessionID == "" {
return Reply{OK: false, Error: "bank created empty 3DS session"}
}
return Reply{OK: true, Payload: s.SessionID}
case "CONFIRM_3DS":
var p struct {
SessionID string `json:"sessionId"`
Code string `json:"code"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
if err := b.Confirm3DS(p.SessionID, p.Code); err != nil {
return Reply{OK: false, Error: err.Error()}
}
return Reply{OK: true}
case "INIT_CHARGE":
var p struct {
CardNumber string `json:"cardNumber"`
Amount float64 `json:"amount"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
s, err := b.Charge(p.CardNumber, p.Amount)
if err != nil {
return Reply{OK: false, Error: err.Error()}
}
return Reply{OK: true, Payload: s.SessionID}
case "COMPLETE_CHARGE":
var p struct {
SessionID string `json:"sessionId"`
Amount float64 `json:"amount"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
if err := b.CompleteCharge(p.SessionID, p.Amount); err != nil {
return Reply{OK: false, Error: err.Error()}
}
return Reply{OK: true}
case "PREPARE_CHARGE":
var p struct {
CorrelationID string `json:"correlationId"`
CardNumber string `json:"cardNumber"`
Amount float64 `json:"amount"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
ch, err := b.PrepareCharge(p.CorrelationID, p.CardNumber, p.Amount)
if err != nil {
return Reply{OK: false, Error: err.Error()}
}
out, _ := json.Marshal(ch)
return Reply{OK: true, Payload: string(out)}
case "COMMIT_CHARGE_2PC":
var p struct {
CorrelationID string `json:"correlationId"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
ch, err := b.CommitCharge(p.CorrelationID)
if err != nil {
return Reply{OK: false, Error: err.Error()}
}
out, _ := json.Marshal(ch)
return Reply{OK: true, Payload: string(out)}
case "ROLLBACK_CHARGE":
var p struct {
CorrelationID string `json:"correlationId"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
ch, err := b.RollbackCharge(p.CorrelationID)
if err != nil {
return Reply{OK: false, Error: err.Error()}
}
out, _ := json.Marshal(ch)
return Reply{OK: true, Payload: string(out)}
case "DIRECT_CHARGE":
var p struct {
CorrelationID string `json:"correlationId"`
CardNumber string `json:"cardNumber"`
Amount float64 `json:"amount"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
ch, err := b.DirectCharge(p.CorrelationID, p.CardNumber, p.Amount)
if err != nil {
return Reply{OK: false, Error: err.Error()}
}
out, _ := json.Marshal(ch)
return Reply{OK: true, Payload: string(out)}
case "GET_CHARGE_STATUS":
var p struct {
CorrelationID string `json:"correlationId"`
}
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
return Reply{OK: false, Error: err.Error()}
}
ch := b.GetChargeStatus(p.CorrelationID)
if ch == nil {
return Reply{OK: true, Payload: `{"status":"NOT_FOUND"}`}
}
out, _ := json.Marshal(ch)
return Reply{OK: true, Payload: string(out)}
default:
return Reply{OK: false, Error: "unknown operation: " + cmd.Operation}
}
}