293 lines
7.1 KiB
Go
293 lines
7.1 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/websocket"
|
|
"git.farhan.codes/farhan/fedilogue/shared"
|
|
)
|
|
|
|
type MisskeyReply struct {
|
|
Type string `json:"type"`
|
|
Body MisskeyReplyBody `json:"body"`
|
|
}
|
|
|
|
type MisskeyReplyBody struct {
|
|
Channel string `json:"channel"`
|
|
Type string `json:"type"`
|
|
Body MisskeyNoteBody `json:"body"`
|
|
}
|
|
|
|
type MisskeyNoteBody struct {
|
|
Uri string `json:"uri"` // Remote Note
|
|
Id string `json:"id"` // Local note
|
|
}
|
|
|
|
type MisskeyRequest struct {
|
|
Type string `json:"type"`
|
|
Body MisskeyRequestBody `json:"body"`
|
|
}
|
|
|
|
type MisskeyRequestBody struct {
|
|
Channel string `json:"channel"`
|
|
Id string `json:"id"`
|
|
Params MisskeyRequestParams
|
|
}
|
|
|
|
type MisskeyRequestParams struct {
|
|
}
|
|
|
|
func StreamMisskey(endpoint string) {
|
|
logDebug("StreamMisskey for ", endpoint)
|
|
u := url.URL{
|
|
Scheme: "wss",
|
|
Host: endpoint,
|
|
Path: "/streaming",
|
|
}
|
|
|
|
var misskeyrequest MisskeyRequest
|
|
misskeyrequest.Type = "connect"
|
|
misskeyrequest.Body.Channel = "globalTimeline"
|
|
misskeyrequest.Body.Id = uuid.New().String()
|
|
|
|
for {
|
|
misskeyrequeststream, err := json.Marshal(misskeyrequest)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
for {
|
|
// Create a new WebSocket connection.
|
|
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
|
if err != nil {
|
|
logErr("Error dialing misskey webSocket to ", endpoint, " :", err)
|
|
return
|
|
}
|
|
logDebug("Misskey websocket connection created: ", endpoint)
|
|
|
|
ri_mutex.Lock()
|
|
m := runninginstances[endpoint]
|
|
m.Status = shared.RUNNING
|
|
m.LastRun = "Streaming"
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
|
|
// Send a message to the server.
|
|
err = ws.WriteMessage(websocket.TextMessage, misskeyrequeststream)
|
|
if err != nil {
|
|
logErr("Error sending misskey channel subscription: ", endpoint)
|
|
return
|
|
}
|
|
logDebug("Successfully sent misskey channel subscription: ", endpoint)
|
|
|
|
// Read a message from the server.
|
|
for {
|
|
logDebug("Starting Misskey Stream loop for ", endpoint)
|
|
_, message, err := ws.ReadMessage()
|
|
if err != nil {
|
|
logErr("Misskey stream broken: ", endpoint)
|
|
return
|
|
}
|
|
|
|
// Print the message to the console.
|
|
logDebug("Ending Misskey Stream loop for ", endpoint)
|
|
var misskeyreply MisskeyReply
|
|
err = json.Unmarshal(message, &misskeyreply)
|
|
if err != nil {
|
|
logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err)
|
|
break
|
|
}
|
|
// newactivity := misskeyreply.Body.Body
|
|
|
|
var newactivity string
|
|
|
|
if misskeyreply.Body.Body.Uri != "" { // Remote Message
|
|
newactivity = misskeyreply.Body.Body.Uri
|
|
matchset := re.FindStringSubmatch(newactivity)
|
|
if matchset != nil {
|
|
newinstance := matchset[1]
|
|
logDebug("Checking new instance from Misskey Stream: ", newinstance)
|
|
go CheckInstance(newinstance, endpoint)
|
|
}
|
|
} else { // Local Message
|
|
newactivity = "https://" + endpoint + "/notes/" + misskeyreply.Body.Body.Id
|
|
}
|
|
|
|
logDebug("Misskey new URI ", newactivity, " from instance: ", endpoint)
|
|
|
|
go check_activity(newactivity)
|
|
}
|
|
// Close the WebSocket connection.
|
|
ws.Close()
|
|
time.Sleep(time.Minute * 10)
|
|
}
|
|
}
|
|
}
|
|
|
|
func StreamPleroma(endpoint string) {
|
|
wss_url := "wss://" + endpoint + "/api/v1/streaming/?stream=public"
|
|
var pleromaHeader shared.PleromaStreamHeader
|
|
var newactivity shared.ReportActivity
|
|
var err error
|
|
|
|
for {
|
|
var tries int
|
|
var ws *websocket.Conn
|
|
for tries = 0; tries < 10; tries++ {
|
|
ws, _, err = websocket.DefaultDialer.Dial(wss_url, nil)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
if tries == 10 {
|
|
logWarn("Unable to connect to " + endpoint + " after 10 tries, exiting")
|
|
return
|
|
}
|
|
|
|
ri_mutex.Lock()
|
|
m := runninginstances[endpoint]
|
|
m.Status = shared.RUNNING
|
|
m.LastRun = "Streaming"
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
|
|
for {
|
|
logDebug("Starting Pleroma Stream loop for ", endpoint)
|
|
_, p, err := ws.ReadMessage()
|
|
if err != nil {
|
|
logErr("Unable to read message from Pleroma stream: ", endpoint, " Err: ", err)
|
|
break
|
|
}
|
|
|
|
err = json.Unmarshal(p, &pleromaHeader)
|
|
if err != nil {
|
|
logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err)
|
|
break
|
|
}
|
|
|
|
switch pleromaHeader.Event {
|
|
case "update":
|
|
err = json.Unmarshal([]byte(pleromaHeader.Payload), &newactivity)
|
|
if err != nil {
|
|
logErr("Unable to parse data from " + endpoint + ", but still connected.")
|
|
break
|
|
}
|
|
go check_activity(newactivity.Uri)
|
|
matchset := re.FindStringSubmatch(newactivity.Uri)
|
|
if matchset != nil {
|
|
newinstance := matchset[1]
|
|
logDebug("Checking new instance from Pleroma Stream: ", newinstance)
|
|
go CheckInstance(newinstance, endpoint)
|
|
}
|
|
default:
|
|
logDebug("Unimplemented pleroma stream activity: ", pleromaHeader.Event)
|
|
continue
|
|
}
|
|
logDebug("Ending Pleroma stream loop for ", endpoint)
|
|
}
|
|
ws.Close()
|
|
// time.Sleep(time.Minute * 10)
|
|
}
|
|
}
|
|
|
|
func StreamMastodon(endpoint string, o *shared.RunningInstance) {
|
|
stream_client := BuildClient(endpoint)
|
|
|
|
var retry bool
|
|
|
|
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
|
|
|
|
for {
|
|
req, err := http.NewRequest("GET", api_timeline, nil)
|
|
req.Header.Set("User-Agent", "Tusky")
|
|
if err != nil {
|
|
logFatal("Unable to create new request for " + endpoint + ", exiting.")
|
|
return
|
|
}
|
|
|
|
var resp *http.Response
|
|
|
|
for tries := 0; tries < 10; tries++ {
|
|
resp, err = stream_client.Do(req)
|
|
if err != nil {
|
|
time.Sleep(time.Minute * 5)
|
|
logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes, ", err)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
if err != nil {
|
|
logErr("Unable to stream "+api_timeline+": ", err)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
ri_mutex.Lock()
|
|
m := runninginstances[endpoint]
|
|
m.Status = shared.RUNNING
|
|
m.LastRun = "Streaming"
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
|
|
s := bufio.NewScanner(resp.Body)
|
|
var name string
|
|
for s.Scan() {
|
|
logDebug("Starting Mastodon stream loop for ", endpoint)
|
|
line := s.Text()
|
|
token := strings.SplitN(line, ":", 2)
|
|
var newactivity shared.ReportActivity
|
|
if len(token) != 2 {
|
|
continue
|
|
}
|
|
|
|
switch strings.TrimSpace(token[0]) {
|
|
case "event":
|
|
name = strings.TrimSpace(token[1])
|
|
continue
|
|
case "data":
|
|
switch name {
|
|
case "update":
|
|
if len(token) >= 2 && len(token[1]) >= 2 {
|
|
continue
|
|
}
|
|
jsondata := token[1][1:]
|
|
err := json.Unmarshal([]byte(jsondata), &newactivity)
|
|
if err != nil {
|
|
logDebug("Unable to parse data from " + endpoint + ", but still connected.")
|
|
continue
|
|
}
|
|
retry = true
|
|
default:
|
|
continue
|
|
}
|
|
default:
|
|
continue
|
|
}
|
|
|
|
go check_activity(newactivity.Uri)
|
|
|
|
matchset := re.FindStringSubmatch(newactivity.Uri)
|
|
if matchset != nil {
|
|
newinstance := matchset[1]
|
|
logDebug("Checking new instance from Mastodon Stream: ", newinstance)
|
|
go CheckInstance(newinstance, endpoint)
|
|
}
|
|
logDebug("Ending Mastodon stream loop for ", endpoint)
|
|
}
|
|
if retry == true {
|
|
time.Sleep(time.Minute * 10)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|