In-Memory Predicate
Install the Redis Predicate
flowcore component add library/predicate-in-memory
Or install the In-Memory Predicate manually
First install the following dependencies:
bun add @flowcore/sdk-transformer-core
Then add the following code to your lib folder:
import type { TransformerSuccessHandler, WebhookPredicate } from "@flowcore/sdk-transformer-core"
const DEFAULT_TTL_MS = 60000 // 1 minute
let inMemoryCache: TTLMap
/*** Options for the in-memory predicate.* @property ttlMs - The time-to-live for cache entries in milliseconds. If not specified, the default TTL is used, which is 1 minute.*/export type PredicateInMemoryOptions = { ttlMs?: number}
/*** Creates a webhook predicate checker that checks if an event ID exists in the in-memory cache.** This function initializes the in-memory cache if it's not already initialized. It then returns a webhook predicate function that checks the existence of the event ID in the cache.** @returns A webhook predicate function that checks the existence of the event ID in the cache.*/export function createPredicateInMemoryChecker(): WebhookPredicate { if (!inMemoryCache) { inMemoryCache = new TTLMap() }
return async (eventId: string) => inMemoryCache.has(eventId)}
/*** Creates a webhook predicate notifier that sets an event ID in the in-memory cache.** This function initializes the in-memory cache if it's not already initialized. It then returns a webhook predicate function that sets the event ID in the cache with an optional TTL.** @param options - The options for the in-memory predicate.* @returns A webhook predicate function that sets the event ID in the cache.*/export function createPredicateInMemoryNotifier(options?: PredicateInMemoryOptions): TransformerSuccessHandler { if (!inMemoryCache) { inMemoryCache = new TTLMap() }
return async (payload: { eventId: string }) => { inMemoryCache.set(payload.eventId, "1", options?.ttlMs ?? DEFAULT_TTL_MS) }}
/*** Factory function for creating an in-memory predicate checker and notifier.** This function returns an object with two methods: check and notify. The check method is a webhook predicate function that checks the existence of an event ID in the cache. The notify method is a webhook predicate function that sets an event ID in the cache.** @param options - The options for the in-memory predicate.* @returns An object with check and notify methods.*/export function predicateinMemoryFactory(options?: PredicateInMemoryOptions) { return { check: createPredicateInMemoryChecker(), notify: createPredicateInMemoryNotifier(options), }}
/*** A map that stores key-value pairs with a time-to-live (TTL) for each entry.** This class provides a way to store data with a limited lifetime. Each entry in the map* can be set to expire after a specified amount of time. The map automatically removes* expired entries.** Inspired by: https://gist.github.com/maman/557aa2b3b55bc971cf415c0c8369c874*/export class TTLMap { private timeoutData: Record<string, Timer> private data: Map<string, string>
constructor() { this.timeoutData = {} this.data = new Map() }
private clearInstancedTimeout(key: string) { clearTimeout(this.timeoutData[key]) delete this.timeoutData[key] }
/** * Retrieves the value associated with a given key. * * This method returns the value stored in the map for the specified key. If the key * does not exist or has expired, it returns undefined. * * @param key - The key for which to retrieve the value. * @returns The value associated with the key, or undefined if the key does not exist or has expired. */ get(key: string) { return this.data.get(key) }
/** * Checks if a key exists in the map. * * This method returns true if the specified key exists in the map and has not expired, * otherwise it returns false. * * @param key - The key to check. * @returns True if the key exists and has not expired, otherwise false. */ has(key: string) { return this.data.has(key) }
/** * Sets a key-value pair in the map with an optional TTL. * * This method sets a new key-value pair in the map. If a TTL is specified, the entry * will expire after the specified amount of time. If the key already exists, its value * is updated and its TTL is reset. * * @param key - The key for the new entry. * @param val - The value for the new entry. * @param ttl - The time-to-live for the new entry in milliseconds. If not specified, the entry does not expire. * @returns This instance of TTLMap. */ set(key: string, val: string, ttl?: number) { this.clearInstancedTimeout(key) this.data.set(key, val)
const timeout = setTimeout(() => { this.clearInstancedTimeout(key) this.data.delete(key) this.delete(key) }, ttl)
this.timeoutData[key] = timeout return this }
/** * Deletes a key-value pair from the map. * * This method removes the specified key-value pair from the map. If the key exists * and has not expired, it returns true; otherwise, it returns false. * * @param key - The key to delete. * @returns True if the key existed and was deleted, otherwise false. */ delete(key: string) { this.clearInstancedTimeout(key) if (this.data.has(key)) { this.data.delete(key) return true } this.data.delete(key) return false }
/** * Clears all key-value pairs from the map. * * This method removes all entries from the map and stops all associated timers. */ clear() { this.data.clear()
for (const timeout of Object.values(this.timeoutData)) { clearTimeout(timeout) }
this.timeoutData = {} }}
Usage with @flowcore/sdk-transformer-core
import env from "@/env"import { predicateInMemoryFactory } from "@/lib/predicate-in-memory"import { WebhookBuilder } from "@flowcore/sdk-transformer-core"
export const webhookBuilder = new WebhookBuilder({ baseUrl: env.FLOWCORE_WEBHOOK_BASE_URL, tenant: env.FLOWCORE_TENANT, dataCore: env.FLOWCORE_DATA_CORE, apiKey: env.FLOWCORE_API_KEY,}) .withPredicate({ predicate: predicateInMemoryFactory().check, }) .withRetry() .factory()
import env from "@/env"import { predicateInMemoryFactory } from "@/lib/predicate-in-memory"import { transformerFactory } from "@/lib/transformer-factory"
export const transformerBuilder = transformerFactory({ onSuccess: predicateInMemoryFactory().notify, secret: env.TRANSFORMER_SECRET,})
Where the transformerFactory
, is just a wrapper around the TransformerBuilder
class, to make it easier to reuse. it is up to you, if you want to use it or not.
import { TransformerBuilder, type TransformerSuccessHandler } from "@flowcore/sdk-transformer-core"
export type TransformerOptions = { onSuccess: TransformerSuccessHandler secret: string}
export const transformerFactory = (options: TransformerOptions) => { return (flowType: string) => { return new TransformerBuilder(flowType) .withSuccessHandler(options.onSuccess) .withSecret(options.secret) }}