96 lines
2.3 KiB
Go
96 lines
2.3 KiB
Go
package mq
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"time"
|
|
|
|
"accounting-app/internal/repository"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// RecurringTaskSystem 周期性交易任务系统
|
|
type RecurringTaskSystem struct {
|
|
Queue *TaskQueue
|
|
Worker *TaskWorker
|
|
Handler *RecurringTransactionHandler
|
|
}
|
|
|
|
// InitRecurringTaskSystem 初始化周期性交易任务系统
|
|
// 需要 Redis 连接来存储任务队列
|
|
func InitRecurringTaskSystem(
|
|
ctx context.Context,
|
|
redisClient *redis.Client,
|
|
db *gorm.DB,
|
|
pollInterval time.Duration,
|
|
workerCount int,
|
|
) (*RecurringTaskSystem, error) {
|
|
log.Println("[MQ] Initializing recurring task system...")
|
|
|
|
// 创建任务队列
|
|
taskQueue := NewTaskQueue(redisClient, "novault:tasks")
|
|
|
|
// 创建任务 Worker
|
|
worker := NewTaskWorker(taskQueue, pollInterval, workerCount)
|
|
|
|
// 创建周期性交易处理器
|
|
recurringRepo := repository.NewRecurringTransactionRepository(db)
|
|
transactionRepo := repository.NewTransactionRepository(db)
|
|
accountRepo := repository.NewAccountRepository(db)
|
|
|
|
handler := NewRecurringTransactionHandler(
|
|
db,
|
|
recurringRepo,
|
|
transactionRepo,
|
|
accountRepo,
|
|
taskQueue,
|
|
)
|
|
|
|
// 注册处理器
|
|
worker.RegisterHandler(TaskTypeRecurringTransaction, handler.Handle)
|
|
|
|
system := &RecurringTaskSystem{
|
|
Queue: taskQueue,
|
|
Worker: worker,
|
|
Handler: handler,
|
|
}
|
|
|
|
// 启动时调度所有活跃的周期性交易
|
|
if err := handler.ScheduleAllActive(ctx); err != nil {
|
|
log.Printf("[MQ] Warning: failed to schedule active recurring transactions: %v", err)
|
|
}
|
|
|
|
// 处理逾期任务(补偿机制)
|
|
if err := handler.ProcessOverdue(ctx); err != nil {
|
|
log.Printf("[MQ] Warning: failed to process overdue transactions: %v", err)
|
|
}
|
|
|
|
log.Println("[MQ] Recurring task system initialized successfully")
|
|
return system, nil
|
|
}
|
|
|
|
// Start 启动任务系统
|
|
func (s *RecurringTaskSystem) Start(ctx context.Context) {
|
|
log.Println("[MQ] Starting recurring task worker...")
|
|
s.Worker.Start(ctx)
|
|
}
|
|
|
|
// Stop 停止任务系统
|
|
func (s *RecurringTaskSystem) Stop() {
|
|
log.Println("[MQ] Stopping recurring task worker...")
|
|
s.Worker.Stop()
|
|
}
|
|
|
|
// GetStats 获取系统统计信息
|
|
func (s *RecurringTaskSystem) GetStats(ctx context.Context) map[string]interface{} {
|
|
queueStats, _ := s.Queue.GetQueueStats(ctx)
|
|
workerStats := s.Worker.GetStats()
|
|
|
|
return map[string]interface{}{
|
|
"queue": queueStats,
|
|
"worker": workerStats,
|
|
}
|
|
}
|