Files
backone-manage-go/mqtt/mqtt_presence_redis_go.go

124 lines
3.3 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"strconv"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/go-redis/redis/v8"
"golang.org/x/net/context"
)
// Configuration loaded from environment variables. These should match the
// Django settings used in the original Python implementation.
var (
mqttHost = getEnv("MQTT_HOST", "localhost")
mqttPort = getEnv("MQTT_PORT", "1883")
mqttUser = getEnv("MQTT_USER", "")
mqttPass = getEnv("MQTT_PASS", "")
mqttTopicPresence = getEnv("MQTT_TOPIC_PRESENCE", "presence")
redisHost = getEnv("MQTT_REDIS_HOST", "localhost")
redisPort = getEnv("MQTT_REDIS_PORT", "6379")
redisDB = getEnv("MQTT_REDIS_DB", "0")
redisPrefix = getEnv("MQTT_REDIS_PREFIX", "presence")
redisSetEX = getEnv("MQTT_REDIS_SETEX", "86400") // seconds
redisPassword = getEnv("MQTT_REDIS_PASSWORD", "")
)
func main() {
ctx := context.Background()
// Connect to Redis
rdb := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%s", redisHost, redisPort),
Password: redisPassword,
DB: atoi(redisDB),
})
// Ping to verify connection
if err := rdb.Ping(ctx).Err(); err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
}
// MQTT client options
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%s", mqttHost, mqttPort))
opts.SetUsername(mqttUser)
opts.SetPassword(mqttPass)
opts.SetClientID("mqtt_presence_redis_go")
// Handlers
opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
handleMessage(ctx, rdb, msg)
})
opts.OnConnect = func(c mqtt.Client) {
if token := c.Subscribe(mqttTopicPresence, 0, nil); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to subscribe: %v", token.Error())
}
log.Printf("Connected to MQTT broker %s:%s, subscribed to %s", mqttHost, mqttPort, mqttTopicPresence)
}
// Create and start client
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Could not connect to MQTT broker: %v", token.Error())
}
// Block forever
select {}
}
func handleMessage(ctx context.Context, rdb *redis.Client, msg mqtt.Message) {
payload := string(msg.Payload())
log.Printf("Received message: %s", payload)
parts := strings.Split(payload, ";")
if len(parts) == 0 {
return
}
memberID := parts[0]
if len(memberID) > 50 {
memberID = memberID[:50]
}
key := fmt.Sprintf("%s:%s", redisPrefix, memberID)
timestamp := time.Now().Unix()
msgObj := map[string]interface{}{
"mqtt": payload,
"ts": timestamp,
}
data, err := json.Marshal(msgObj)
if err != nil {
log.Printf("Failed to marshal message: %v", err)
return
}
if err := rdb.SetEX(ctx, key, data, time.Duration(atoi(redisSetEX))*time.Second).Err(); err != nil {
log.Printf("Failed to set Redis key: %v", err)
}
}
func getEnv(key, defaultVal string) string {
if v := os.Getenv(key); v != "" {
return v
}
return defaultVal
}
func atoi(s string) int {
i, err := strconv.Atoi(s)
if err != nil {
return 0
}
return i
}