mirror of
https://github.com/Alexander-D-Karpov/webring.git
synced 2026-03-16 22:07:41 +03:00
655 lines
16 KiB
Go
655 lines
16 KiB
Go
package uptime
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"webring/internal/models"
|
|
"webring/internal/telegram"
|
|
)
|
|
|
|
const (
|
|
defaultCheckInterval = 5 * time.Minute
|
|
debugCheckInterval = 5 * time.Second
|
|
httpTimeout = 10 * time.Second
|
|
tlsTimeout = 10 * time.Second
|
|
maxIdleConns = 100
|
|
idleTimeout = 90 * time.Second
|
|
serverErrorCode = 400
|
|
logPerm = 0o644
|
|
userAgent = "webring-checker (+https://otor.ing)"
|
|
defaultWorkers = 5
|
|
defaultDownThreshold = 3
|
|
)
|
|
|
|
type checkTask struct {
|
|
site models.Site
|
|
useProxy bool
|
|
currentUp bool
|
|
}
|
|
|
|
type checkResult struct {
|
|
siteID int
|
|
siteName string
|
|
userID *int
|
|
isUp bool
|
|
wasUp bool
|
|
responseTime float64
|
|
errorMsg string
|
|
useProxy bool
|
|
proxyError bool
|
|
}
|
|
|
|
type Checker struct {
|
|
db *sql.DB
|
|
proxy *url.URL
|
|
proxyAlive bool
|
|
proxyMu sync.RWMutex
|
|
debug bool
|
|
notifyStates sync.Map
|
|
failureCounters sync.Map
|
|
downThreshold int
|
|
workers int
|
|
checkInterval time.Duration
|
|
taskQueue chan checkTask
|
|
resultQueue chan checkResult
|
|
wg sync.WaitGroup
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
type NotifyState struct {
|
|
LastNotifiedState bool
|
|
NotifiedAt time.Time
|
|
}
|
|
|
|
func NewChecker(db *sql.DB) *Checker {
|
|
var proxyURL *url.URL
|
|
if proxyStr := os.Getenv("CHECKER_PROXY"); proxyStr != "" {
|
|
var err error
|
|
proxyURL, err = url.Parse(proxyStr)
|
|
if err != nil {
|
|
log.Printf("Warning: Invalid proxy URL provided (%s): %v. Will proceed without proxy.", proxyStr, err)
|
|
} else {
|
|
log.Printf("Using proxy: %s", proxyStr)
|
|
}
|
|
}
|
|
|
|
debug := false
|
|
if debugStr := os.Getenv("CHECKER_DEBUG"); debugStr != "" {
|
|
var err error
|
|
debug, err = strconv.ParseBool(debugStr)
|
|
if err != nil {
|
|
log.Printf("Warning: Invalid CHECKER_DEBUG value: %v", err)
|
|
}
|
|
}
|
|
|
|
workers := defaultWorkers
|
|
if workersStr := os.Getenv("CHECKER_WORKERS"); workersStr != "" {
|
|
if w, err := strconv.Atoi(workersStr); err == nil && w > 0 {
|
|
workers = w
|
|
} else {
|
|
log.Printf("Warning: Invalid CHECKER_WORKERS value: %s, using default %d", workersStr, defaultWorkers)
|
|
}
|
|
}
|
|
|
|
downThreshold := defaultDownThreshold
|
|
if thresholdStr := os.Getenv("CHECKER_DOWN_THRESHOLD"); thresholdStr != "" {
|
|
if t, err := strconv.Atoi(thresholdStr); err == nil && t > 0 {
|
|
downThreshold = t
|
|
} else {
|
|
log.Printf("Warning: Invalid CHECKER_DOWN_THRESHOLD value: %s, using default %d", thresholdStr, defaultDownThreshold)
|
|
}
|
|
}
|
|
|
|
checkInterval := defaultCheckInterval
|
|
if debug {
|
|
checkInterval = debugCheckInterval
|
|
}
|
|
|
|
if intervalStr := os.Getenv("CHECKER_INTERVAL"); intervalStr != "" {
|
|
if d, err := time.ParseDuration(intervalStr); err == nil && d >= time.Second {
|
|
checkInterval = d
|
|
} else {
|
|
log.Printf("Warning: Invalid CHECKER_INTERVAL value: %s, using %v", intervalStr, checkInterval)
|
|
}
|
|
}
|
|
|
|
checker := &Checker{
|
|
db: db,
|
|
proxy: proxyURL,
|
|
proxyAlive: true,
|
|
debug: debug,
|
|
downThreshold: downThreshold,
|
|
workers: workers,
|
|
checkInterval: checkInterval,
|
|
taskQueue: make(chan checkTask, 1000),
|
|
resultQueue: make(chan checkResult, 1000),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
checker.loadInitialNotifyStates()
|
|
checker.validateCapacity()
|
|
|
|
log.Printf("Checker initialized with down threshold: %d consecutive failures required", downThreshold)
|
|
|
|
return checker
|
|
}
|
|
|
|
func (c *Checker) validateCapacity() {
|
|
count, err := c.getSiteCount()
|
|
if err != nil {
|
|
log.Printf("Warning: Could not validate checker capacity: %v", err)
|
|
return
|
|
}
|
|
|
|
maxTasksPerInterval := float64(c.workers) * (c.checkInterval.Seconds() / httpTimeout.Seconds())
|
|
|
|
if float64(count) > maxTasksPerInterval {
|
|
log.Printf("WARNING: Checker capacity may be insufficient!")
|
|
log.Printf(" Sites to check: %d", count)
|
|
log.Printf(" Workers: %d", c.workers)
|
|
log.Printf(" Check interval: %v", c.checkInterval)
|
|
log.Printf(" HTTP timeout: %v", httpTimeout)
|
|
log.Printf(" Max sites per interval: %.0f", maxTasksPerInterval)
|
|
log.Printf(" Consider increasing CHECKER_WORKERS or check interval")
|
|
} else {
|
|
c.debugLogf("Capacity OK: %d sites, %d workers, max %.0f sites/interval",
|
|
count, c.workers, maxTasksPerInterval)
|
|
}
|
|
}
|
|
|
|
func (c *Checker) getSiteCount() (int, error) {
|
|
var count int
|
|
err := c.db.QueryRow("SELECT COUNT(*) FROM sites").Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
func (c *Checker) loadInitialNotifyStates() {
|
|
rows, err := c.db.Query("SELECT id, is_up FROM sites")
|
|
if err != nil {
|
|
log.Printf("Error loading initial site states: %v", err)
|
|
return
|
|
}
|
|
defer func() {
|
|
if cerr := rows.Close(); cerr != nil {
|
|
log.Printf("Error closing rows: %v", cerr)
|
|
}
|
|
}()
|
|
|
|
for rows.Next() {
|
|
var id int
|
|
var isUp bool
|
|
if scanErr := rows.Scan(&id, &isUp); scanErr != nil {
|
|
log.Printf("Error scanning site state: %v", scanErr)
|
|
continue
|
|
}
|
|
c.notifyStates.Store(id, &NotifyState{
|
|
LastNotifiedState: isUp,
|
|
NotifiedAt: time.Now(),
|
|
})
|
|
c.failureCounters.Store(id, 0)
|
|
}
|
|
|
|
if rowsErr := rows.Err(); rowsErr != nil {
|
|
log.Printf("Error iterating rows: %v", rowsErr)
|
|
}
|
|
}
|
|
|
|
func (c *Checker) debugLogf(format string, args ...interface{}) {
|
|
if c.debug {
|
|
log.Printf("[DEBUG] "+format, args...)
|
|
}
|
|
}
|
|
|
|
func (c *Checker) Start() {
|
|
log.Printf("Starting checker with %d workers...", c.workers)
|
|
if c.debug {
|
|
c.debugLogf("Debug mode enabled, check interval: %v", c.checkInterval)
|
|
}
|
|
|
|
for i := 0; i < c.workers; i++ {
|
|
c.wg.Add(1)
|
|
go c.worker(i)
|
|
}
|
|
|
|
c.wg.Add(1)
|
|
go c.resultProcessor()
|
|
|
|
c.wg.Add(1)
|
|
go c.scheduler()
|
|
|
|
c.wg.Wait()
|
|
}
|
|
|
|
func (c *Checker) Stop() {
|
|
close(c.stopCh)
|
|
c.wg.Wait()
|
|
}
|
|
|
|
func (c *Checker) scheduler() {
|
|
defer c.wg.Done()
|
|
|
|
c.scheduleTasks()
|
|
|
|
ticker := time.NewTicker(c.checkInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-c.stopCh:
|
|
close(c.taskQueue)
|
|
return
|
|
case <-ticker.C:
|
|
c.scheduleTasks()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Checker) scheduleTasks() {
|
|
sites, err := c.getAllSitesWithStatus()
|
|
if err != nil {
|
|
log.Printf("Error getting sites for scheduling: %v", err)
|
|
return
|
|
}
|
|
|
|
c.debugLogf("Scheduling %d sites for checking", len(sites))
|
|
|
|
c.proxyMu.RLock()
|
|
useProxy := c.proxy != nil && c.proxyAlive
|
|
c.proxyMu.RUnlock()
|
|
|
|
for _, site := range sites {
|
|
select {
|
|
case <-c.stopCh:
|
|
return
|
|
case c.taskQueue <- checkTask{site: site.Site, useProxy: useProxy, currentUp: site.IsUp}:
|
|
c.debugLogf("Scheduled site %s (ID: %d, currentUp: %t)", site.URL, site.ID, site.IsUp)
|
|
default:
|
|
log.Printf("Warning: Task queue full, skipping site %s (ID: %d)", site.URL, site.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Checker) worker(id int) {
|
|
defer c.wg.Done()
|
|
|
|
transport := &http.Transport{
|
|
TLSHandshakeTimeout: tlsTimeout,
|
|
DisableKeepAlives: false,
|
|
MaxIdleConns: maxIdleConns,
|
|
IdleConnTimeout: idleTimeout,
|
|
}
|
|
|
|
client := &http.Client{
|
|
Timeout: httpTimeout,
|
|
Transport: transport,
|
|
}
|
|
|
|
c.debugLogf("Worker %d started", id)
|
|
|
|
for task := range c.taskQueue {
|
|
c.debugLogf("Worker %d checking site %s (ID: %d, dbState: %t)", id, task.site.URL, task.site.ID, task.currentUp)
|
|
|
|
result := c.checkSite(client, transport, &task.site, task.useProxy)
|
|
result.userID = task.site.UserID
|
|
result.siteName = task.site.Name
|
|
result.wasUp = task.currentUp
|
|
|
|
select {
|
|
case c.resultQueue <- result:
|
|
case <-c.stopCh:
|
|
return
|
|
}
|
|
|
|
if !result.isUp && result.proxyError && task.useProxy {
|
|
c.debugLogf("Worker %d retrying site %s without proxy", id, task.site.URL)
|
|
retryResult := c.checkSite(client, transport, &task.site, false)
|
|
retryResult.userID = task.site.UserID
|
|
retryResult.siteName = task.site.Name
|
|
retryResult.wasUp = task.currentUp
|
|
|
|
select {
|
|
case c.resultQueue <- retryResult:
|
|
case <-c.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
c.debugLogf("Worker %d stopped", id)
|
|
}
|
|
|
|
func (c *Checker) checkSite(client *http.Client,
|
|
transport *http.Transport, site *models.Site, useProxy bool) checkResult {
|
|
result := checkResult{
|
|
siteID: site.ID,
|
|
useProxy: useProxy,
|
|
}
|
|
|
|
if useProxy && c.proxy != nil {
|
|
transport.Proxy = http.ProxyURL(c.proxy)
|
|
} else {
|
|
transport.Proxy = nil
|
|
}
|
|
|
|
siteURL := site.URL
|
|
if !hasProtocol(siteURL) {
|
|
siteURL = "https://" + siteURL
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), httpTimeout)
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", siteURL, http.NoBody)
|
|
if err != nil {
|
|
result.errorMsg = fmt.Sprintf("Error creating request: %v", err)
|
|
result.responseTime = time.Since(start).Seconds()
|
|
return result
|
|
}
|
|
req.Header.Set("User-Agent", userAgent)
|
|
|
|
resp, err := client.Do(req)
|
|
result.responseTime = time.Since(start).Seconds()
|
|
|
|
if err != nil {
|
|
result.errorMsg = fmt.Sprintf("Error checking site: %v", err)
|
|
result.proxyError = isProxyError(err)
|
|
return result
|
|
}
|
|
defer func() {
|
|
if cerr := resp.Body.Close(); cerr != nil {
|
|
c.debugLogf("Error closing response body: %v", cerr)
|
|
}
|
|
}()
|
|
|
|
result.isUp = resp.StatusCode < serverErrorCode
|
|
c.debugLogf("Checked site %s (ID: %d): status %d, isUp: %t, responseTime: %.2fs",
|
|
site.URL, site.ID, resp.StatusCode, result.isUp, result.responseTime)
|
|
return result
|
|
}
|
|
|
|
func (c *Checker) resultProcessor() {
|
|
defer c.wg.Done()
|
|
defer close(c.resultQueue)
|
|
|
|
proxyFailures := 0
|
|
proxySuccesses := 0
|
|
const proxyThreshold = 5
|
|
|
|
for {
|
|
select {
|
|
case <-c.stopCh:
|
|
return
|
|
case result, ok := <-c.resultQueue:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
shouldUpdateDB, newIsUp := c.processCheckResult(result)
|
|
|
|
if shouldUpdateDB {
|
|
c.updateSiteStatus(result.siteID, newIsUp, result.responseTime)
|
|
|
|
if !newIsUp && result.errorMsg != "" {
|
|
c.logError(fmt.Sprintf("site-%d", result.siteID), result.errorMsg)
|
|
}
|
|
|
|
c.checkAndNotifyStatusChange(result.siteID, result.userID, result.siteName, newIsUp)
|
|
} else {
|
|
c.updateSiteResponseTime(result.siteID, result.responseTime)
|
|
}
|
|
|
|
if result.useProxy {
|
|
if result.proxyError {
|
|
proxyFailures++
|
|
proxySuccesses = 0
|
|
} else if result.isUp {
|
|
proxySuccesses++
|
|
proxyFailures = 0
|
|
}
|
|
|
|
if proxyFailures >= proxyThreshold {
|
|
c.proxyMu.Lock()
|
|
if c.proxyAlive {
|
|
log.Printf("Proxy appears to be down after %d consecutive failures", proxyFailures)
|
|
c.proxyAlive = false
|
|
}
|
|
c.proxyMu.Unlock()
|
|
proxyFailures = 0
|
|
}
|
|
|
|
if proxySuccesses >= proxyThreshold {
|
|
c.proxyMu.Lock()
|
|
if !c.proxyAlive {
|
|
log.Printf("Proxy recovered after %d consecutive successes", proxySuccesses)
|
|
c.proxyAlive = true
|
|
}
|
|
c.proxyMu.Unlock()
|
|
proxySuccesses = 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Checker) processCheckResult(result checkResult) (shouldUpdateDB, newIsUp bool) {
|
|
dbIsUp := result.wasUp
|
|
|
|
if result.isUp {
|
|
c.failureCounters.Store(result.siteID, 0)
|
|
|
|
if !dbIsUp {
|
|
c.debugLogf("Site %d is UP (check passed), DB says DOWN, updating to UP", result.siteID)
|
|
return true, true
|
|
}
|
|
return false, true
|
|
}
|
|
|
|
if !dbIsUp {
|
|
c.debugLogf("Site %d check failed, DB already says DOWN, just updating response time", result.siteID)
|
|
return false, false
|
|
}
|
|
|
|
counterInterface, _ := c.failureCounters.LoadOrStore(result.siteID, 0)
|
|
currentCount, ok := counterInterface.(int)
|
|
if !ok {
|
|
currentCount = 0
|
|
}
|
|
newCount := currentCount + 1
|
|
c.failureCounters.Store(result.siteID, newCount)
|
|
|
|
c.debugLogf("Site %d check failed (DB says UP), failure count: %d/%d", result.siteID, newCount, c.downThreshold)
|
|
|
|
if newCount >= c.downThreshold {
|
|
c.debugLogf("Site %d reached failure threshold (%d), marking as DOWN", result.siteID, c.downThreshold)
|
|
c.failureCounters.Store(result.siteID, 0)
|
|
return true, false
|
|
}
|
|
|
|
return false, true
|
|
}
|
|
|
|
func (c *Checker) checkAndNotifyStatusChange(siteID int, userID *int, siteName string, currentIsUp bool) {
|
|
stateInterface, exists := c.notifyStates.Load(siteID)
|
|
if !exists {
|
|
c.notifyStates.Store(siteID, &NotifyState{
|
|
LastNotifiedState: currentIsUp,
|
|
NotifiedAt: time.Now(),
|
|
})
|
|
return
|
|
}
|
|
|
|
state, ok := stateInterface.(*NotifyState)
|
|
if !ok {
|
|
log.Printf("Error: invalid state type for site %d", siteID)
|
|
return
|
|
}
|
|
|
|
statusChanged := state.LastNotifiedState != currentIsUp
|
|
|
|
if statusChanged {
|
|
now := time.Now()
|
|
timeSinceLastNotification := now.Sub(state.NotifiedAt)
|
|
|
|
if timeSinceLastNotification >= 30*time.Second {
|
|
if userID != nil && *userID != 0 {
|
|
go c.notifyOwner(*userID, siteName, currentIsUp)
|
|
}
|
|
|
|
state.LastNotifiedState = currentIsUp
|
|
state.NotifiedAt = now
|
|
}
|
|
}
|
|
|
|
c.notifyStates.Store(siteID, state)
|
|
}
|
|
|
|
func (c *Checker) notifyOwner(userID int, siteName string, isUp bool) {
|
|
user, err := c.getUserByID(userID)
|
|
if err != nil {
|
|
log.Printf("Error getting user for notification: %v", err)
|
|
return
|
|
}
|
|
|
|
if user.TelegramID == 0 {
|
|
return
|
|
}
|
|
|
|
botToken := os.Getenv("TELEGRAM_BOT_TOKEN")
|
|
if botToken == "" {
|
|
return
|
|
}
|
|
|
|
var message string
|
|
if isUp {
|
|
message = telegram.RenderMessage("site_online", map[string]interface{}{
|
|
"SiteName": siteName,
|
|
})
|
|
} else {
|
|
message = telegram.RenderMessage("site_offline", map[string]interface{}{
|
|
"SiteName": siteName,
|
|
"DownThreshold": c.downThreshold,
|
|
})
|
|
}
|
|
|
|
telegram.SendMessage(botToken, user.TelegramID, message)
|
|
}
|
|
|
|
func (c *Checker) getUserByID(userID int) (*models.User, error) {
|
|
var user models.User
|
|
var telegramID sql.NullInt64
|
|
err := c.db.QueryRow(`
|
|
SELECT id, telegram_id, telegram_username, first_name, last_name, is_admin
|
|
FROM users WHERE id = $1
|
|
`, userID).Scan(&user.ID, &telegramID, &user.TelegramUsername,
|
|
&user.FirstName, &user.LastName, &user.IsAdmin)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if telegramID.Valid {
|
|
user.TelegramID = telegramID.Int64
|
|
}
|
|
|
|
return &user, nil
|
|
}
|
|
|
|
func (c *Checker) updateSiteStatus(id int, isUp bool, responseTime float64) {
|
|
_, err := c.db.Exec("UPDATE sites SET is_up = $1, last_check = $2 WHERE id = $3", isUp, responseTime, id)
|
|
if err != nil {
|
|
log.Printf("Error updating site status: %v", err)
|
|
}
|
|
}
|
|
|
|
func (c *Checker) updateSiteResponseTime(id int, responseTime float64) {
|
|
_, err := c.db.Exec("UPDATE sites SET last_check = $1 WHERE id = $2", responseTime, id)
|
|
if err != nil {
|
|
log.Printf("Error updating site response time: %v", err)
|
|
}
|
|
}
|
|
|
|
func (c *Checker) logError(siteURL, errorMsg string) {
|
|
f, err := os.OpenFile("checker_error.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, logPerm)
|
|
if err != nil {
|
|
log.Printf("Error opening log file: %v", err)
|
|
return
|
|
}
|
|
defer func() {
|
|
if cerr := f.Close(); cerr != nil {
|
|
log.Printf("Error closing log file: %v", cerr)
|
|
}
|
|
}()
|
|
|
|
if _, werr := fmt.Fprintf(f, "%s failed to respond: %s\n", siteURL, errorMsg); werr != nil {
|
|
log.Printf("Error writing to log file: %v", werr)
|
|
}
|
|
}
|
|
|
|
type siteWithStatus struct {
|
|
models.Site
|
|
IsUp bool
|
|
}
|
|
|
|
func (c *Checker) getAllSitesWithStatus() ([]siteWithStatus, error) {
|
|
rows, err := c.db.Query("SELECT id, name, url, user_id, is_up FROM sites")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if cerr := rows.Close(); cerr != nil {
|
|
log.Printf("Error closing rows: %v", cerr)
|
|
}
|
|
}()
|
|
|
|
var sites []siteWithStatus
|
|
for rows.Next() {
|
|
var site siteWithStatus
|
|
if scanErr := rows.Scan(&site.ID, &site.Name, &site.URL, &site.UserID, &site.IsUp); scanErr != nil {
|
|
return nil, scanErr
|
|
}
|
|
sites = append(sites, site)
|
|
}
|
|
|
|
if rowsErr := rows.Err(); rowsErr != nil {
|
|
return nil, rowsErr
|
|
}
|
|
|
|
return sites, nil
|
|
}
|
|
|
|
func hasProtocol(u string) bool {
|
|
return len(u) > 8 && (strings.HasPrefix(u, "http://") || strings.HasPrefix(u, "https://"))
|
|
}
|
|
|
|
func isProxyError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
errStr := strings.ToLower(err.Error())
|
|
proxyErrors := []string{
|
|
"cannot connect to proxy",
|
|
"proxy refused connection",
|
|
"no route to host",
|
|
"proxy authentication required",
|
|
"bad gateway",
|
|
}
|
|
for _, proxyErr := range proxyErrors {
|
|
if strings.Contains(errStr, proxyErr) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|