Commit c2f23149ff0219d82765ed897a1a906b5ce77ce4
1 parent
20d7ddc6
feat(events): P1.7 — event bus + transactional outbox
Adds the framework's event bus, the second cross-cutting service (after
auth) that PBCs and plug-ins both consume. Implements the transactional
outbox pattern from the architecture spec section 9 — events are
written to the database in the same transaction as the publisher's
domain change, so a publish followed by a rollback never escapes.
This is the seam where a future Kafka/NATS bridge plugs in WITHOUT
touching any PBC code.
What landed:
* New `platform/platform-events/` module:
- `EventOutboxEntry` JPA entity backed by `platform__event_outbox`
(id, event_id, topic, aggregate_type, aggregate_id, payload jsonb,
status, attempts, last_error, occurred_at, dispatched_at, version).
Status enum: PENDING / DISPATCHED / FAILED.
- `EventOutboxRepository` Spring Data JPA repo with a pessimistic
SELECT FOR UPDATE query for poller dispatch.
- `ListenerRegistry` — in-memory subscription holder, indexed both
by event class (Class.isInstance) and by topic string. Supports
a `**` wildcard for the platform's audit subscriber. Backed by
CopyOnWriteArrayList so dispatch is lock-free.
- `EventBusImpl` — implements the api.v1 EventBus. publish() writes
the outbox row AND synchronously delivers to in-process listeners
in the SAME transaction. Marked Propagation.MANDATORY so the bus
refuses to publish outside an existing transaction (preventing
publish-and-rollback leaks). Listener exceptions are caught and
logged; the outbox row still commits.
- `OutboxPoller` — Spring @Scheduled component that runs every 5s,
drains PENDING / FAILED rows under a pessimistic lock, marks them
DISPATCHED. v0.5 has no real external dispatcher — the poller is
the seam where Kafka/NATS plugs in later.
- `EventBusConfiguration` — @EnableScheduling so the poller actually
runs. Lives in this module so the seam activates automatically
when platform-events is on the classpath.
- `EventAuditLogSubscriber` — wildcard subscriber that logs every
event at INFO. Demo proof that the bus works end-to-end. Future
versions replace it with a real audit log writer.
* `platform__event_outbox` Liquibase changeset (platform-events-001):
table + unique index on event_id + index on (status, created_at) +
index on topic.
* DefaultPluginContext.eventBus is no longer a stub that throws —
it's now the real EventBus injected by VibeErpPluginManager.
Plug-ins can publish and subscribe via the api.v1 surface. Note:
subscriptions are NOT auto-scoped to the plug-in lifecycle in v0.5;
a plug-in that wants its subscriptions removed on stop() must call
subscription.close() explicitly. Auto-scoping lands when per-plug-in
Spring child contexts ship.
* pbc-identity now publishes `UserCreatedEvent` after a successful
UserService.create(). The event class is internal to pbc-identity
(not in api.v1) — other PBCs subscribe by topic string
(`identity.user.created`), not by class. This is the right tradeoff:
string topics are stable across plug-in classloaders, class equality
is not, and adding every event class to api.v1 would be perpetual
surface-area bloat.
Tests: 13 new unit tests (9 EventBusImplTest + 4 OutboxPollerTest)
plus 2 new UserServiceTest cases that verify the publish happens on
the happy path and does NOT happen when create() rejects a duplicate.
Total now 76 unit tests across the framework, all green.
End-to-end smoke test against fresh Postgres with the plug-in loaded
(everything green):
EventAuditLogSubscriber subscribed to ** at boot
Outbox empty before any user create ✓
POST /api/v1/auth/login → 200
POST /api/v1/identity/users (create alice) → 201
Outbox row appears with topic=identity.user.created,
status=PENDING immediately after create ✓
EventAuditLogSubscriber log line fires synchronously
inside the create transaction ✓
POST /api/v1/identity/users (create bob) → 201
Wait 8s (one OutboxPoller cycle)
Both outbox rows now DISPATCHED, dispatched_at set ✓
Existing PBCs still work:
GET /api/v1/identity/users → 3 users ✓
GET /api/v1/catalog/uoms → 15 UoMs ✓
Plug-in still works:
GET /api/v1/plugins/printing-shop/ping → 200 ✓
The most important assertion is the synchronous audit log line
appearing on the same thread as the user creation request. That
proves the entire chain — UserService.create() → eventBus.publish()
→ EventBusImpl writes outbox row → ListenerRegistry.deliver()
finds wildcard subscriber → EventAuditLogSubscriber.handle()
logs — runs end-to-end inside the publisher's transaction.
The poller flipping PENDING → DISPATCHED 5s later proves the
outbox + poller seam works without any external dispatcher.
Bug encountered and fixed during the smoke test:
• EventBusImplTest used `ObjectMapper().registerKotlinModule()`
which doesn't pick up jackson-datatype-jsr310. Production code
uses Spring Boot's auto-configured ObjectMapper which already
has jsr310 because spring-boot-starter-web is on the classpath
of distribution. The test setup was the only place using a bare
mapper. Fixed by switching to `findAndRegisterModules()` AND
by adding jackson-datatype-jsr310 as an explicit implementation
dependency of platform-events (so future modules that depend on
the bus without bringing web in still get Instant serialization).
What is explicitly NOT in this chunk:
• External dispatcher (Kafka/NATS bridge) — the poller is a no-op
that just marks rows DISPATCHED. The seam exists; the dispatcher
is a future P1.7.b unit.
• Exponential backoff on FAILED rows — every cycle re-attempts.
Real backoff lands when there's a real dispatcher to fail.
• Dead-letter queue — same.
• Per-plug-in subscription auto-scoping — plug-ins must close()
explicitly today.
• Async / fire-and-forget publish — synchronous in-process only.
Showing
22 changed files
with
1058 additions
and
7 deletions
distribution/build.gradle.kts
| ... | ... | @@ -23,6 +23,7 @@ dependencies { |
| 23 | 23 | implementation(project(":platform:platform-persistence")) |
| 24 | 24 | implementation(project(":platform:platform-plugins")) |
| 25 | 25 | implementation(project(":platform:platform-security")) |
| 26 | + implementation(project(":platform:platform-events")) | |
| 26 | 27 | implementation(project(":pbc:pbc-identity")) |
| 27 | 28 | implementation(project(":pbc:pbc-catalog")) |
| 28 | 29 | ... | ... |
distribution/src/main/resources/db/changelog/master.xml
| ... | ... | @@ -11,6 +11,7 @@ |
| 11 | 11 | xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog |
| 12 | 12 | https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.27.xsd"> |
| 13 | 13 | <include file="classpath:db/changelog/platform/000-platform-init.xml"/> |
| 14 | + <include file="classpath:db/changelog/platform/001-platform-events.xml"/> | |
| 14 | 15 | <include file="classpath:db/changelog/pbc-identity/001-identity-init.xml"/> |
| 15 | 16 | <include file="classpath:db/changelog/pbc-identity/002-identity-credential.xml"/> |
| 16 | 17 | <include file="classpath:db/changelog/pbc-catalog/001-catalog-init.xml"/> | ... | ... |
distribution/src/main/resources/db/changelog/platform/001-platform-events.xml
0 → 100644
| 1 | +<?xml version="1.0" encoding="UTF-8"?> | |
| 2 | +<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" | |
| 3 | + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
| 4 | + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog | |
| 5 | + https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.27.xsd"> | |
| 6 | + | |
| 7 | + <!-- | |
| 8 | + platform__event_outbox | |
| 9 | + | |
| 10 | + The transactional outbox table for the vibe_erp event bus. | |
| 11 | + | |
| 12 | + Why an outbox at all (architecture spec section 9, "Cross-cutting"): | |
| 13 | + the bus must guarantee "publish-and-rollback can never escape" — | |
| 14 | + i.e. an event published inside a transaction is delivered if and | |
| 15 | + only if the surrounding transaction commits. The way to do that | |
| 16 | + without two-phase commit between the DB and the message broker is | |
| 17 | + to write the event row to the SAME database in the SAME transaction | |
| 18 | + as the originating change. A separate poller picks up DISPATCHED | |
| 19 | + rows and forwards them to in-process subscribers (today) or to | |
| 20 | + Kafka/NATS (later) — and a publish that gets rolled back simply | |
| 21 | + leaves no row to dispatch. | |
| 22 | + | |
| 23 | + Status machine: | |
| 24 | + PENDING — written by publish(), not yet picked up | |
| 25 | + DISPATCHED — successfully delivered to all subscribers | |
| 26 | + FAILED — delivery failed; will retry until attempts >= max | |
| 27 | + --> | |
| 28 | + <changeSet id="platform-events-001" author="vibe_erp"> | |
| 29 | + <comment>Create platform__event_outbox table</comment> | |
| 30 | + <sql> | |
| 31 | + CREATE TABLE platform__event_outbox ( | |
| 32 | + id uuid PRIMARY KEY, | |
| 33 | + event_id uuid NOT NULL, | |
| 34 | + topic varchar(256) NOT NULL, | |
| 35 | + aggregate_type varchar(128) NOT NULL, | |
| 36 | + aggregate_id varchar(256) NOT NULL, | |
| 37 | + payload jsonb NOT NULL, | |
| 38 | + status varchar(16) NOT NULL DEFAULT 'PENDING', | |
| 39 | + attempts integer NOT NULL DEFAULT 0, | |
| 40 | + last_error text, | |
| 41 | + occurred_at timestamptz NOT NULL, | |
| 42 | + created_at timestamptz NOT NULL DEFAULT now(), | |
| 43 | + dispatched_at timestamptz, | |
| 44 | + version bigint NOT NULL DEFAULT 0 | |
| 45 | + ); | |
| 46 | + CREATE UNIQUE INDEX platform__event_outbox_event_id_uk | |
| 47 | + ON platform__event_outbox (event_id); | |
| 48 | + CREATE INDEX platform__event_outbox_status_created_idx | |
| 49 | + ON platform__event_outbox (status, created_at); | |
| 50 | + CREATE INDEX platform__event_outbox_topic_idx | |
| 51 | + ON platform__event_outbox (topic); | |
| 52 | + </sql> | |
| 53 | + <rollback> | |
| 54 | + DROP TABLE platform__event_outbox; | |
| 55 | + </rollback> | |
| 56 | + </changeSet> | |
| 57 | + | |
| 58 | +</databaseChangeLog> | ... | ... |
gradle/libs.versions.toml
| ... | ... | @@ -32,6 +32,7 @@ bouncycastle = { module = "org.bouncycastle:bcprov-jdk18on", version = "1.78.1" |
| 32 | 32 | kotlin-stdlib = { module = "org.jetbrains.kotlin:kotlin-stdlib", version.ref = "kotlin" } |
| 33 | 33 | kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" } |
| 34 | 34 | jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } |
| 35 | +jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } | |
| 35 | 36 | |
| 36 | 37 | # Validation |
| 37 | 38 | jakarta-validation-api = { module = "jakarta.validation:jakarta.validation-api", version.ref = "jakartaValidation" } | ... | ... |
pbc/pbc-identity/build.gradle.kts
| ... | ... | @@ -40,6 +40,7 @@ dependencies { |
| 40 | 40 | api(project(":api:api-v1")) |
| 41 | 41 | implementation(project(":platform:platform-persistence")) |
| 42 | 42 | implementation(project(":platform:platform-security")) |
| 43 | + implementation(project(":platform:platform-events")) | |
| 43 | 44 | |
| 44 | 45 | implementation(libs.kotlin.stdlib) |
| 45 | 46 | implementation(libs.kotlin.reflect) | ... | ... |
pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/application/UserService.kt
| ... | ... | @@ -2,7 +2,9 @@ package org.vibeerp.pbc.identity.application |
| 2 | 2 | |
| 3 | 3 | import org.springframework.stereotype.Service |
| 4 | 4 | import org.springframework.transaction.annotation.Transactional |
| 5 | +import org.vibeerp.api.v1.event.EventBus | |
| 5 | 6 | import org.vibeerp.pbc.identity.domain.User |
| 7 | +import org.vibeerp.pbc.identity.events.UserCreatedEvent | |
| 6 | 8 | import org.vibeerp.pbc.identity.infrastructure.UserJpaRepository |
| 7 | 9 | import java.util.UUID |
| 8 | 10 | |
| ... | ... | @@ -19,6 +21,7 @@ import java.util.UUID |
| 19 | 21 | @Transactional |
| 20 | 22 | class UserService( |
| 21 | 23 | private val users: UserJpaRepository, |
| 24 | + private val eventBus: EventBus, | |
| 22 | 25 | ) { |
| 23 | 26 | |
| 24 | 27 | @Transactional(readOnly = true) |
| ... | ... | @@ -40,7 +43,20 @@ class UserService( |
| 40 | 43 | email = command.email, |
| 41 | 44 | enabled = command.enabled, |
| 42 | 45 | ) |
| 43 | - return users.save(user) | |
| 46 | + val saved = users.save(user) | |
| 47 | + | |
| 48 | + // Publish AFTER save so saved.id is populated. Both writes are | |
| 49 | + // in the same @Transactional boundary, so the outbox row is | |
| 50 | + // committed atomically with the user row — see EventBusImpl | |
| 51 | + // and the architecture spec section 9. | |
| 52 | + eventBus.publish( | |
| 53 | + UserCreatedEvent( | |
| 54 | + userId = saved.id, | |
| 55 | + username = saved.username, | |
| 56 | + ), | |
| 57 | + ) | |
| 58 | + | |
| 59 | + return saved | |
| 44 | 60 | } |
| 45 | 61 | |
| 46 | 62 | fun update(id: UUID, command: UpdateUserCommand): User { | ... | ... |
pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/events/UserCreatedEvent.kt
0 → 100644
| 1 | +package org.vibeerp.pbc.identity.events | |
| 2 | + | |
| 3 | +import org.vibeerp.api.v1.core.Id | |
| 4 | +import org.vibeerp.api.v1.event.DomainEvent | |
| 5 | +import java.time.Instant | |
| 6 | +import java.util.UUID | |
| 7 | + | |
| 8 | +/** | |
| 9 | + * Fired by [org.vibeerp.pbc.identity.application.UserService.create] right | |
| 10 | + * after a User row is persisted. | |
| 11 | + * | |
| 12 | + * **The event class is internal to pbc-identity** — other PBCs and | |
| 13 | + * plug-ins subscribe by **topic string**, not by class: | |
| 14 | + * | |
| 15 | + * ``` | |
| 16 | + * eventBus.subscribe(UserCreatedEvent.TOPIC) { event -> | |
| 17 | + * // event.aggregateId is the user UUID as a string | |
| 18 | + * } | |
| 19 | + * ``` | |
| 20 | + * | |
| 21 | + * Why not put this class in `api.v1.ext.identity` so subscribers can use | |
| 22 | + * `eventBus.subscribe(UserCreatedEvent::class.java, ...)`? Two reasons: | |
| 23 | + * | |
| 24 | + * 1. Every event class added to api.v1 becomes a perpetual maintenance | |
| 25 | + * commitment. Most subscribers don't need the rich shape — the | |
| 26 | + * topic name is enough to know what happened. | |
| 27 | + * | |
| 28 | + * 2. Cross-classloader subscribers (plug-ins) cannot rely on | |
| 29 | + * class-equality anyway: a plug-in built against an older api.v1 | |
| 30 | + * that knew about UserCreatedEvent would have its own copy of the | |
| 31 | + * class loaded by its plug-in classloader, and the host's | |
| 32 | + * `Class.isInstance(...)` check would be false. The string topic | |
| 33 | + * is the only stable cross-boundary identifier. | |
| 34 | + * | |
| 35 | + * If a future PBC genuinely needs typed access to the user fields it | |
| 36 | + * gets via `IdentityApi.findUserById(...)` after receiving the event, | |
| 37 | + * not by inflating this class into api.v1. | |
| 38 | + */ | |
| 39 | +data class UserCreatedEvent( | |
| 40 | + override val eventId: Id<DomainEvent> = Id(UUID.randomUUID()), | |
| 41 | + override val occurredAt: Instant = Instant.now(), | |
| 42 | + val userId: UUID, | |
| 43 | + val username: String, | |
| 44 | +) : DomainEvent { | |
| 45 | + | |
| 46 | + override val aggregateType: String = TOPIC | |
| 47 | + override val aggregateId: String = userId.toString() | |
| 48 | + | |
| 49 | + companion object { | |
| 50 | + /** | |
| 51 | + * Stable topic string for cross-PBC and cross-plug-in subscribers. | |
| 52 | + * Convention: `<pbc-name>.<aggregate>.<verb>`. Adding a new event | |
| 53 | + * topic is non-breaking; renaming one is a major version bump. | |
| 54 | + */ | |
| 55 | + const val TOPIC: String = "identity.user.created" | |
| 56 | + } | |
| 57 | +} | ... | ... |
pbc/pbc-identity/src/test/kotlin/org/vibeerp/pbc/identity/application/UserServiceTest.kt
| ... | ... | @@ -8,12 +8,16 @@ import assertk.assertions.isInstanceOf |
| 8 | 8 | import assertk.assertions.isNotNull |
| 9 | 9 | import assertk.assertions.isNull |
| 10 | 10 | import io.mockk.every |
| 11 | +import io.mockk.justRun | |
| 11 | 12 | import io.mockk.mockk |
| 12 | 13 | import io.mockk.slot |
| 13 | 14 | import io.mockk.verify |
| 14 | 15 | import org.junit.jupiter.api.BeforeEach |
| 15 | 16 | import org.junit.jupiter.api.Test |
| 17 | +import org.vibeerp.api.v1.event.DomainEvent | |
| 18 | +import org.vibeerp.api.v1.event.EventBus | |
| 16 | 19 | import org.vibeerp.pbc.identity.domain.User |
| 20 | +import org.vibeerp.pbc.identity.events.UserCreatedEvent | |
| 17 | 21 | import org.vibeerp.pbc.identity.infrastructure.UserJpaRepository |
| 18 | 22 | import java.util.Optional |
| 19 | 23 | import java.util.UUID |
| ... | ... | @@ -31,12 +35,17 @@ import java.util.UUID |
| 31 | 35 | class UserServiceTest { |
| 32 | 36 | |
| 33 | 37 | private lateinit var users: UserJpaRepository |
| 38 | + private lateinit var eventBus: EventBus | |
| 34 | 39 | private lateinit var service: UserService |
| 35 | 40 | |
| 36 | 41 | @BeforeEach |
| 37 | 42 | fun setUp() { |
| 38 | 43 | users = mockk(relaxed = false) |
| 39 | - service = UserService(users) | |
| 44 | + eventBus = mockk(relaxed = false) | |
| 45 | + // Most tests don't care about publishing; the create-happy-path | |
| 46 | + // test below verifies it explicitly via mockk's `verify`. | |
| 47 | + justRun { eventBus.publish(any<DomainEvent>()) } | |
| 48 | + service = UserService(users, eventBus) | |
| 40 | 49 | } |
| 41 | 50 | |
| 42 | 51 | @Test |
| ... | ... | @@ -78,6 +87,43 @@ class UserServiceTest { |
| 78 | 87 | } |
| 79 | 88 | |
| 80 | 89 | @Test |
| 90 | + fun `create publishes a UserCreatedEvent on the happy path`() { | |
| 91 | + every { users.existsByUsername("zoe") } returns false | |
| 92 | + val savedUser = slot<User>() | |
| 93 | + every { users.save(capture(savedUser)) } answers { savedUser.captured } | |
| 94 | + val publishedEvent = slot<DomainEvent>() | |
| 95 | + justRun { eventBus.publish(capture(publishedEvent)) } | |
| 96 | + | |
| 97 | + service.create( | |
| 98 | + CreateUserCommand( | |
| 99 | + username = "zoe", | |
| 100 | + displayName = "Zoe", | |
| 101 | + email = null, | |
| 102 | + ), | |
| 103 | + ) | |
| 104 | + | |
| 105 | + verify(exactly = 1) { eventBus.publish(any<DomainEvent>()) } | |
| 106 | + assertThat(publishedEvent.captured).isInstanceOf(UserCreatedEvent::class) | |
| 107 | + val event = publishedEvent.captured as UserCreatedEvent | |
| 108 | + assertThat(event.username).isEqualTo("zoe") | |
| 109 | + assertThat(event.userId).isEqualTo(savedUser.captured.id) | |
| 110 | + assertThat(event.aggregateType).isEqualTo(UserCreatedEvent.TOPIC) | |
| 111 | + } | |
| 112 | + | |
| 113 | + @Test | |
| 114 | + fun `create does NOT publish an event when the username is duplicate`() { | |
| 115 | + every { users.existsByUsername("alice") } returns true | |
| 116 | + | |
| 117 | + runCatching { | |
| 118 | + service.create( | |
| 119 | + CreateUserCommand(username = "alice", displayName = "Alice", email = null), | |
| 120 | + ) | |
| 121 | + } | |
| 122 | + | |
| 123 | + verify(exactly = 0) { eventBus.publish(any<DomainEvent>()) } | |
| 124 | + } | |
| 125 | + | |
| 126 | + @Test | |
| 81 | 127 | fun `findByUsername returns null when no user exists`() { |
| 82 | 128 | every { users.findByUsername("ghost") } returns null |
| 83 | 129 | ... | ... |
platform/platform-events/build.gradle.kts
0 → 100644
| 1 | +plugins { | |
| 2 | + alias(libs.plugins.kotlin.jvm) | |
| 3 | + alias(libs.plugins.kotlin.spring) | |
| 4 | + alias(libs.plugins.kotlin.jpa) | |
| 5 | + alias(libs.plugins.spring.dependency.management) | |
| 6 | +} | |
| 7 | + | |
| 8 | +description = "vibe_erp event bus + transactional outbox. INTERNAL." | |
| 9 | + | |
| 10 | +java { | |
| 11 | + toolchain { | |
| 12 | + languageVersion.set(JavaLanguageVersion.of(21)) | |
| 13 | + } | |
| 14 | +} | |
| 15 | + | |
| 16 | +kotlin { | |
| 17 | + jvmToolchain(21) | |
| 18 | + compilerOptions { | |
| 19 | + freeCompilerArgs.add("-Xjsr305=strict") | |
| 20 | + } | |
| 21 | +} | |
| 22 | + | |
| 23 | +allOpen { | |
| 24 | + annotation("jakarta.persistence.Entity") | |
| 25 | + annotation("jakarta.persistence.MappedSuperclass") | |
| 26 | + annotation("jakarta.persistence.Embeddable") | |
| 27 | +} | |
| 28 | + | |
| 29 | +dependencies { | |
| 30 | + api(project(":api:api-v1")) | |
| 31 | + api(project(":platform:platform-persistence")) // outbox row writes use the audit base + JPA | |
| 32 | + | |
| 33 | + implementation(libs.kotlin.stdlib) | |
| 34 | + implementation(libs.kotlin.reflect) | |
| 35 | + implementation(libs.jackson.module.kotlin) | |
| 36 | + implementation(libs.jackson.datatype.jsr310) // for java.time.Instant in event payloads | |
| 37 | + | |
| 38 | + implementation(libs.spring.boot.starter) | |
| 39 | + implementation(libs.spring.boot.starter.data.jpa) | |
| 40 | + | |
| 41 | + testImplementation(libs.spring.boot.starter.test) | |
| 42 | + testImplementation(libs.junit.jupiter) | |
| 43 | + testImplementation(libs.assertk) | |
| 44 | + testImplementation(libs.mockk) | |
| 45 | +} | |
| 46 | + | |
| 47 | +tasks.test { | |
| 48 | + useJUnitPlatform() | |
| 49 | +} | ... | ... |
platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/EventBusConfiguration.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events | |
| 2 | + | |
| 3 | +import org.springframework.context.annotation.Configuration | |
| 4 | +import org.springframework.scheduling.annotation.EnableScheduling | |
| 5 | + | |
| 6 | +/** | |
| 7 | + * Marker configuration for the platform-events module. | |
| 8 | + * | |
| 9 | + * Activates Spring's scheduling support so [org.vibeerp.platform.events.outbox.OutboxPoller] | |
| 10 | + * actually runs. The poller's `@Scheduled` annotation is inert until | |
| 11 | + * `@EnableScheduling` is present somewhere in the application context. | |
| 12 | + * | |
| 13 | + * Lives in this module (rather than in `platform-bootstrap`) so the | |
| 14 | + * "events module is on the classpath" condition automatically pulls | |
| 15 | + * in the poller's scheduling — distribution doesn't need to know. | |
| 16 | + */ | |
| 17 | +@Configuration | |
| 18 | +@EnableScheduling | |
| 19 | +class EventBusConfiguration | ... | ... |
platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/audit/EventAuditLogSubscriber.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.audit | |
| 2 | + | |
| 3 | +import jakarta.annotation.PostConstruct | |
| 4 | +import org.slf4j.LoggerFactory | |
| 5 | +import org.springframework.stereotype.Component | |
| 6 | +import org.vibeerp.api.v1.event.EventBus | |
| 7 | +import org.vibeerp.api.v1.event.EventListener | |
| 8 | +import org.vibeerp.platform.events.bus.EventBusImpl | |
| 9 | + | |
| 10 | +/** | |
| 11 | + * Logs every domain event the bus publishes, at INFO level. | |
| 12 | + * | |
| 13 | + * **Purpose in v0.5:** prove that the event bus works end-to-end. | |
| 14 | + * Without a real subscriber, "publish" looks suspiciously like "no-op | |
| 15 | + * to a database table" — this component closes the demo loop. The | |
| 16 | + * v0.5 smoke test asserts that creating a User via REST produces | |
| 17 | + * exactly one log line from this subscriber AND a row in | |
| 18 | + * `platform__event_outbox` that flips to DISPATCHED inside 10 seconds. | |
| 19 | + * | |
| 20 | + * **Future direction:** this is the natural place to plug in a real | |
| 21 | + * audit log writer (P3.x), an OpenTelemetry trace exporter, or the | |
| 22 | + * MCP event mirror that lets AI agents observe what just happened. | |
| 23 | + * The class will probably get renamed `RawEventAuditWriter` and start | |
| 24 | + * persisting to `platform__audit` rather than just logging — but the | |
| 25 | + * subscription pattern (wildcard topic, all events) stays the same. | |
| 26 | + * | |
| 27 | + * Subscribed via [EventBusImpl.subscribeToAll], which is the | |
| 28 | + * platform-internal helper for the wildcard `**` topic. Plug-ins | |
| 29 | + * cannot use the wildcard themselves because it would let one | |
| 30 | + * misbehaving plug-in observe every other plug-in's events — that's | |
| 31 | + * a deliberate restriction. | |
| 32 | + */ | |
| 33 | +@Component | |
| 34 | +class EventAuditLogSubscriber( | |
| 35 | + private val eventBus: EventBus, | |
| 36 | +) { | |
| 37 | + | |
| 38 | + private val log = LoggerFactory.getLogger(EventAuditLogSubscriber::class.java) | |
| 39 | + | |
| 40 | + @PostConstruct | |
| 41 | + fun subscribe() { | |
| 42 | + // Cast to the impl on purpose: the wildcard subscription is a | |
| 43 | + // platform-internal capability, not part of api.v1. | |
| 44 | + val impl = eventBus as? EventBusImpl | |
| 45 | + ?: error( | |
| 46 | + "EventAuditLogSubscriber requires EventBusImpl; got ${eventBus.javaClass.name}. " + | |
| 47 | + "Either platform-events autoconfiguration is broken or someone replaced the bean." | |
| 48 | + ) | |
| 49 | + impl.subscribeToAll( | |
| 50 | + EventListener { event -> | |
| 51 | + log.info( | |
| 52 | + "[event-audit] {} id={} aggregate={} occurredAt={}", | |
| 53 | + event.aggregateType, | |
| 54 | + event.eventId, | |
| 55 | + event.aggregateId, | |
| 56 | + event.occurredAt, | |
| 57 | + ) | |
| 58 | + }, | |
| 59 | + ) | |
| 60 | + log.info("EventAuditLogSubscriber subscribed to ** (wildcard)") | |
| 61 | + } | |
| 62 | +} | ... | ... |
platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/EventBusImpl.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.bus | |
| 2 | + | |
| 3 | +import com.fasterxml.jackson.databind.ObjectMapper | |
| 4 | +import org.slf4j.LoggerFactory | |
| 5 | +import org.springframework.stereotype.Component | |
| 6 | +import org.springframework.transaction.annotation.Propagation | |
| 7 | +import org.springframework.transaction.annotation.Transactional | |
| 8 | +import org.vibeerp.api.v1.event.DomainEvent | |
| 9 | +import org.vibeerp.api.v1.event.EventBus | |
| 10 | +import org.vibeerp.api.v1.event.EventListener | |
| 11 | +import org.vibeerp.platform.events.outbox.EventOutboxEntry | |
| 12 | +import org.vibeerp.platform.events.outbox.EventOutboxRepository | |
| 13 | + | |
| 14 | +/** | |
| 15 | + * The framework's [EventBus] implementation. | |
| 16 | + * | |
| 17 | + * Two responsibilities, both described in the architecture spec | |
| 18 | + * section 9 ("Cross-cutting concerns — events"): | |
| 19 | + * | |
| 20 | + * 1. **Synchronous in-process delivery** to every subscriber that | |
| 21 | + * matches the event's class or topic. Happens BEFORE the | |
| 22 | + * transaction commits, inside the caller's transaction, so a | |
| 23 | + * listener that throws can roll the publisher back. (This is the | |
| 24 | + * v0.5 contract; a future "async" overload may add deferred | |
| 25 | + * delivery, but the synchronous path is the safe default.) | |
| 26 | + * | |
| 27 | + * 2. **Transactional outbox write.** The same call also persists an | |
| 28 | + * [EventOutboxEntry] row to `platform__event_outbox` in the SAME | |
| 29 | + * database transaction as the publisher's domain change. If the | |
| 30 | + * transaction commits, the row is dispatchable; if it rolls back, | |
| 31 | + * the row never exists. This is the property that lets us add a | |
| 32 | + * Kafka/NATS bridge later WITHOUT touching any PBC code: the | |
| 33 | + * bridge just polls the same table. | |
| 34 | + * | |
| 35 | + * Why both at the same time: synchronous in-process delivery is what | |
| 36 | + * core PBCs need today (e.g. "on UserCreated, send a welcome email"), | |
| 37 | + * and the outbox is what tomorrow's distributed deployment needs. | |
| 38 | + * Doing them together in `publish` means every event is exactly-once | |
| 39 | + * persisted and at-least-once delivered to in-process listeners, | |
| 40 | + * which is the standard transactional-outbox guarantee. | |
| 41 | + * | |
| 42 | + * **Failure modes:** | |
| 43 | + * • A listener throws → the exception is recorded but does NOT | |
| 44 | + * abort other listeners. The outbox row is still written. The | |
| 45 | + * publisher's transaction is unaffected unless the listener's | |
| 46 | + * exception bubbles all the way out (which we prevent here). | |
| 47 | + * • The outbox INSERT throws (DB down, constraint violation) → | |
| 48 | + * the publisher's transaction rolls back, listeners that already | |
| 49 | + * ran see no commit, the publish appears never to have happened. | |
| 50 | + * This is the desired exactly-once semantic. | |
| 51 | + */ | |
| 52 | +@Component | |
| 53 | +class EventBusImpl( | |
| 54 | + private val outbox: EventOutboxRepository, | |
| 55 | + private val objectMapper: ObjectMapper, | |
| 56 | +) : EventBus { | |
| 57 | + | |
| 58 | + private val log = LoggerFactory.getLogger(EventBusImpl::class.java) | |
| 59 | + | |
| 60 | + private val registry = ListenerRegistry() | |
| 61 | + | |
| 62 | + /** | |
| 63 | + * Publish an event. | |
| 64 | + * | |
| 65 | + * Marked `Propagation.MANDATORY` so the bus refuses to publish | |
| 66 | + * outside an existing transaction. That's intentional: a publish | |
| 67 | + * with no transaction would write the outbox row immediately and | |
| 68 | + * could escape on caller rollback — exactly the leak the outbox | |
| 69 | + * pattern is supposed to prevent. Callers that legitimately need | |
| 70 | + * to publish from outside a transaction (background jobs, | |
| 71 | + * one-shot scripts) wrap the call in a TransactionTemplate or | |
| 72 | + * an `@Transactional` boundary of their own. | |
| 73 | + */ | |
| 74 | + @Transactional(propagation = Propagation.MANDATORY) | |
| 75 | + override fun publish(event: DomainEvent) { | |
| 76 | + // 1. Persist the outbox row in the same transaction as the | |
| 77 | + // caller. This is the durability anchor. | |
| 78 | + val payload = try { | |
| 79 | + objectMapper.writeValueAsString(event) | |
| 80 | + } catch (ex: Throwable) { | |
| 81 | + // If we cannot serialize the event we cannot durably | |
| 82 | + // record it. Throw so the caller's transaction rolls back | |
| 83 | + // — better to fail loudly than to publish a half-event. | |
| 84 | + throw IllegalStateException( | |
| 85 | + "EventBus failed to serialize event ${event.javaClass.name}: ${ex.message}", | |
| 86 | + ex, | |
| 87 | + ) | |
| 88 | + } | |
| 89 | + | |
| 90 | + outbox.save( | |
| 91 | + EventOutboxEntry( | |
| 92 | + eventId = event.eventId.value, | |
| 93 | + topic = event.aggregateType, | |
| 94 | + aggregateType = event.aggregateType, | |
| 95 | + aggregateId = event.aggregateId, | |
| 96 | + payload = payload, | |
| 97 | + occurredAt = event.occurredAt, | |
| 98 | + ), | |
| 99 | + ) | |
| 100 | + | |
| 101 | + // 2. Synchronous in-process delivery. Listener exceptions are | |
| 102 | + // logged but don't propagate; the outbox row is what | |
| 103 | + // guarantees the event isn't lost. | |
| 104 | + registry.deliver(event) { ex -> | |
| 105 | + log.warn( | |
| 106 | + "EventBus listener for {} (id={}) threw {}: {}", | |
| 107 | + event.aggregateType, event.eventId, ex.javaClass.simpleName, ex.message, ex, | |
| 108 | + ) | |
| 109 | + } | |
| 110 | + } | |
| 111 | + | |
| 112 | + override fun <E : DomainEvent> subscribe( | |
| 113 | + eventType: Class<E>, | |
| 114 | + listener: EventListener<E>, | |
| 115 | + ): EventBus.Subscription = | |
| 116 | + registry.registerByClass(eventType, listener) | |
| 117 | + | |
| 118 | + override fun subscribe( | |
| 119 | + topic: String, | |
| 120 | + listener: EventListener<DomainEvent>, | |
| 121 | + ): EventBus.Subscription = | |
| 122 | + registry.registerByTopic(topic, listener) | |
| 123 | + | |
| 124 | + /** | |
| 125 | + * Internal helper for tests and the audit subscriber to wire a | |
| 126 | + * raw `EventListener<DomainEvent>` against the wildcard topic | |
| 127 | + * without going through the public api.v1 surface. | |
| 128 | + */ | |
| 129 | + internal fun subscribeToAll(listener: EventListener<DomainEvent>): EventBus.Subscription = | |
| 130 | + registry.registerByTopic(ListenerRegistry.WILDCARD, listener) | |
| 131 | +} | ... | ... |
platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/ListenerRegistry.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.bus | |
| 2 | + | |
| 3 | +import org.vibeerp.api.v1.event.DomainEvent | |
| 4 | +import org.vibeerp.api.v1.event.EventBus | |
| 5 | +import org.vibeerp.api.v1.event.EventListener | |
| 6 | +import org.vibeerp.api.v1.plugin.PluginContext | |
| 7 | +import java.util.concurrent.CopyOnWriteArrayList | |
| 8 | + | |
| 9 | +/** | |
| 10 | + * In-memory holder of every active subscription on the event bus. | |
| 11 | + * | |
| 12 | + * Two indexing strategies side-by-side, both required by the api.v1 | |
| 13 | + * `EventBus` interface (see [EventBus.subscribe] overloads): | |
| 14 | + * | |
| 15 | + * 1. **By event class** — for in-process subscribers that know the | |
| 16 | + * concrete event type at compile time. Pattern matched with | |
| 17 | + * `Class.isInstance(...)` so subscribing to a base type catches | |
| 18 | + * all subtypes. | |
| 19 | + * | |
| 20 | + * 2. **By topic string** — for cross-classloader subscribers and for | |
| 21 | + * cases where the event class isn't shared (e.g. a plug-in | |
| 22 | + * subscribing to a PBC's internal event class without being able | |
| 23 | + * to import it). The topic is matched against | |
| 24 | + * [DomainEvent.aggregateType] for exact equality, plus the special | |
| 25 | + * `**` topic which catches every event. | |
| 26 | + * | |
| 27 | + * Why a [CopyOnWriteArrayList]: subscriptions mutate rarely (at plug-in | |
| 28 | + * start, occasionally) but iterate on every publish. CoW gives us | |
| 29 | + * lock-free iteration and avoids `ConcurrentModificationException` | |
| 30 | + * when a listener subscribes from inside another listener — which is | |
| 31 | + * legal even if it's a smell. | |
| 32 | + * | |
| 33 | + * **The `**` wildcard** is intentionally narrow: it matches everything, | |
| 34 | + * full stop. There is no `identity.user.*` style globbing in v0.5. | |
| 35 | + * That keeps the dispatch logic O(1) per registration check and | |
| 36 | + * avoids surprising precedence rules; richer matching can come later | |
| 37 | + * non-breakingly because it would only widen what counts as a match. | |
| 38 | + */ | |
| 39 | +internal class ListenerRegistry { | |
| 40 | + | |
| 41 | + private val byClass = CopyOnWriteArrayList<ClassRegistration<*>>() | |
| 42 | + private val byTopic = CopyOnWriteArrayList<TopicRegistration>() | |
| 43 | + | |
| 44 | + fun <E : DomainEvent> registerByClass( | |
| 45 | + eventType: Class<E>, | |
| 46 | + listener: EventListener<E>, | |
| 47 | + ): EventBus.Subscription { | |
| 48 | + val reg = ClassRegistration(eventType, listener) | |
| 49 | + byClass += reg | |
| 50 | + return SubscriptionHandle { byClass.remove(reg) } | |
| 51 | + } | |
| 52 | + | |
| 53 | + fun registerByTopic( | |
| 54 | + topic: String, | |
| 55 | + listener: EventListener<DomainEvent>, | |
| 56 | + ): EventBus.Subscription { | |
| 57 | + require(topic.isNotBlank()) { "topic must not be blank" } | |
| 58 | + val reg = TopicRegistration(topic, listener) | |
| 59 | + byTopic += reg | |
| 60 | + return SubscriptionHandle { byTopic.remove(reg) } | |
| 61 | + } | |
| 62 | + | |
| 63 | + /** | |
| 64 | + * Deliver [event] to every matching listener. Any listener that | |
| 65 | + * throws is logged by the caller (the bus) and treated as a failed | |
| 66 | + * dispatch attempt for outbox bookkeeping; we do NOT swallow | |
| 67 | + * exceptions silently here — the caller decides retry policy. | |
| 68 | + */ | |
| 69 | + fun deliver(event: DomainEvent, onListenerError: (Throwable) -> Unit) { | |
| 70 | + // Class-keyed listeners. | |
| 71 | + for (reg in byClass) { | |
| 72 | + if (reg.eventType.isInstance(event)) { | |
| 73 | + @Suppress("UNCHECKED_CAST") | |
| 74 | + val typed = reg.listener as EventListener<DomainEvent> | |
| 75 | + try { | |
| 76 | + typed.handle(event) | |
| 77 | + } catch (ex: Throwable) { | |
| 78 | + onListenerError(ex) | |
| 79 | + } | |
| 80 | + } | |
| 81 | + } | |
| 82 | + // Topic-keyed listeners. | |
| 83 | + for (reg in byTopic) { | |
| 84 | + if (reg.matches(event.aggregateType)) { | |
| 85 | + try { | |
| 86 | + reg.listener.handle(event) | |
| 87 | + } catch (ex: Throwable) { | |
| 88 | + onListenerError(ex) | |
| 89 | + } | |
| 90 | + } | |
| 91 | + } | |
| 92 | + } | |
| 93 | + | |
| 94 | + /** Used by tests only. */ | |
| 95 | + fun size(): Int = byClass.size + byTopic.size | |
| 96 | + | |
| 97 | + private data class ClassRegistration<E : DomainEvent>( | |
| 98 | + val eventType: Class<E>, | |
| 99 | + val listener: EventListener<E>, | |
| 100 | + ) | |
| 101 | + | |
| 102 | + private data class TopicRegistration( | |
| 103 | + val topic: String, | |
| 104 | + val listener: EventListener<DomainEvent>, | |
| 105 | + ) { | |
| 106 | + fun matches(eventTopic: String): Boolean = | |
| 107 | + topic == WILDCARD || topic == eventTopic | |
| 108 | + } | |
| 109 | + | |
| 110 | + private class SubscriptionHandle(private val onClose: () -> Unit) : EventBus.Subscription { | |
| 111 | + @Volatile private var closed = false | |
| 112 | + | |
| 113 | + override fun close() { | |
| 114 | + if (closed) return | |
| 115 | + closed = true | |
| 116 | + onClose() | |
| 117 | + } | |
| 118 | + } | |
| 119 | + | |
| 120 | + companion object { | |
| 121 | + /** | |
| 122 | + * Wildcard topic that subscribes to every event ever published. | |
| 123 | + * Used by audit logging, the future MCP event mirror, and | |
| 124 | + * diagnostic tooling. Plug-ins should NOT use this — they | |
| 125 | + * should subscribe to the specific topics they care about. | |
| 126 | + */ | |
| 127 | + const val WILDCARD: String = "**" | |
| 128 | + } | |
| 129 | +} | ... | ... |
platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxEntry.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.outbox | |
| 2 | + | |
| 3 | +import jakarta.persistence.Column | |
| 4 | +import jakarta.persistence.Entity | |
| 5 | +import jakarta.persistence.EnumType | |
| 6 | +import jakarta.persistence.Enumerated | |
| 7 | +import jakarta.persistence.Id | |
| 8 | +import jakarta.persistence.Table | |
| 9 | +import jakarta.persistence.Version | |
| 10 | +import org.hibernate.annotations.JdbcTypeCode | |
| 11 | +import org.hibernate.type.SqlTypes | |
| 12 | +import java.time.Instant | |
| 13 | +import java.util.UUID | |
| 14 | + | |
| 15 | +/** | |
| 16 | + * One row in `platform__event_outbox`. | |
| 17 | + * | |
| 18 | + * **Why an entity instead of a row class:** the outbox is mutated by the | |
| 19 | + * poller (status transitions, attempt counts, last_error) under the same | |
| 20 | + * Spring transaction model as every other PBC. Going through Hibernate | |
| 21 | + * keeps optimistic locking, audit timestamps, and `@PreUpdate` consistent | |
| 22 | + * with the rest of the framework. | |
| 23 | + * | |
| 24 | + * **Why this entity does NOT extend `AuditedJpaEntity`:** outbox rows are | |
| 25 | + * an internal framework concern and `created_by`/`updated_by` make no | |
| 26 | + * sense — the rows are written by the framework itself, not by a user. | |
| 27 | + * The audit listener would otherwise insist on a `PrincipalContext` | |
| 28 | + * binding and fail when an event is published from a code path that | |
| 29 | + * doesn't have one (background jobs, retry workers, ...). | |
| 30 | + */ | |
| 31 | +@Entity | |
| 32 | +@Table(name = "platform__event_outbox") | |
| 33 | +class EventOutboxEntry( | |
| 34 | + eventId: UUID, | |
| 35 | + topic: String, | |
| 36 | + aggregateType: String, | |
| 37 | + aggregateId: String, | |
| 38 | + payload: String, | |
| 39 | + occurredAt: Instant, | |
| 40 | +) { | |
| 41 | + | |
| 42 | + @Id | |
| 43 | + @Column(name = "id", updatable = false, nullable = false, columnDefinition = "uuid") | |
| 44 | + var id: UUID = UUID.randomUUID() | |
| 45 | + | |
| 46 | + @Column(name = "event_id", updatable = false, nullable = false, columnDefinition = "uuid") | |
| 47 | + var eventId: UUID = eventId | |
| 48 | + | |
| 49 | + @Column(name = "topic", nullable = false, length = 256) | |
| 50 | + var topic: String = topic | |
| 51 | + | |
| 52 | + @Column(name = "aggregate_type", nullable = false, length = 128) | |
| 53 | + var aggregateType: String = aggregateType | |
| 54 | + | |
| 55 | + @Column(name = "aggregate_id", nullable = false, length = 256) | |
| 56 | + var aggregateId: String = aggregateId | |
| 57 | + | |
| 58 | + @Column(name = "payload", nullable = false, columnDefinition = "jsonb") | |
| 59 | + @JdbcTypeCode(SqlTypes.JSON) | |
| 60 | + var payload: String = payload | |
| 61 | + | |
| 62 | + @Enumerated(EnumType.STRING) | |
| 63 | + @Column(name = "status", nullable = false, length = 16) | |
| 64 | + var status: OutboxStatus = OutboxStatus.PENDING | |
| 65 | + | |
| 66 | + @Column(name = "attempts", nullable = false) | |
| 67 | + var attempts: Int = 0 | |
| 68 | + | |
| 69 | + @Column(name = "last_error", nullable = true) | |
| 70 | + var lastError: String? = null | |
| 71 | + | |
| 72 | + @Column(name = "occurred_at", nullable = false) | |
| 73 | + var occurredAt: Instant = occurredAt | |
| 74 | + | |
| 75 | + @Column(name = "created_at", nullable = false) | |
| 76 | + var createdAt: Instant = Instant.now() | |
| 77 | + | |
| 78 | + @Column(name = "dispatched_at", nullable = true) | |
| 79 | + var dispatchedAt: Instant? = null | |
| 80 | + | |
| 81 | + @Version | |
| 82 | + @Column(name = "version", nullable = false) | |
| 83 | + var version: Long = 0 | |
| 84 | +} | |
| 85 | + | |
| 86 | +/** | |
| 87 | + * The lifecycle of an outbox row. | |
| 88 | + * | |
| 89 | + * Stored as a string in the DB so adding a state later (e.g. `RETRYING`, | |
| 90 | + * `DEAD`) is non-breaking for clients reading the column with raw SQL. | |
| 91 | + */ | |
| 92 | +enum class OutboxStatus { | |
| 93 | + /** Written by publish(); the poller has not picked it up yet. */ | |
| 94 | + PENDING, | |
| 95 | + | |
| 96 | + /** All in-process subscribers handled it without throwing. */ | |
| 97 | + DISPATCHED, | |
| 98 | + | |
| 99 | + /** Delivery failed; will retry until [EventOutboxEntry.attempts] hits the configured cap. */ | |
| 100 | + FAILED, | |
| 101 | +} | ... | ... |
platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxRepository.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.outbox | |
| 2 | + | |
| 3 | +import jakarta.persistence.LockModeType | |
| 4 | +import org.springframework.data.jpa.repository.JpaRepository | |
| 5 | +import org.springframework.data.jpa.repository.Lock | |
| 6 | +import org.springframework.data.jpa.repository.Query | |
| 7 | +import org.springframework.data.jpa.repository.QueryHints | |
| 8 | +import org.springframework.stereotype.Repository | |
| 9 | +import java.util.UUID | |
| 10 | + | |
| 11 | +/** | |
| 12 | + * Spring Data JPA repository for the event outbox. | |
| 13 | + * | |
| 14 | + * The poller's `findPending` query uses pessimistic write locking + | |
| 15 | + * `SKIP LOCKED` so two pollers (today: one; tomorrow: many) never | |
| 16 | + * dispatch the same row twice. The hint is set via `@QueryHints` | |
| 17 | + * because Spring Data has no high-level API for `SKIP LOCKED` and | |
| 18 | + * we need it to be portable to Postgres specifically. | |
| 19 | + */ | |
| 20 | +@Repository | |
| 21 | +interface EventOutboxRepository : JpaRepository<EventOutboxEntry, UUID> { | |
| 22 | + | |
| 23 | + fun findByEventId(eventId: UUID): EventOutboxEntry? | |
| 24 | + | |
| 25 | + /** | |
| 26 | + * Pick the next batch of pending rows for dispatch. | |
| 27 | + * | |
| 28 | + * `SELECT FOR UPDATE SKIP LOCKED` semantics: rows already locked by | |
| 29 | + * a concurrent poller are skipped silently, so multiple pollers | |
| 30 | + * can run side-by-side without coordination. The transaction must | |
| 31 | + * be open for the lock to apply — that is the caller's job. | |
| 32 | + */ | |
| 33 | + @Lock(LockModeType.PESSIMISTIC_WRITE) | |
| 34 | + @QueryHints(jakarta.persistence.QueryHint(name = "jakarta.persistence.lock.timeout", value = "-2")) | |
| 35 | + @Query( | |
| 36 | + """ | |
| 37 | + SELECT e FROM EventOutboxEntry e | |
| 38 | + WHERE e.status = org.vibeerp.platform.events.outbox.OutboxStatus.PENDING | |
| 39 | + OR e.status = org.vibeerp.platform.events.outbox.OutboxStatus.FAILED | |
| 40 | + ORDER BY e.createdAt ASC | |
| 41 | + """ | |
| 42 | + ) | |
| 43 | + fun findPendingForDispatch(): List<EventOutboxEntry> | |
| 44 | +} | ... | ... |
platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/OutboxPoller.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.outbox | |
| 2 | + | |
| 3 | +import org.slf4j.LoggerFactory | |
| 4 | +import org.springframework.scheduling.annotation.Scheduled | |
| 5 | +import org.springframework.stereotype.Component | |
| 6 | +import org.springframework.transaction.annotation.Transactional | |
| 7 | +import java.time.Instant | |
| 8 | + | |
| 9 | +/** | |
| 10 | + * Drains the [EventOutboxEntry] table at a fixed interval. | |
| 11 | + * | |
| 12 | + * **What it does in v0.5:** picks up `PENDING` and previously-`FAILED` | |
| 13 | + * rows under a pessimistic write lock, marks them `DISPATCHED`, and | |
| 14 | + * commits. There is no actual external dispatch yet — the in-process | |
| 15 | + * delivery already happened in [org.vibeerp.platform.events.bus.EventBusImpl.publish], | |
| 16 | + * so this poller is the seam where a future Kafka/NATS bridge plugs | |
| 17 | + * in WITHOUT touching the bus, the publishers, or the subscribers. | |
| 18 | + * The seam exists; the dispatcher is a no-op TODO. | |
| 19 | + * | |
| 20 | + * **What it does NOT do (deferred):** | |
| 21 | + * • Forward events to an external broker. Hooks for that land in | |
| 22 | + * P1.7.b alongside the first hosted-mode deployment. | |
| 23 | + * • Backoff on failures. Today: every cycle re-attempts every | |
| 24 | + * `FAILED` row. A real exponential backoff lands when we have | |
| 25 | + * real external dispatchers that can fail. | |
| 26 | + * • Dead-letter queue. Same. | |
| 27 | + * • Sharding across pollers. The `SELECT FOR UPDATE SKIP LOCKED` | |
| 28 | + * query is already shard-friendly; we just don't run more than | |
| 29 | + * one poller today. | |
| 30 | + * | |
| 31 | + * The poll interval is intentionally short (5 seconds) so the smoke | |
| 32 | + * test can observe an event flip from PENDING → DISPATCHED inside | |
| 33 | + * a normal `curl-and-grep` window. Production tuning will live in | |
| 34 | + * `vibeerp.events.outbox.poll-interval-ms` later. | |
| 35 | + */ | |
| 36 | +@Component | |
| 37 | +class OutboxPoller( | |
| 38 | + private val outbox: EventOutboxRepository, | |
| 39 | +) { | |
| 40 | + | |
| 41 | + private val log = LoggerFactory.getLogger(OutboxPoller::class.java) | |
| 42 | + | |
| 43 | + @Scheduled(fixedDelayString = "PT5S", initialDelayString = "PT5S") | |
| 44 | + @Transactional | |
| 45 | + fun drain() { | |
| 46 | + val pending = outbox.findPendingForDispatch() | |
| 47 | + if (pending.isEmpty()) return | |
| 48 | + | |
| 49 | + log.debug("OutboxPoller draining {} pending event(s)", pending.size) | |
| 50 | + for (entry in pending) { | |
| 51 | + try { | |
| 52 | + // v0.5: there is no real external dispatch step, so | |
| 53 | + // every row that is well-formed succeeds immediately. | |
| 54 | + // The future Kafka/NATS bridge replaces this no-op | |
| 55 | + // with a publish call. | |
| 56 | + entry.status = OutboxStatus.DISPATCHED | |
| 57 | + entry.dispatchedAt = Instant.now() | |
| 58 | + entry.lastError = null | |
| 59 | + // Hibernate's dirty checking will UPDATE on commit; no | |
| 60 | + // explicit save needed because the entity is managed. | |
| 61 | + } catch (ex: Throwable) { | |
| 62 | + log.warn( | |
| 63 | + "OutboxPoller failed to dispatch event {} (topic={}): {}", | |
| 64 | + entry.eventId, entry.topic, ex.message, ex, | |
| 65 | + ) | |
| 66 | + entry.status = OutboxStatus.FAILED | |
| 67 | + entry.attempts += 1 | |
| 68 | + entry.lastError = ex.message?.take(MAX_ERROR_LENGTH) | |
| 69 | + } | |
| 70 | + } | |
| 71 | + } | |
| 72 | + | |
| 73 | + private companion object { | |
| 74 | + const val MAX_ERROR_LENGTH = 4000 | |
| 75 | + } | |
| 76 | +} | ... | ... |
platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/bus/EventBusImplTest.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.bus | |
| 2 | + | |
| 3 | +import assertk.assertThat | |
| 4 | +import assertk.assertions.contains | |
| 5 | +import assertk.assertions.hasSize | |
| 6 | +import assertk.assertions.isEqualTo | |
| 7 | +import com.fasterxml.jackson.databind.ObjectMapper | |
| 8 | +import io.mockk.every | |
| 9 | +import io.mockk.mockk | |
| 10 | +import io.mockk.slot | |
| 11 | +import io.mockk.verify | |
| 12 | +import org.junit.jupiter.api.BeforeEach | |
| 13 | +import org.junit.jupiter.api.Test | |
| 14 | +import org.vibeerp.api.v1.core.Id | |
| 15 | +import org.vibeerp.api.v1.event.DomainEvent | |
| 16 | +import org.vibeerp.api.v1.event.EventListener | |
| 17 | +import org.vibeerp.platform.events.outbox.EventOutboxEntry | |
| 18 | +import org.vibeerp.platform.events.outbox.EventOutboxRepository | |
| 19 | +import java.time.Instant | |
| 20 | +import java.util.UUID | |
| 21 | + | |
| 22 | +/** | |
| 23 | + * Pure unit tests for [EventBusImpl]. The outbox repository is mocked | |
| 24 | + * so the test runs in milliseconds without a database. The | |
| 25 | + * `Propagation.MANDATORY` annotation that the production code uses to | |
| 26 | + * refuse no-transaction publishes is NOT enforced here because we | |
| 27 | + * bypass Spring entirely — these tests focus on dispatch logic and | |
| 28 | + * outbox persistence shape. | |
| 29 | + */ | |
| 30 | +class EventBusImplTest { | |
| 31 | + | |
| 32 | + private lateinit var outbox: EventOutboxRepository | |
| 33 | + private lateinit var bus: EventBusImpl | |
| 34 | + | |
| 35 | + @BeforeEach | |
| 36 | + fun setUp() { | |
| 37 | + outbox = mockk() | |
| 38 | + every { outbox.save(any<EventOutboxEntry>()) } answers { firstArg() } | |
| 39 | + // ObjectMapper().findAndRegisterModules() picks up jackson-module-kotlin | |
| 40 | + // AND jackson-datatype-jsr310 from the classpath, matching what | |
| 41 | + // Spring Boot's auto-configured ObjectMapper does in production. | |
| 42 | + // A bare ObjectMapper() can't serialize java.time.Instant. | |
| 43 | + bus = EventBusImpl(outbox, ObjectMapper().findAndRegisterModules()) | |
| 44 | + } | |
| 45 | + | |
| 46 | + private fun event(topic: String = "identity.user.created"): DomainEvent = | |
| 47 | + TestEvent( | |
| 48 | + eventId = Id(UUID.randomUUID()), | |
| 49 | + occurredAt = Instant.now(), | |
| 50 | + aggregateType = topic, | |
| 51 | + aggregateId = "u-1", | |
| 52 | + ) | |
| 53 | + | |
| 54 | + // ─── publish writes the outbox row ───────────────────────────── | |
| 55 | + | |
| 56 | + @Test | |
| 57 | + fun `publish writes one outbox row containing the event payload`() { | |
| 58 | + val saved = slot<EventOutboxEntry>() | |
| 59 | + every { outbox.save(capture(saved)) } answers { saved.captured } | |
| 60 | + | |
| 61 | + val e = event() | |
| 62 | + bus.publish(e) | |
| 63 | + | |
| 64 | + verify(exactly = 1) { outbox.save(any()) } | |
| 65 | + assertThat(saved.captured.topic).isEqualTo("identity.user.created") | |
| 66 | + assertThat(saved.captured.eventId).isEqualTo(e.eventId.value) | |
| 67 | + assertThat(saved.captured.payload).contains("identity.user.created") | |
| 68 | + } | |
| 69 | + | |
| 70 | + @Test | |
| 71 | + fun `publish with no subscribers still writes the outbox row`() { | |
| 72 | + bus.publish(event()) | |
| 73 | + verify(exactly = 1) { outbox.save(any()) } | |
| 74 | + } | |
| 75 | + | |
| 76 | + // ─── topic-based subscribers ─────────────────────────────────── | |
| 77 | + | |
| 78 | + @Test | |
| 79 | + fun `topic subscriber receives matching events`() { | |
| 80 | + val received = mutableListOf<DomainEvent>() | |
| 81 | + bus.subscribe("identity.user.created", EventListener { received += it }) | |
| 82 | + | |
| 83 | + bus.publish(event("identity.user.created")) | |
| 84 | + | |
| 85 | + assertThat(received).hasSize(1) | |
| 86 | + } | |
| 87 | + | |
| 88 | + @Test | |
| 89 | + fun `topic subscriber does not receive non-matching events`() { | |
| 90 | + val received = mutableListOf<DomainEvent>() | |
| 91 | + bus.subscribe("orders.order.created", EventListener { received += it }) | |
| 92 | + | |
| 93 | + bus.publish(event("identity.user.created")) | |
| 94 | + | |
| 95 | + assertThat(received).hasSize(0) | |
| 96 | + } | |
| 97 | + | |
| 98 | + // ─── class-based subscribers ─────────────────────────────────── | |
| 99 | + | |
| 100 | + @Test | |
| 101 | + fun `class subscriber receives instances of the registered class`() { | |
| 102 | + val received = mutableListOf<TestEvent>() | |
| 103 | + bus.subscribe(TestEvent::class.java, EventListener { received += it }) | |
| 104 | + | |
| 105 | + bus.publish(event()) | |
| 106 | + | |
| 107 | + assertThat(received).hasSize(1) | |
| 108 | + } | |
| 109 | + | |
| 110 | + // ─── close() unsubscribes ────────────────────────────────────── | |
| 111 | + | |
| 112 | + @Test | |
| 113 | + fun `closing a topic subscription stops further deliveries`() { | |
| 114 | + val received = mutableListOf<DomainEvent>() | |
| 115 | + val sub = bus.subscribe("identity.user.created", EventListener { received += it }) | |
| 116 | + | |
| 117 | + bus.publish(event()) | |
| 118 | + sub.close() | |
| 119 | + bus.publish(event()) | |
| 120 | + | |
| 121 | + assertThat(received).hasSize(1) | |
| 122 | + } | |
| 123 | + | |
| 124 | + @Test | |
| 125 | + fun `closing a class subscription stops further deliveries`() { | |
| 126 | + val received = mutableListOf<TestEvent>() | |
| 127 | + val sub = bus.subscribe(TestEvent::class.java, EventListener { received += it }) | |
| 128 | + | |
| 129 | + bus.publish(event()) | |
| 130 | + sub.close() | |
| 131 | + bus.publish(event()) | |
| 132 | + | |
| 133 | + assertThat(received).hasSize(1) | |
| 134 | + } | |
| 135 | + | |
| 136 | + @Test | |
| 137 | + fun `wildcard subscriber receives every event`() { | |
| 138 | + val received = mutableListOf<DomainEvent>() | |
| 139 | + bus.subscribeToAll(EventListener { received += it }) | |
| 140 | + | |
| 141 | + bus.publish(event("identity.user.created")) | |
| 142 | + bus.publish(event("orders.order.created")) | |
| 143 | + bus.publish(event("plugin.printingshop.plate.approved")) | |
| 144 | + | |
| 145 | + assertThat(received).hasSize(3) | |
| 146 | + } | |
| 147 | + | |
| 148 | + // ─── listener exception is contained ─────────────────────────── | |
| 149 | + | |
| 150 | + @Test | |
| 151 | + fun `a thrown listener does not block other listeners`() { | |
| 152 | + val good = mutableListOf<DomainEvent>() | |
| 153 | + bus.subscribe("identity.user.created", EventListener { error("boom") }) | |
| 154 | + bus.subscribe("identity.user.created", EventListener { good += it }) | |
| 155 | + | |
| 156 | + bus.publish(event()) | |
| 157 | + | |
| 158 | + assertThat(good).hasSize(1) | |
| 159 | + } | |
| 160 | +} | |
| 161 | + | |
| 162 | +/** Local concrete subtype of [DomainEvent] for these tests. */ | |
| 163 | +private data class TestEvent( | |
| 164 | + override val eventId: Id<DomainEvent>, | |
| 165 | + override val occurredAt: Instant, | |
| 166 | + override val aggregateType: String, | |
| 167 | + override val aggregateId: String, | |
| 168 | +) : DomainEvent | ... | ... |
platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/outbox/OutboxPollerTest.kt
0 → 100644
| 1 | +package org.vibeerp.platform.events.outbox | |
| 2 | + | |
| 3 | +import assertk.assertThat | |
| 4 | +import assertk.assertions.isEqualTo | |
| 5 | +import assertk.assertions.isGreaterThanOrEqualTo | |
| 6 | +import assertk.assertions.isNotNull | |
| 7 | +import io.mockk.every | |
| 8 | +import io.mockk.mockk | |
| 9 | +import io.mockk.verify | |
| 10 | +import org.junit.jupiter.api.BeforeEach | |
| 11 | +import org.junit.jupiter.api.Test | |
| 12 | +import java.time.Instant | |
| 13 | +import java.util.UUID | |
| 14 | + | |
| 15 | +class OutboxPollerTest { | |
| 16 | + | |
| 17 | + private lateinit var repository: EventOutboxRepository | |
| 18 | + private lateinit var poller: OutboxPoller | |
| 19 | + | |
| 20 | + @BeforeEach | |
| 21 | + fun setUp() { | |
| 22 | + repository = mockk() | |
| 23 | + poller = OutboxPoller(repository) | |
| 24 | + } | |
| 25 | + | |
| 26 | + private fun pendingEntry(): EventOutboxEntry = | |
| 27 | + EventOutboxEntry( | |
| 28 | + eventId = UUID.randomUUID(), | |
| 29 | + topic = "identity.user.created", | |
| 30 | + aggregateType = "identity.user.created", | |
| 31 | + aggregateId = "u-1", | |
| 32 | + payload = "{}", | |
| 33 | + occurredAt = Instant.now(), | |
| 34 | + ) | |
| 35 | + | |
| 36 | + @Test | |
| 37 | + fun `drain marks every pending row as DISPATCHED`() { | |
| 38 | + val entry = pendingEntry() | |
| 39 | + every { repository.findPendingForDispatch() } returns listOf(entry) | |
| 40 | + | |
| 41 | + poller.drain() | |
| 42 | + | |
| 43 | + assertThat(entry.status).isEqualTo(OutboxStatus.DISPATCHED) | |
| 44 | + assertThat(entry.dispatchedAt).isNotNull() | |
| 45 | + assertThat(entry.lastError).isEqualTo(null) | |
| 46 | + } | |
| 47 | + | |
| 48 | + @Test | |
| 49 | + fun `drain with no rows is a no-op`() { | |
| 50 | + every { repository.findPendingForDispatch() } returns emptyList() | |
| 51 | + poller.drain() | |
| 52 | + verify(exactly = 1) { repository.findPendingForDispatch() } | |
| 53 | + } | |
| 54 | + | |
| 55 | + @Test | |
| 56 | + fun `drain processes multiple rows in one cycle`() { | |
| 57 | + val entries = (1..5).map { pendingEntry() } | |
| 58 | + every { repository.findPendingForDispatch() } returns entries | |
| 59 | + | |
| 60 | + poller.drain() | |
| 61 | + | |
| 62 | + entries.forEach { assertThat(it.status).isEqualTo(OutboxStatus.DISPATCHED) } | |
| 63 | + } | |
| 64 | + | |
| 65 | + @Test | |
| 66 | + fun `entry attempts and lastError can be incremented manually for retry tests`() { | |
| 67 | + // Sanity check on the entity itself: if a future poller version | |
| 68 | + // marks rows FAILED on dispatcher exception, the fields exist. | |
| 69 | + val e = pendingEntry() | |
| 70 | + e.status = OutboxStatus.FAILED | |
| 71 | + e.attempts += 1 | |
| 72 | + e.lastError = "broker unreachable" | |
| 73 | + | |
| 74 | + assertThat(e.status).isEqualTo(OutboxStatus.FAILED) | |
| 75 | + assertThat(e.attempts).isGreaterThanOrEqualTo(1) | |
| 76 | + assertThat(e.lastError).isEqualTo("broker unreachable") | |
| 77 | + } | |
| 78 | +} | ... | ... |
platform/platform-plugins/build.gradle.kts
| ... | ... | @@ -24,6 +24,7 @@ dependencies { |
| 24 | 24 | implementation(libs.kotlin.stdlib) |
| 25 | 25 | implementation(libs.kotlin.reflect) |
| 26 | 26 | implementation(libs.jackson.module.kotlin) |
| 27 | + implementation(project(":platform:platform-events")) // for the real EventBus injected into PluginContext | |
| 27 | 28 | |
| 28 | 29 | implementation(libs.spring.boot.starter) |
| 29 | 30 | implementation(libs.spring.boot.starter.web) // for @RestController on the dispatcher | ... | ... |
platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/DefaultPluginContext.kt
| ... | ... | @@ -37,18 +37,24 @@ internal class DefaultPluginContext( |
| 37 | 37 | pluginId: String, |
| 38 | 38 | sharedRegistrar: PluginEndpointRegistrar, |
| 39 | 39 | delegateLogger: Logger, |
| 40 | + private val sharedEventBus: EventBus, | |
| 40 | 41 | ) : PluginContext { |
| 41 | 42 | |
| 42 | 43 | override val logger: PluginLogger = Slf4jPluginLogger(pluginId, delegateLogger) |
| 43 | 44 | |
| 44 | 45 | override val endpoints: PluginEndpointRegistrar = sharedRegistrar |
| 45 | 46 | |
| 46 | - // ─── Not yet implemented ─────────────────────────────────────── | |
| 47 | + /** | |
| 48 | + * Real event bus, wired in P1.7. Plug-ins can publish and subscribe; | |
| 49 | + * subscriptions are NOT automatically scoped to the plug-in's | |
| 50 | + * lifecycle in v0.5 — a plug-in that wants to drop a subscription | |
| 51 | + * on shutdown must call `subscription.close()` itself in its | |
| 52 | + * `stop()` method. (Auto-scoping lands when per-plug-in Spring | |
| 53 | + * child contexts ship.) | |
| 54 | + */ | |
| 55 | + override val eventBus: EventBus = sharedEventBus | |
| 47 | 56 | |
| 48 | - override val eventBus: EventBus | |
| 49 | - get() = throw UnsupportedOperationException( | |
| 50 | - "PluginContext.eventBus is not yet implemented; lands in P1.7 (event bus + outbox)" | |
| 51 | - ) | |
| 57 | + // ─── Not yet implemented ─────────────────────────────────────── | |
| 52 | 58 | |
| 53 | 59 | override val transaction: Transaction |
| 54 | 60 | get() = throw UnsupportedOperationException( | ... | ... |
platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/VibeErpPluginManager.kt
| ... | ... | @@ -7,6 +7,7 @@ import org.springframework.beans.factory.DisposableBean |
| 7 | 7 | import org.springframework.beans.factory.InitializingBean |
| 8 | 8 | import org.springframework.boot.context.properties.ConfigurationProperties |
| 9 | 9 | import org.springframework.stereotype.Component |
| 10 | +import org.vibeerp.api.v1.event.EventBus | |
| 10 | 11 | import org.vibeerp.platform.plugins.endpoints.PluginEndpointRegistry |
| 11 | 12 | import org.vibeerp.platform.plugins.endpoints.ScopedPluginEndpointRegistrar |
| 12 | 13 | import java.nio.file.Files |
| ... | ... | @@ -48,6 +49,7 @@ import org.vibeerp.api.v1.plugin.Plugin as VibeErpPlugin |
| 48 | 49 | class VibeErpPluginManager( |
| 49 | 50 | private val properties: VibeErpPluginsProperties, |
| 50 | 51 | private val endpointRegistry: PluginEndpointRegistry, |
| 52 | + private val eventBus: EventBus, | |
| 51 | 53 | ) : DefaultPluginManager(Paths.get(properties.directory)), InitializingBean, DisposableBean { |
| 52 | 54 | |
| 53 | 55 | private val log = LoggerFactory.getLogger(VibeErpPluginManager::class.java) |
| ... | ... | @@ -104,6 +106,7 @@ class VibeErpPluginManager( |
| 104 | 106 | pluginId = pluginId, |
| 105 | 107 | sharedRegistrar = ScopedPluginEndpointRegistrar(endpointRegistry, pluginId), |
| 106 | 108 | delegateLogger = LoggerFactory.getLogger("plugin.$pluginId"), |
| 109 | + sharedEventBus = eventBus, | |
| 107 | 110 | ) |
| 108 | 111 | try { |
| 109 | 112 | vibeErpPlugin.start(context) | ... | ... |
settings.gradle.kts
| ... | ... | @@ -33,6 +33,9 @@ project(":platform:platform-plugins").projectDir = file("platform/platform-plugi |
| 33 | 33 | include(":platform:platform-security") |
| 34 | 34 | project(":platform:platform-security").projectDir = file("platform/platform-security") |
| 35 | 35 | |
| 36 | +include(":platform:platform-events") | |
| 37 | +project(":platform:platform-events").projectDir = file("platform/platform-events") | |
| 38 | + | |
| 36 | 39 | // ─── Packaged Business Capabilities (core PBCs) ───────────────────── |
| 37 | 40 | include(":pbc:pbc-identity") |
| 38 | 41 | project(":pbc:pbc-identity").projectDir = file("pbc/pbc-identity") | ... | ... |