275 lines
7.1 KiB
Go
275 lines
7.1 KiB
Go
package mq
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/Azure/go-amqp"
|
|
|
|
"bank-emulator/internal/bank"
|
|
)
|
|
|
|
const (
|
|
requestQueue = "bank.requests"
|
|
responseQueue = "bank.responses"
|
|
)
|
|
|
|
type Command struct {
|
|
CorrelationID string `json:"correlationId"`
|
|
Operation string `json:"operation"`
|
|
Payload string `json:"payload"`
|
|
ReplyQueue string `json:"replyQueue"`
|
|
}
|
|
|
|
type Reply struct {
|
|
CorrelationID string `json:"correlationId"`
|
|
OK bool `json:"ok"`
|
|
Payload string `json:"payload,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
func RunConsumer(ctx context.Context, url string, b *bank.Bank) error {
|
|
conn, err := amqp.Dial(ctx, url, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("dial: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
session, err := conn.NewSession(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("session: %w", err)
|
|
}
|
|
|
|
receiver, err := session.NewReceiver(ctx, requestQueue, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("receiver: %w", err)
|
|
}
|
|
defer receiver.Close(ctx)
|
|
|
|
sender, err := session.NewSender(ctx, responseQueue, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("sender: %w", err)
|
|
}
|
|
defer sender.Close(ctx)
|
|
|
|
log.Printf("[AMQP] consumer started on %s", requestQueue)
|
|
|
|
for {
|
|
msg, err := receiver.Receive(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("receive: %w", err)
|
|
}
|
|
|
|
body, err := extractBody(msg)
|
|
if err != nil {
|
|
log.Printf("[AMQP] body extract failed: %v", err)
|
|
receiver.AcceptMessage(ctx, msg)
|
|
continue
|
|
}
|
|
|
|
var cmd Command
|
|
if err := json.Unmarshal([]byte(body), &cmd); err != nil {
|
|
log.Printf("[AMQP] decode failed: %v", err)
|
|
receiver.AcceptMessage(ctx, msg)
|
|
continue
|
|
}
|
|
|
|
corrID := pickCorrelationID(msg, cmd.CorrelationID)
|
|
reply := dispatch(b, cmd)
|
|
reply.CorrelationID = corrID
|
|
|
|
log.Printf("[AMQP] op=%s corr=%s ok=%v err=%q", cmd.Operation, corrID, reply.OK, reply.Error)
|
|
|
|
replyJSON, _ := json.Marshal(reply)
|
|
replyMsg := &amqp.Message{
|
|
Value: string(replyJSON),
|
|
Properties: &amqp.MessageProperties{
|
|
CorrelationID: corrID,
|
|
},
|
|
}
|
|
|
|
sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
if err := sender.Send(sendCtx, replyMsg, nil); err != nil {
|
|
log.Printf("[AMQP] reply send failed for %s: %v", corrID, err)
|
|
}
|
|
cancel()
|
|
|
|
receiver.AcceptMessage(ctx, msg)
|
|
}
|
|
}
|
|
|
|
func extractBody(msg *amqp.Message) (string, error) {
|
|
if msg.Value != nil {
|
|
if s, ok := msg.Value.(string); ok {
|
|
return s, nil
|
|
}
|
|
}
|
|
if len(msg.Data) > 0 {
|
|
return string(msg.Data[0]), nil
|
|
}
|
|
return "", fmt.Errorf("empty body")
|
|
}
|
|
|
|
func pickCorrelationID(msg *amqp.Message, fallback string) string {
|
|
if msg.Properties != nil && msg.Properties.CorrelationID != nil {
|
|
if s, ok := msg.Properties.CorrelationID.(string); ok && s != "" {
|
|
return s
|
|
}
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func dispatch(b *bank.Bank, cmd Command) Reply {
|
|
switch cmd.Operation {
|
|
case "VALIDATE":
|
|
var p struct {
|
|
CardNumber string `json:"cardNumber"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
if err := b.ValidateCard(p.CardNumber); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
return Reply{OK: true}
|
|
|
|
case "INIT_BIND":
|
|
var p struct {
|
|
CardNumber string `json:"cardNumber"`
|
|
CVV string `json:"cvv"`
|
|
Expiry string `json:"expiry"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
s, err := b.InitiateBind(p.CardNumber, p.CVV, p.Expiry)
|
|
if err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
if s == nil || s.SessionID == "" {
|
|
return Reply{OK: false, Error: "bank created empty 3DS session"}
|
|
}
|
|
return Reply{OK: true, Payload: s.SessionID}
|
|
|
|
case "CONFIRM_3DS":
|
|
var p struct {
|
|
SessionID string `json:"sessionId"`
|
|
Code string `json:"code"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
if err := b.Confirm3DS(p.SessionID, p.Code); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
return Reply{OK: true}
|
|
|
|
case "INIT_CHARGE":
|
|
var p struct {
|
|
CardNumber string `json:"cardNumber"`
|
|
Amount float64 `json:"amount"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
s, err := b.Charge(p.CardNumber, p.Amount)
|
|
if err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
return Reply{OK: true, Payload: s.SessionID}
|
|
|
|
case "COMPLETE_CHARGE":
|
|
var p struct {
|
|
SessionID string `json:"sessionId"`
|
|
Amount float64 `json:"amount"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
if err := b.CompleteCharge(p.SessionID, p.Amount); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
return Reply{OK: true}
|
|
|
|
case "PREPARE_CHARGE":
|
|
var p struct {
|
|
CorrelationID string `json:"correlationId"`
|
|
CardNumber string `json:"cardNumber"`
|
|
Amount float64 `json:"amount"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
ch, err := b.PrepareCharge(p.CorrelationID, p.CardNumber, p.Amount)
|
|
if err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
out, _ := json.Marshal(ch)
|
|
return Reply{OK: true, Payload: string(out)}
|
|
|
|
case "COMMIT_CHARGE_2PC":
|
|
var p struct {
|
|
CorrelationID string `json:"correlationId"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
ch, err := b.CommitCharge(p.CorrelationID)
|
|
if err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
out, _ := json.Marshal(ch)
|
|
return Reply{OK: true, Payload: string(out)}
|
|
|
|
case "ROLLBACK_CHARGE":
|
|
var p struct {
|
|
CorrelationID string `json:"correlationId"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
ch, err := b.RollbackCharge(p.CorrelationID)
|
|
if err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
out, _ := json.Marshal(ch)
|
|
return Reply{OK: true, Payload: string(out)}
|
|
|
|
case "DIRECT_CHARGE":
|
|
var p struct {
|
|
CorrelationID string `json:"correlationId"`
|
|
CardNumber string `json:"cardNumber"`
|
|
Amount float64 `json:"amount"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
ch, err := b.DirectCharge(p.CorrelationID, p.CardNumber, p.Amount)
|
|
if err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
out, _ := json.Marshal(ch)
|
|
return Reply{OK: true, Payload: string(out)}
|
|
|
|
case "GET_CHARGE_STATUS":
|
|
var p struct {
|
|
CorrelationID string `json:"correlationId"`
|
|
}
|
|
if err := json.Unmarshal([]byte(cmd.Payload), &p); err != nil {
|
|
return Reply{OK: false, Error: err.Error()}
|
|
}
|
|
ch := b.GetChargeStatus(p.CorrelationID)
|
|
if ch == nil {
|
|
return Reply{OK: true, Payload: `{"status":"NOT_FOUND"}`}
|
|
}
|
|
out, _ := json.Marshal(ch)
|
|
return Reply{OK: true, Payload: string(out)}
|
|
|
|
default:
|
|
return Reply{OK: false, Error: "unknown operation: " + cmd.Operation}
|
|
}
|
|
}
|