Event Ingestion
Pub/sub topics, payload contracts, subscribers, and retry semantics for the audit pipeline
Event Ingestion
Audit and activity rows are never written by application code directly. Every row arrives through Redis pub/sub. This page documents the wire contract — topic names, payload shapes, subscriber names, and how failed messages are retried.
Topics
The two topic names are produced by helpers in common/pkg/events/topics.go:
| Helper | Returns |
|---|---|
events.BuildAuditTopic() | "audit.events" |
events.BuildActivityTopic() | "activity.events" |
Both topics live on the same Redis cluster used by the rest of the platform's pub/sub (Watermill-backed).
Payload Contracts
AuditEventPayload
Defined in common/pkg/events/audit_events.go — published to audit.events for any admin/system mutation.
type AuditEventPayload struct {
TenantID string `json:"tenant_id"`
ActorID string `json:"actor_id"`
ActorType string `json:"actor_type,omitempty"` // "user", "system", "admin"
Action string `json:"action"` // "created", "updated", "deleted"
ResourceType string `json:"resource_type"` // "course", "user", "role", ...
ResourceID string `json:"resource_id"`
Module string `json:"module,omitempty"`
Description string `json:"description,omitempty"`
BeforeValue map[string]interface{} `json:"before_value,omitempty"`
AfterValue map[string]interface{} `json:"after_value,omitempty"`
IPAddress string `json:"ip_address,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Timestamp time.Time `json:"timestamp"`
}Defaults applied by the publisher:
Timestampis set totime.Now()if zeroActorTypedefaults to"user"if empty
ActivityEventPayload
Published to activity.events for any user-driven event:
type ActivityEventPayload struct {
TenantID string `json:"tenant_id"`
UserID string `json:"user_id"`
ImpersonatedBy string `json:"impersonated_by,omitempty"`
Title string `json:"title"` // "User logged in"
Action string `json:"action"` // "login", "enroll", ...
Module string `json:"module,omitempty"`
Description string `json:"description,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Method string `json:"method,omitempty"`
StatusCode int `json:"status_code,omitempty"`
IPAddress string `json:"ip_address,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Timestamp time.Time `json:"timestamp"`
}The publisher sets Timestamp to now if zero. StatusCode is validated by the service layer — anything outside 100..599 is rejected.
Publishing Helpers
Both helpers live in common/pkg/events/audit_events.go:
| Function | Purpose |
|---|---|
events.PublishAuditEvent(ctx, publisher, payload) | Validates defaults, attaches metadata, publishes to audit.events |
events.PublishActivityEvent(ctx, publisher, payload) | Same flow for activity.events |
Both helpers attach a correlation_id (a fresh UUID) and the most relevant fields (tenant_id, actor/user, action, resource type, module) as message metadata so downstream consumers and logs can trace events across services.
If publisher is nil, the helper is a no-op — services where audit is optional can wire nil and ship without breaking.
Subscribers
Inside audit-service two subscribers run side-by-side, registered from main.go:
| Subscriber | Topic | Service |
|---|---|---|
audit-service-audit | audit.events | AuditEventSubscriber → AuditLogService.CreateFromEvent |
audit-service-activity | activity.events | ActivityEventSubscriber → ActivityLogService.CreateFromEvent |
The subscribers run as long-lived goroutines started by StartSubscriber(ctx). On shutdown the audit-service cancels their context and calls Close() on the underlying Redis subscriber.
Retry Semantics
Both subscribers wrap message handling with pubsub.ProcessMessagesWithRetry and use pubsub.DefaultRetryConfig:
| Setting | Value |
|---|---|
| Max retries | 3 |
| Initial delay | 5s |
| Max delay | 5m |
| Backoff factor | ×2 (exponential) |
A DefaultRetryableErrorChecker classifies error strings:
- Retryable substrings:
timeout,connection,temporary,deadline exceeded,context deadline exceeded,database,network - Non-retryable substrings:
invalid,validation,not found,unauthorized,forbidden
On top of that, each subscriber adds its own non-retryable patterns:
| Subscriber | Extra non-retryable patterns |
|---|---|
| Audit | invalid tenant_id, invalid payload |
| Activity | invalid tenant_id, invalid user_id, invalid payload |
Non-retryable errors are acknowledged — the message is not re-delivered, because re-trying will never succeed. Retryable errors are nacked and re-delivered with backoff.
Required vs Optional Fields
The service-layer CreateFromEvent enforces these requirements:
| Stream | Required | Validation failure |
|---|---|---|
| Audit | tenant_id | Returns invalid tenant_id (non-retryable) |
| Audit | parseable JSON | Returns invalid payload (non-retryable) |
| Activity | tenant_id | Returns invalid tenant_id (non-retryable) |
| Activity | user_id | Returns invalid user_id (non-retryable) |
| Activity | status_code if set must be 100–599 | Rejected before insert |
Everything else — description, before_value, after_value, ip_address, user_agent, metadata — is optional and stored as-is.
End-to-End Flow
┌──────────────────┐ payload ┌──────────────┐
│ publisher service│ ──────────► │ Redis topic │
│ (auth, learning,│ │ audit.events │
│ billing, ...) │ │activity.events│
└──────────────────┘ └───────┬──────┘
│
audit-service subscribers
│
▼
┌────────────────────────┐
│ Validate payload │
│ Resolve tenant_id │
│ INSERT into Postgres │
│ ack / retry as needed │
└────────────────────────┘The publishing service does not wait for the subscriber. A briefly-down audit-service does not slow user-facing requests; messages stay in Redis until the subscriber catches up.
Operational Endpoints
The audit-service Gin engine is built via base_api.NewApiEngine from common/api/engine.go, which wires two health probes exposed at the root of the service — not behind /api/v1:
| Method | Path | Response |
|---|---|---|
| GET | / | { "status": "healthy", "timestamp": "<UTC ISO8601>" } |
| GET | /health | Same — identical handler |
There is no separate readiness (/ready) probe; liveness and readiness both resolve against /health. Neither probe checks Redis or Postgres connectivity — it returns 200 OK as soon as the HTTP server is accepting connections. Wire richer checks into your deployment's startup probe if you need them.
Patterns for Service Authors
You have two ways to publish:
-
Manual publish — build a payload yourself and call the helper. Use this when you want full
before_value/after_valuesnapshots, custom descriptions, or to publish activity events (the middleware only publishes audit events)._ = events.PublishAuditEvent(ctx, publisher, &events.AuditEventPayload{ TenantID: tenantID, ActorID: userID, Action: "updated", ResourceType: "course", ResourceID: courseID, Module: "learning", BeforeValue: beforeMap, AfterValue: afterMap, }) -
Drop-in middleware — mount
events.AuditMiddleware(publisher, "<module>")on an admin Gin route group and every POST/PUT/PATCH/DELETE auto-publishes (including failed mutations — the middleware records the response status inmetadata.status_coderather than filtering on it). See Audit Middleware.
Activity events do not have a corresponding middleware — services publish them explicitly when meaningful user actions occur.
Troubleshooting
| Issue | Solution |
|---|---|
| Publish returns no error but row never appears | Check the audit-service logs for non-retryable validation rejections (most often missing tenant_id) |
| Same event appears twice | Producer published twice (e.g. middleware + manual publish on the same route); verify only one is wired |
| Subscriber stuck in retry loop | Inspect the audit-service logs — if the error string doesn't match any known pattern, it's classified as retryable; consider extending WithNonRetryableErrors |
| Correlation ID is missing | The publisher attaches one automatically — if it's missing the message bypassed PublishAuditEvent/PublishActivityEvent and was published raw |

