diff --git a/internal/mq/recurring_handler.go b/internal/mq/recurring_handler.go index c6258e7..856c8d7 100644 --- a/internal/mq/recurring_handler.go +++ b/internal/mq/recurring_handler.go @@ -49,7 +49,7 @@ func (h *RecurringTransactionHandler) Handle(ctx context.Context, task *DelayedT // 幂等性检查:防止同一笔交易重复执行 // 使用 Redis SETNX 锁,锁 24 小时(足以覆盖重试窗口) // TaskID 本身包含 recurringID 和执行时间戳,是天然的幂等 Key - lockKey := fmt.Sprintf("novault:lock:recurring:%s", task.ID) + lockKey := GenerateLockKey(task.ID) // 使用 TaskQueue 提供的公开方法获取锁 isNew, err := h.taskQueue.AcquireLock(ctx, lockKey, 24*time.Hour) if err != nil { diff --git a/internal/mq/task_queue.go b/internal/mq/task_queue.go index e5ceb20..53d5820 100644 --- a/internal/mq/task_queue.go +++ b/internal/mq/task_queue.go @@ -18,6 +18,13 @@ const ( TaskTypeRecurringTransaction TaskType = "recurring_transaction" // TaskTypeAllocationRule 分配规则执行任务 TaskTypeAllocationRule TaskType = "allocation_rule" + + // Redis Key Prefixes + KeyPrefixDefault = "novault:tasks" + KeySuffixDelayed = ":delayed" + KeySuffixReady = ":ready" + KeySuffixProcessing = ":processing" + KeyPrefixLock = "novault:lock:" ) // DelayedTask 延迟任务结构 @@ -50,17 +57,22 @@ type TaskQueue struct { // NewTaskQueue 创建任务队列实例 func NewTaskQueue(client *redis.Client, keyPrefix string) *TaskQueue { if keyPrefix == "" { - keyPrefix = "novault:tasks" + keyPrefix = KeyPrefixDefault } return &TaskQueue{ client: client, keyPrefix: keyPrefix, - delayedKey: keyPrefix + ":delayed", - readyKey: keyPrefix + ":ready", - processingKey: keyPrefix + ":processing", + delayedKey: keyPrefix + KeySuffixDelayed, + readyKey: keyPrefix + KeySuffixReady, + processingKey: keyPrefix + KeySuffixProcessing, } } +// GenerateLockKey 生成分布式锁 Key +func GenerateLockKey(taskID string) string { + return KeyPrefixLock + "recurring:" + taskID +} + // Schedule 调度延迟任务 func (q *TaskQueue) Schedule(ctx context.Context, task *DelayedTask) error { // 序列化任务