Redis Predicate
Install the Redis Predicate
flowcore component add library/predicate-redis
Or install the Redis Predicate manually
First install the following dependencies:
bun add @flowcore/sdk-transformer-core ioredis
Then add the following code to your lib folder:
import type { TransformerSuccessHandler, WebhookPredicate } from "@flowcore/sdk-transformer-core"import { redisFactory } from "@components/redis-factory"import Redis from "ioredis"
/*** Options for the Redis predicate.* @property redisUrl - The URL of the Redis server.* @property redisEventIdKey - The key pattern for the Redis key.* @example "redisEventIdKey: scenario-api:event-cache"*/export type RedisPredicateOptions = { redisUrl: string redisEventIdKey: string}
/*** Creates a webhook predicate checker that checks if a Redis key exists for a given event ID.** @param options - The options for the Redis connection and the key pattern.* @returns A webhook predicate function that checks the existence of the Redis key and returns true if it exists.*/export function createPredicateRedisChecker(redis: Redis, options: RedisPredicateOptions): WebhookPredicate { return async (eventId: string) => { const loaded = await redis?.get(`${options.redisEventIdKey}:${eventId}`) return !!loaded }}
/*** Creates a webhook predicate notifier that sets a Redis key for a given event ID.* The key is set to expire in 60 seconds.** @param options - The options for the Redis connection and the key pattern.* @returns A webhook predicate function that sets the Redis key and returns true.*/export function createPredicateRedisNotifier(redis: Redis, options: RedisPredicateOptions): TransformerSuccessHandler { return async (payload: { eventId: string }) => { await redis.set(`${options.redisEventIdKey}:${payload.eventId}`, "1", "EX", "60") return true }}
/*** Factory function for creating a Redis predicate checker and notifier.** @param options - The options for the Redis connection and the key pattern.* @returns An object with a check function that checks the existence of the Redis key and a notify function that sets the Redis key.*/export function predicateRedisFactory(options: RedisPredicateOptions) { const redis = redisFactory(options.redisUrl)
return { check: createPredicateRedisChecker(redis, options), notify: createPredicateRedisNotifier(redis, options), }}
Add the following to your docker-compose.yml
file:
services: redis: image: redis:7.2-alpine ports: - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 5s retries: 10
Usage
// Setup the predicate checkconst predicateCheck = redisPredicateFactory({ redisUrl: "http://localhost:6379", redisEventIdKey: "my-api:event-cache",})
Usage with @flowcore/sdk-transformer-core
import env from "@/env"import { redisPredicateFactory } from "@/lib/redis-predicate-factory"import { transformerFactory } from "@/lib/transformer-factory"import { WebhookBuilder } from "@flowcore/sdk-transformer-core"
// Setup the predicate check, utilising Redisconst predicateCheck = redisPredicateFactory({ redisUrl: env.REDIS_URL, redisEventIdKey: env.REDIS_KEY_PATTERN,})
// Setup the webhook client, utilising the predicate checkexport const webhookClient = new WebhookBuilder({ baseUrl: env.FLOWCORE_WEBHOOK_BASE_URL, tenant: env.FLOWCORE_TENANT, dataCore: env.FLOWCORE_DATA_CORE, apiKey: env.FLOWCORE_API_KEY,}) .withPredicate({ predicate: predicateCheck.check, }) .withRetry() .factory()
// Setup the transformer client, utilising the predicate checkexport const transformerClient = transformerFactory({ onSuccess: predicateCheck.notify, secret: env.TRANSFORMER_SECRET,})
Where the transformerFactory
, is just a wrapper around the TransformerBuilder
class, to make it easier to reuse. it is up to you, if you want to use it or not.
import { TransformerBuilder, type TransformerSuccessHandler } from "@flowcore/sdk-transformer-core"
export type TransformerOptions = { onSuccess: TransformerSuccessHandler secret: string}
export const transformerFactory = (options: TransformerOptions) => { return (flowType: string) => { return new TransformerBuilder(flowType) .withSuccessHandler(options.onSuccess) .withSecret(options.secret) }}
Running tests
Then we recommend you add this to your test docker-compose.yml
file:
services: test-redis: image: redis:7.2-alpine ports: - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 5s retries: 10