Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions packages/redis-worker/src/fair-queue/concurrency.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,30 @@ export class ConcurrencyManager {
await pipeline.exec();
}

/**
* Release concurrency slots for multiple messages in a single pipeline.
* More efficient than calling release() multiple times.
*/
async releaseBatch(
messages: Array<{ queue: QueueDescriptor; messageId: string }>
): Promise<void> {
if (messages.length === 0) {
return;
}

const pipeline = this.redis.pipeline();

for (const { queue, messageId } of messages) {
for (const group of this.groups) {
const groupId = group.extractGroupId(queue);
const key = this.keys.concurrencyKey(group.name, groupId);
pipeline.srem(key, messageId);
}
}

await pipeline.exec();
}

/**
* Get current concurrency for a specific group.
*/
Expand Down
27 changes: 25 additions & 2 deletions packages/redis-worker/src/fair-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1343,13 +1343,36 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
let totalReclaimed = 0;

for (let shardId = 0; shardId < this.shardCount; shardId++) {
const reclaimed = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
queueKey: this.keys.queueKey(queueId),
queueItemsKey: this.keys.queueItemsKey(queueId),
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
}));

totalReclaimed += reclaimed;
// Release concurrency for all reclaimed messages in a single batch
// This is critical: when a message times out, its concurrency slot must be freed
// so the message can be processed again when it's re-claimed from the queue
if (this.concurrencyManager && reclaimedMessages.length > 0) {
try {
await this.concurrencyManager.releaseBatch(
reclaimedMessages.map((msg) => ({
queue: {
id: msg.queueId,
tenantId: msg.tenantId,
metadata: msg.metadata ?? {},
},
messageId: msg.messageId,
}))
);
} catch (error) {
this.logger.error("Failed to release concurrency for reclaimed messages", {
count: reclaimedMessages.length,
error: error instanceof Error ? error.message : String(error),
});
}
}

totalReclaimed += reclaimedMessages.length;
}

if (totalReclaimed > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,12 @@ describe("Race Condition Tests", () => {
await new Promise((resolve) => setTimeout(resolve, 300));

// Try to reclaim (should find nothing because heartbeat extended the deadline)
const reclaimed = await manager.reclaimTimedOut(0, (queueId) => ({
const reclaimedMessages = await manager.reclaimTimedOut(0, (queueId) => ({
queueKey: keys.queueKey(queueId),
queueItemsKey: keys.queueItemsKey(queueId),
masterQueueKey: keys.masterQueueKey(0),
}));
reclaimResults.push(reclaimed);
reclaimResults.push(reclaimedMessages.length);
}

// Heartbeats should have kept the message alive
Expand Down
251 changes: 250 additions & 1 deletion packages/redis-worker/src/fair-queue/tests/visibility.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { describe, expect } from "vitest";
import { redisTest } from "@internal/testcontainers";
import { createRedisClient } from "@internal/redis";
import { VisibilityManager, DefaultFairQueueKeyProducer } from "../index.js";
import type { FairQueueKeyProducer } from "../types.js";
import type { FairQueueKeyProducer, ReclaimedMessageInfo } from "../types.js";

describe("VisibilityManager", () => {
let keys: FairQueueKeyProducer;
Expand Down Expand Up @@ -597,5 +597,254 @@ describe("VisibilityManager", () => {
}
);
});

describe("reclaimTimedOut", () => {
redisTest(
"should return reclaimed message info with tenantId for concurrency release",
{ timeout: 10000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const manager = new VisibilityManager({
redis: redisOptions,
keys,
shardCount: 1,
defaultTimeoutMs: 100, // Very short timeout
});

const redis = createRedisClient(redisOptions);
const queueId = "tenant:t1:queue:reclaim-test";
const queueKey = keys.queueKey(queueId);
const queueItemsKey = keys.queueItemsKey(queueId);
const masterQueueKey = keys.masterQueueKey(0);

// Add and claim a message
const messageId = "reclaim-msg";
const storedMessage = {
id: messageId,
queueId,
tenantId: "t1",
payload: { id: 1, value: "test" },
timestamp: Date.now() - 1000,
attempt: 1,
metadata: { orgId: "org-123" },
};

await redis.zadd(queueKey, storedMessage.timestamp, messageId);
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));

// Claim with very short timeout
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
expect(claimResult.claimed).toBe(true);

// Wait for timeout to expire
await new Promise((resolve) => setTimeout(resolve, 150));

// Reclaim should return the message info
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
queueKey: keys.queueKey(qId),
queueItemsKey: keys.queueItemsKey(qId),
masterQueueKey,
}));

expect(reclaimedMessages).toHaveLength(1);
expect(reclaimedMessages[0]).toEqual({
messageId,
queueId,
tenantId: "t1",
metadata: { orgId: "org-123" },
});

// Verify message is back in queue
const queueCount = await redis.zcard(queueKey);
expect(queueCount).toBe(1);

// Verify message is back in queue with its original timestamp (not the deadline)
const queueMessages = await redis.zrange(queueKey, 0, -1, "WITHSCORES");
expect(queueMessages[0]).toBe(messageId);
expect(parseInt(queueMessages[1]!)).toBe(storedMessage.timestamp);

// Verify message is no longer in-flight
const inflightCount = await manager.getTotalInflightCount();
expect(inflightCount).toBe(0);

await manager.close();
await redis.quit();
}
);

redisTest(
"should return empty array when no messages have timed out",
{ timeout: 10000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const manager = new VisibilityManager({
redis: redisOptions,
keys,
shardCount: 1,
defaultTimeoutMs: 60000, // Long timeout
});

const redis = createRedisClient(redisOptions);
const queueId = "tenant:t1:queue:no-timeout";
const queueKey = keys.queueKey(queueId);
const queueItemsKey = keys.queueItemsKey(queueId);
const masterQueueKey = keys.masterQueueKey(0);

// Add and claim a message with long timeout
const messageId = "long-timeout-msg";
const storedMessage = {
id: messageId,
queueId,
tenantId: "t1",
payload: { id: 1 },
timestamp: Date.now() - 1000,
attempt: 1,
};

await redis.zadd(queueKey, storedMessage.timestamp, messageId);
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));

await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1");

// Reclaim should return empty array (message hasn't timed out)
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
queueKey: keys.queueKey(qId),
queueItemsKey: keys.queueItemsKey(qId),
masterQueueKey,
}));

expect(reclaimedMessages).toHaveLength(0);

await manager.close();
await redis.quit();
}
);

redisTest(
"should reclaim multiple timed-out messages and return all their info",
{ timeout: 10000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const manager = new VisibilityManager({
redis: redisOptions,
keys,
shardCount: 1,
defaultTimeoutMs: 100,
});

const redis = createRedisClient(redisOptions);
const masterQueueKey = keys.masterQueueKey(0);

// Add and claim messages for two different tenants
for (const tenant of ["t1", "t2"]) {
const queueId = `tenant:${tenant}:queue:multi-reclaim`;
const queueKey = keys.queueKey(queueId);
const queueItemsKey = keys.queueItemsKey(queueId);

const messageId = `msg-${tenant}`;
const storedMessage = {
id: messageId,
queueId,
tenantId: tenant,
payload: { id: 1 },
timestamp: Date.now() - 1000,
attempt: 1,
};

await redis.zadd(queueKey, storedMessage.timestamp, messageId);
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));

await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
}

// Wait for timeout
await new Promise((resolve) => setTimeout(resolve, 150));

// Reclaim should return both messages
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
queueKey: keys.queueKey(qId),
queueItemsKey: keys.queueItemsKey(qId),
masterQueueKey,
}));

expect(reclaimedMessages).toHaveLength(2);

// Verify both tenants are represented
const tenantIds = reclaimedMessages.map((m: ReclaimedMessageInfo) => m.tenantId).sort();
expect(tenantIds).toEqual(["t1", "t2"]);

await manager.close();
await redis.quit();
}
);

redisTest(
"should use fallback tenantId extraction when message data is missing or corrupted",
{ timeout: 10000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const manager = new VisibilityManager({
redis: redisOptions,
keys,
shardCount: 1,
defaultTimeoutMs: 100,
});

const redis = createRedisClient(redisOptions);
const queueId = "tenant:t1:queue:fallback-test";
const queueKey = keys.queueKey(queueId);
const queueItemsKey = keys.queueItemsKey(queueId);
const masterQueueKey = keys.masterQueueKey(0);
const inflightDataKey = keys.inflightDataKey(0);

// Add and claim a message
const messageId = "fallback-msg";
const storedMessage = {
id: messageId,
queueId,
tenantId: "t1",
payload: { id: 1 },
timestamp: Date.now() - 1000,
attempt: 1,
metadata: { orgId: "org-123" },
};

await redis.zadd(queueKey, storedMessage.timestamp, messageId);
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));

// Claim the message
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
expect(claimResult.claimed).toBe(true);

// Corrupt the in-flight data by setting invalid JSON
await redis.hset(inflightDataKey, messageId, "not-valid-json{{{");

// Wait for timeout
await new Promise((resolve) => setTimeout(resolve, 150));

// Reclaim should still work using fallback extraction
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
queueKey: keys.queueKey(qId),
queueItemsKey: keys.queueItemsKey(qId),
masterQueueKey,
}));

expect(reclaimedMessages).toHaveLength(1);
expect(reclaimedMessages[0]).toEqual({
messageId,
queueId,
tenantId: "t1", // Extracted from queueId via fallback
metadata: {}, // Empty metadata since we couldn't parse the stored message
});

await manager.close();
await redis.quit();
}
);
});
});

19 changes: 19 additions & 0 deletions packages/redis-worker/src/fair-queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,25 @@ export interface ConcurrencyCheckResult {
blockedBy?: ConcurrencyState;
}

// ============================================================================
// Visibility Types
// ============================================================================

/**
* Information about a reclaimed message from visibility timeout.
* Used to release concurrency after a message is returned to the queue.
*/
export interface ReclaimedMessageInfo {
/** Message ID */
messageId: string;
/** Queue ID */
queueId: string;
/** Tenant ID for concurrency release */
tenantId: string;
/** Additional metadata for concurrency group extraction */
metadata?: Record<string, unknown>;
}

// ============================================================================
// Scheduler Types
// ============================================================================
Expand Down
Loading