diff --git a/distribution/build.gradle.kts b/distribution/build.gradle.kts index e6a9ab2..83c61bb 100644 --- a/distribution/build.gradle.kts +++ b/distribution/build.gradle.kts @@ -23,6 +23,7 @@ dependencies { implementation(project(":platform:platform-persistence")) implementation(project(":platform:platform-plugins")) implementation(project(":platform:platform-security")) + implementation(project(":platform:platform-events")) implementation(project(":pbc:pbc-identity")) implementation(project(":pbc:pbc-catalog")) diff --git a/distribution/src/main/resources/db/changelog/master.xml b/distribution/src/main/resources/db/changelog/master.xml index 01a124d..54990e6 100644 --- a/distribution/src/main/resources/db/changelog/master.xml +++ b/distribution/src/main/resources/db/changelog/master.xml @@ -11,6 +11,7 @@ xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.27.xsd"> + diff --git a/distribution/src/main/resources/db/changelog/platform/001-platform-events.xml b/distribution/src/main/resources/db/changelog/platform/001-platform-events.xml new file mode 100644 index 0000000..567993a --- /dev/null +++ b/distribution/src/main/resources/db/changelog/platform/001-platform-events.xml @@ -0,0 +1,58 @@ + + + + + + Create platform__event_outbox table + + CREATE TABLE platform__event_outbox ( + id uuid PRIMARY KEY, + event_id uuid NOT NULL, + topic varchar(256) NOT NULL, + aggregate_type varchar(128) NOT NULL, + aggregate_id varchar(256) NOT NULL, + payload jsonb NOT NULL, + status varchar(16) NOT NULL DEFAULT 'PENDING', + attempts integer NOT NULL DEFAULT 0, + last_error text, + occurred_at timestamptz NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + dispatched_at timestamptz, + version bigint NOT NULL DEFAULT 0 + ); + CREATE UNIQUE INDEX platform__event_outbox_event_id_uk + ON platform__event_outbox (event_id); + CREATE INDEX platform__event_outbox_status_created_idx + ON platform__event_outbox (status, created_at); + CREATE INDEX platform__event_outbox_topic_idx + ON platform__event_outbox (topic); + + + DROP TABLE platform__event_outbox; + + + + diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1a87993..5a8bdcd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -32,6 +32,7 @@ bouncycastle = { module = "org.bouncycastle:bcprov-jdk18on", version = "1.78.1" kotlin-stdlib = { module = "org.jetbrains.kotlin:kotlin-stdlib", version.ref = "kotlin" } kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" } jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } +jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } # Validation jakarta-validation-api = { module = "jakarta.validation:jakarta.validation-api", version.ref = "jakartaValidation" } diff --git a/pbc/pbc-identity/build.gradle.kts b/pbc/pbc-identity/build.gradle.kts index cd5a62b..11da7d1 100644 --- a/pbc/pbc-identity/build.gradle.kts +++ b/pbc/pbc-identity/build.gradle.kts @@ -40,6 +40,7 @@ dependencies { api(project(":api:api-v1")) implementation(project(":platform:platform-persistence")) implementation(project(":platform:platform-security")) + implementation(project(":platform:platform-events")) implementation(libs.kotlin.stdlib) implementation(libs.kotlin.reflect) diff --git a/pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/application/UserService.kt b/pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/application/UserService.kt index 2dca763..5ee53eb 100644 --- a/pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/application/UserService.kt +++ b/pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/application/UserService.kt @@ -2,7 +2,9 @@ package org.vibeerp.pbc.identity.application import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional +import org.vibeerp.api.v1.event.EventBus import org.vibeerp.pbc.identity.domain.User +import org.vibeerp.pbc.identity.events.UserCreatedEvent import org.vibeerp.pbc.identity.infrastructure.UserJpaRepository import java.util.UUID @@ -19,6 +21,7 @@ import java.util.UUID @Transactional class UserService( private val users: UserJpaRepository, + private val eventBus: EventBus, ) { @Transactional(readOnly = true) @@ -40,7 +43,20 @@ class UserService( email = command.email, enabled = command.enabled, ) - return users.save(user) + val saved = users.save(user) + + // Publish AFTER save so saved.id is populated. Both writes are + // in the same @Transactional boundary, so the outbox row is + // committed atomically with the user row — see EventBusImpl + // and the architecture spec section 9. + eventBus.publish( + UserCreatedEvent( + userId = saved.id, + username = saved.username, + ), + ) + + return saved } fun update(id: UUID, command: UpdateUserCommand): User { diff --git a/pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/events/UserCreatedEvent.kt b/pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/events/UserCreatedEvent.kt new file mode 100644 index 0000000..ca5d23d --- /dev/null +++ b/pbc/pbc-identity/src/main/kotlin/org/vibeerp/pbc/identity/events/UserCreatedEvent.kt @@ -0,0 +1,57 @@ +package org.vibeerp.pbc.identity.events + +import org.vibeerp.api.v1.core.Id +import org.vibeerp.api.v1.event.DomainEvent +import java.time.Instant +import java.util.UUID + +/** + * Fired by [org.vibeerp.pbc.identity.application.UserService.create] right + * after a User row is persisted. + * + * **The event class is internal to pbc-identity** — other PBCs and + * plug-ins subscribe by **topic string**, not by class: + * + * ``` + * eventBus.subscribe(UserCreatedEvent.TOPIC) { event -> + * // event.aggregateId is the user UUID as a string + * } + * ``` + * + * Why not put this class in `api.v1.ext.identity` so subscribers can use + * `eventBus.subscribe(UserCreatedEvent::class.java, ...)`? Two reasons: + * + * 1. Every event class added to api.v1 becomes a perpetual maintenance + * commitment. Most subscribers don't need the rich shape — the + * topic name is enough to know what happened. + * + * 2. Cross-classloader subscribers (plug-ins) cannot rely on + * class-equality anyway: a plug-in built against an older api.v1 + * that knew about UserCreatedEvent would have its own copy of the + * class loaded by its plug-in classloader, and the host's + * `Class.isInstance(...)` check would be false. The string topic + * is the only stable cross-boundary identifier. + * + * If a future PBC genuinely needs typed access to the user fields it + * gets via `IdentityApi.findUserById(...)` after receiving the event, + * not by inflating this class into api.v1. + */ +data class UserCreatedEvent( + override val eventId: Id = Id(UUID.randomUUID()), + override val occurredAt: Instant = Instant.now(), + val userId: UUID, + val username: String, +) : DomainEvent { + + override val aggregateType: String = TOPIC + override val aggregateId: String = userId.toString() + + companion object { + /** + * Stable topic string for cross-PBC and cross-plug-in subscribers. + * Convention: `..`. Adding a new event + * topic is non-breaking; renaming one is a major version bump. + */ + const val TOPIC: String = "identity.user.created" + } +} diff --git a/pbc/pbc-identity/src/test/kotlin/org/vibeerp/pbc/identity/application/UserServiceTest.kt b/pbc/pbc-identity/src/test/kotlin/org/vibeerp/pbc/identity/application/UserServiceTest.kt index bf42370..ed7ad09 100644 --- a/pbc/pbc-identity/src/test/kotlin/org/vibeerp/pbc/identity/application/UserServiceTest.kt +++ b/pbc/pbc-identity/src/test/kotlin/org/vibeerp/pbc/identity/application/UserServiceTest.kt @@ -8,12 +8,16 @@ import assertk.assertions.isInstanceOf import assertk.assertions.isNotNull import assertk.assertions.isNull import io.mockk.every +import io.mockk.justRun import io.mockk.mockk import io.mockk.slot import io.mockk.verify import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.vibeerp.api.v1.event.DomainEvent +import org.vibeerp.api.v1.event.EventBus import org.vibeerp.pbc.identity.domain.User +import org.vibeerp.pbc.identity.events.UserCreatedEvent import org.vibeerp.pbc.identity.infrastructure.UserJpaRepository import java.util.Optional import java.util.UUID @@ -31,12 +35,17 @@ import java.util.UUID class UserServiceTest { private lateinit var users: UserJpaRepository + private lateinit var eventBus: EventBus private lateinit var service: UserService @BeforeEach fun setUp() { users = mockk(relaxed = false) - service = UserService(users) + eventBus = mockk(relaxed = false) + // Most tests don't care about publishing; the create-happy-path + // test below verifies it explicitly via mockk's `verify`. + justRun { eventBus.publish(any()) } + service = UserService(users, eventBus) } @Test @@ -78,6 +87,43 @@ class UserServiceTest { } @Test + fun `create publishes a UserCreatedEvent on the happy path`() { + every { users.existsByUsername("zoe") } returns false + val savedUser = slot() + every { users.save(capture(savedUser)) } answers { savedUser.captured } + val publishedEvent = slot() + justRun { eventBus.publish(capture(publishedEvent)) } + + service.create( + CreateUserCommand( + username = "zoe", + displayName = "Zoe", + email = null, + ), + ) + + verify(exactly = 1) { eventBus.publish(any()) } + assertThat(publishedEvent.captured).isInstanceOf(UserCreatedEvent::class) + val event = publishedEvent.captured as UserCreatedEvent + assertThat(event.username).isEqualTo("zoe") + assertThat(event.userId).isEqualTo(savedUser.captured.id) + assertThat(event.aggregateType).isEqualTo(UserCreatedEvent.TOPIC) + } + + @Test + fun `create does NOT publish an event when the username is duplicate`() { + every { users.existsByUsername("alice") } returns true + + runCatching { + service.create( + CreateUserCommand(username = "alice", displayName = "Alice", email = null), + ) + } + + verify(exactly = 0) { eventBus.publish(any()) } + } + + @Test fun `findByUsername returns null when no user exists`() { every { users.findByUsername("ghost") } returns null diff --git a/platform/platform-events/build.gradle.kts b/platform/platform-events/build.gradle.kts new file mode 100644 index 0000000..1c922b9 --- /dev/null +++ b/platform/platform-events/build.gradle.kts @@ -0,0 +1,49 @@ +plugins { + alias(libs.plugins.kotlin.jvm) + alias(libs.plugins.kotlin.spring) + alias(libs.plugins.kotlin.jpa) + alias(libs.plugins.spring.dependency.management) +} + +description = "vibe_erp event bus + transactional outbox. INTERNAL." + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(21)) + } +} + +kotlin { + jvmToolchain(21) + compilerOptions { + freeCompilerArgs.add("-Xjsr305=strict") + } +} + +allOpen { + annotation("jakarta.persistence.Entity") + annotation("jakarta.persistence.MappedSuperclass") + annotation("jakarta.persistence.Embeddable") +} + +dependencies { + api(project(":api:api-v1")) + api(project(":platform:platform-persistence")) // outbox row writes use the audit base + JPA + + implementation(libs.kotlin.stdlib) + implementation(libs.kotlin.reflect) + implementation(libs.jackson.module.kotlin) + implementation(libs.jackson.datatype.jsr310) // for java.time.Instant in event payloads + + implementation(libs.spring.boot.starter) + implementation(libs.spring.boot.starter.data.jpa) + + testImplementation(libs.spring.boot.starter.test) + testImplementation(libs.junit.jupiter) + testImplementation(libs.assertk) + testImplementation(libs.mockk) +} + +tasks.test { + useJUnitPlatform() +} diff --git a/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/EventBusConfiguration.kt b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/EventBusConfiguration.kt new file mode 100644 index 0000000..b50da60 --- /dev/null +++ b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/EventBusConfiguration.kt @@ -0,0 +1,19 @@ +package org.vibeerp.platform.events + +import org.springframework.context.annotation.Configuration +import org.springframework.scheduling.annotation.EnableScheduling + +/** + * Marker configuration for the platform-events module. + * + * Activates Spring's scheduling support so [org.vibeerp.platform.events.outbox.OutboxPoller] + * actually runs. The poller's `@Scheduled` annotation is inert until + * `@EnableScheduling` is present somewhere in the application context. + * + * Lives in this module (rather than in `platform-bootstrap`) so the + * "events module is on the classpath" condition automatically pulls + * in the poller's scheduling — distribution doesn't need to know. + */ +@Configuration +@EnableScheduling +class EventBusConfiguration diff --git a/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/audit/EventAuditLogSubscriber.kt b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/audit/EventAuditLogSubscriber.kt new file mode 100644 index 0000000..dec3beb --- /dev/null +++ b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/audit/EventAuditLogSubscriber.kt @@ -0,0 +1,62 @@ +package org.vibeerp.platform.events.audit + +import jakarta.annotation.PostConstruct +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import org.vibeerp.api.v1.event.EventBus +import org.vibeerp.api.v1.event.EventListener +import org.vibeerp.platform.events.bus.EventBusImpl + +/** + * Logs every domain event the bus publishes, at INFO level. + * + * **Purpose in v0.5:** prove that the event bus works end-to-end. + * Without a real subscriber, "publish" looks suspiciously like "no-op + * to a database table" — this component closes the demo loop. The + * v0.5 smoke test asserts that creating a User via REST produces + * exactly one log line from this subscriber AND a row in + * `platform__event_outbox` that flips to DISPATCHED inside 10 seconds. + * + * **Future direction:** this is the natural place to plug in a real + * audit log writer (P3.x), an OpenTelemetry trace exporter, or the + * MCP event mirror that lets AI agents observe what just happened. + * The class will probably get renamed `RawEventAuditWriter` and start + * persisting to `platform__audit` rather than just logging — but the + * subscription pattern (wildcard topic, all events) stays the same. + * + * Subscribed via [EventBusImpl.subscribeToAll], which is the + * platform-internal helper for the wildcard `**` topic. Plug-ins + * cannot use the wildcard themselves because it would let one + * misbehaving plug-in observe every other plug-in's events — that's + * a deliberate restriction. + */ +@Component +class EventAuditLogSubscriber( + private val eventBus: EventBus, +) { + + private val log = LoggerFactory.getLogger(EventAuditLogSubscriber::class.java) + + @PostConstruct + fun subscribe() { + // Cast to the impl on purpose: the wildcard subscription is a + // platform-internal capability, not part of api.v1. + val impl = eventBus as? EventBusImpl + ?: error( + "EventAuditLogSubscriber requires EventBusImpl; got ${eventBus.javaClass.name}. " + + "Either platform-events autoconfiguration is broken or someone replaced the bean." + ) + impl.subscribeToAll( + EventListener { event -> + log.info( + "[event-audit] {} id={} aggregate={} occurredAt={}", + event.aggregateType, + event.eventId, + event.aggregateId, + event.occurredAt, + ) + }, + ) + log.info("EventAuditLogSubscriber subscribed to ** (wildcard)") + } +} diff --git a/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/EventBusImpl.kt b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/EventBusImpl.kt new file mode 100644 index 0000000..c050f57 --- /dev/null +++ b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/EventBusImpl.kt @@ -0,0 +1,131 @@ +package org.vibeerp.platform.events.bus + +import com.fasterxml.jackson.databind.ObjectMapper +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional +import org.vibeerp.api.v1.event.DomainEvent +import org.vibeerp.api.v1.event.EventBus +import org.vibeerp.api.v1.event.EventListener +import org.vibeerp.platform.events.outbox.EventOutboxEntry +import org.vibeerp.platform.events.outbox.EventOutboxRepository + +/** + * The framework's [EventBus] implementation. + * + * Two responsibilities, both described in the architecture spec + * section 9 ("Cross-cutting concerns — events"): + * + * 1. **Synchronous in-process delivery** to every subscriber that + * matches the event's class or topic. Happens BEFORE the + * transaction commits, inside the caller's transaction, so a + * listener that throws can roll the publisher back. (This is the + * v0.5 contract; a future "async" overload may add deferred + * delivery, but the synchronous path is the safe default.) + * + * 2. **Transactional outbox write.** The same call also persists an + * [EventOutboxEntry] row to `platform__event_outbox` in the SAME + * database transaction as the publisher's domain change. If the + * transaction commits, the row is dispatchable; if it rolls back, + * the row never exists. This is the property that lets us add a + * Kafka/NATS bridge later WITHOUT touching any PBC code: the + * bridge just polls the same table. + * + * Why both at the same time: synchronous in-process delivery is what + * core PBCs need today (e.g. "on UserCreated, send a welcome email"), + * and the outbox is what tomorrow's distributed deployment needs. + * Doing them together in `publish` means every event is exactly-once + * persisted and at-least-once delivered to in-process listeners, + * which is the standard transactional-outbox guarantee. + * + * **Failure modes:** + * • A listener throws → the exception is recorded but does NOT + * abort other listeners. The outbox row is still written. The + * publisher's transaction is unaffected unless the listener's + * exception bubbles all the way out (which we prevent here). + * • The outbox INSERT throws (DB down, constraint violation) → + * the publisher's transaction rolls back, listeners that already + * ran see no commit, the publish appears never to have happened. + * This is the desired exactly-once semantic. + */ +@Component +class EventBusImpl( + private val outbox: EventOutboxRepository, + private val objectMapper: ObjectMapper, +) : EventBus { + + private val log = LoggerFactory.getLogger(EventBusImpl::class.java) + + private val registry = ListenerRegistry() + + /** + * Publish an event. + * + * Marked `Propagation.MANDATORY` so the bus refuses to publish + * outside an existing transaction. That's intentional: a publish + * with no transaction would write the outbox row immediately and + * could escape on caller rollback — exactly the leak the outbox + * pattern is supposed to prevent. Callers that legitimately need + * to publish from outside a transaction (background jobs, + * one-shot scripts) wrap the call in a TransactionTemplate or + * an `@Transactional` boundary of their own. + */ + @Transactional(propagation = Propagation.MANDATORY) + override fun publish(event: DomainEvent) { + // 1. Persist the outbox row in the same transaction as the + // caller. This is the durability anchor. + val payload = try { + objectMapper.writeValueAsString(event) + } catch (ex: Throwable) { + // If we cannot serialize the event we cannot durably + // record it. Throw so the caller's transaction rolls back + // — better to fail loudly than to publish a half-event. + throw IllegalStateException( + "EventBus failed to serialize event ${event.javaClass.name}: ${ex.message}", + ex, + ) + } + + outbox.save( + EventOutboxEntry( + eventId = event.eventId.value, + topic = event.aggregateType, + aggregateType = event.aggregateType, + aggregateId = event.aggregateId, + payload = payload, + occurredAt = event.occurredAt, + ), + ) + + // 2. Synchronous in-process delivery. Listener exceptions are + // logged but don't propagate; the outbox row is what + // guarantees the event isn't lost. + registry.deliver(event) { ex -> + log.warn( + "EventBus listener for {} (id={}) threw {}: {}", + event.aggregateType, event.eventId, ex.javaClass.simpleName, ex.message, ex, + ) + } + } + + override fun subscribe( + eventType: Class, + listener: EventListener, + ): EventBus.Subscription = + registry.registerByClass(eventType, listener) + + override fun subscribe( + topic: String, + listener: EventListener, + ): EventBus.Subscription = + registry.registerByTopic(topic, listener) + + /** + * Internal helper for tests and the audit subscriber to wire a + * raw `EventListener` against the wildcard topic + * without going through the public api.v1 surface. + */ + internal fun subscribeToAll(listener: EventListener): EventBus.Subscription = + registry.registerByTopic(ListenerRegistry.WILDCARD, listener) +} diff --git a/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/ListenerRegistry.kt b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/ListenerRegistry.kt new file mode 100644 index 0000000..5aecad0 --- /dev/null +++ b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/bus/ListenerRegistry.kt @@ -0,0 +1,129 @@ +package org.vibeerp.platform.events.bus + +import org.vibeerp.api.v1.event.DomainEvent +import org.vibeerp.api.v1.event.EventBus +import org.vibeerp.api.v1.event.EventListener +import org.vibeerp.api.v1.plugin.PluginContext +import java.util.concurrent.CopyOnWriteArrayList + +/** + * In-memory holder of every active subscription on the event bus. + * + * Two indexing strategies side-by-side, both required by the api.v1 + * `EventBus` interface (see [EventBus.subscribe] overloads): + * + * 1. **By event class** — for in-process subscribers that know the + * concrete event type at compile time. Pattern matched with + * `Class.isInstance(...)` so subscribing to a base type catches + * all subtypes. + * + * 2. **By topic string** — for cross-classloader subscribers and for + * cases where the event class isn't shared (e.g. a plug-in + * subscribing to a PBC's internal event class without being able + * to import it). The topic is matched against + * [DomainEvent.aggregateType] for exact equality, plus the special + * `**` topic which catches every event. + * + * Why a [CopyOnWriteArrayList]: subscriptions mutate rarely (at plug-in + * start, occasionally) but iterate on every publish. CoW gives us + * lock-free iteration and avoids `ConcurrentModificationException` + * when a listener subscribes from inside another listener — which is + * legal even if it's a smell. + * + * **The `**` wildcard** is intentionally narrow: it matches everything, + * full stop. There is no `identity.user.*` style globbing in v0.5. + * That keeps the dispatch logic O(1) per registration check and + * avoids surprising precedence rules; richer matching can come later + * non-breakingly because it would only widen what counts as a match. + */ +internal class ListenerRegistry { + + private val byClass = CopyOnWriteArrayList>() + private val byTopic = CopyOnWriteArrayList() + + fun registerByClass( + eventType: Class, + listener: EventListener, + ): EventBus.Subscription { + val reg = ClassRegistration(eventType, listener) + byClass += reg + return SubscriptionHandle { byClass.remove(reg) } + } + + fun registerByTopic( + topic: String, + listener: EventListener, + ): EventBus.Subscription { + require(topic.isNotBlank()) { "topic must not be blank" } + val reg = TopicRegistration(topic, listener) + byTopic += reg + return SubscriptionHandle { byTopic.remove(reg) } + } + + /** + * Deliver [event] to every matching listener. Any listener that + * throws is logged by the caller (the bus) and treated as a failed + * dispatch attempt for outbox bookkeeping; we do NOT swallow + * exceptions silently here — the caller decides retry policy. + */ + fun deliver(event: DomainEvent, onListenerError: (Throwable) -> Unit) { + // Class-keyed listeners. + for (reg in byClass) { + if (reg.eventType.isInstance(event)) { + @Suppress("UNCHECKED_CAST") + val typed = reg.listener as EventListener + try { + typed.handle(event) + } catch (ex: Throwable) { + onListenerError(ex) + } + } + } + // Topic-keyed listeners. + for (reg in byTopic) { + if (reg.matches(event.aggregateType)) { + try { + reg.listener.handle(event) + } catch (ex: Throwable) { + onListenerError(ex) + } + } + } + } + + /** Used by tests only. */ + fun size(): Int = byClass.size + byTopic.size + + private data class ClassRegistration( + val eventType: Class, + val listener: EventListener, + ) + + private data class TopicRegistration( + val topic: String, + val listener: EventListener, + ) { + fun matches(eventTopic: String): Boolean = + topic == WILDCARD || topic == eventTopic + } + + private class SubscriptionHandle(private val onClose: () -> Unit) : EventBus.Subscription { + @Volatile private var closed = false + + override fun close() { + if (closed) return + closed = true + onClose() + } + } + + companion object { + /** + * Wildcard topic that subscribes to every event ever published. + * Used by audit logging, the future MCP event mirror, and + * diagnostic tooling. Plug-ins should NOT use this — they + * should subscribe to the specific topics they care about. + */ + const val WILDCARD: String = "**" + } +} diff --git a/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxEntry.kt b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxEntry.kt new file mode 100644 index 0000000..e754fb2 --- /dev/null +++ b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxEntry.kt @@ -0,0 +1,101 @@ +package org.vibeerp.platform.events.outbox + +import jakarta.persistence.Column +import jakarta.persistence.Entity +import jakarta.persistence.EnumType +import jakarta.persistence.Enumerated +import jakarta.persistence.Id +import jakarta.persistence.Table +import jakarta.persistence.Version +import org.hibernate.annotations.JdbcTypeCode +import org.hibernate.type.SqlTypes +import java.time.Instant +import java.util.UUID + +/** + * One row in `platform__event_outbox`. + * + * **Why an entity instead of a row class:** the outbox is mutated by the + * poller (status transitions, attempt counts, last_error) under the same + * Spring transaction model as every other PBC. Going through Hibernate + * keeps optimistic locking, audit timestamps, and `@PreUpdate` consistent + * with the rest of the framework. + * + * **Why this entity does NOT extend `AuditedJpaEntity`:** outbox rows are + * an internal framework concern and `created_by`/`updated_by` make no + * sense — the rows are written by the framework itself, not by a user. + * The audit listener would otherwise insist on a `PrincipalContext` + * binding and fail when an event is published from a code path that + * doesn't have one (background jobs, retry workers, ...). + */ +@Entity +@Table(name = "platform__event_outbox") +class EventOutboxEntry( + eventId: UUID, + topic: String, + aggregateType: String, + aggregateId: String, + payload: String, + occurredAt: Instant, +) { + + @Id + @Column(name = "id", updatable = false, nullable = false, columnDefinition = "uuid") + var id: UUID = UUID.randomUUID() + + @Column(name = "event_id", updatable = false, nullable = false, columnDefinition = "uuid") + var eventId: UUID = eventId + + @Column(name = "topic", nullable = false, length = 256) + var topic: String = topic + + @Column(name = "aggregate_type", nullable = false, length = 128) + var aggregateType: String = aggregateType + + @Column(name = "aggregate_id", nullable = false, length = 256) + var aggregateId: String = aggregateId + + @Column(name = "payload", nullable = false, columnDefinition = "jsonb") + @JdbcTypeCode(SqlTypes.JSON) + var payload: String = payload + + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false, length = 16) + var status: OutboxStatus = OutboxStatus.PENDING + + @Column(name = "attempts", nullable = false) + var attempts: Int = 0 + + @Column(name = "last_error", nullable = true) + var lastError: String? = null + + @Column(name = "occurred_at", nullable = false) + var occurredAt: Instant = occurredAt + + @Column(name = "created_at", nullable = false) + var createdAt: Instant = Instant.now() + + @Column(name = "dispatched_at", nullable = true) + var dispatchedAt: Instant? = null + + @Version + @Column(name = "version", nullable = false) + var version: Long = 0 +} + +/** + * The lifecycle of an outbox row. + * + * Stored as a string in the DB so adding a state later (e.g. `RETRYING`, + * `DEAD`) is non-breaking for clients reading the column with raw SQL. + */ +enum class OutboxStatus { + /** Written by publish(); the poller has not picked it up yet. */ + PENDING, + + /** All in-process subscribers handled it without throwing. */ + DISPATCHED, + + /** Delivery failed; will retry until [EventOutboxEntry.attempts] hits the configured cap. */ + FAILED, +} diff --git a/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxRepository.kt b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxRepository.kt new file mode 100644 index 0000000..a9fd0f2 --- /dev/null +++ b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/EventOutboxRepository.kt @@ -0,0 +1,44 @@ +package org.vibeerp.platform.events.outbox + +import jakarta.persistence.LockModeType +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Lock +import org.springframework.data.jpa.repository.Query +import org.springframework.data.jpa.repository.QueryHints +import org.springframework.stereotype.Repository +import java.util.UUID + +/** + * Spring Data JPA repository for the event outbox. + * + * The poller's `findPending` query uses pessimistic write locking + + * `SKIP LOCKED` so two pollers (today: one; tomorrow: many) never + * dispatch the same row twice. The hint is set via `@QueryHints` + * because Spring Data has no high-level API for `SKIP LOCKED` and + * we need it to be portable to Postgres specifically. + */ +@Repository +interface EventOutboxRepository : JpaRepository { + + fun findByEventId(eventId: UUID): EventOutboxEntry? + + /** + * Pick the next batch of pending rows for dispatch. + * + * `SELECT FOR UPDATE SKIP LOCKED` semantics: rows already locked by + * a concurrent poller are skipped silently, so multiple pollers + * can run side-by-side without coordination. The transaction must + * be open for the lock to apply — that is the caller's job. + */ + @Lock(LockModeType.PESSIMISTIC_WRITE) + @QueryHints(jakarta.persistence.QueryHint(name = "jakarta.persistence.lock.timeout", value = "-2")) + @Query( + """ + SELECT e FROM EventOutboxEntry e + WHERE e.status = org.vibeerp.platform.events.outbox.OutboxStatus.PENDING + OR e.status = org.vibeerp.platform.events.outbox.OutboxStatus.FAILED + ORDER BY e.createdAt ASC + """ + ) + fun findPendingForDispatch(): List +} diff --git a/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/OutboxPoller.kt b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/OutboxPoller.kt new file mode 100644 index 0000000..14f3895 --- /dev/null +++ b/platform/platform-events/src/main/kotlin/org/vibeerp/platform/events/outbox/OutboxPoller.kt @@ -0,0 +1,76 @@ +package org.vibeerp.platform.events.outbox + +import org.slf4j.LoggerFactory +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Transactional +import java.time.Instant + +/** + * Drains the [EventOutboxEntry] table at a fixed interval. + * + * **What it does in v0.5:** picks up `PENDING` and previously-`FAILED` + * rows under a pessimistic write lock, marks them `DISPATCHED`, and + * commits. There is no actual external dispatch yet — the in-process + * delivery already happened in [org.vibeerp.platform.events.bus.EventBusImpl.publish], + * so this poller is the seam where a future Kafka/NATS bridge plugs + * in WITHOUT touching the bus, the publishers, or the subscribers. + * The seam exists; the dispatcher is a no-op TODO. + * + * **What it does NOT do (deferred):** + * • Forward events to an external broker. Hooks for that land in + * P1.7.b alongside the first hosted-mode deployment. + * • Backoff on failures. Today: every cycle re-attempts every + * `FAILED` row. A real exponential backoff lands when we have + * real external dispatchers that can fail. + * • Dead-letter queue. Same. + * • Sharding across pollers. The `SELECT FOR UPDATE SKIP LOCKED` + * query is already shard-friendly; we just don't run more than + * one poller today. + * + * The poll interval is intentionally short (5 seconds) so the smoke + * test can observe an event flip from PENDING → DISPATCHED inside + * a normal `curl-and-grep` window. Production tuning will live in + * `vibeerp.events.outbox.poll-interval-ms` later. + */ +@Component +class OutboxPoller( + private val outbox: EventOutboxRepository, +) { + + private val log = LoggerFactory.getLogger(OutboxPoller::class.java) + + @Scheduled(fixedDelayString = "PT5S", initialDelayString = "PT5S") + @Transactional + fun drain() { + val pending = outbox.findPendingForDispatch() + if (pending.isEmpty()) return + + log.debug("OutboxPoller draining {} pending event(s)", pending.size) + for (entry in pending) { + try { + // v0.5: there is no real external dispatch step, so + // every row that is well-formed succeeds immediately. + // The future Kafka/NATS bridge replaces this no-op + // with a publish call. + entry.status = OutboxStatus.DISPATCHED + entry.dispatchedAt = Instant.now() + entry.lastError = null + // Hibernate's dirty checking will UPDATE on commit; no + // explicit save needed because the entity is managed. + } catch (ex: Throwable) { + log.warn( + "OutboxPoller failed to dispatch event {} (topic={}): {}", + entry.eventId, entry.topic, ex.message, ex, + ) + entry.status = OutboxStatus.FAILED + entry.attempts += 1 + entry.lastError = ex.message?.take(MAX_ERROR_LENGTH) + } + } + } + + private companion object { + const val MAX_ERROR_LENGTH = 4000 + } +} diff --git a/platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/bus/EventBusImplTest.kt b/platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/bus/EventBusImplTest.kt new file mode 100644 index 0000000..8dbbd93 --- /dev/null +++ b/platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/bus/EventBusImplTest.kt @@ -0,0 +1,168 @@ +package org.vibeerp.platform.events.bus + +import assertk.assertThat +import assertk.assertions.contains +import assertk.assertions.hasSize +import assertk.assertions.isEqualTo +import com.fasterxml.jackson.databind.ObjectMapper +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.verify +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.vibeerp.api.v1.core.Id +import org.vibeerp.api.v1.event.DomainEvent +import org.vibeerp.api.v1.event.EventListener +import org.vibeerp.platform.events.outbox.EventOutboxEntry +import org.vibeerp.platform.events.outbox.EventOutboxRepository +import java.time.Instant +import java.util.UUID + +/** + * Pure unit tests for [EventBusImpl]. The outbox repository is mocked + * so the test runs in milliseconds without a database. The + * `Propagation.MANDATORY` annotation that the production code uses to + * refuse no-transaction publishes is NOT enforced here because we + * bypass Spring entirely — these tests focus on dispatch logic and + * outbox persistence shape. + */ +class EventBusImplTest { + + private lateinit var outbox: EventOutboxRepository + private lateinit var bus: EventBusImpl + + @BeforeEach + fun setUp() { + outbox = mockk() + every { outbox.save(any()) } answers { firstArg() } + // ObjectMapper().findAndRegisterModules() picks up jackson-module-kotlin + // AND jackson-datatype-jsr310 from the classpath, matching what + // Spring Boot's auto-configured ObjectMapper does in production. + // A bare ObjectMapper() can't serialize java.time.Instant. + bus = EventBusImpl(outbox, ObjectMapper().findAndRegisterModules()) + } + + private fun event(topic: String = "identity.user.created"): DomainEvent = + TestEvent( + eventId = Id(UUID.randomUUID()), + occurredAt = Instant.now(), + aggregateType = topic, + aggregateId = "u-1", + ) + + // ─── publish writes the outbox row ───────────────────────────── + + @Test + fun `publish writes one outbox row containing the event payload`() { + val saved = slot() + every { outbox.save(capture(saved)) } answers { saved.captured } + + val e = event() + bus.publish(e) + + verify(exactly = 1) { outbox.save(any()) } + assertThat(saved.captured.topic).isEqualTo("identity.user.created") + assertThat(saved.captured.eventId).isEqualTo(e.eventId.value) + assertThat(saved.captured.payload).contains("identity.user.created") + } + + @Test + fun `publish with no subscribers still writes the outbox row`() { + bus.publish(event()) + verify(exactly = 1) { outbox.save(any()) } + } + + // ─── topic-based subscribers ─────────────────────────────────── + + @Test + fun `topic subscriber receives matching events`() { + val received = mutableListOf() + bus.subscribe("identity.user.created", EventListener { received += it }) + + bus.publish(event("identity.user.created")) + + assertThat(received).hasSize(1) + } + + @Test + fun `topic subscriber does not receive non-matching events`() { + val received = mutableListOf() + bus.subscribe("orders.order.created", EventListener { received += it }) + + bus.publish(event("identity.user.created")) + + assertThat(received).hasSize(0) + } + + // ─── class-based subscribers ─────────────────────────────────── + + @Test + fun `class subscriber receives instances of the registered class`() { + val received = mutableListOf() + bus.subscribe(TestEvent::class.java, EventListener { received += it }) + + bus.publish(event()) + + assertThat(received).hasSize(1) + } + + // ─── close() unsubscribes ────────────────────────────────────── + + @Test + fun `closing a topic subscription stops further deliveries`() { + val received = mutableListOf() + val sub = bus.subscribe("identity.user.created", EventListener { received += it }) + + bus.publish(event()) + sub.close() + bus.publish(event()) + + assertThat(received).hasSize(1) + } + + @Test + fun `closing a class subscription stops further deliveries`() { + val received = mutableListOf() + val sub = bus.subscribe(TestEvent::class.java, EventListener { received += it }) + + bus.publish(event()) + sub.close() + bus.publish(event()) + + assertThat(received).hasSize(1) + } + + @Test + fun `wildcard subscriber receives every event`() { + val received = mutableListOf() + bus.subscribeToAll(EventListener { received += it }) + + bus.publish(event("identity.user.created")) + bus.publish(event("orders.order.created")) + bus.publish(event("plugin.printingshop.plate.approved")) + + assertThat(received).hasSize(3) + } + + // ─── listener exception is contained ─────────────────────────── + + @Test + fun `a thrown listener does not block other listeners`() { + val good = mutableListOf() + bus.subscribe("identity.user.created", EventListener { error("boom") }) + bus.subscribe("identity.user.created", EventListener { good += it }) + + bus.publish(event()) + + assertThat(good).hasSize(1) + } +} + +/** Local concrete subtype of [DomainEvent] for these tests. */ +private data class TestEvent( + override val eventId: Id, + override val occurredAt: Instant, + override val aggregateType: String, + override val aggregateId: String, +) : DomainEvent diff --git a/platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/outbox/OutboxPollerTest.kt b/platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/outbox/OutboxPollerTest.kt new file mode 100644 index 0000000..f36390f --- /dev/null +++ b/platform/platform-events/src/test/kotlin/org/vibeerp/platform/events/outbox/OutboxPollerTest.kt @@ -0,0 +1,78 @@ +package org.vibeerp.platform.events.outbox + +import assertk.assertThat +import assertk.assertions.isEqualTo +import assertk.assertions.isGreaterThanOrEqualTo +import assertk.assertions.isNotNull +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.time.Instant +import java.util.UUID + +class OutboxPollerTest { + + private lateinit var repository: EventOutboxRepository + private lateinit var poller: OutboxPoller + + @BeforeEach + fun setUp() { + repository = mockk() + poller = OutboxPoller(repository) + } + + private fun pendingEntry(): EventOutboxEntry = + EventOutboxEntry( + eventId = UUID.randomUUID(), + topic = "identity.user.created", + aggregateType = "identity.user.created", + aggregateId = "u-1", + payload = "{}", + occurredAt = Instant.now(), + ) + + @Test + fun `drain marks every pending row as DISPATCHED`() { + val entry = pendingEntry() + every { repository.findPendingForDispatch() } returns listOf(entry) + + poller.drain() + + assertThat(entry.status).isEqualTo(OutboxStatus.DISPATCHED) + assertThat(entry.dispatchedAt).isNotNull() + assertThat(entry.lastError).isEqualTo(null) + } + + @Test + fun `drain with no rows is a no-op`() { + every { repository.findPendingForDispatch() } returns emptyList() + poller.drain() + verify(exactly = 1) { repository.findPendingForDispatch() } + } + + @Test + fun `drain processes multiple rows in one cycle`() { + val entries = (1..5).map { pendingEntry() } + every { repository.findPendingForDispatch() } returns entries + + poller.drain() + + entries.forEach { assertThat(it.status).isEqualTo(OutboxStatus.DISPATCHED) } + } + + @Test + fun `entry attempts and lastError can be incremented manually for retry tests`() { + // Sanity check on the entity itself: if a future poller version + // marks rows FAILED on dispatcher exception, the fields exist. + val e = pendingEntry() + e.status = OutboxStatus.FAILED + e.attempts += 1 + e.lastError = "broker unreachable" + + assertThat(e.status).isEqualTo(OutboxStatus.FAILED) + assertThat(e.attempts).isGreaterThanOrEqualTo(1) + assertThat(e.lastError).isEqualTo("broker unreachable") + } +} diff --git a/platform/platform-plugins/build.gradle.kts b/platform/platform-plugins/build.gradle.kts index d2a2bf5..b7ca25f 100644 --- a/platform/platform-plugins/build.gradle.kts +++ b/platform/platform-plugins/build.gradle.kts @@ -24,6 +24,7 @@ dependencies { implementation(libs.kotlin.stdlib) implementation(libs.kotlin.reflect) implementation(libs.jackson.module.kotlin) + implementation(project(":platform:platform-events")) // for the real EventBus injected into PluginContext implementation(libs.spring.boot.starter) implementation(libs.spring.boot.starter.web) // for @RestController on the dispatcher diff --git a/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/DefaultPluginContext.kt b/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/DefaultPluginContext.kt index 8d15aad..57eadc6 100644 --- a/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/DefaultPluginContext.kt +++ b/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/DefaultPluginContext.kt @@ -37,18 +37,24 @@ internal class DefaultPluginContext( pluginId: String, sharedRegistrar: PluginEndpointRegistrar, delegateLogger: Logger, + private val sharedEventBus: EventBus, ) : PluginContext { override val logger: PluginLogger = Slf4jPluginLogger(pluginId, delegateLogger) override val endpoints: PluginEndpointRegistrar = sharedRegistrar - // ─── Not yet implemented ─────────────────────────────────────── + /** + * Real event bus, wired in P1.7. Plug-ins can publish and subscribe; + * subscriptions are NOT automatically scoped to the plug-in's + * lifecycle in v0.5 — a plug-in that wants to drop a subscription + * on shutdown must call `subscription.close()` itself in its + * `stop()` method. (Auto-scoping lands when per-plug-in Spring + * child contexts ship.) + */ + override val eventBus: EventBus = sharedEventBus - override val eventBus: EventBus - get() = throw UnsupportedOperationException( - "PluginContext.eventBus is not yet implemented; lands in P1.7 (event bus + outbox)" - ) + // ─── Not yet implemented ─────────────────────────────────────── override val transaction: Transaction get() = throw UnsupportedOperationException( diff --git a/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/VibeErpPluginManager.kt b/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/VibeErpPluginManager.kt index 49ac9df..c1d645f 100644 --- a/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/VibeErpPluginManager.kt +++ b/platform/platform-plugins/src/main/kotlin/org/vibeerp/platform/plugins/VibeErpPluginManager.kt @@ -7,6 +7,7 @@ import org.springframework.beans.factory.DisposableBean import org.springframework.beans.factory.InitializingBean import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.stereotype.Component +import org.vibeerp.api.v1.event.EventBus import org.vibeerp.platform.plugins.endpoints.PluginEndpointRegistry import org.vibeerp.platform.plugins.endpoints.ScopedPluginEndpointRegistrar import java.nio.file.Files @@ -48,6 +49,7 @@ import org.vibeerp.api.v1.plugin.Plugin as VibeErpPlugin class VibeErpPluginManager( private val properties: VibeErpPluginsProperties, private val endpointRegistry: PluginEndpointRegistry, + private val eventBus: EventBus, ) : DefaultPluginManager(Paths.get(properties.directory)), InitializingBean, DisposableBean { private val log = LoggerFactory.getLogger(VibeErpPluginManager::class.java) @@ -104,6 +106,7 @@ class VibeErpPluginManager( pluginId = pluginId, sharedRegistrar = ScopedPluginEndpointRegistrar(endpointRegistry, pluginId), delegateLogger = LoggerFactory.getLogger("plugin.$pluginId"), + sharedEventBus = eventBus, ) try { vibeErpPlugin.start(context) diff --git a/settings.gradle.kts b/settings.gradle.kts index 6fcee16..d86a89e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -33,6 +33,9 @@ project(":platform:platform-plugins").projectDir = file("platform/platform-plugi include(":platform:platform-security") project(":platform:platform-security").projectDir = file("platform/platform-security") +include(":platform:platform-events") +project(":platform:platform-events").projectDir = file("platform/platform-events") + // ─── Packaged Business Capabilities (core PBCs) ───────────────────── include(":pbc:pbc-identity") project(":pbc:pbc-identity").projectDir = file("pbc/pbc-identity")