From 546244a959c320c6e2cb25f51f68f1de6804179a Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Sun, 16 Jul 2023 05:29:40 +0000 Subject: [PATCH] Adding Misskey streaming --- {fedilogue => fedilogger}/Dockerfile | 0 {fedilogue => fedilogger}/config.go | 0 {fedilogue => fedilogger}/config.jsonc.sample | 0 {fedilogue => fedilogger}/config_test.go | 0 {fedilogue => fedilogger}/ctl.go | 0 {fedilogue => fedilogger}/db.go | 0 .../fedilogue.go => fedilogger/fedilogger.go | 0 fedilogger/fedilogue.go | 123 +++++++++++++++ {fedilogue => fedilogger}/fedilogue_test.go | 0 {fedilogue => fedilogger}/follow.go | 0 {fedilogue => fedilogger}/instance.go | 1 + {fedilogue => fedilogger}/instance_test.go | 0 {fedilogue => fedilogger}/log.go | 0 {fedilogue => fedilogger}/oauth.go | 0 {fedilogue => fedilogger}/poll.go | 0 {fedilogue => fedilogger}/retrieve.go | 0 {fedilogue => fedilogger}/retrieve_test.go | 0 {fedilogue => fedilogger}/stream.go | 147 ++++++++++++++++-- {fedilogue => fedilogger}/tables.sql | 0 {fedilogue => fedilogger}/testhelper.go | 0 {fedilogue => fedilogger}/web.go | 0 go.mod | 1 + go.sum | 2 + 23 files changed, 262 insertions(+), 12 deletions(-) rename {fedilogue => fedilogger}/Dockerfile (100%) rename {fedilogue => fedilogger}/config.go (100%) rename {fedilogue => fedilogger}/config.jsonc.sample (100%) rename {fedilogue => fedilogger}/config_test.go (100%) rename {fedilogue => fedilogger}/ctl.go (100%) rename {fedilogue => fedilogger}/db.go (100%) rename fedilogue/fedilogue.go => fedilogger/fedilogger.go (100%) create mode 100644 fedilogger/fedilogue.go rename {fedilogue => fedilogger}/fedilogue_test.go (100%) rename {fedilogue => fedilogger}/follow.go (100%) rename {fedilogue => fedilogger}/instance.go (99%) rename {fedilogue => fedilogger}/instance_test.go (100%) rename {fedilogue => fedilogger}/log.go (100%) rename {fedilogue => fedilogger}/oauth.go (100%) rename {fedilogue => fedilogger}/poll.go (100%) rename {fedilogue => fedilogger}/retrieve.go (100%) rename {fedilogue => fedilogger}/retrieve_test.go (100%) rename {fedilogue => fedilogger}/stream.go (50%) rename {fedilogue => fedilogger}/tables.sql (100%) rename {fedilogue => fedilogger}/testhelper.go (100%) rename {fedilogue => fedilogger}/web.go (100%) diff --git a/fedilogue/Dockerfile b/fedilogger/Dockerfile similarity index 100% rename from fedilogue/Dockerfile rename to fedilogger/Dockerfile diff --git a/fedilogue/config.go b/fedilogger/config.go similarity index 100% rename from fedilogue/config.go rename to fedilogger/config.go diff --git a/fedilogue/config.jsonc.sample b/fedilogger/config.jsonc.sample similarity index 100% rename from fedilogue/config.jsonc.sample rename to fedilogger/config.jsonc.sample diff --git a/fedilogue/config_test.go b/fedilogger/config_test.go similarity index 100% rename from fedilogue/config_test.go rename to fedilogger/config_test.go diff --git a/fedilogue/ctl.go b/fedilogger/ctl.go similarity index 100% rename from fedilogue/ctl.go rename to fedilogger/ctl.go diff --git a/fedilogue/db.go b/fedilogger/db.go similarity index 100% rename from fedilogue/db.go rename to fedilogger/db.go diff --git a/fedilogue/fedilogue.go b/fedilogger/fedilogger.go similarity index 100% rename from fedilogue/fedilogue.go rename to fedilogger/fedilogger.go diff --git a/fedilogger/fedilogue.go b/fedilogger/fedilogue.go new file mode 100644 index 0000000..816cd44 --- /dev/null +++ b/fedilogger/fedilogue.go @@ -0,0 +1,123 @@ +package main + +import ( + "context" + "net/http" + _ "net/http/pprof" + "regexp" + "runtime" + "sync" + "time" + + "github.com/microcosm-cc/bluemonday" + "gitlab.com/khanzf/fedilogue/shared" +) + +// Current instances +var runninginstances map[string]shared.RunningInstance +var ri_mutex = &sync.Mutex{} + +func startpprof() { + logInfo("Starting http/pprof on :7777") + logFatal(http.ListenAndServe("127.0.0.1:7777", nil)) +} + +func statusReportHandler() { + for { + StatusReport() + time.Sleep(time.Second * 60) + } +} + +/* Tests: + - TestStatusReport_empty_run + - TestStatusReport_full_content +*/ +func StatusReport() { + running := 0 + keepalive := 0 + unsupported := 0 + + mastodon := 0 + pleroma := 0 + misskey := 0 + other := 0 + ri_mutex.Lock() + for i, o := range runninginstances { + logDebug("Software ", o.Software, " Status: ", o.Status, " instance ", i) + if o.Status == 200 { + running = running + 1 + } else if o.Status == 607 { // Keepalive + keepalive = keepalive + 1 + } else if o.Status == 605 { // Unsupported instance + unsupported = unsupported + 1 + } + + if o.Software == "mastodon" && o.Status == 200 { + mastodon = mastodon + 1 + } else if o.Software == "pleroma" && o.Status == 200 { + pleroma = pleroma + 1 + } else if o.Software == "misskey" && o.Status == 200 { + misskey = misskey + 1 + } else if o.Status == 200 { + other = other + 1 + } + } + ri_mutex.Unlock() + logInfo("Running:", running, " Keepalive:", keepalive, " Unsupported:", unsupported, " Ma:", mastodon, ",P:", pleroma, ",Mi:", misskey, ",O:", other) +} + +func main() { + // Initial Setup + logInit() + runninginstances = make(map[string]shared.RunningInstance) + + getSettings() + if len(settings.Proxies) > 0 { + for i := 0; i < len(settings.Proxies); i++ { + logInfo("Using proxy: ", settings.Proxies[i].Host, ":", settings.Proxies[i].Port) + } + } + go startpprof() + + pool = getDbPool() + + p = bluemonday.NewPolicy() + spaceReg = regexp.MustCompile(`[\s\t\.]+`) + removeHTMLReg = regexp.MustCompile(`<\/?\s*br\s*>`) + re = regexp.MustCompile("^https?://([^/]*)/(.*)$") + matchurl = regexp.MustCompile("http?s://[\\w\\-]+\\.[\\w\\-]+\\S*") + staggeredStartChan = make(chan bool) + + // Start instances located in database + rows, err := pool.Query(context.Background(), "SELECT endpoint FROM instances") + if err != nil { + logErr("Unable to select from instances") + return + } + defer rows.Close() + + go staggeredStart() + go statusReportHandler() + + for rows.Next() { + var endpoint string + err = rows.Scan(&endpoint) + if err != nil { + logErr("Unable to iterate database, exiting.") + return + } + o, exists := GetRunner(endpoint) + if o.Banned == true { + continue // Banned instance + } + if exists == false { + go StartInstance(endpoint) + } + } + + go startctl() + go webmain() + + runtime.Goexit() +} diff --git a/fedilogue/fedilogue_test.go b/fedilogger/fedilogue_test.go similarity index 100% rename from fedilogue/fedilogue_test.go rename to fedilogger/fedilogue_test.go diff --git a/fedilogue/follow.go b/fedilogger/follow.go similarity index 100% rename from fedilogue/follow.go rename to fedilogger/follow.go diff --git a/fedilogue/instance.go b/fedilogger/instance.go similarity index 99% rename from fedilogue/instance.go rename to fedilogger/instance.go index e3532d3..369cfbb 100644 --- a/fedilogue/instance.go +++ b/fedilogger/instance.go @@ -256,6 +256,7 @@ func StartInstance(endpoint string) { logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Stream" UpdateRunner(endpoint, o) + StreamMisskey(endpoint) } else { o.Status = 605 UpdateRunner(endpoint, o) diff --git a/fedilogue/instance_test.go b/fedilogger/instance_test.go similarity index 100% rename from fedilogue/instance_test.go rename to fedilogger/instance_test.go diff --git a/fedilogue/log.go b/fedilogger/log.go similarity index 100% rename from fedilogue/log.go rename to fedilogger/log.go diff --git a/fedilogue/oauth.go b/fedilogger/oauth.go similarity index 100% rename from fedilogue/oauth.go rename to fedilogger/oauth.go diff --git a/fedilogue/poll.go b/fedilogger/poll.go similarity index 100% rename from fedilogue/poll.go rename to fedilogger/poll.go diff --git a/fedilogue/retrieve.go b/fedilogger/retrieve.go similarity index 100% rename from fedilogue/retrieve.go rename to fedilogger/retrieve.go diff --git a/fedilogue/retrieve_test.go b/fedilogger/retrieve_test.go similarity index 100% rename from fedilogue/retrieve_test.go rename to fedilogger/retrieve_test.go diff --git a/fedilogue/stream.go b/fedilogger/stream.go similarity index 50% rename from fedilogue/stream.go rename to fedilogger/stream.go index 0155893..2c4e9b8 100644 --- a/fedilogue/stream.go +++ b/fedilogger/stream.go @@ -5,13 +5,134 @@ import ( "encoding/json" "net/http" + "net/url" "strings" "time" + "github.com/google/uuid" "github.com/gorilla/websocket" "gitlab.com/khanzf/fedilogue/shared" ) +type MisskeyReply struct { + Type string `json:"type"` + Body MisskeyReplyBody `json:"body"` +} + +type MisskeyReplyBody struct { + Channel string `json:"channel"` + Type string `json:"type"` + Body MisskeyNoteBody `json:"body"` +} + +type MisskeyNoteBody struct { + Uri string `json:"uri"` // Remote Note + Id string `json:"id"` // Local note +} + +////////////////////////////// + +type MisskeyRequest struct { + Type string `json:"type"` + Body MisskeyRequestBody `json:"body"` +} + +type MisskeyRequestBody struct { + Channel string `json:"channel"` + Id string `json:"id"` + Params MisskeyRequestParams +} + +type MisskeyRequestParams struct { +} + +func StreamMisskey(endpoint string) { + logDebug("StreamMisskey for ", endpoint) + u := url.URL{ + Scheme: "wss", + Host: endpoint, + Path: "/streaming", + } + + var misskeyrequest MisskeyRequest + misskeyrequest.Type = "connect" + misskeyrequest.Body.Channel = "globalTimeline" + misskeyrequest.Body.Id = uuid.New().String() + + for { + misskeyrequeststream, err := json.Marshal(misskeyrequest) + if err != nil { + panic(err) + } + + for { + // Create a new WebSocket connection. + ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + logErr("Error dialing misskey webSocket to ", endpoint, " :", err) + return + } + logDebug("Misskey websocket connection created: ", endpoint) + + ri_mutex.Lock() + m := runninginstances[endpoint] + m.Status = shared.RUNNING + m.LastRun = "Streaming" + runninginstances[endpoint] = m + ri_mutex.Unlock() + + // Send a message to the server. + err = ws.WriteMessage(websocket.TextMessage, misskeyrequeststream) + if err != nil { + logErr("Error sending misskey channel subscription: ", endpoint) + return + } + logDebug("Successfully sent misskey channel subscription: ", endpoint) + + // Read a message from the server. + for { + logDebug("Starting Misskey Stream loop for ", endpoint) + _, message, err := ws.ReadMessage() + if err != nil { + logErr("Misskey stream broken: ", endpoint) + return + } + + // Print the message to the console. + logDebug("Ending Misskey Stream loop for ", endpoint) + var misskeyreply MisskeyReply + err = json.Unmarshal(message, &misskeyreply) + if err != nil { + logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err) + break + } + // newactivity := misskeyreply.Body.Body + + var newactivity string + + if misskeyreply.Body.Body.Uri != "" { // Remote Message + newactivity = misskeyreply.Body.Body.Uri + matchset := re.FindStringSubmatch(newactivity) + if matchset != nil { + newinstance := matchset[1] + logDebug("Checking new instance from Misskey Stream: ", newinstance) + go CheckInstance(newinstance, endpoint) + } + } else { // Local Message + newactivity = "https://" + endpoint + "/notes/" + misskeyreply.Body.Body.Id + } + + logDebug("Misskey new URI ", newactivity, " from instance: ", endpoint) + + go check_activity(newactivity) + } + // Close the WebSocket connection. + ws.Close() + time.Sleep(time.Minute * 10) + } + } +} + func StreamPleroma(endpoint string) { wss_url := "wss://" + endpoint + "/api/v1/streaming/?stream=public" var pleromaHeader shared.PleromaStreamHeader @@ -34,24 +155,24 @@ func StreamPleroma(endpoint string) { return } - ri_mutex.Lock() - m := runninginstances[endpoint] - m.Status = shared.RUNNING - m.LastRun = "Streaming" - runninginstances[endpoint] = m - ri_mutex.Unlock() + // ri_mutex.Lock() + // m := runninginstances[endpoint] + // m.Status = shared.RUNNING + // m.LastRun = "Streaming" + // runninginstances[endpoint] = m + // ri_mutex.Unlock() for { - logDebug("Starting loop for ", endpoint) + logDebug("Starting Pleroma Stream loop for ", endpoint) _, p, err := ws.ReadMessage() if err != nil { - logWarn(err) + logErr("Unable to read message from Pleroma stream: ", endpoint, " Err: ", err) break } err = json.Unmarshal(p, &pleromaHeader) if err != nil { - logDebug("Unable to parse data from " + endpoint + ", but still connected.") + logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err) break } @@ -59,7 +180,7 @@ func StreamPleroma(endpoint string) { case "update": err = json.Unmarshal([]byte(pleromaHeader.Payload), &newactivity) if err != nil { - logDebug("Unable to parse data from " + endpoint + ", but still connected.") + logErr("Unable to parse data from " + endpoint + ", but still connected.") break } go check_activity(newactivity.Uri) @@ -73,10 +194,10 @@ func StreamPleroma(endpoint string) { logDebug("Unimplemented pleroma stream activity: ", pleromaHeader.Event) continue } - logDebug("Ending loop for ", endpoint) + logDebug("Ending Pleroma stream loop for ", endpoint) } ws.Close() - time.Sleep(time.Minute * 10) + // time.Sleep(time.Minute * 10) } } @@ -143,6 +264,7 @@ func StreamMastodon(endpoint string, o *shared.RunningInstance) { s := bufio.NewScanner(resp.Body) var name string for s.Scan() { + logDebug("Ending Mastodon stream loop for ", endpoint) line := s.Text() token := strings.SplitN(line, ":", 2) var newactivity shared.ReportActivity @@ -182,6 +304,7 @@ func StreamMastodon(endpoint string, o *shared.RunningInstance) { logDebug("Checking new instance from Mastodon Stream: ", newinstance) go CheckInstance(newinstance, endpoint) } + logDebug("Ending Mastodon stream loop for ", endpoint) } if retry == true { time.Sleep(time.Minute * 10) diff --git a/fedilogue/tables.sql b/fedilogger/tables.sql similarity index 100% rename from fedilogue/tables.sql rename to fedilogger/tables.sql diff --git a/fedilogue/testhelper.go b/fedilogger/testhelper.go similarity index 100% rename from fedilogue/testhelper.go rename to fedilogger/testhelper.go diff --git a/fedilogue/web.go b/fedilogger/web.go similarity index 100% rename from fedilogue/web.go rename to fedilogger/web.go diff --git a/go.mod b/go.mod index aead069..2819b37 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require github.com/gorilla/websocket v1.5.0 require ( github.com/aymerick/douceur v0.2.0 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/gorilla/css v1.0.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.14.0 // indirect diff --git a/go.sum b/go.sum index 0e9d43f..a007105 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY= github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=