From 5a0ae912d5c4d9d7bf7ec7ac246bf6a4bc8ce367 Mon Sep 17 00:00:00 2001 From: admin <1297598740@qq.com> Date: Wed, 28 Jan 2026 16:40:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E5=91=A8=E6=9C=9F?= =?UTF-8?q?=E6=80=A7=E4=BA=A4=E6=98=93=E5=A4=84=E7=90=86=E5=99=A8=E5=8F=8A?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E9=98=9F=E5=88=97=EF=BC=8C=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=91=A8=E6=9C=9F=E6=80=A7=E4=BA=A4=E6=98=93=E7=9A=84=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=8C=96=E5=A4=84=E7=90=86=E3=80=81=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E5=AE=89=E5=85=A8=E4=B8=8E=E5=B9=82=E7=AD=89=E6=80=A7=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/mq/recurring_handler.go | 2 +- internal/mq/task_queue.go | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/internal/mq/recurring_handler.go b/internal/mq/recurring_handler.go index c6258e7..856c8d7 100644 --- a/internal/mq/recurring_handler.go +++ b/internal/mq/recurring_handler.go @@ -49,7 +49,7 @@ func (h *RecurringTransactionHandler) Handle(ctx context.Context, task *DelayedT // 幂等性检查:防止同一笔交易重复执行 // 使用 Redis SETNX 锁,锁 24 小时(足以覆盖重试窗口) // TaskID 本身包含 recurringID 和执行时间戳,是天然的幂等 Key - lockKey := fmt.Sprintf("novault:lock:recurring:%s", task.ID) + lockKey := GenerateLockKey(task.ID) // 使用 TaskQueue 提供的公开方法获取锁 isNew, err := h.taskQueue.AcquireLock(ctx, lockKey, 24*time.Hour) if err != nil { diff --git a/internal/mq/task_queue.go b/internal/mq/task_queue.go index e5ceb20..53d5820 100644 --- a/internal/mq/task_queue.go +++ b/internal/mq/task_queue.go @@ -18,6 +18,13 @@ const ( TaskTypeRecurringTransaction TaskType = "recurring_transaction" // TaskTypeAllocationRule 分配规则执行任务 TaskTypeAllocationRule TaskType = "allocation_rule" + + // Redis Key Prefixes + KeyPrefixDefault = "novault:tasks" + KeySuffixDelayed = ":delayed" + KeySuffixReady = ":ready" + KeySuffixProcessing = ":processing" + KeyPrefixLock = "novault:lock:" ) // DelayedTask 延迟任务结构 @@ -50,17 +57,22 @@ type TaskQueue struct { // NewTaskQueue 创建任务队列实例 func NewTaskQueue(client *redis.Client, keyPrefix string) *TaskQueue { if keyPrefix == "" { - keyPrefix = "novault:tasks" + keyPrefix = KeyPrefixDefault } return &TaskQueue{ client: client, keyPrefix: keyPrefix, - delayedKey: keyPrefix + ":delayed", - readyKey: keyPrefix + ":ready", - processingKey: keyPrefix + ":processing", + delayedKey: keyPrefix + KeySuffixDelayed, + readyKey: keyPrefix + KeySuffixReady, + processingKey: keyPrefix + KeySuffixProcessing, } } +// GenerateLockKey 生成分布式锁 Key +func GenerateLockKey(taskID string) string { + return KeyPrefixLock + "recurring:" + taskID +} + // Schedule 调度延迟任务 func (q *TaskQueue) Schedule(ctx context.Context, task *DelayedTask) error { // 序列化任务