Skip to content

Getting Started

CQRS + Event Sourced To-Do app in 5 minutes

Flowcore is a simplified approach to achieve CQRS + Event Sourcing, by using tools that every web dev is familiar with.

You define your event architecture in a simple YAML file. Before you emit a JSON event to Flowcore you do business logic checks against real-time read models. When you receive the event back from Flowcore you update your read models.

No Kafka. No Domain Driven Design. No current-state rebuild/rehydration to validate user actions.

Flowcore is responsible for storing the events that you emit, in immutable event logs, and fanning them out to subscribed services either in single digit milliseconds or seconds, based on your needs.

We act as a powerful event processing infrastructure that guarantees immutable storage, real-time fan-out, automatic hot and cold storage management, and effortless replay so you can focus on writing business logic instead of building event infrastructure.

Flow of Flowcore

Prerequisites for tutorial

Step 1: Initialize the Project

Create a new directory and initialize a Bun project:

Terminal window
mkdir flowcore-todo-tutorial
cd flowcore-todo-tutorial
bun init -y

Install the required dependencies:

Terminal window
bun add @flowcore/pathways zod postgres

First, log in to Flowcore using the CLI (this uses port 3000 so make sure no service is running on that port):

Terminal window
flowcore login

This will authenticate you with Flowcore and allow you to run subsequent commands.

Step 2: Create an HTTP Server

Inside index.ts set up a simple HTTP server with a POST /api/transformer/ endpoint. This endpoint will receive events from Flowcore. Here we use the Flowcore Pathways library which will proxy events to your .handle() handlers in your code:

import type { FlowcoreLegacyEvent } from "@flowcore/pathways";
import { pathwaysRouter } from "./pathways";
import "./handlers";
const server = Bun.serve({
port: 3000,
async fetch(req) {
const url = new URL(req.url);
// POST endpoint to receive events from Flowcore platform
if (
req.method === "POST" &&
(url.pathname === "/api/transformer" ||
url.pathname === "/api/transformer/")
) {
try {
const body = await req.json();
const event = body as FlowcoreLegacyEvent;
const secret = req.headers.get("x-secret") ?? "";
await pathwaysRouter.processEvent(event, secret);
return new Response("OK", {
status: 200,
headers: { "Content-Type": "text/plain" },
});
} catch (error) {
console.error("❌ Error processing Flowcore event:", error);
return new Response(
JSON.stringify({ error: (error as Error).message }),
{
status: 500,
headers: { "Content-Type": "application/json" },
}
);
}
}
return new Response("Not Found", { status: 404 });
},
});
console.log(`🚀 Server running on http://localhost:${server.port}`);
console.log(
`📡 Transformer endpoint: http://localhost:${server.port}/api/transformer`
);

Step 3: Define Your Event Architecture

Understanding Flowcore Concepts

  • Data Core: Think of this as a project or Git repository - it’s the top-level container for your events
  • Flow Type: Similar to an aggregate in traditional event sourcing, but simpler - it’s just a title for grouping related immutable event logs e.g. TodoItem
  • Event Type: This is where Flowcore differs significantly from classic event sourcing:
    • In classic event sourcing: You have one immutable event log per aggregate instance (e.g., one stream per TodoItem, like TodoItem-001, TodoItem-002, etc). where the intent of an individual event, e.g. create, update, or delete, is indicated by the event type field on the json object.
    • In Flowcore: You have one immutable event log per event type. Instead of TodoItem instances, you have TodoItemCreated, TodoItemUpdated, TodoItemDeleted as separate immutable event logs. So the intent of an event, is indicated by which immutable event log it’s stored in.

Create a flowcore.yaml file to define your event architecture:

# cli "flowcore data-core apply -f flowcore.yaml"
version: 1
tenant: YOUR_TENANT_NAME # <-- replace
dataCore:
name: todo-app
flowTypes:
todo-items:
eventTypes:
todo-item.created.v0:
description: "A todo item was created (full snapshot)"
todo-item.renamed.v0:
description: "The title of a todo item was changed"
todo-item.completed.v0:
description: "A todo item was marked as done"
todo-item.reopened.v0:
description: "A completed todo item was reopened"
todo-item.deleted.v0:
description: "A todo item was deleted"

Apply this configuration to create Flowcore primitives:

Terminal window
flowcore data-core apply -f flowcore.yaml

Create schemas.ts to define the contract of your events. This is where you define the structure of your events in your code.

import { z } from "zod";
/* full snapshot on create */
export const todoCreated = z.object({
id: z.string(),
title: z.string(),
description: z.string().optional(),
done: z.literal(false),
});
/* deltas thereafter */
export const todoRenamed = z.object({ id: z.string(), newTitle: z.string() });
export const todoCompleted = z.object({ id: z.string() });
export const todoReopened = z.object({ id: z.string() });
export const todoDeleted = z.object({ id: z.string() });

Step 4: Set Up Local Development

Create this flowcore.local.yaml file for local development:

tenant: YOUR_TENANT_NAME # <-- replace
development:
proxyEndpoints:
todo-app:
dataCore: todo-app
flowType: todo-items
events:
- todo-item.created.v0
- todo-item.renamed.v0
- todo-item.completed.v0
- todo-item.reopened.v0
- todo-item.deleted.v0
endpoints:
- "" # leave this empty. POST /api/transformer/ is the default endpoint.

You can try to run this command now to see if it starts the websocket connection. But I’ll remind you to run it again when we test the entire flow.

Terminal window
flowcore scenario local -f flowcore.local.yaml -s now -e http://localhost:3000/api/transformer -H 'X-Secret: 1234'

This configures Flowcore to fan out events to your local endpoint in real-time. This is instead of using something like Ngrok.

Step 5: Database Setup

Create a docker-compose.yaml file to run PostgreSQL:

version: "3.8"
services:
postgres:
container_name: todo-app-postgres
image: postgres:15
environment:
POSTGRES_DB: pathway_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- "5433:5432"
volumes:
- ./init-db:/docker-entrypoint-initdb.d

Why PostgreSQL? We need a database for two purposes:

  1. Application data: Store our todo items
  2. Pathway state: Flowcore uses this to track the state of the .write() -> .handle() flow, ensuring events make it from the write side to your event handlers

Start the database:

Terminal window
docker-compose up -d

This is the URL that you can use to connect to the local PostgreSQL database:

postgresql://postgres:postgres@localhost:5433/pathway_db

Create the todo table:

CREATE TABLE IF NOT EXISTS todo (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(255) NOT NULL,
description VARCHAR(1000),
done BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMP DEFAULT NOW() NOT NULL,
updated_at TIMESTAMP DEFAULT NOW() NOT NULL
);

Step 6: Set Up Flowcore Pathways

Create an API Key

Generate an API key for your application to authenticate with Flowcore:

Terminal window
flowcore auth new key --tenant YOUR_TENANT_NAME todo-app-key

This will create a new API key that you’ll use in your application code.

In the future you need to save this securely. For now we’ll add it directly to the code in the next code snippet.

Configure Pathways

Create pathways.ts to configure Flowcore. Here you use the Flowcore Pathways library which proxies events coming in from the POST /api/transformer/ endpoint to handlers, .handle(), in your code. It also is used to emit events to Flowcore by using the .write() method.

import {
PathwayRouter,
PathwaysBuilder,
createPostgresPathwayState,
} from "@flowcore/pathways";
import {
todoCreated,
todoRenamed,
todoCompleted,
todoReopened,
todoDeleted,
} from "./schemas";
import postgres from "postgres";
const apiKey = "YOUR_GENERATED_API_KEY"; // <--- replace with generated api key
const postgresUrl = "postgresql://postgres:postgres@localhost:5433/pathway_db";
// Initialize the database connection
export const sql = postgres(postgresUrl);
export const pathways = new PathwaysBuilder({
baseUrl: "https://webhook.api.flowcore.io",
tenant: "YOUR_TENANT_NAME", // <--------- replace with your tenant name
dataCore: "todo-app",
apiKey,
})
.withPathwayState(
createPostgresPathwayState({ connectionString: postgresUrl })
)
.register({
flowType: "todo-items",
eventType: "todo-item.created.v0",
schema: todoCreated,
})
.register({
flowType: "todo-items",
eventType: "todo-item.renamed.v0",
schema: todoRenamed,
})
.register({
flowType: "todo-items",
eventType: "todo-item.completed.v0",
schema: todoCompleted,
})
.register({
flowType: "todo-items",
eventType: "todo-item.reopened.v0",
schema: todoReopened,
})
.register({
flowType: "todo-items",
eventType: "todo-item.deleted.v0",
schema: todoDeleted,
});
export const pathwaysRouter = new PathwayRouter(pathways, "1234");

Create handlers

Create a handlers.ts file where we will listen to incoming events and store them in the postgres database.

import { pathways, sql } from "./pathways";
pathways.handle("todo-items/todo-item.created.v0", async ({ payload }) => {
console.log("Todo Item CREATED", payload);
await sql`
INSERT INTO todo (id, title, description, done)
VALUES (${payload.id}, ${payload.title}, ${payload.description ?? null}, ${
payload.done
})
`;
});
pathways.handle("todo-items/todo-item.renamed.v0", async ({ payload }) => {
console.log("Todo Item RENAMED", payload);
await sql`
UPDATE todo
SET title = ${payload.newTitle}
WHERE id = ${payload.id}
`;
});
pathways.handle("todo-items/todo-item.completed.v0", async ({ payload }) => {
console.log("Todo Item COMPLETED", payload);
await sql`UPDATE todo SET done = true WHERE id = ${payload.id}`;
});
pathways.handle("todo-items/todo-item.reopened.v0", async ({ payload }) => {
console.log("Todo Item REOPENED", payload);
await sql`UPDATE todo SET done = false WHERE id = ${payload.id}`;
});
pathways.handle("todo-items/todo-item.deleted.v0", async ({ payload }) => {
console.log("Todo Item DELETED", payload);
await sql`DELETE FROM todo WHERE id = ${payload.id}`;
});

Step 7: Create a Demo Script

Create write-demo.ts to send test events:

import { sleep } from "bun";
import { pathways, sql } from "./pathways";
// you can add this after each write to see more slowly what's happening.
async function dump(label: string) {
await sleep(300); // give Flowcore -> handler -> DB time
const [row] = await sql`SELECT * FROM todo WHERE id = ${id}`;
console.log(`\n🔎 ${label}`);
console.table(row);
}
const id = crypto.randomUUID();
await pathways.write("todo-items/todo-item.created.v0", {
data: {
id,
title: "Buy milk",
description: "2 litres, semi-skimmed",
done: false,
},
});
await pathways.write("todo-items/todo-item.renamed.v0", {
data: { id, newTitle: "Buy oat milk" },
});
await pathways.write("todo-items/todo-item.completed.v0", {
data: { id },
});
await pathways.write("todo-items/todo-item.reopened.v0", {
data: { id },
});
console.log("\n✅ demo events sent");
await sql.end();
process.exit(0);

Step 8: Test the Complete Flow

1. Start your HTTP server:

Terminal window
bun index.ts

2. In another terminal, Start the local proxy to connect Flowcore to your local endpoint:

Terminal window
flowcore scenario local -f flowcore.local.yaml -s now -e http://localhost:3000/api/transformer -H 'X-Secret: 1234'

3. In a third terminal, send test events:

Terminal window
bun write-demo.ts

4. Truncate the database:

Now if you go to look in the todo postgres table, it should be populated. Try to truncate the table and then run this command:

Terminal window
flowcore scenario local -f flowcore.yaml -f flowcore.local.yaml -s first -e http://localhost:3000/api/transformer -H 'X-Secret: 1234'

The events get replayed from the beginning in the correct order and the database is repopulated.