Skip to main content

Go Client Library

The lplexc package provides a Go client for communicating with an lplex server. It handles SSE parsing, session management, auto-reconnect, PGN decoding, and mDNS discovery.

Install

go get github.com/sixfathoms/lplex/lplexc

Quick start

package main

import (
"context"
"fmt"
"log"

"github.com/sixfathoms/lplex/lplexc"
)

func main() {
client := lplexc.NewClient("http://inuc1.local:8089")

// List devices
devices, err := client.Devices(context.Background())
if err != nil {
log.Fatal(err)
}
for _, d := range devices {
fmt.Printf("%s (%s) at src=%d\n", d.ModelID, d.Manufacturer, d.Src)
}

// Subscribe to all frames
sub, err := client.Subscribe(context.Background(), nil)
if err != nil {
log.Fatal(err)
}
defer sub.Close()

for {
ev, err := sub.Next()
if err != nil {
break
}
if ev.Frame != nil {
fmt.Printf("pgn=%d src=%d data=%s\n", ev.Frame.PGN, ev.Frame.Src, ev.Frame.Data)
}
}
}

Client options

client := lplexc.NewClient("http://inuc1.local:8089",
lplexc.WithLogger(slog.Default()),
lplexc.WithPoolSize(20),
lplexc.WithBackoff(lplexc.BackoffConfig{
InitialInterval: 2 * time.Second,
MaxInterval: 1 * time.Minute,
MaxRetries: 0, // unlimited
}),
)
OptionDescription
WithHTTPClient(c)Use a custom *http.Client
WithTransport(t)Set a custom http.RoundTripper
WithPoolSize(n)Max idle connections per host
WithLogger(l)Structured logger (*slog.Logger)
WithBackoff(b)Reconnection backoff config

Ephemeral subscription

Subscribe opens a GET /events SSE stream. Returns a *Subscription that yields Event values.

filter := &lplexc.Filter{
PGNs: []uint32{129025, 130306},
Manufacturers: []string{"Garmin"},
}

sub, err := client.Subscribe(ctx, filter)
if err != nil {
log.Fatal(err)
}
defer sub.Close()

for {
ev, err := sub.Next()
if err != nil {
break // io.EOF on stream end, or context cancellation
}
if ev.Frame != nil {
fmt.Printf("seq=%d pgn=%d\n", ev.Frame.Seq, ev.Frame.PGN)
}
}

Auto-reconnecting subscription

SubscribeReconnect returns a channel that automatically reconnects with exponential backoff on disconnect.

ch := client.SubscribeReconnect(ctx, &lplexc.Filter{
PGNs: []uint32{129025},
})

for ev := range ch {
if ev.Frame != nil {
fmt.Println(ev.Frame.PGN, ev.Frame.Data)
}
}
// Channel closes when ctx is cancelled

Watch (decoded PGN stream)

Watch combines auto-reconnect with PGN decoding. It filters to a single PGN and decodes every frame into its typed Go struct.

ch, err := client.Watch(ctx, 129025) // Position Rapid Update
if err != nil {
log.Fatal(err)
}

for wv := range ch {
pos := wv.Value.(pgn.PositionRapidUpdate)
fmt.Printf("lat=%.6f lon=%.6f\n", pos.Latitude, pos.Longitude)
}

The PGN must be registered in pgn.Registry. Unknown PGNs return an error.

Buffered sessions

Create a session for reliable delivery with replay:

session, err := client.CreateSession(ctx, lplexc.SessionConfig{
ClientID: "my-pipeline",
BufferTimeout: "PT5M",
Filter: &lplexc.Filter{
Manufacturers: []string{"Victron"},
},
})
if err != nil {
log.Fatal(err)
}

fmt.Printf("resuming from cursor=%d, head=%d\n", session.Info().Cursor, session.Info().Seq)

sub, err := session.Subscribe(ctx)
if err != nil {
log.Fatal(err)
}
defer sub.Close()

lastSeq := uint64(0)
for {
ev, err := sub.Next()
if err != nil {
break
}
if ev.Frame != nil {
lastSeq = ev.Frame.Seq
process(ev.Frame)
}
}

// ACK what we processed
if lastSeq > 0 {
_ = session.Ack(ctx, lastSeq)
}

Session API

MethodDescription
client.CreateSession(ctx, cfg)Create or reconnect a session
session.Info()Get session metadata (client_id, seq, cursor, devices)
session.Subscribe(ctx)Open SSE stream with replay from cursor
session.Ack(ctx, seq)Advance cursor to this sequence
session.LastAcked()Last successfully ACK'd sequence

Device discovery

devices, err := client.Devices(ctx)
for _, d := range devices {
fmt.Printf("src=%d %s %s (packets=%d)\n",
d.Src, d.Manufacturer, d.ModelID, d.PacketCount)
}

Send frames

err := client.Send(ctx, 129025, 10, 255, 2, []byte{0x01, 0x02, 0x03})

Query on demand

RequestPGN sends an ISO Request (PGN 59904) and waits for the response. The server handles the ISO protocol and returns the first matching frame.

// Request position from all devices
frame, err := client.RequestPGN(ctx, 129025, 0xFF)
if err != nil {
log.Fatal(err)
}
fmt.Printf("pgn=%d src=%d data=%s\n", frame.PGN, frame.Src, frame.Data)

// Request product info from a specific device
frame, err = client.RequestPGN(ctx, 126996, 10)

Last-known values

Values returns the most recent frame for each (device, PGN) pair without subscribing to a stream.

values, err := client.Values(ctx, nil) // all values
for _, dv := range values {
fmt.Printf("device src=%d %s\n", dv.Source, dv.Manufacturer)
for _, v := range dv.Values {
fmt.Printf(" pgn=%d data=%s\n", v.PGN, v.Data)
}
}

// With filter
values, err = client.Values(ctx, &lplexc.Filter{PGNs: []uint32{129025}})

DecodedValues returns the same data with PGN fields decoded into named fields.

decoded, err := client.DecodedValues(ctx, nil)
for _, dv := range decoded {
for _, v := range dv.Values {
fmt.Printf(" pgn=%d %s fields=%v\n", v.PGN, v.Description, v.Fields)
}
}

mDNS discovery

Find an lplex server on the local network:

url, err := lplexc.Discover(ctx) // blocks up to 3 seconds
if err != nil {
log.Fatal("no lplex server found on the network")
}

client := lplexc.NewClient(url)

Types

type Frame struct {
Seq uint64 `json:"seq"`
Ts string `json:"ts"` // RFC 3339
Prio uint8 `json:"prio"`
PGN uint32 `json:"pgn"`
Src uint8 `json:"src"`
Dst uint8 `json:"dst"`
Data string `json:"data"` // hex-encoded
}

type Device struct {
Src uint8 `json:"src"`
Name string `json:"name"`
Manufacturer string `json:"manufacturer"`
ManufacturerCode uint16 `json:"manufacturer_code"`
DeviceClass uint8 `json:"device_class"`
DeviceFunction uint8 `json:"device_function"`
DeviceInstance uint8 `json:"device_instance"`
UniqueNumber uint32 `json:"unique_number"`
ModelID string `json:"model_id"`
SoftwareVersion string `json:"software_version"`
ModelVersion string `json:"model_version"`
ModelSerial string `json:"model_serial"`
ProductCode uint16 `json:"product_code"`
FirstSeen string `json:"first_seen"`
LastSeen string `json:"last_seen"`
PacketCount uint64 `json:"packet_count"`
ByteCount uint64 `json:"byte_count"`
}

type Filter struct {
PGNs []uint32 `json:"pgn,omitempty"`
Manufacturers []string `json:"manufacturer,omitempty"`
Instances []uint8 `json:"instance,omitempty"`
Names []string `json:"name,omitempty"`
}

type Event struct {
Frame *Frame
Device *Device
}

type WatchValue struct {
Frame Frame
Value any // type-assert to the specific PGN struct
}

type PGNValue struct {
PGN uint32 `json:"pgn"`
Ts string `json:"ts"`
Data string `json:"data"`
Seq uint64 `json:"seq"`
}

type DeviceValues struct {
Name string `json:"name"`
Source uint8 `json:"src"`
Manufacturer string `json:"manufacturer,omitempty"`
ModelID string `json:"model_id,omitempty"`
Values []PGNValue `json:"values"`
}

type DecodedPGNValue struct {
PGN uint32 `json:"pgn"`
Description string `json:"description"`
Ts string `json:"ts"`
Seq uint64 `json:"seq"`
Fields any `json:"fields"`
}

type DecodedDeviceValues struct {
Name string `json:"name"`
Source uint8 `json:"src"`
Manufacturer string `json:"manufacturer,omitempty"`
ModelID string `json:"model_id,omitempty"`
Values []DecodedPGNValue `json:"values"`
}