EduShade
Audit Module

Event Ingestion

Pub/sub topics, payload contracts, subscribers, and retry semantics for the audit pipeline

Event Ingestion

Audit and activity rows are never written by application code directly. Every row arrives through Redis pub/sub. This page documents the wire contract — topic names, payload shapes, subscriber names, and how failed messages are retried.

Topics

The two topic names are produced by helpers in common/pkg/events/topics.go:

HelperReturns
events.BuildAuditTopic()"audit.events"
events.BuildActivityTopic()"activity.events"

Both topics live on the same Redis cluster used by the rest of the platform's pub/sub (Watermill-backed).

Payload Contracts

AuditEventPayload

Defined in common/pkg/events/audit_events.go — published to audit.events for any admin/system mutation.

type AuditEventPayload struct {
    TenantID     string                 `json:"tenant_id"`
    ActorID      string                 `json:"actor_id"`
    ActorType    string                 `json:"actor_type,omitempty"`   // "user", "system", "admin"
    Action       string                 `json:"action"`                 // "created", "updated", "deleted"
    ResourceType string                 `json:"resource_type"`          // "course", "user", "role", ...
    ResourceID   string                 `json:"resource_id"`
    Module       string                 `json:"module,omitempty"`
    Description  string                 `json:"description,omitempty"`
    BeforeValue  map[string]interface{} `json:"before_value,omitempty"`
    AfterValue   map[string]interface{} `json:"after_value,omitempty"`
    IPAddress    string                 `json:"ip_address,omitempty"`
    UserAgent    string                 `json:"user_agent,omitempty"`
    Metadata     map[string]interface{} `json:"metadata,omitempty"`
    Timestamp    time.Time              `json:"timestamp"`
}

Defaults applied by the publisher:

  • Timestamp is set to time.Now() if zero
  • ActorType defaults to "user" if empty

ActivityEventPayload

Published to activity.events for any user-driven event:

type ActivityEventPayload struct {
    TenantID       string                 `json:"tenant_id"`
    UserID         string                 `json:"user_id"`
    ImpersonatedBy string                 `json:"impersonated_by,omitempty"`
    Title          string                 `json:"title"`           // "User logged in"
    Action         string                 `json:"action"`          // "login", "enroll", ...
    Module         string                 `json:"module,omitempty"`
    Description    string                 `json:"description,omitempty"`
    Endpoint       string                 `json:"endpoint,omitempty"`
    Method         string                 `json:"method,omitempty"`
    StatusCode     int                    `json:"status_code,omitempty"`
    IPAddress      string                 `json:"ip_address,omitempty"`
    UserAgent      string                 `json:"user_agent,omitempty"`
    Metadata       map[string]interface{} `json:"metadata,omitempty"`
    Timestamp      time.Time              `json:"timestamp"`
}

The publisher sets Timestamp to now if zero. StatusCode is validated by the service layer — anything outside 100..599 is rejected.

Publishing Helpers

Both helpers live in common/pkg/events/audit_events.go:

FunctionPurpose
events.PublishAuditEvent(ctx, publisher, payload)Validates defaults, attaches metadata, publishes to audit.events
events.PublishActivityEvent(ctx, publisher, payload)Same flow for activity.events

Both helpers attach a correlation_id (a fresh UUID) and the most relevant fields (tenant_id, actor/user, action, resource type, module) as message metadata so downstream consumers and logs can trace events across services.

If publisher is nil, the helper is a no-op — services where audit is optional can wire nil and ship without breaking.

Subscribers

Inside audit-service two subscribers run side-by-side, registered from main.go:

SubscriberTopicService
audit-service-auditaudit.eventsAuditEventSubscriberAuditLogService.CreateFromEvent
audit-service-activityactivity.eventsActivityEventSubscriberActivityLogService.CreateFromEvent

The subscribers run as long-lived goroutines started by StartSubscriber(ctx). On shutdown the audit-service cancels their context and calls Close() on the underlying Redis subscriber.

Retry Semantics

Both subscribers wrap message handling with pubsub.ProcessMessagesWithRetry and use pubsub.DefaultRetryConfig:

SettingValue
Max retries3
Initial delay5s
Max delay5m
Backoff factor×2 (exponential)

A DefaultRetryableErrorChecker classifies error strings:

  • Retryable substrings: timeout, connection, temporary, deadline exceeded, context deadline exceeded, database, network
  • Non-retryable substrings: invalid, validation, not found, unauthorized, forbidden

On top of that, each subscriber adds its own non-retryable patterns:

SubscriberExtra non-retryable patterns
Auditinvalid tenant_id, invalid payload
Activityinvalid tenant_id, invalid user_id, invalid payload

Non-retryable errors are acknowledged — the message is not re-delivered, because re-trying will never succeed. Retryable errors are nacked and re-delivered with backoff.

Required vs Optional Fields

The service-layer CreateFromEvent enforces these requirements:

StreamRequiredValidation failure
Audittenant_idReturns invalid tenant_id (non-retryable)
Auditparseable JSONReturns invalid payload (non-retryable)
Activitytenant_idReturns invalid tenant_id (non-retryable)
Activityuser_idReturns invalid user_id (non-retryable)
Activitystatus_code if set must be 100–599Rejected before insert

Everything else — description, before_value, after_value, ip_address, user_agent, metadata — is optional and stored as-is.

End-to-End Flow

┌──────────────────┐    payload      ┌──────────────┐
│ publisher service│  ──────────►   │  Redis topic  │
│  (auth, learning,│                 │ audit.events  │
│   billing, ...)  │                 │activity.events│
└──────────────────┘                 └───────┬──────┘

                              audit-service subscribers


                                ┌────────────────────────┐
                                │ Validate payload        │
                                │ Resolve tenant_id       │
                                │ INSERT into Postgres    │
                                │ ack / retry as needed   │
                                └────────────────────────┘

The publishing service does not wait for the subscriber. A briefly-down audit-service does not slow user-facing requests; messages stay in Redis until the subscriber catches up.

Operational Endpoints

The audit-service Gin engine is built via base_api.NewApiEngine from common/api/engine.go, which wires two health probes exposed at the root of the service — not behind /api/v1:

MethodPathResponse
GET/{ "status": "healthy", "timestamp": "<UTC ISO8601>" }
GET/healthSame — identical handler

There is no separate readiness (/ready) probe; liveness and readiness both resolve against /health. Neither probe checks Redis or Postgres connectivity — it returns 200 OK as soon as the HTTP server is accepting connections. Wire richer checks into your deployment's startup probe if you need them.

Patterns for Service Authors

You have two ways to publish:

  1. Manual publish — build a payload yourself and call the helper. Use this when you want full before_value / after_value snapshots, custom descriptions, or to publish activity events (the middleware only publishes audit events).

    _ = events.PublishAuditEvent(ctx, publisher, &events.AuditEventPayload{
        TenantID:     tenantID,
        ActorID:      userID,
        Action:       "updated",
        ResourceType: "course",
        ResourceID:   courseID,
        Module:       "learning",
        BeforeValue:  beforeMap,
        AfterValue:   afterMap,
    })
  2. Drop-in middleware — mount events.AuditMiddleware(publisher, "<module>") on an admin Gin route group and every POST/PUT/PATCH/DELETE auto-publishes (including failed mutations — the middleware records the response status in metadata.status_code rather than filtering on it). See Audit Middleware.

Activity events do not have a corresponding middleware — services publish them explicitly when meaningful user actions occur.

Troubleshooting

IssueSolution
Publish returns no error but row never appearsCheck the audit-service logs for non-retryable validation rejections (most often missing tenant_id)
Same event appears twiceProducer published twice (e.g. middleware + manual publish on the same route); verify only one is wired
Subscriber stuck in retry loopInspect the audit-service logs — if the error string doesn't match any known pattern, it's classified as retryable; consider extending WithNonRetryableErrors
Correlation ID is missingThe publisher attaches one automatically — if it's missing the message bypassed PublishAuditEvent/PublishActivityEvent and was published raw

On this page