Skip to main content

Installation

go get github.com/aptly-cloud/go

Initialization

Initialize the Brook client with your API key:
package main

import (
    "log"

    aptly "github.com/aptly-cloud/go"
)

func main() {
    client, err := aptly.NewClient(aptly.Config{
        APIKey:  "your-api-key",
    })
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Cleanup()
}

Configuration Options

OptionTypeRequiredDescription
APIKeystringYesYour API key from the console
Recommended: Use Server Keys Go applications typically run in backend environments. We recommend using Server Keys for Go applications. Always store your server key in environment variables and never commit it to version control.

Connecting to the Server

Establish a connection to the Brook server:
// Connect to the server
if err := client.Connect(); err != nil {
    log.Fatalf("Failed to connect: %v", err)
}

// Check connection status
isConnected := client.IsConnected()
fmt.Println("Connected:", isConnected)

// Get connection status
status := client.GetConnectionStatus()
fmt.Println("Status:", status) // "connected", "connecting", "disconnected", etc.

// Get client ID
clientID := client.GetClientID()
fmt.Println("Client ID:", clientID)

Subscribing to Messages

Subscribe to a channel/topic to receive real-time messages:
// Create or get a channel
channel, err := client.Realtime().Channel("my-topic")
if err != nil {
    log.Fatalf("Failed to create channel: %v", err)
}

// Subscribe to messages
unsubscribe, err := channel.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
    fmt.Printf("Message: %v\n", data)
    fmt.Printf("Metadata: %+v\n", metadata)

    // Metadata contains:
    // - Offset: Message sequence number
    // - Timestamp: When message was sent (Unix milliseconds)
    // - Replay: Boolean indicating if this is a replayed message
    // - Channel: Channel name
})
if err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}
defer unsubscribe()

// Later, unsubscribe when done
unsubscribe()

Message Metadata

Every message comes with metadata that provides context:
FieldTypeDescription
Offsetint64Message sequence number in the channel
Timestampint64Unix timestamp in milliseconds when message was sent
Replaybooltrue if this is a missed message being replayed
ChannelstringChannel name
channel.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
    fmt.Printf("Message data: %v\n", data)

    // Access metadata fields
    fmt.Printf("Offset: %d\n", metadata.Offset)
    fmt.Printf("Timestamp: %d\n", metadata.Timestamp)
    fmt.Printf("Replay: %v\n", metadata.Replay)
    fmt.Printf("Channel: %s\n", metadata.Channel)

    if metadata.Replay {
        fmt.Printf("This is a replayed message from offset: %d\n", metadata.Offset)
    }
})

Type-Safe Message Handling

You can type-assert message data to your specific types:
type ChatMessage struct {
    User string `json:"user"`
    Text string `json:"text"`
    Time int64  `json:"time"`
}

channel.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
    // Type assert to map first
    if dataMap, ok := data.(map[string]interface{}); ok {
        // Extract fields
        if user, ok := dataMap["user"].(string); ok {
            fmt.Printf("User: %s\n", user)
        }
        if text, ok := dataMap["text"].(string); ok {
            fmt.Printf("Message: %s\n", text)
        }
    }
})

Publishing Messages

Using the SDK

Publish messages through the WebSocket connection:
channel, _ := client.Realtime().Channel("my-topic")

// Publish a simple message
err := channel.Publish(map[string]interface{}{
    "text": "Hello World!",
})
if err != nil {
    log.Printf("Failed to publish: %v", err)
}

// Publish structured data
err = channel.Publish(map[string]interface{}{
    "type": "notification",
    "user": "john",
    "data": map[string]interface{}{
        "count": 42,
    },
})

// Using the alias method (same as Publish)
channel.Send(map[string]string{
    "text": "Same as publish()",
})

Using HTTP (REST API)

Publish messages via HTTP when WebSocket is not available:
// Publish via HTTP
result, err := client.PublishHTTP("my-topic", map[string]string{
    "text": "Hello from HTTP!",
})
if err != nil {
    log.Fatalf("Failed to publish via HTTP: %v", err)
}

fmt.Printf("Published: %+v\n", result)

Using Standard HTTP Client

You can also use Go’s standard HTTP client:
import (
    "bytes"
    "encoding/json"
    "net/http"
)

// Prepare request body
body := map[string]interface{}{
    "channel": "my-topic",
    "message": map[string]string{
        "text": "Hello from standard HTTP!",
    },
}

jsonData, _ := json.Marshal(body)

// Create request
req, _ := http.NewRequest("POST", "https://connect.aptly.cloud/realtime", bytes.NewBuffer(jsonData))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-api-key", "your-server-key")

// Send request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
    log.Fatal(err)
}
defer resp.Body.Close()

fmt.Println("Status:", resp.Status)

Observing Connection Status

Monitor connection status changes to handle reconnections and errors:
// Subscribe to connection changes
unsubscribe := client.OnConnectivityChange(func(status aptly.ConnectionState) {
    fmt.Printf("Connection status changed to: %s\n", status)

    switch status {
    case aptly.StateConnected:
        fmt.Println("Connected and ready to send/receive messages")
    case aptly.StateConnecting:
        fmt.Println("Attempting to connect...")
    case aptly.StateReconnecting:
        fmt.Println("Connection lost, attempting to reconnect...")
    case aptly.StateDisconnected:
        fmt.Println("Disconnected from server")
    case aptly.StateFailed:
        fmt.Println("Connection failed")
    case aptly.StateUnauthorized:
        fmt.Println("Authentication failed")
    }
})
defer unsubscribe()

// Unsubscribe when done
unsubscribe()

Connection States

StateDescription
StateDisconnectedNot connected to server
StateConnectingInitial connection attempt in progress
StateAuthenticatingAuthenticating with API key
StateConnectedSuccessfully connected and authenticated
StateReconnectingAttempting to reconnect after connection loss
StateUnauthorizedAuthentication failed (invalid API key)
StateFailedConnection failed

Complete Example

View Complete Example

See a full working example with all features on GitHub

Advanced Features

Getting Client Information

// Get unique client ID
clientID := client.GetClientID()
fmt.Println("Client ID:", clientID)

// Check if authenticated
isAuth := client.IsAuthenticated()
fmt.Println("Authenticated:", isAuth)

// Get active channels
channels := client.GetActiveChannels()
fmt.Printf("Active channels: %v\n", channels)

// Get detailed statistics
stats := client.GetStats()
fmt.Printf("Connection stats: %+v\n", stats.Connection)
fmt.Printf("Channel stats: %+v\n", stats.Channels)
fmt.Printf("Active channel count: %d\n", stats.ActiveChannels)

Channel Statistics

channel, _ := client.Realtime().Channel("my-topic")

stats := channel.GetStats()
fmt.Printf("Channel stats: %+v\n", stats)
// Output:
// {
//   Name: "my-topic",
//   StreamHandlers: 1,
//   ConnectionState: "connected"
// }

Working with Multiple Channels

// Create multiple channels
channel1, _ := client.Realtime().Channel("channel-1")
channel2, _ := client.Realtime().Channel("channel-2")

// Subscribe to both
unsub1, _ := channel1.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
    fmt.Printf("Channel 1: %v\n", data)
})

unsub2, _ := channel2.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
    fmt.Printf("Channel 2: %v\n", data)
})

defer unsub1()
defer unsub2()

// Publish to different channels
channel1.Publish(map[string]string{"text": "Message to channel 1"})
channel2.Publish(map[string]string{"text": "Message to channel 2"})

Graceful Shutdown

import (
    "os"
    "os/signal"
    "syscall"
)

// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Wait for signal
<-sigChan

// Cleanup and disconnect
client.Cleanup()
fmt.Println("Shutdown complete")

Using Context for Cancellation

import "context"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Use context with goroutines
go func() {
    channel, _ := client.Realtime().Channel("my-topic")
    unsubscribe, _ := channel.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
        fmt.Printf("Message: %v\n", data)
    })
    defer unsubscribe()

    <-ctx.Done()
    fmt.Println("Context cancelled, cleaning up...")
}()

// Cancel when done
time.Sleep(30 * time.Second)
cancel()

Error Handling

// Connection errors
if err := client.Connect(); err != nil {
    log.Printf("Connection error: %v", err)
}

// Channel creation errors
channel, err := client.Realtime().Channel("")
if err != nil {
    log.Printf("Invalid channel name: %v", err)
}

// Publishing errors
if err := channel.Publish(data); err != nil {
    log.Printf("Publish failed: %v", err)

    // Fallback to HTTP
    if result, httpErr := client.PublishHTTP("my-topic", data); httpErr != nil {
        log.Printf("HTTP publish also failed: %v", httpErr)
    } else {
        fmt.Printf("Published via HTTP: %+v\n", result)
    }
}

// HTTP publishing errors
response, err := client.PublishHTTP("channel", data)
if err != nil {
    log.Printf("HTTP publish failed: %v", err)
}

Best Practices

Always handle connection state changes to provide feedback when connection is lost or restored. Use OnConnectivityChange() to monitor connection health.
Don’t forget to cleanup by calling defer client.Cleanup() or defer unsubscribe() to prevent memory leaks and ensure proper resource cleanup.
Message replay automatically handles missed messages when you reconnect, ensuring no data loss. Check metadata.Replay to distinguish between new and replayed messages.

Additional Recommendations

  1. Use defer for cleanup: Always use defer to ensure cleanup functions are called
  2. Handle errors: Check errors from all SDK methods
  3. Type assertions: Safely type-assert message data before using it
  4. Goroutine safety: All SDK operations are goroutine-safe, but handle your own data structures carefully
  5. Context integration: Use Go contexts for better cancellation control

Thread Safety

The Go SDK is designed to be thread-safe and can be safely used across multiple goroutines:
// Safe to use from multiple goroutines
go func() {
    channel.Publish(map[string]string{"text": "From goroutine 1"})
}()

go func() {
    channel.Publish(map[string]string{"text": "From goroutine 2"})
}()

// Safe to subscribe from multiple goroutines
for i := 0; i < 5; i++ {
    go func(id int) {
        channel, _ := client.Realtime().Channel(fmt.Sprintf("channel-%d", id))
        channel.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
            fmt.Printf("Goroutine %d received: %v\n", id, data)
        })
    }(i)
}

Testing

Here’s how to write tests for applications using the Brook SDK:
package main

import (
    "testing"
    "time"

    aptly "github.com/aptly-cloud/go"
)

func TestBrookIntegration(t *testing.T) {
    // Create client
    client, err := aptly.NewClient(aptly.Config{
        APIKey: "test-api-key",
    })
    if err != nil {
        t.Fatalf("Failed to create client: %v", err)
    }
    defer client.Cleanup()

    // Connect
    if err := client.Connect(); err != nil {
        t.Fatalf("Failed to connect: %v", err)
    }

    // Create channel
    channel, err := client.Realtime().Channel("test-channel")
    if err != nil {
        t.Fatalf("Failed to create channel: %v", err)
    }

    // Test subscription
    received := make(chan bool)
    unsubscribe, err := channel.Stream(func(data interface{}, metadata aptly.MessageMetadata) {
        received <- true
    })
    if err != nil {
        t.Fatalf("Failed to subscribe: %v", err)
    }
    defer unsubscribe()

    // Test publishing
    if err := channel.Publish(map[string]string{"test": "data"}); err != nil {
        t.Fatalf("Failed to publish: %v", err)
    }

    // Wait for message
    select {
    case <-received:
        t.Log("Message received successfully")
    case <-time.After(5 * time.Second):
        t.Fatal("Timeout waiting for message")
    }
}

API Reference

Client Methods

  • NewClient(config Config) (*Client, error) - Create new client
  • Connect() error - Connect to server
  • Disconnect() - Disconnect from server
  • Cleanup() - Clean up resources
  • Realtime() *Realtime - Get realtime interface
  • Channel(name string) (*Channel, error) - Create/get channel
  • IsConnected() bool - Check connection status
  • IsAuthenticated() bool - Check authentication status
  • GetClientID() string - Get client ID
  • GetConnectionStatus() ConnectionState - Get connection state
  • GetActiveChannels() []string - Get active channel names
  • GetStats() Stats - Get statistics
  • PublishHTTP(channel string, message interface{}) (map[string]interface{}, error) - Publish via HTTP
  • OnConnectivityChange(callback ConnectivityCallback) func() - Subscribe to connectivity changes

Channel Methods

  • Stream(callback MessageHandler) (func(), error) - Subscribe to messages
  • Unstream(callback MessageHandler) - Unsubscribe callback
  • Publish(message interface{}) error - Publish message via WebSocket
  • Send(message interface{}) error - Alias for Publish
  • Close() - Close channel
  • GetStats() ChannelStats - Get channel statistics
  • Resubscribe() - Manually resubscribe (usually automatic)

Types

// Config for client initialization
type Config struct {
    APIKey           string
    Verbose          bool
    Endpoint         string
    ReconnectTimeout time.Duration
}

// MessageMetadata provided with each message
type MessageMetadata struct {
    Offset    int64
    Timestamp int64
    Replay    bool
    Channel   string
}

// ConnectionState represents connection status
type ConnectionState string

const (
    StateDisconnected    ConnectionState = "disconnected"
    StateConnecting      ConnectionState = "connecting"
    StateAuthenticating  ConnectionState = "authenticating"
    StateConnected       ConnectionState = "connected"
    StateReconnecting    ConnectionState = "reconnecting"
    StateFailed          ConnectionState = "failed"
    StateUnauthorized    ConnectionState = "unauthorized"
)

// Callback types
type MessageHandler func(data interface{}, metadata MessageMetadata)
type ConnectivityCallback func(status ConnectionState)

Next Steps