Installation
go get github.com/aptly-cloud/go
Initialization
Initialize the Brook client with your API key:
package main
import (
" log "
aptly " github.com/aptly-cloud/go "
)
func main () {
client , err := aptly . NewClient ( aptly . Config {
APIKey : "your-api-key" ,
})
if err != nil {
log . Fatalf ( "Failed to create client: %v " , err )
}
defer client . Cleanup ()
}
Configuration Options
Option Type Required Description
APIKeystringYes Your API key from the console
Recommended: Use Server Keys Go applications typically run in backend
environments. We recommend using Server Keys for Go applications. Always store
your server key in environment variables and never commit it to version
control.
Connecting to the Server
Establish a connection to the Brook server:
// Connect to the server
if err := client . Connect (); err != nil {
log . Fatalf ( "Failed to connect: %v " , err )
}
// Check connection status
isConnected := client . IsConnected ()
fmt . Println ( "Connected:" , isConnected )
// Get connection status
status := client . GetConnectionStatus ()
fmt . Println ( "Status:" , status ) // "connected", "connecting", "disconnected", etc.
// Get client ID
clientID := client . GetClientID ()
fmt . Println ( "Client ID:" , clientID )
Subscribing to Messages
Subscribe to a channel/topic to receive real-time messages:
// Create or get a channel
channel , err := client . Realtime (). Channel ( "my-topic" )
if err != nil {
log . Fatalf ( "Failed to create channel: %v " , err )
}
// Subscribe to messages
unsubscribe , err := channel . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
fmt . Printf ( "Message: %v \n " , data )
fmt . Printf ( "Metadata: %+v \n " , metadata )
// Metadata contains:
// - Offset: Message sequence number
// - Timestamp: When message was sent (Unix milliseconds)
// - Replay: Boolean indicating if this is a replayed message
// - Channel: Channel name
})
if err != nil {
log . Fatalf ( "Failed to subscribe: %v " , err )
}
defer unsubscribe ()
// Later, unsubscribe when done
unsubscribe ()
Every message comes with metadata that provides context:
Field Type Description
Offsetint64Message sequence number in the channel Timestampint64Unix timestamp in milliseconds when message was sent Replaybooltrue if this is a missed message being replayedChannelstringChannel name
channel . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
fmt . Printf ( "Message data: %v \n " , data )
// Access metadata fields
fmt . Printf ( "Offset: %d \n " , metadata . Offset )
fmt . Printf ( "Timestamp: %d \n " , metadata . Timestamp )
fmt . Printf ( "Replay: %v \n " , metadata . Replay )
fmt . Printf ( "Channel: %s \n " , metadata . Channel )
if metadata . Replay {
fmt . Printf ( "This is a replayed message from offset: %d \n " , metadata . Offset )
}
})
Type-Safe Message Handling
You can type-assert message data to your specific types:
type ChatMessage struct {
User string `json:"user"`
Text string `json:"text"`
Time int64 `json:"time"`
}
channel . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
// Type assert to map first
if dataMap , ok := data .( map [ string ] interface {}); ok {
// Extract fields
if user , ok := dataMap [ "user" ].( string ); ok {
fmt . Printf ( "User: %s \n " , user )
}
if text , ok := dataMap [ "text" ].( string ); ok {
fmt . Printf ( "Message: %s \n " , text )
}
}
})
Publishing Messages
Using the SDK
Publish messages through the WebSocket connection:
channel , _ := client . Realtime (). Channel ( "my-topic" )
// Publish a simple message
err := channel . Publish ( map [ string ] interface {}{
"text" : "Hello World!" ,
})
if err != nil {
log . Printf ( "Failed to publish: %v " , err )
}
// Publish structured data
err = channel . Publish ( map [ string ] interface {}{
"type" : "notification" ,
"user" : "john" ,
"data" : map [ string ] interface {}{
"count" : 42 ,
},
})
// Using the alias method (same as Publish)
channel . Send ( map [ string ] string {
"text" : "Same as publish()" ,
})
Using HTTP (REST API)
Publish messages via HTTP when WebSocket is not available:
// Publish via HTTP
result , err := client . PublishHTTP ( "my-topic" , map [ string ] string {
"text" : "Hello from HTTP!" ,
})
if err != nil {
log . Fatalf ( "Failed to publish via HTTP: %v " , err )
}
fmt . Printf ( "Published: %+v \n " , result )
Using Standard HTTP Client
You can also use Go’s standard HTTP client:
import (
" bytes "
" encoding/json "
" net/http "
)
// Prepare request body
body := map [ string ] interface {}{
"channel" : "my-topic" ,
"message" : map [ string ] string {
"text" : "Hello from standard HTTP!" ,
},
}
jsonData , _ := json . Marshal ( body )
// Create request
req , _ := http . NewRequest ( "POST" , "https://connect.aptly.cloud/realtime" , bytes . NewBuffer ( jsonData ))
req . Header . Set ( "Content-Type" , "application/json" )
req . Header . Set ( "x-api-key" , "your-server-key" )
// Send request
client := & http . Client {}
resp , err := client . Do ( req )
if err != nil {
log . Fatal ( err )
}
defer resp . Body . Close ()
fmt . Println ( "Status:" , resp . Status )
Observing Connection Status
Monitor connection status changes to handle reconnections and errors:
// Subscribe to connection changes
unsubscribe := client . OnConnectivityChange ( func ( status aptly . ConnectionState ) {
fmt . Printf ( "Connection status changed to: %s \n " , status )
switch status {
case aptly . StateConnected :
fmt . Println ( "Connected and ready to send/receive messages" )
case aptly . StateConnecting :
fmt . Println ( "Attempting to connect..." )
case aptly . StateReconnecting :
fmt . Println ( "Connection lost, attempting to reconnect..." )
case aptly . StateDisconnected :
fmt . Println ( "Disconnected from server" )
case aptly . StateFailed :
fmt . Println ( "Connection failed" )
case aptly . StateUnauthorized :
fmt . Println ( "Authentication failed" )
}
})
defer unsubscribe ()
// Unsubscribe when done
unsubscribe ()
Connection States
State Description
StateDisconnectedNot connected to server StateConnectingInitial connection attempt in progress StateAuthenticatingAuthenticating with API key StateConnectedSuccessfully connected and authenticated StateReconnectingAttempting to reconnect after connection loss StateUnauthorizedAuthentication failed (invalid API key) StateFailedConnection failed
Complete Example
View Complete Example See a full working example with all features on GitHub
Advanced Features
// Get unique client ID
clientID := client . GetClientID ()
fmt . Println ( "Client ID:" , clientID )
// Check if authenticated
isAuth := client . IsAuthenticated ()
fmt . Println ( "Authenticated:" , isAuth )
// Get active channels
channels := client . GetActiveChannels ()
fmt . Printf ( "Active channels: %v \n " , channels )
// Get detailed statistics
stats := client . GetStats ()
fmt . Printf ( "Connection stats: %+v \n " , stats . Connection )
fmt . Printf ( "Channel stats: %+v \n " , stats . Channels )
fmt . Printf ( "Active channel count: %d \n " , stats . ActiveChannels )
Channel Statistics
channel , _ := client . Realtime (). Channel ( "my-topic" )
stats := channel . GetStats ()
fmt . Printf ( "Channel stats: %+v \n " , stats )
// Output:
// {
// Name: "my-topic",
// StreamHandlers: 1,
// ConnectionState: "connected"
// }
Working with Multiple Channels
// Create multiple channels
channel1 , _ := client . Realtime (). Channel ( "channel-1" )
channel2 , _ := client . Realtime (). Channel ( "channel-2" )
// Subscribe to both
unsub1 , _ := channel1 . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
fmt . Printf ( "Channel 1: %v \n " , data )
})
unsub2 , _ := channel2 . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
fmt . Printf ( "Channel 2: %v \n " , data )
})
defer unsub1 ()
defer unsub2 ()
// Publish to different channels
channel1 . Publish ( map [ string ] string { "text" : "Message to channel 1" })
channel2 . Publish ( map [ string ] string { "text" : "Message to channel 2" })
Graceful Shutdown
import (
" os "
" os/signal "
" syscall "
)
// Set up signal handling
sigChan := make ( chan os . Signal , 1 )
signal . Notify ( sigChan , syscall . SIGINT , syscall . SIGTERM )
// Wait for signal
<- sigChan
// Cleanup and disconnect
client . Cleanup ()
fmt . Println ( "Shutdown complete" )
Using Context for Cancellation
import " context "
ctx , cancel := context . WithCancel ( context . Background ())
defer cancel ()
// Use context with goroutines
go func () {
channel , _ := client . Realtime (). Channel ( "my-topic" )
unsubscribe , _ := channel . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
fmt . Printf ( "Message: %v \n " , data )
})
defer unsubscribe ()
<- ctx . Done ()
fmt . Println ( "Context cancelled, cleaning up..." )
}()
// Cancel when done
time . Sleep ( 30 * time . Second )
cancel ()
Error Handling
// Connection errors
if err := client . Connect (); err != nil {
log . Printf ( "Connection error: %v " , err )
}
// Channel creation errors
channel , err := client . Realtime (). Channel ( "" )
if err != nil {
log . Printf ( "Invalid channel name: %v " , err )
}
// Publishing errors
if err := channel . Publish ( data ); err != nil {
log . Printf ( "Publish failed: %v " , err )
// Fallback to HTTP
if result , httpErr := client . PublishHTTP ( "my-topic" , data ); httpErr != nil {
log . Printf ( "HTTP publish also failed: %v " , httpErr )
} else {
fmt . Printf ( "Published via HTTP: %+v \n " , result )
}
}
// HTTP publishing errors
response , err := client . PublishHTTP ( "channel" , data )
if err != nil {
log . Printf ( "HTTP publish failed: %v " , err )
}
Best Practices
Always handle connection state changes to provide feedback when connection
is lost or restored. Use OnConnectivityChange() to monitor connection
health.
Don’t forget to cleanup by calling defer client.Cleanup() or defer unsubscribe() to prevent memory leaks and ensure proper resource cleanup.
Message replay automatically handles missed messages when you reconnect,
ensuring no data loss. Check metadata.Replay to distinguish between new and
replayed messages.
Additional Recommendations
Use defer for cleanup : Always use defer to ensure cleanup functions are called
Handle errors : Check errors from all SDK methods
Type assertions : Safely type-assert message data before using it
Goroutine safety : All SDK operations are goroutine-safe, but handle your own data structures carefully
Context integration : Use Go contexts for better cancellation control
Thread Safety
The Go SDK is designed to be thread-safe and can be safely used across multiple goroutines:
// Safe to use from multiple goroutines
go func () {
channel . Publish ( map [ string ] string { "text" : "From goroutine 1" })
}()
go func () {
channel . Publish ( map [ string ] string { "text" : "From goroutine 2" })
}()
// Safe to subscribe from multiple goroutines
for i := 0 ; i < 5 ; i ++ {
go func ( id int ) {
channel , _ := client . Realtime (). Channel ( fmt . Sprintf ( "channel- %d " , id ))
channel . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
fmt . Printf ( "Goroutine %d received: %v \n " , id , data )
})
}( i )
}
Testing
Here’s how to write tests for applications using the Brook SDK:
package main
import (
" testing "
" time "
aptly " github.com/aptly-cloud/go "
)
func TestBrookIntegration ( t * testing . T ) {
// Create client
client , err := aptly . NewClient ( aptly . Config {
APIKey : "test-api-key" ,
})
if err != nil {
t . Fatalf ( "Failed to create client: %v " , err )
}
defer client . Cleanup ()
// Connect
if err := client . Connect (); err != nil {
t . Fatalf ( "Failed to connect: %v " , err )
}
// Create channel
channel , err := client . Realtime (). Channel ( "test-channel" )
if err != nil {
t . Fatalf ( "Failed to create channel: %v " , err )
}
// Test subscription
received := make ( chan bool )
unsubscribe , err := channel . Stream ( func ( data interface {}, metadata aptly . MessageMetadata ) {
received <- true
})
if err != nil {
t . Fatalf ( "Failed to subscribe: %v " , err )
}
defer unsubscribe ()
// Test publishing
if err := channel . Publish ( map [ string ] string { "test" : "data" }); err != nil {
t . Fatalf ( "Failed to publish: %v " , err )
}
// Wait for message
select {
case <- received :
t . Log ( "Message received successfully" )
case <- time . After ( 5 * time . Second ):
t . Fatal ( "Timeout waiting for message" )
}
}
API Reference
Client Methods
NewClient(config Config) (*Client, error) - Create new client
Connect() error - Connect to server
Disconnect() - Disconnect from server
Cleanup() - Clean up resources
Realtime() *Realtime - Get realtime interface
Channel(name string) (*Channel, error) - Create/get channel
IsConnected() bool - Check connection status
IsAuthenticated() bool - Check authentication status
GetClientID() string - Get client ID
GetConnectionStatus() ConnectionState - Get connection state
GetActiveChannels() []string - Get active channel names
GetStats() Stats - Get statistics
PublishHTTP(channel string, message interface{}) (map[string]interface{}, error) - Publish via HTTP
OnConnectivityChange(callback ConnectivityCallback) func() - Subscribe to connectivity changes
Channel Methods
Stream(callback MessageHandler) (func(), error) - Subscribe to messages
Unstream(callback MessageHandler) - Unsubscribe callback
Publish(message interface{}) error - Publish message via WebSocket
Send(message interface{}) error - Alias for Publish
Close() - Close channel
GetStats() ChannelStats - Get channel statistics
Resubscribe() - Manually resubscribe (usually automatic)
Types
// Config for client initialization
type Config struct {
APIKey string
Verbose bool
Endpoint string
ReconnectTimeout time . Duration
}
// MessageMetadata provided with each message
type MessageMetadata struct {
Offset int64
Timestamp int64
Replay bool
Channel string
}
// ConnectionState represents connection status
type ConnectionState string
const (
StateDisconnected ConnectionState = "disconnected"
StateConnecting ConnectionState = "connecting"
StateAuthenticating ConnectionState = "authenticating"
StateConnected ConnectionState = "connected"
StateReconnecting ConnectionState = "reconnecting"
StateFailed ConnectionState = "failed"
StateUnauthorized ConnectionState = "unauthorized"
)
// Callback types
type MessageHandler func ( data interface {}, metadata MessageMetadata )
type ConnectivityCallback func ( status ConnectionState )
Next Steps