Files

96 lines
2.3 KiB
Go

package mq
import (
"context"
"log"
"time"
"accounting-app/internal/repository"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
// RecurringTaskSystem 周期性交易任务系统
type RecurringTaskSystem struct {
Queue *TaskQueue
Worker *TaskWorker
Handler *RecurringTransactionHandler
}
// InitRecurringTaskSystem 初始化周期性交易任务系统
// 需要 Redis 连接来存储任务队列
func InitRecurringTaskSystem(
ctx context.Context,
redisClient *redis.Client,
db *gorm.DB,
pollInterval time.Duration,
workerCount int,
) (*RecurringTaskSystem, error) {
log.Println("[MQ] Initializing recurring task system...")
// 创建任务队列
taskQueue := NewTaskQueue(redisClient, "novault:tasks")
// 创建任务 Worker
worker := NewTaskWorker(taskQueue, pollInterval, workerCount)
// 创建周期性交易处理器
recurringRepo := repository.NewRecurringTransactionRepository(db)
transactionRepo := repository.NewTransactionRepository(db)
accountRepo := repository.NewAccountRepository(db)
handler := NewRecurringTransactionHandler(
db,
recurringRepo,
transactionRepo,
accountRepo,
taskQueue,
)
// 注册处理器
worker.RegisterHandler(TaskTypeRecurringTransaction, handler.Handle)
system := &RecurringTaskSystem{
Queue: taskQueue,
Worker: worker,
Handler: handler,
}
// 启动时调度所有活跃的周期性交易
if err := handler.ScheduleAllActive(ctx); err != nil {
log.Printf("[MQ] Warning: failed to schedule active recurring transactions: %v", err)
}
// 处理逾期任务(补偿机制)
if err := handler.ProcessOverdue(ctx); err != nil {
log.Printf("[MQ] Warning: failed to process overdue transactions: %v", err)
}
log.Println("[MQ] Recurring task system initialized successfully")
return system, nil
}
// Start 启动任务系统
func (s *RecurringTaskSystem) Start(ctx context.Context) {
log.Println("[MQ] Starting recurring task worker...")
s.Worker.Start(ctx)
}
// Stop 停止任务系统
func (s *RecurringTaskSystem) Stop() {
log.Println("[MQ] Stopping recurring task worker...")
s.Worker.Stop()
}
// GetStats 获取系统统计信息
func (s *RecurringTaskSystem) GetStats(ctx context.Context) map[string]interface{} {
queueStats, _ := s.Queue.GetQueueStats(ctx)
workerStats := s.Worker.GetStats()
return map[string]interface{}{
"queue": queueStats,
"worker": workerStats,
}
}