Skip to content

Redis Predicate

Install the Redis Predicate

Terminal window
flowcore component add library/predicate-redis
Or install the Redis Predicate manually

First install the following dependencies:

Terminal window
bun add @flowcore/sdk-transformer-core ioredis

Then add the following code to your lib folder:

import type { TransformerSuccessHandler, WebhookPredicate } from "@flowcore/sdk-transformer-core"
import Redis from "ioredis"
/**
* Options for the Redis predicate.
* @property redisUrl - The URL of the Redis server.
* @property redisEventIdKey - The key pattern for the Redis key.
* @example "redisEventIdKey: scenario-api:event-cache"
*/
export type RedisPredicateOptions = {
redisUrl: string
redisEventIdKey: string
}
/**
* Creates a webhook predicate checker that checks if a Redis key exists for a given event ID.
*
* @param options - The options for the Redis connection and the key pattern.
* @returns A webhook predicate function that checks the existence of the Redis key and returns true if it exists.
*/
export function createPredicateRedisChecker(redis: Redis, options: RedisPredicateOptions): WebhookPredicate {
return async (eventId: string) => {
const loaded = await redis?.get(`${options.redisEventIdKey}:${eventId}`)
return !!loaded
}
}
/**
* Creates a webhook predicate notifier that sets a Redis key for a given event ID.
* The key is set to expire in 60 seconds.
*
* @param options - The options for the Redis connection and the key pattern.
* @returns A webhook predicate function that sets the Redis key and returns true.
*/
export function createPredicateRedisNotifier(redis: Redis, options: RedisPredicateOptions): TransformerSuccessHandler {
return async (payload: { eventId: string }) => {
await redis.set(`${options.redisEventIdKey}:${payload.eventId}`, "1", "EX", "60")
return true
}
}
/**
* Factory function for creating a Redis predicate checker and notifier.
*
* @param options - The options for the Redis connection and the key pattern.
* @returns An object with a check function that checks the existence of the Redis key and a notify function that sets the Redis key.
*/
export function predicateRedisFactory(options: RedisPredicateOptions) {
const redis = new Redis(options.redisUrl)
return {
check: createPredicateRedisChecker(redis, options),
notify: createPredicateRedisNotifier(redis, options),
}
}

Add the following to your docker-compose.yml file:

docker-compose.yml
services:
redis:
image: redis:7.2-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 10

Usage

// Setup the predicate check
const predicateCheck = redisPredicateFactory({
redisUrl: "http://localhost:6379",
redisEventIdKey: "my-api:event-cache",
})

Usage with @flowcore/sdk-transformer-core

flowcore.ts
import env from "@/env"
import { redisPredicateFactory } from "@/lib/redis-predicate-factory"
import { transformerFactory } from "@/lib/transformer-factory"
import { WebhookBuilder } from "@flowcore/sdk-transformer-core"
// Setup the predicate check, utilising Redis
const predicateCheck = redisPredicateFactory({
redisUrl: env.REDIS_URL,
redisEventIdKey: env.REDIS_KEY_PATTERN,
})
// Setup the webhook client, utilising the predicate check
export const webhookClient = new WebhookBuilder({
baseUrl: env.FLOWCORE_WEBHOOK_BASE_URL,
tenant: env.FLOWCORE_TENANT,
dataCore: env.FLOWCORE_DATA_CORE,
apiKey: env.FLOWCORE_API_KEY,
})
.withPredicate({
predicate: predicateCheck.check,
})
.withRetry()
.factory()
// Setup the transformer client, utilising the predicate check
export const transformerClient = transformerFactory({
onSuccess: predicateCheck.notify,
secret: env.TRANSFORMER_SECRET,
})

Where the transformerFactory, is just a wrapper around the TransformerBuilder class, to make it easier to reuse. it is up to you, if you want to use it or not.

transformer-factory.ts
import { TransformerBuilder, type TransformerSuccessHandler } from "@flowcore/sdk-transformer-core"
export type TransformerOptions = {
onSuccess: TransformerSuccessHandler
secret: string
}
export const transformerFactory = (options: TransformerOptions) => {
return (flowType: string) => {
return new TransformerBuilder(flowType)
.withSuccessHandler(options.onSuccess)
.withSecret(options.secret)
}
}

Running tests

Then we recommend you add this to your test docker-compose.yml file:

docker-compose.test.yml
services:
test-redis:
image: redis:7.2-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 10