package main import ( "bufio" "encoding/json" "net/http" "net/url" "strings" "time" "github.com/google/uuid" "github.com/gorilla/websocket" "gitlab.com/khanzf/fedilogue/shared" ) type MisskeyReply struct { Type string `json:"type"` Body MisskeyReplyBody `json:"body"` } type MisskeyReplyBody struct { Channel string `json:"channel"` Type string `json:"type"` Body MisskeyNoteBody `json:"body"` } type MisskeyNoteBody struct { Uri string `json:"uri"` // Remote Note Id string `json:"id"` // Local note } ////////////////////////////// type MisskeyRequest struct { Type string `json:"type"` Body MisskeyRequestBody `json:"body"` } type MisskeyRequestBody struct { Channel string `json:"channel"` Id string `json:"id"` Params MisskeyRequestParams } type MisskeyRequestParams struct { } func StreamMisskey(endpoint string) { logDebug("StreamMisskey for ", endpoint) u := url.URL{ Scheme: "wss", Host: endpoint, Path: "/streaming", } var misskeyrequest MisskeyRequest misskeyrequest.Type = "connect" misskeyrequest.Body.Channel = "globalTimeline" misskeyrequest.Body.Id = uuid.New().String() for { misskeyrequeststream, err := json.Marshal(misskeyrequest) if err != nil { panic(err) } for { // Create a new WebSocket connection. ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { logErr("Error dialing misskey webSocket to ", endpoint, " :", err) return } logDebug("Misskey websocket connection created: ", endpoint) ri_mutex.Lock() m := runninginstances[endpoint] m.Status = shared.RUNNING m.LastRun = "Streaming" runninginstances[endpoint] = m ri_mutex.Unlock() // Send a message to the server. err = ws.WriteMessage(websocket.TextMessage, misskeyrequeststream) if err != nil { logErr("Error sending misskey channel subscription: ", endpoint) return } logDebug("Successfully sent misskey channel subscription: ", endpoint) // Read a message from the server. for { logDebug("Starting Misskey Stream loop for ", endpoint) _, message, err := ws.ReadMessage() if err != nil { logErr("Misskey stream broken: ", endpoint) return } // Print the message to the console. logDebug("Ending Misskey Stream loop for ", endpoint) var misskeyreply MisskeyReply err = json.Unmarshal(message, &misskeyreply) if err != nil { logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err) break } // newactivity := misskeyreply.Body.Body var newactivity string if misskeyreply.Body.Body.Uri != "" { // Remote Message newactivity = misskeyreply.Body.Body.Uri matchset := re.FindStringSubmatch(newactivity) if matchset != nil { newinstance := matchset[1] logDebug("Checking new instance from Misskey Stream: ", newinstance) go CheckInstance(newinstance, endpoint) } } else { // Local Message newactivity = "https://" + endpoint + "/notes/" + misskeyreply.Body.Body.Id } logDebug("Misskey new URI ", newactivity, " from instance: ", endpoint) go check_activity(newactivity) } // Close the WebSocket connection. ws.Close() time.Sleep(time.Minute * 10) } } } func StreamPleroma(endpoint string) { wss_url := "wss://" + endpoint + "/api/v1/streaming/?stream=public" var pleromaHeader shared.PleromaStreamHeader var newactivity shared.ReportActivity var err error for { var tries int var ws *websocket.Conn for tries = 0; tries < 10; tries++ { ws, _, err = websocket.DefaultDialer.Dial(wss_url, nil) if err != nil { continue } break } if tries == 10 { logWarn("Unable to connect to " + endpoint + " after 10 tries, exiting") return } ri_mutex.Lock() m := runninginstances[endpoint] m.Status = shared.RUNNING m.LastRun = "Streaming" runninginstances[endpoint] = m ri_mutex.Unlock() for { logDebug("Starting Pleroma Stream loop for ", endpoint) _, p, err := ws.ReadMessage() if err != nil { logErr("Unable to read message from Pleroma stream: ", endpoint, " Err: ", err) break } err = json.Unmarshal(p, &pleromaHeader) if err != nil { logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err) break } switch pleromaHeader.Event { case "update": err = json.Unmarshal([]byte(pleromaHeader.Payload), &newactivity) if err != nil { logErr("Unable to parse data from " + endpoint + ", but still connected.") break } go check_activity(newactivity.Uri) matchset := re.FindStringSubmatch(newactivity.Uri) if matchset != nil { newinstance := matchset[1] logDebug("Checking new instance from Pleroma Stream: ", newinstance) go CheckInstance(newinstance, endpoint) } default: logDebug("Unimplemented pleroma stream activity: ", pleromaHeader.Event) continue } logDebug("Ending Pleroma stream loop for ", endpoint) } ws.Close() // time.Sleep(time.Minute * 10) } } func StreamMastodon(endpoint string, o *shared.RunningInstance) { stream_client := BuildClient(endpoint) var oauthData OAuth var retry bool api_timeline := "https://" + endpoint + "/api/v1/streaming/public" for { req, err := http.NewRequest("GET", api_timeline, nil) req.Header.Set("User-Agent", "Tusky") if err != nil { logFatal("Unable to create new request for " + endpoint + ", exiting.") return } for _, extaccount := range settings.Externalaccounts { if extaccount.Endpoint == endpoint { get_client(endpoint, o) err = get_client(endpoint, o) if err != nil { logWarn("Unable to register client: ", err) } oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password) if err != nil { logWarn("Unable to login: ", err) return } req.Header.Add("Authorization", oauthData.Access_token) } } var resp *http.Response for tries := 0; tries < 10; tries++ { resp, err = stream_client.Do(req) if err != nil { time.Sleep(time.Minute * 5) logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes, ", err) continue } break } if err != nil { logErr("Unable to stream "+api_timeline+": ", err) return } defer resp.Body.Close() ri_mutex.Lock() m := runninginstances[endpoint] m.Status = shared.RUNNING m.LastRun = "Streaming" runninginstances[endpoint] = m ri_mutex.Unlock() s := bufio.NewScanner(resp.Body) var name string for s.Scan() { logDebug("Starting Mastodon stream loop for ", endpoint) line := s.Text() token := strings.SplitN(line, ":", 2) var newactivity shared.ReportActivity if len(token) != 2 { continue } switch strings.TrimSpace(token[0]) { case "event": name = strings.TrimSpace(token[1]) continue case "data": switch name { case "update": if len(token) >= 2 && len(token[1]) >= 2 { continue } jsondata := token[1][1:] err := json.Unmarshal([]byte(jsondata), &newactivity) if err != nil { logDebug("Unable to parse data from " + endpoint + ", but still connected.") continue } retry = true default: continue } default: continue } go check_activity(newactivity.Uri) matchset := re.FindStringSubmatch(newactivity.Uri) if matchset != nil { newinstance := matchset[1] logDebug("Checking new instance from Mastodon Stream: ", newinstance) go CheckInstance(newinstance, endpoint) } logDebug("Ending Mastodon stream loop for ", endpoint) } if retry == true { time.Sleep(time.Minute * 10) } else { break } } }