Add MQTT presence to Redis integration
This commit is contained in:
123
mqtt/mqtt_presence_redis_go.go
Normal file
123
mqtt/mqtt_presence_redis_go.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
"strconv"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Configuration loaded from environment variables. These should match the
|
||||
// Django settings used in the original Python implementation.
|
||||
var (
|
||||
mqttHost = getEnv("MQTT_HOST", "localhost")
|
||||
mqttPort = getEnv("MQTT_PORT", "1883")
|
||||
mqttUser = getEnv("MQTT_USER", "")
|
||||
mqttPass = getEnv("MQTT_PASS", "")
|
||||
mqttTopicPresence = getEnv("MQTT_TOPIC_PRESENCE", "presence")
|
||||
|
||||
redisHost = getEnv("MQTT_REDIS_HOST", "localhost")
|
||||
redisPort = getEnv("MQTT_REDIS_PORT", "6379")
|
||||
redisDB = getEnv("MQTT_REDIS_DB", "0")
|
||||
|
||||
redisPrefix = getEnv("MQTT_REDIS_PREFIX", "presence")
|
||||
redisSetEX = getEnv("MQTT_REDIS_SETEX", "86400") // seconds
|
||||
redisPassword = getEnv("MQTT_REDIS_PASSWORD", "")
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
// Connect to Redis
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: fmt.Sprintf("%s:%s", redisHost, redisPort),
|
||||
Password: redisPassword,
|
||||
DB: atoi(redisDB),
|
||||
})
|
||||
|
||||
// Ping to verify connection
|
||||
if err := rdb.Ping(ctx).Err(); err != nil {
|
||||
log.Fatalf("Could not connect to Redis: %v", err)
|
||||
}
|
||||
|
||||
// MQTT client options
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.AddBroker(fmt.Sprintf("tcp://%s:%s", mqttHost, mqttPort))
|
||||
opts.SetUsername(mqttUser)
|
||||
opts.SetPassword(mqttPass)
|
||||
opts.SetClientID("mqtt_presence_redis_go")
|
||||
|
||||
// Handlers
|
||||
opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
|
||||
handleMessage(ctx, rdb, msg)
|
||||
})
|
||||
|
||||
opts.OnConnect = func(c mqtt.Client) {
|
||||
if token := c.Subscribe(mqttTopicPresence, 0, nil); token.Wait() && token.Error() != nil {
|
||||
log.Fatalf("Failed to subscribe: %v", token.Error())
|
||||
}
|
||||
log.Printf("Connected to MQTT broker %s:%s, subscribed to %s", mqttHost, mqttPort, mqttTopicPresence)
|
||||
}
|
||||
|
||||
// Create and start client
|
||||
client := mqtt.NewClient(opts)
|
||||
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||||
log.Fatalf("Could not connect to MQTT broker: %v", token.Error())
|
||||
}
|
||||
|
||||
// Block forever
|
||||
select {}
|
||||
}
|
||||
|
||||
func handleMessage(ctx context.Context, rdb *redis.Client, msg mqtt.Message) {
|
||||
payload := string(msg.Payload())
|
||||
log.Printf("Received message: %s", payload)
|
||||
|
||||
parts := strings.Split(payload, ";")
|
||||
if len(parts) == 0 {
|
||||
return
|
||||
}
|
||||
memberID := parts[0]
|
||||
if len(memberID) > 50 {
|
||||
memberID = memberID[:50]
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s:%s", redisPrefix, memberID)
|
||||
timestamp := time.Now().Unix()
|
||||
|
||||
msgObj := map[string]interface{}{
|
||||
"mqtt": payload,
|
||||
"ts": timestamp,
|
||||
}
|
||||
data, err := json.Marshal(msgObj)
|
||||
if err != nil {
|
||||
log.Printf("Failed to marshal message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := rdb.SetEX(ctx, key, data, time.Duration(atoi(redisSetEX))*time.Second).Err(); err != nil {
|
||||
log.Printf("Failed to set Redis key: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func getEnv(key, defaultVal string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return defaultVal
|
||||
}
|
||||
|
||||
func atoi(s string) int {
|
||||
i, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return i
|
||||
}
|
||||
Reference in New Issue
Block a user