package mq import ( "context" "log" "time" "accounting-app/internal/repository" "github.com/redis/go-redis/v9" "gorm.io/gorm" ) // RecurringTaskSystem 周期性交易任务系统 type RecurringTaskSystem struct { Queue *TaskQueue Worker *TaskWorker Handler *RecurringTransactionHandler } // InitRecurringTaskSystem 初始化周期性交易任务系统 // 需要 Redis 连接来存储任务队列 func InitRecurringTaskSystem( ctx context.Context, redisClient *redis.Client, db *gorm.DB, pollInterval time.Duration, workerCount int, ) (*RecurringTaskSystem, error) { log.Println("[MQ] Initializing recurring task system...") // 创建任务队列 taskQueue := NewTaskQueue(redisClient, "novault:tasks") // 创建任务 Worker worker := NewTaskWorker(taskQueue, pollInterval, workerCount) // 创建周期性交易处理器 recurringRepo := repository.NewRecurringTransactionRepository(db) transactionRepo := repository.NewTransactionRepository(db) accountRepo := repository.NewAccountRepository(db) handler := NewRecurringTransactionHandler( db, recurringRepo, transactionRepo, accountRepo, taskQueue, ) // 注册处理器 worker.RegisterHandler(TaskTypeRecurringTransaction, handler.Handle) system := &RecurringTaskSystem{ Queue: taskQueue, Worker: worker, Handler: handler, } // 启动时调度所有活跃的周期性交易 if err := handler.ScheduleAllActive(ctx); err != nil { log.Printf("[MQ] Warning: failed to schedule active recurring transactions: %v", err) } // 处理逾期任务(补偿机制) if err := handler.ProcessOverdue(ctx); err != nil { log.Printf("[MQ] Warning: failed to process overdue transactions: %v", err) } log.Println("[MQ] Recurring task system initialized successfully") return system, nil } // Start 启动任务系统 func (s *RecurringTaskSystem) Start(ctx context.Context) { log.Println("[MQ] Starting recurring task worker...") s.Worker.Start(ctx) } // Stop 停止任务系统 func (s *RecurringTaskSystem) Stop() { log.Println("[MQ] Stopping recurring task worker...") s.Worker.Stop() } // GetStats 获取系统统计信息 func (s *RecurringTaskSystem) GetStats(ctx context.Context) map[string]interface{} { queueStats, _ := s.Queue.GetQueueStats(ctx) workerStats := s.Worker.GetStats() return map[string]interface{}{ "queue": queueStats, "worker": workerStats, } }