Skip to content

Flowcore Pathways

Flowcore Pathways is a TypeScript library for building event-driven applications on the Flowcore platform. It provides type-safe event publishing, schema validation with Zod, webhook routing, and handler registration — all through a single builder API.

Architecture

Events flow through the Flowcore platform in a pipeline:

Publish ──▶ Store ──▶ Transform ──▶ Webhook ──▶ Handler
│ │ │ │ │
write() Flowcore Scenarios HTTP POST handle()
Platform to your app

Pathways covers the Publish and Handler sides of this pipeline, giving you type-safe contracts for both producing and consuming events.

Installation

Terminal window
bun add @flowcore/pathways

Getting Started

Initialize the builder with your Flowcore connection details:

import { PathwaysBuilder } from "@flowcore/pathways"
const pathways = new PathwaysBuilder({
baseUrl: "https://api.flowcore.io",
tenant: "your-tenant",
dataCore: "your-data-core",
apiKey: "your-api-key",
})

Registering Event Types

Register each event type with a flow type, event type name, and a Zod schema for validation:

import { z } from "zod"
const userCreatedSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
pathways.register({
flowType: "user",
eventType: "created",
schema: userCreatedSchema,
})

You can also configure retry behavior per registration:

pathways.register({
flowType: "order",
eventType: "placed",
schema: orderPlacedSchema,
maxRetries: 3,
retryDelayMs: 500,
})

Publishing Events

Use write() to publish events. The data is validated against the registered schema at runtime.

// Single event
const eventId = await pathways.write("user/created", {
data: {
id: "user-123",
name: "John Doe",
email: "john@example.com",
},
})

With Metadata

const eventId = await pathways.write("user/created", {
data: {
id: "user-123",
name: "John Doe",
email: "john@example.com",
},
metadata: {
correlationId: "corr-789",
source: "registration-service",
},
})

Fire-and-Forget

Skip waiting for the event to be acknowledged by the platform:

const eventId = await pathways.write("user/created", {
data: userData,
options: {
fireAndForget: true,
},
})

Batch Writes

const eventIds = await pathways.write("user/created", {
batch: true,
data: [userData1, userData2, userData3],
})

Handling Events

Register handlers that are invoked when events arrive via webhooks:

pathways.handle("user/created", async (event) => {
const { id, name, email } = event.payload
await createUserAccount(id, name, email)
console.log(`Processed event: ${event.eventId}`)
})

The event object includes:

  • eventId — unique event identifier
  • payload — type-safe data matching the registered schema
  • metadata — optional correlation and source information

Event Contracts

Define your event contracts as reusable schemas to share between publishers and handlers:

import { z } from "zod"
// Flow type: user
export const userCreatedSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
export const userUpdatedSchema = z.object({
id: z.string(),
changes: z.record(z.unknown()),
})
// Flow type: order
export const orderPlacedSchema = z.object({
orderId: z.string(),
userId: z.string(),
items: z.array(z.object({
productId: z.string(),
quantity: z.number(),
price: z.number(),
})),
total: z.number(),
})

Register them all at startup:

pathways.register({
flowType: "user",
eventType: "created",
schema: userCreatedSchema,
})
pathways.register({
flowType: "user",
eventType: "updated",
schema: userUpdatedSchema,
})
pathways.register({
flowType: "order",
eventType: "placed",
schema: orderPlacedSchema,
})

Webhook Integration

Flowcore delivers events to your application via webhooks. Use PathwayRouter to verify secrets and route events to the correct handler.

import { PathwayRouter } from "@flowcore/pathways"
const WEBHOOK_SECRET = "your-webhook-secret"
const router = new PathwayRouter(pathways, WEBHOOK_SECRET)
async function handleWebhook(req: Request): Promise<Response> {
const event = await req.json()
const secret = req.headers.get("X-Webhook-Secret")
try {
await router.processEvent(event, secret)
return new Response("Event processed", { status: 200 })
} catch (error) {
return new Response("Error processing event", { status: 500 })
}
}

Error Handling

Per-Pathway Errors

pathways.onError("user/created", (error, event) => {
console.error(`Failed to process user/created: ${error.message}`)
reportToMonitoring(error, event)
})

Global Error Handler

pathways.onAnyError((error, event, pathway) => {
console.error(`Error in ${pathway}: ${error.message}`)
})

Event Observability

Subscribe to lifecycle hooks for logging and monitoring:

// Before processing
pathways.subscribe("user/created", (event) => {
console.log(`Starting: ${event.eventId}`)
}, "before")
// After processing
pathways.subscribe("user/created", (event) => {
console.log(`Completed: ${event.eventId}`)
}, "after")

Configuration Files

When using the Flowcore CLI, data cores and scenarios are defined in YAML configuration files.

Data Core Definition

flowcore.data-core.yaml
flowcore: "2"
tenant: "your-tenant"
dataCore: "your-data-core"
flowTypes:
- name: "user"
eventTypes:
- name: "created"
- name: "updated"
- name: "order"
eventTypes:
- name: "placed"
- name: "fulfilled"

Scenario Definition

flowcore.scenario.yaml
flowcore: "2"
tenant: "your-tenant"
scenario: "your-scenario"
transformers:
- flowType: "user"
eventType: "created"
targetUrl: "https://your-app.com/webhooks/flowcore"
secret: "${{secrets.WEBHOOK_SECRET}}"