Skip to content

In-Memory Predicate

Install the Redis Predicate

Terminal window
flowcore component add library/predicate-in-memory
Or install the In-Memory Predicate manually

First install the following dependencies:

Terminal window
bun add @flowcore/sdk-transformer-core

Then add the following code to your lib folder:

import type { TransformerSuccessHandler, WebhookPredicate } from "@flowcore/sdk-transformer-core"
const DEFAULT_TTL_MS = 60000 // 1 minute
let inMemoryCache: TTLMap
/**
* Options for the in-memory predicate.
* @property ttlMs - The time-to-live for cache entries in milliseconds. If not specified, the default TTL is used, which is 1 minute.
*/
export type PredicateInMemoryOptions = {
ttlMs?: number
}
/**
* Creates a webhook predicate checker that checks if an event ID exists in the in-memory cache.
*
* This function initializes the in-memory cache if it's not already initialized. It then returns a webhook predicate function that checks the existence of the event ID in the cache.
*
* @returns A webhook predicate function that checks the existence of the event ID in the cache.
*/
export function createPredicateInMemoryChecker(): WebhookPredicate {
if (!inMemoryCache) {
inMemoryCache = new TTLMap()
}
return async (eventId: string) => inMemoryCache.has(eventId)
}
/**
* Creates a webhook predicate notifier that sets an event ID in the in-memory cache.
*
* This function initializes the in-memory cache if it's not already initialized. It then returns a webhook predicate function that sets the event ID in the cache with an optional TTL.
*
* @param options - The options for the in-memory predicate.
* @returns A webhook predicate function that sets the event ID in the cache.
*/
export function createPredicateInMemoryNotifier(options?: PredicateInMemoryOptions): TransformerSuccessHandler {
if (!inMemoryCache) {
inMemoryCache = new TTLMap()
}
return async (payload: { eventId: string }) => {
inMemoryCache.set(payload.eventId, "1", options?.ttlMs ?? DEFAULT_TTL_MS)
}
}
/**
* Factory function for creating an in-memory predicate checker and notifier.
*
* This function returns an object with two methods: check and notify. The check method is a webhook predicate function that checks the existence of an event ID in the cache. The notify method is a webhook predicate function that sets an event ID in the cache.
*
* @param options - The options for the in-memory predicate.
* @returns An object with check and notify methods.
*/
export function predicateinMemoryFactory(options?: PredicateInMemoryOptions) {
return {
check: createPredicateInMemoryChecker(),
notify: createPredicateInMemoryNotifier(options),
}
}
/**
* A map that stores key-value pairs with a time-to-live (TTL) for each entry.
*
* This class provides a way to store data with a limited lifetime. Each entry in the map
* can be set to expire after a specified amount of time. The map automatically removes
* expired entries.
*
* Inspired by: https://gist.github.com/maman/557aa2b3b55bc971cf415c0c8369c874
*/
export class TTLMap {
private timeoutData: Record<string, Timer>
private data: Map<string, string>
constructor() {
this.timeoutData = {}
this.data = new Map()
}
private clearInstancedTimeout(key: string) {
clearTimeout(this.timeoutData[key])
delete this.timeoutData[key]
}
/**
* Retrieves the value associated with a given key.
*
* This method returns the value stored in the map for the specified key. If the key
* does not exist or has expired, it returns undefined.
*
* @param key - The key for which to retrieve the value.
* @returns The value associated with the key, or undefined if the key does not exist or has expired.
*/
get(key: string) {
return this.data.get(key)
}
/**
* Checks if a key exists in the map.
*
* This method returns true if the specified key exists in the map and has not expired,
* otherwise it returns false.
*
* @param key - The key to check.
* @returns True if the key exists and has not expired, otherwise false.
*/
has(key: string) {
return this.data.has(key)
}
/**
* Sets a key-value pair in the map with an optional TTL.
*
* This method sets a new key-value pair in the map. If a TTL is specified, the entry
* will expire after the specified amount of time. If the key already exists, its value
* is updated and its TTL is reset.
*
* @param key - The key for the new entry.
* @param val - The value for the new entry.
* @param ttl - The time-to-live for the new entry in milliseconds. If not specified, the entry does not expire.
* @returns This instance of TTLMap.
*/
set(key: string, val: string, ttl?: number) {
this.clearInstancedTimeout(key)
this.data.set(key, val)
const timeout = setTimeout(() => {
this.clearInstancedTimeout(key)
this.data.delete(key)
this.delete(key)
}, ttl)
this.timeoutData[key] = timeout
return this
}
/**
* Deletes a key-value pair from the map.
*
* This method removes the specified key-value pair from the map. If the key exists
* and has not expired, it returns true; otherwise, it returns false.
*
* @param key - The key to delete.
* @returns True if the key existed and was deleted, otherwise false.
*/
delete(key: string) {
this.clearInstancedTimeout(key)
if (this.data.has(key)) {
this.data.delete(key)
return true
}
this.data.delete(key)
return false
}
/**
* Clears all key-value pairs from the map.
*
* This method removes all entries from the map and stops all associated timers.
*/
clear() {
this.data.clear()
for (const timeout of Object.values(this.timeoutData)) {
clearTimeout(timeout)
}
this.timeoutData = {}
}
}

Usage with @flowcore/sdk-transformer-core

webhook-builder.ts
import env from "@/env"
import { predicateInMemoryFactory } from "@/lib/predicate-in-memory"
import { WebhookBuilder } from "@flowcore/sdk-transformer-core"
export const webhookBuilder = new WebhookBuilder({
baseUrl: env.FLOWCORE_WEBHOOK_BASE_URL,
tenant: env.FLOWCORE_TENANT,
dataCore: env.FLOWCORE_DATA_CORE,
apiKey: env.FLOWCORE_API_KEY,
})
.withPredicate({
predicate: predicateInMemoryFactory().check,
})
.withRetry()
.factory()
transformer-builder.ts
import env from "@/env"
import { predicateInMemoryFactory } from "@/lib/predicate-in-memory"
import { transformerFactory } from "@/lib/transformer-factory"
export const transformerBuilder = transformerFactory({
onSuccess: predicateInMemoryFactory().notify,
secret: env.TRANSFORMER_SECRET,
})

Where the transformerFactory, is just a wrapper around the TransformerBuilder class, to make it easier to reuse. it is up to you, if you want to use it or not.

transformer-factory.ts
import { TransformerBuilder, type TransformerSuccessHandler } from "@flowcore/sdk-transformer-core"
export type TransformerOptions = {
onSuccess: TransformerSuccessHandler
secret: string
}
export const transformerFactory = (options: TransformerOptions) => {
return (flowType: string) => {
return new TransformerBuilder(flowType)
.withSuccessHandler(options.onSuccess)
.withSecret(options.secret)
}
}