Getting Started
CQRS + Event Sourced To-Do app in 5 minutes
Flowcore is a simplified approach to achieve CQRS + Event Sourcing, by using tools that every web dev is familiar with.
You define your event architecture in a simple YAML file. Before you emit a JSON event to Flowcore you do business logic checks against real-time read models. When you receive the event back from Flowcore you update your read models.
No Kafka. No Domain Driven Design. No current-state rebuild/rehydration to validate user actions.
Flowcore is responsible for storing the events that you emit, in immutable event logs, and fanning them out to subscribed services either in single digit milliseconds or seconds, based on your needs.
We act as a powerful event processing infrastructure that guarantees immutable storage, real-time fan-out, automatic hot and cold storage management, and effortless replay so you can focus on writing business logic instead of building event infrastructure.
Prerequisites for tutorial
- Flowcore Platform account to get your tenant name
- Flowcore CLI
- Bun runtime installed
- Docker for running PostgreSQL
Step 1: Initialize the Project
Create a new directory and initialize a Bun project:
mkdir flowcore-todo-tutorialcd flowcore-todo-tutorialbun init -y
Install the required dependencies:
bun add @flowcore/pathways zod postgres
First, log in to Flowcore using the CLI (this uses port 3000 so make sure no service is running on that port):
flowcore login
This will authenticate you with Flowcore and allow you to run subsequent commands.
Step 2: Create an HTTP Server
Inside index.ts
set up a simple HTTP server with a POST /api/transformer/ endpoint. This endpoint will receive events from Flowcore.
Here we use the Flowcore Pathways library which will proxy events to your .handle()
handlers in your code:
import type { FlowcoreLegacyEvent } from "@flowcore/pathways";import { pathwaysRouter } from "./pathways";import "./handlers";
const server = Bun.serve({ port: 3000, async fetch(req) { const url = new URL(req.url);
// POST endpoint to receive events from Flowcore platform if ( req.method === "POST" && (url.pathname === "/api/transformer" || url.pathname === "/api/transformer/") ) { try { const body = await req.json(); const event = body as FlowcoreLegacyEvent; const secret = req.headers.get("x-secret") ?? "";
await pathwaysRouter.processEvent(event, secret);
return new Response("OK", { status: 200, headers: { "Content-Type": "text/plain" }, }); } catch (error) { console.error("❌ Error processing Flowcore event:", error); return new Response( JSON.stringify({ error: (error as Error).message }), { status: 500, headers: { "Content-Type": "application/json" }, } ); } }
return new Response("Not Found", { status: 404 }); },});
console.log(`🚀 Server running on http://localhost:${server.port}`);console.log( `📡 Transformer endpoint: http://localhost:${server.port}/api/transformer`);
Step 3: Define Your Event Architecture
Understanding Flowcore Concepts
- Data Core: Think of this as a project or Git repository - it’s the top-level container for your events
- Flow Type: Similar to an aggregate in traditional event sourcing, but simpler - it’s just a title for grouping related immutable event logs e.g.
TodoItem
- Event Type: This is where Flowcore differs significantly from classic event sourcing:
- In classic event sourcing: You have one immutable event log per aggregate instance (e.g., one stream per
TodoItem
, likeTodoItem-001
,TodoItem-002
, etc). where the intent of an individual event, e.g. create, update, or delete, is indicated by the event type field on the json object. - In Flowcore: You have one immutable event log per event type. Instead of
TodoItem
instances, you haveTodoItemCreated
,TodoItemUpdated
,TodoItemDeleted
as separate immutable event logs. So the intent of an event, is indicated by which immutable event log it’s stored in.
- In classic event sourcing: You have one immutable event log per aggregate instance (e.g., one stream per
Create a flowcore.yaml
file to define your event architecture:
# cli "flowcore data-core apply -f flowcore.yaml"version: 1tenant: YOUR_TENANT_NAME # <-- replacedataCore: name: todo-app flowTypes: todo-items: eventTypes: todo-item.created.v0: description: "A todo item was created (full snapshot)" todo-item.renamed.v0: description: "The title of a todo item was changed" todo-item.completed.v0: description: "A todo item was marked as done" todo-item.reopened.v0: description: "A completed todo item was reopened" todo-item.deleted.v0: description: "A todo item was deleted"
Apply this configuration to create Flowcore primitives:
flowcore data-core apply -f flowcore.yaml
Create schemas.ts
to define the contract of your events. This is where you define the structure of your events in your code.
import { z } from "zod";
/* full snapshot on create */export const todoCreated = z.object({ id: z.string(), title: z.string(), description: z.string().optional(), done: z.literal(false),});
/* deltas thereafter */export const todoRenamed = z.object({ id: z.string(), newTitle: z.string() });export const todoCompleted = z.object({ id: z.string() });export const todoReopened = z.object({ id: z.string() });export const todoDeleted = z.object({ id: z.string() });
Step 4: Set Up Local Development
Create this flowcore.local.yaml
file for local development:
tenant: YOUR_TENANT_NAME # <-- replacedevelopment: proxyEndpoints: todo-app: dataCore: todo-app flowType: todo-items events: - todo-item.created.v0 - todo-item.renamed.v0 - todo-item.completed.v0 - todo-item.reopened.v0 - todo-item.deleted.v0 endpoints: - "" # leave this empty. POST /api/transformer/ is the default endpoint.
You can try to run this command now to see if it starts the websocket connection. But I’ll remind you to run it again when we test the entire flow.
flowcore scenario local -f flowcore.local.yaml -s now -e http://localhost:3000/api/transformer -H 'X-Secret: 1234'
This configures Flowcore to fan out events to your local endpoint in real-time. This is instead of using something like Ngrok.
Step 5: Database Setup
Create a docker-compose.yaml
file to run PostgreSQL:
version: "3.8"services: postgres: container_name: todo-app-postgres image: postgres:15 environment: POSTGRES_DB: pathway_db POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres ports: - "5433:5432" volumes: - ./init-db:/docker-entrypoint-initdb.d
Why PostgreSQL? We need a database for two purposes:
- Application data: Store our todo items
- Pathway state: Flowcore uses this to track the state of the
.write()
->.handle()
flow, ensuring events make it from the write side to your event handlers
Start the database:
docker-compose up -d
This is the URL that you can use to connect to the local PostgreSQL database:
postgresql://postgres:postgres@localhost:5433/pathway_db
Create the todo table:
CREATE TABLE IF NOT EXISTS todo ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), title VARCHAR(255) NOT NULL, description VARCHAR(1000), done BOOLEAN NOT NULL DEFAULT false, created_at TIMESTAMP DEFAULT NOW() NOT NULL, updated_at TIMESTAMP DEFAULT NOW() NOT NULL);
Step 6: Set Up Flowcore Pathways
Create an API Key
Generate an API key for your application to authenticate with Flowcore:
flowcore auth new key --tenant YOUR_TENANT_NAME todo-app-key
This will create a new API key that you’ll use in your application code.
In the future you need to save this securely. For now we’ll add it directly to the code in the next code snippet.
Configure Pathways
Create pathways.ts
to configure Flowcore. Here you use the Flowcore Pathways library which proxies events coming in from the POST /api/transformer/ endpoint
to handlers, .handle()
, in your code. It also is used to emit events to Flowcore by using the .write()
method.
import { PathwayRouter, PathwaysBuilder, createPostgresPathwayState,} from "@flowcore/pathways";import { todoCreated, todoRenamed, todoCompleted, todoReopened, todoDeleted,} from "./schemas";import postgres from "postgres";
const apiKey = "YOUR_GENERATED_API_KEY"; // <--- replace with generated api keyconst postgresUrl = "postgresql://postgres:postgres@localhost:5433/pathway_db";
// Initialize the database connectionexport const sql = postgres(postgresUrl);
export const pathways = new PathwaysBuilder({ baseUrl: "https://webhook.api.flowcore.io", tenant: "YOUR_TENANT_NAME", // <--------- replace with your tenant name dataCore: "todo-app", apiKey,}) .withPathwayState( createPostgresPathwayState({ connectionString: postgresUrl }) ) .register({ flowType: "todo-items", eventType: "todo-item.created.v0", schema: todoCreated, }) .register({ flowType: "todo-items", eventType: "todo-item.renamed.v0", schema: todoRenamed, }) .register({ flowType: "todo-items", eventType: "todo-item.completed.v0", schema: todoCompleted, }) .register({ flowType: "todo-items", eventType: "todo-item.reopened.v0", schema: todoReopened, }) .register({ flowType: "todo-items", eventType: "todo-item.deleted.v0", schema: todoDeleted, });
export const pathwaysRouter = new PathwayRouter(pathways, "1234");
Create handlers
Create a handlers.ts
file where we will listen to incoming events and store them in the postgres database.
import { pathways, sql } from "./pathways";
pathways.handle("todo-items/todo-item.created.v0", async ({ payload }) => { console.log("Todo Item CREATED", payload); await sql` INSERT INTO todo (id, title, description, done) VALUES (${payload.id}, ${payload.title}, ${payload.description ?? null}, ${ payload.done }) `;});
pathways.handle("todo-items/todo-item.renamed.v0", async ({ payload }) => { console.log("Todo Item RENAMED", payload); await sql` UPDATE todo SET title = ${payload.newTitle} WHERE id = ${payload.id} `;});
pathways.handle("todo-items/todo-item.completed.v0", async ({ payload }) => { console.log("Todo Item COMPLETED", payload); await sql`UPDATE todo SET done = true WHERE id = ${payload.id}`;});
pathways.handle("todo-items/todo-item.reopened.v0", async ({ payload }) => { console.log("Todo Item REOPENED", payload); await sql`UPDATE todo SET done = false WHERE id = ${payload.id}`;});
pathways.handle("todo-items/todo-item.deleted.v0", async ({ payload }) => { console.log("Todo Item DELETED", payload); await sql`DELETE FROM todo WHERE id = ${payload.id}`;});
Step 7: Create a Demo Script
Create write-demo.ts
to send test events:
import { sleep } from "bun";import { pathways, sql } from "./pathways";
// you can add this after each write to see more slowly what's happening.async function dump(label: string) { await sleep(300); // give Flowcore -> handler -> DB time const [row] = await sql`SELECT * FROM todo WHERE id = ${id}`; console.log(`\n🔎 ${label}`); console.table(row);}
const id = crypto.randomUUID();
await pathways.write("todo-items/todo-item.created.v0", { data: { id, title: "Buy milk", description: "2 litres, semi-skimmed", done: false, },});
await pathways.write("todo-items/todo-item.renamed.v0", { data: { id, newTitle: "Buy oat milk" },});
await pathways.write("todo-items/todo-item.completed.v0", { data: { id },});
await pathways.write("todo-items/todo-item.reopened.v0", { data: { id },});
console.log("\n✅ demo events sent");
await sql.end();process.exit(0);
Step 8: Test the Complete Flow
1. Start your HTTP server:
bun index.ts
2. In another terminal, Start the local proxy to connect Flowcore to your local endpoint:
flowcore scenario local -f flowcore.local.yaml -s now -e http://localhost:3000/api/transformer -H 'X-Secret: 1234'
3. In a third terminal, send test events:
bun write-demo.ts
4. Truncate the database:
Now if you go to look in the todo postgres table, it should be populated. Try to truncate the table and then run this command:
flowcore scenario local -f flowcore.yaml -f flowcore.local.yaml -s first -e http://localhost:3000/api/transformer -H 'X-Secret: 1234'
The events get replayed from the beginning in the correct order and the database is repopulated.