Flowcore Pathways
Flowcore Pathways is a TypeScript library for building event-driven applications on the Flowcore platform. It provides type-safe event publishing, schema validation with Zod, webhook routing, and handler registration — all through a single builder API.
Architecture
Events flow through the Flowcore platform in a pipeline:
Publish ──▶ Store ──▶ Transform ──▶ Webhook ──▶ Handler │ │ │ │ │ write() Flowcore Scenarios HTTP POST handle() Platform to your appPathways covers the Publish and Handler sides of this pipeline, giving you type-safe contracts for both producing and consuming events.
Installation
bun add @flowcore/pathwaysnpm install @flowcore/pathwaysdeno add npm:@flowcore/pathwaysGetting Started
Initialize the builder with your Flowcore connection details:
import { PathwaysBuilder } from "@flowcore/pathways"
const pathways = new PathwaysBuilder({ baseUrl: "https://api.flowcore.io", tenant: "your-tenant", dataCore: "your-data-core", apiKey: "your-api-key",})Registering Event Types
Register each event type with a flow type, event type name, and a Zod schema for validation:
import { z } from "zod"
const userCreatedSchema = z.object({ id: z.string(), name: z.string(), email: z.string().email(),})
pathways.register({ flowType: "user", eventType: "created", schema: userCreatedSchema,})You can also configure retry behavior per registration:
pathways.register({ flowType: "order", eventType: "placed", schema: orderPlacedSchema, maxRetries: 3, retryDelayMs: 500,})Publishing Events
Use write() to publish events. The data is validated against the registered schema at runtime.
// Single eventconst eventId = await pathways.write("user/created", { data: { id: "user-123", name: "John Doe", email: "john@example.com", },})With Metadata
const eventId = await pathways.write("user/created", { data: { id: "user-123", name: "John Doe", email: "john@example.com", }, metadata: { correlationId: "corr-789", source: "registration-service", },})Fire-and-Forget
Skip waiting for the event to be acknowledged by the platform:
const eventId = await pathways.write("user/created", { data: userData, options: { fireAndForget: true, },})Batch Writes
const eventIds = await pathways.write("user/created", { batch: true, data: [userData1, userData2, userData3],})Handling Events
Register handlers that are invoked when events arrive via webhooks:
pathways.handle("user/created", async (event) => { const { id, name, email } = event.payload
await createUserAccount(id, name, email) console.log(`Processed event: ${event.eventId}`)})The event object includes:
eventId— unique event identifierpayload— type-safe data matching the registered schemametadata— optional correlation and source information
Event Contracts
Define your event contracts as reusable schemas to share between publishers and handlers:
import { z } from "zod"
// Flow type: userexport const userCreatedSchema = z.object({ id: z.string(), name: z.string(), email: z.string().email(),})
export const userUpdatedSchema = z.object({ id: z.string(), changes: z.record(z.unknown()),})
// Flow type: orderexport const orderPlacedSchema = z.object({ orderId: z.string(), userId: z.string(), items: z.array(z.object({ productId: z.string(), quantity: z.number(), price: z.number(), })), total: z.number(),})Register them all at startup:
pathways.register({ flowType: "user", eventType: "created", schema: userCreatedSchema,})
pathways.register({ flowType: "user", eventType: "updated", schema: userUpdatedSchema,})
pathways.register({ flowType: "order", eventType: "placed", schema: orderPlacedSchema,})Webhook Integration
Flowcore delivers events to your application via webhooks. Use PathwayRouter to verify secrets and route events to the correct handler.
import { PathwayRouter } from "@flowcore/pathways"
const WEBHOOK_SECRET = "your-webhook-secret"const router = new PathwayRouter(pathways, WEBHOOK_SECRET)
async function handleWebhook(req: Request): Promise<Response> { const event = await req.json() const secret = req.headers.get("X-Webhook-Secret")
try { await router.processEvent(event, secret) return new Response("Event processed", { status: 200 }) } catch (error) { return new Response("Error processing event", { status: 500 }) }}Error Handling
Per-Pathway Errors
pathways.onError("user/created", (error, event) => { console.error(`Failed to process user/created: ${error.message}`) reportToMonitoring(error, event)})Global Error Handler
pathways.onAnyError((error, event, pathway) => { console.error(`Error in ${pathway}: ${error.message}`)})Event Observability
Subscribe to lifecycle hooks for logging and monitoring:
// Before processingpathways.subscribe("user/created", (event) => { console.log(`Starting: ${event.eventId}`)}, "before")
// After processingpathways.subscribe("user/created", (event) => { console.log(`Completed: ${event.eventId}`)}, "after")Configuration Files
When using the Flowcore CLI, data cores and scenarios are defined in YAML configuration files.
Data Core Definition
flowcore: "2"tenant: "your-tenant"dataCore: "your-data-core"flowTypes: - name: "user" eventTypes: - name: "created" - name: "updated" - name: "order" eventTypes: - name: "placed" - name: "fulfilled"Scenario Definition
flowcore: "2"tenant: "your-tenant"scenario: "your-scenario"transformers: - flowType: "user" eventType: "created" targetUrl: "https://your-app.com/webhooks/flowcore" secret: "${{secrets.WEBHOOK_SECRET}}"