fedilogue/fedilogger/stream.go
2023-07-16 05:29:40 +00:00

316 lines
7.7 KiB
Go

package main
import (
"bufio"
"encoding/json"
"net/http"
"net/url"
"strings"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"gitlab.com/khanzf/fedilogue/shared"
)
type MisskeyReply struct {
Type string `json:"type"`
Body MisskeyReplyBody `json:"body"`
}
type MisskeyReplyBody struct {
Channel string `json:"channel"`
Type string `json:"type"`
Body MisskeyNoteBody `json:"body"`
}
type MisskeyNoteBody struct {
Uri string `json:"uri"` // Remote Note
Id string `json:"id"` // Local note
}
//////////////////////////////
type MisskeyRequest struct {
Type string `json:"type"`
Body MisskeyRequestBody `json:"body"`
}
type MisskeyRequestBody struct {
Channel string `json:"channel"`
Id string `json:"id"`
Params MisskeyRequestParams
}
type MisskeyRequestParams struct {
}
func StreamMisskey(endpoint string) {
logDebug("StreamMisskey for ", endpoint)
u := url.URL{
Scheme: "wss",
Host: endpoint,
Path: "/streaming",
}
var misskeyrequest MisskeyRequest
misskeyrequest.Type = "connect"
misskeyrequest.Body.Channel = "globalTimeline"
misskeyrequest.Body.Id = uuid.New().String()
for {
misskeyrequeststream, err := json.Marshal(misskeyrequest)
if err != nil {
panic(err)
}
for {
// Create a new WebSocket connection.
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
logErr("Error dialing misskey webSocket to ", endpoint, " :", err)
return
}
logDebug("Misskey websocket connection created: ", endpoint)
ri_mutex.Lock()
m := runninginstances[endpoint]
m.Status = shared.RUNNING
m.LastRun = "Streaming"
runninginstances[endpoint] = m
ri_mutex.Unlock()
// Send a message to the server.
err = ws.WriteMessage(websocket.TextMessage, misskeyrequeststream)
if err != nil {
logErr("Error sending misskey channel subscription: ", endpoint)
return
}
logDebug("Successfully sent misskey channel subscription: ", endpoint)
// Read a message from the server.
for {
logDebug("Starting Misskey Stream loop for ", endpoint)
_, message, err := ws.ReadMessage()
if err != nil {
logErr("Misskey stream broken: ", endpoint)
return
}
// Print the message to the console.
logDebug("Ending Misskey Stream loop for ", endpoint)
var misskeyreply MisskeyReply
err = json.Unmarshal(message, &misskeyreply)
if err != nil {
logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err)
break
}
// newactivity := misskeyreply.Body.Body
var newactivity string
if misskeyreply.Body.Body.Uri != "" { // Remote Message
newactivity = misskeyreply.Body.Body.Uri
matchset := re.FindStringSubmatch(newactivity)
if matchset != nil {
newinstance := matchset[1]
logDebug("Checking new instance from Misskey Stream: ", newinstance)
go CheckInstance(newinstance, endpoint)
}
} else { // Local Message
newactivity = "https://" + endpoint + "/notes/" + misskeyreply.Body.Body.Id
}
logDebug("Misskey new URI ", newactivity, " from instance: ", endpoint)
go check_activity(newactivity)
}
// Close the WebSocket connection.
ws.Close()
time.Sleep(time.Minute * 10)
}
}
}
func StreamPleroma(endpoint string) {
wss_url := "wss://" + endpoint + "/api/v1/streaming/?stream=public"
var pleromaHeader shared.PleromaStreamHeader
var newactivity shared.ReportActivity
var err error
for {
var tries int
var ws *websocket.Conn
for tries = 0; tries < 10; tries++ {
ws, _, err = websocket.DefaultDialer.Dial(wss_url, nil)
if err != nil {
continue
}
break
}
if tries == 10 {
logWarn("Unable to connect to " + endpoint + " after 10 tries, exiting")
return
}
// ri_mutex.Lock()
// m := runninginstances[endpoint]
// m.Status = shared.RUNNING
// m.LastRun = "Streaming"
// runninginstances[endpoint] = m
// ri_mutex.Unlock()
for {
logDebug("Starting Pleroma Stream loop for ", endpoint)
_, p, err := ws.ReadMessage()
if err != nil {
logErr("Unable to read message from Pleroma stream: ", endpoint, " Err: ", err)
break
}
err = json.Unmarshal(p, &pleromaHeader)
if err != nil {
logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err)
break
}
switch pleromaHeader.Event {
case "update":
err = json.Unmarshal([]byte(pleromaHeader.Payload), &newactivity)
if err != nil {
logErr("Unable to parse data from " + endpoint + ", but still connected.")
break
}
go check_activity(newactivity.Uri)
matchset := re.FindStringSubmatch(newactivity.Uri)
if matchset != nil {
newinstance := matchset[1]
logDebug("Checking new instance from Pleroma Stream: ", newinstance)
go CheckInstance(newinstance, endpoint)
}
default:
logDebug("Unimplemented pleroma stream activity: ", pleromaHeader.Event)
continue
}
logDebug("Ending Pleroma stream loop for ", endpoint)
}
ws.Close()
// time.Sleep(time.Minute * 10)
}
}
func StreamMastodon(endpoint string, o *shared.RunningInstance) {
stream_client := BuildClient(endpoint)
var oauthData OAuth
var retry bool
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
for {
req, err := http.NewRequest("GET", api_timeline, nil)
req.Header.Set("User-Agent", "Tusky")
if err != nil {
logFatal("Unable to create new request for " + endpoint + ", exiting.")
return
}
for _, extaccount := range settings.Externalaccounts {
if extaccount.Endpoint == endpoint {
get_client(endpoint, o)
err = get_client(endpoint, o)
if err != nil {
logWarn("Unable to register client: ", err)
}
oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password)
if err != nil {
logWarn("Unable to login: ", err)
return
}
req.Header.Add("Authorization", oauthData.Access_token)
}
}
var resp *http.Response
for tries := 0; tries < 10; tries++ {
resp, err = stream_client.Do(req)
if err != nil {
time.Sleep(time.Minute * 5)
logWarn("Failure connecting to "+req.URL.Scheme+"://"+req.URL.Host+req.URL.Path+", attempt ", tries+1, ", sleeping for 5 minutes, ", err)
continue
}
break
}
if err != nil {
logErr("Unable to stream "+api_timeline+": ", err)
return
}
defer resp.Body.Close()
ri_mutex.Lock()
m := runninginstances[endpoint]
m.Status = shared.RUNNING
m.LastRun = "Streaming"
runninginstances[endpoint] = m
ri_mutex.Unlock()
s := bufio.NewScanner(resp.Body)
var name string
for s.Scan() {
logDebug("Ending Mastodon stream loop for ", endpoint)
line := s.Text()
token := strings.SplitN(line, ":", 2)
var newactivity shared.ReportActivity
if len(token) != 2 {
continue
}
switch strings.TrimSpace(token[0]) {
case "event":
name = strings.TrimSpace(token[1])
continue
case "data":
switch name {
case "update":
if len(token) >= 2 && len(token[1]) >= 2 {
continue
}
jsondata := token[1][1:]
err := json.Unmarshal([]byte(jsondata), &newactivity)
if err != nil {
logDebug("Unable to parse data from " + endpoint + ", but still connected.")
continue
}
retry = true
default:
continue
}
default:
continue
}
go check_activity(newactivity.Uri)
matchset := re.FindStringSubmatch(newactivity.Uri)
if matchset != nil {
newinstance := matchset[1]
logDebug("Checking new instance from Mastodon Stream: ", newinstance)
go CheckInstance(newinstance, endpoint)
}
logDebug("Ending Mastodon stream loop for ", endpoint)
}
if retry == true {
time.Sleep(time.Minute * 10)
} else {
break
}
}
}