Skip to content

feat: Events - Support for Test, Delivery, Redelivery and Stats #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 171 additions & 18 deletions management/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package management

import (
"context"
"strings"
"time"
)

// EventStreamManager manages Auth0 Event Stream resources.
type EventStreamManager manager

/* ------------------------------------------------ CRUD ---------------------------------------------------------*/

// EventStream is used for event stream data.
type EventStream struct {
ID *string `json:"id,omitempty"`
Expand Down Expand Up @@ -33,31 +39,13 @@ type EventStreamDestination struct {
EventStreamDestinationConfiguration map[string]interface{} `json:"configuration,omitempty"`
}

// reset cleans up unnecessary fields based on the operation type.
func (e *EventStream) reset(op string) {
e.ID = nil
e.CreatedAt = nil
e.UpdatedAt = nil

switch op {
case "create":
e.Status = nil
default:
}
}

// EventStreamManager manages Auth0 Event Stream resources.
type EventStreamManager manager

// Create an Event Stream.
func (m *EventStreamManager) Create(ctx context.Context, e *EventStream, opts ...RequestOption) error {
e.reset("create")
return m.management.Request(ctx, "POST", m.management.URI("event-streams"), e, opts...)
}

// Update an Event Stream.
func (m *EventStreamManager) Update(ctx context.Context, id string, e *EventStream, opts ...RequestOption) error {
e.reset("update")
return m.management.Request(ctx, "PATCH", m.management.URI("event-streams", id), e, opts...)
}

Expand All @@ -77,3 +65,168 @@ func (m *EventStreamManager) Read(ctx context.Context, id string, opts ...Reques
func (m *EventStreamManager) Delete(ctx context.Context, id string, opts ...RequestOption) error {
return m.management.Request(ctx, "DELETE", m.management.URI("event-streams", id), nil, opts...)
}

/* ------------------------------------------------ UTILS ---------------------------------------------------------*/

// WithEventTypes returns a RequestOption that sets the `event_types` query parameter
// by joining the provided event type strings with commas.
// This is useful for filtering results by multiple event types in endpoints like /deliveries.
func WithEventTypes(types ...string) RequestOption {
return Parameter("event_types", strings.Join(types, ","))
}

// WithDateRange returns a slice of RequestOptions that apply a `date_from` and `date_to`
// range filter using RFC3339 timestamp format.
// This is commonly used to limit responses to a specific time window for endpoints that support temporal filtering.
func WithDateRange(from, to time.Time) []RequestOption {
return []RequestOption{
Parameter("date_from", from.Format(time.RFC3339)),
Parameter("date_to", to.Format(time.RFC3339)),
}
}

/* ------------------------------------------------ DELIVERY ---------------------------------------------------------*/

// BulkRedeliverRequest specifies filters for bulk redelivery of events.
type BulkRedeliverRequest struct {
EventTypes *[]string `json:"event_types,omitempty"`
DateFrom *string `json:"date_from,omitempty"`
DateTo *string `json:"date_to,omitempty"`
}

// EventDeliveryList represents the response from the /deliveries endpoint.
type EventDeliveryList struct {
List
Deliveries []*EventDelivery `json:"deliveries"`
}

// EventDelivery represents an individual delivery object.
type EventDelivery struct {
ID *string `json:"id,omitempty"`
EventStreamID *string `json:"event_stream_id,omitempty"`
EventType *string `json:"event_type,omitempty"`
Status *string `json:"status,omitempty"`
Attempts []*DeliveryAttempt `json:"attempts"`
Event *DeliveryEvent `json:"event,omitempty"`
}

// DeliveryAttempt represents an individual delivery attempt.
type DeliveryAttempt struct {
Status *string `json:"status,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
Timestamp *time.Time `json:"timestamp,omitempty"`
Duration *float64 `json:"duration,omitempty"`
}

// DeliveryEvent represents the original event data structure.
type DeliveryEvent struct {
ID *string `json:"id,omitempty"`
Source *string `json:"source,omitempty"`
SpecVersion *string `json:"specversion,omitempty"`
Type *string `json:"type,omitempty"`
Time *time.Time `json:"time,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
A0Tenant *string `json:"a0tenant,omitempty"`
A0Stream *string `json:"a0stream,omitempty"`
A0Purpose *string `json:"a0purpose,omitempty"`
}

// EventDeliveryListQueryParams holds query parameters for delivery listing.
type EventDeliveryListQueryParams struct {
EventTypes *[]string `url:"event_types,omitempty"`
DateFrom *time.Time `url:"date_from,omitempty"`
DateTo *time.Time `url:"date_to,omitempty"`
From *string `url:"from,omitempty"`
Take *int `url:"take,omitempty"`
}

// ReadDelivery fetches a single failed delivery event by eventStreamID and eventID.
func (m *EventStreamManager) ReadDelivery(ctx context.Context, eventStreamID, eventID string, opts ...RequestOption) (ed *EventDelivery, err error) {
err = m.management.Request(ctx, "GET", m.management.URI("event-streams", eventStreamID, "deliveries", eventID), &ed, opts...)
return
}

// ListDeliveries retrieves failed deliveries for a given Event Stream ID.
//
// Valid query parameters include:
// - "event_types": Comma-separated list of event types to filter by (e.g. "user.created,organization.created")
// - "date_from": Start datetime (ISO 8601)
// - "date_to": End datetime (ISO 8601)
// - "take": Max results per page (int)
// - "from": Pagination token returned in `nextPageToken` via ListDeliveries call
func (m *EventStreamManager) ListDeliveries(ctx context.Context, id string, opts ...RequestOption) (edl *EventDeliveryList, err error) {
err = m.management.Request(ctx, "GET", m.management.URI("event-streams", id, "deliveries"), &edl, applyListCheckpointDefaults(opts))
return
}

// Redeliver attempts to resend a previously failed delivery.
func (m *EventStreamManager) Redeliver(ctx context.Context, eventStreamID, eventID string, opts ...RequestOption) error {
return m.management.Request(ctx, "POST", m.management.URI("event-streams", eventStreamID, "redeliver", eventID), nil, opts...)
}

// RedeliverMany allow attempting delivery for failed events - Filters are combined using AND logic.
func (m *EventStreamManager) RedeliverMany(ctx context.Context, eventStreamID string, req *BulkRedeliverRequest, opts ...RequestOption) error {
return m.management.Request(ctx, "POST", m.management.URI("event-streams", eventStreamID, "redeliver"), req, opts...)
}

/* ------------------------------------------------ TEST ---------------------------------------------------------*/

// TestEvent represents both the request payload and the response from the Test endpoint.
type TestEvent struct {
// Request Fields
EventType *string `json:"event_type"` // Required for request
Data map[string]interface{} `json:"data,omitempty"` // Optional in request

// Response Fields
ID *string `json:"id,omitempty"`
EventStreamID *string `json:"event_stream_id,omitempty"`
Status *string `json:"status,omitempty"`
Attempts []*DeliveryAttempt `json:"attempts,omitempty"`
Event *DeliveryEvent `json:"event,omitempty"`
}

// Test triggers a test event on the given event stream.
func (m *EventStreamManager) Test(ctx context.Context, id string, testEvent *TestEvent, opts ...RequestOption) error {
return m.management.Request(ctx, "POST", m.management.URI("event-streams", id, "test"), &testEvent, opts...)
}

/* ------------------------------------------------ STATS ---------------------------------------------------------*/

// EventStreamStats represents the statistics returned for an event stream.
type EventStreamStats struct {
ID *string `json:"id,omitempty"`
Name *string `json:"name,omitempty"`
Window *StatsWindow `json:"window,omitempty"`
Buckets []*time.Time `json:"buckets,omitempty"`
Metrics []*StatsMetric `json:"metrics,omitempty"`
}

// StatsWindow defines the time range and interval used for stats calculation.
type StatsWindow struct {
DateFrom *time.Time `json:"date_from,omitempty"`
DateTo *time.Time `json:"date_to,omitempty"`
BucketInterval *StatsInterval `json:"bucket_interval,omitempty"`
}

// StatsInterval represents the size of each bucket interval in seconds.
type StatsInterval struct {
ScaleFactor *int `json:"scale_factor,omitempty"`
}

// StatsMetric represents a specific metric tracked over time buckets.
type StatsMetric struct {
Name *string `json:"name,omitempty"`
WindowTotal *int `json:"window_total,omitempty"`
Type *string `json:"type,omitempty"` // e.g., "sum"
Data []*int `json:"data,omitempty"` // aligns index-wise with Buckets
}

// Stats retrieves statistics for the specified event stream, including metrics such as
// deliveries over a given time window. Optional query parameters like `date_from` and
// `date_to` can be passed using RequestOptions.
//
// For example, use WithDateRange or other filters to control the time window and granularity.
func (m *EventStreamManager) Stats(ctx context.Context, id string, opts ...RequestOption) (stats *EventStreamStats, err error) {
err = m.management.Request(ctx, "GET", m.management.URI("event-streams", id, "stats"), &stats, opts...)
return
}
Loading
Loading