Adding Misskey streaming

This commit is contained in:
Farhan Khan 2023-07-16 05:29:40 +00:00
parent 0abc76eb04
commit 546244a959
Signed by untrusted user who does not match committer: farhan
GPG Key ID: 45FE45AD7E54F59B
23 changed files with 262 additions and 12 deletions

123
fedilogger/fedilogue.go Normal file
View File

@ -0,0 +1,123 @@
package main
import (
"context"
"net/http"
_ "net/http/pprof"
"regexp"
"runtime"
"sync"
"time"
"github.com/microcosm-cc/bluemonday"
"gitlab.com/khanzf/fedilogue/shared"
)
// Current instances
var runninginstances map[string]shared.RunningInstance
var ri_mutex = &sync.Mutex{}
func startpprof() {
logInfo("Starting http/pprof on :7777")
logFatal(http.ListenAndServe("127.0.0.1:7777", nil))
}
func statusReportHandler() {
for {
StatusReport()
time.Sleep(time.Second * 60)
}
}
/* Tests:
- TestStatusReport_empty_run
- TestStatusReport_full_content
*/
func StatusReport() {
running := 0
keepalive := 0
unsupported := 0
mastodon := 0
pleroma := 0
misskey := 0
other := 0
ri_mutex.Lock()
for i, o := range runninginstances {
logDebug("Software ", o.Software, " Status: ", o.Status, " instance ", i)
if o.Status == 200 {
running = running + 1
} else if o.Status == 607 { // Keepalive
keepalive = keepalive + 1
} else if o.Status == 605 { // Unsupported instance
unsupported = unsupported + 1
}
if o.Software == "mastodon" && o.Status == 200 {
mastodon = mastodon + 1
} else if o.Software == "pleroma" && o.Status == 200 {
pleroma = pleroma + 1
} else if o.Software == "misskey" && o.Status == 200 {
misskey = misskey + 1
} else if o.Status == 200 {
other = other + 1
}
}
ri_mutex.Unlock()
logInfo("Running:", running, " Keepalive:", keepalive, " Unsupported:", unsupported, " Ma:", mastodon, ",P:", pleroma, ",Mi:", misskey, ",O:", other)
}
func main() {
// Initial Setup
logInit()
runninginstances = make(map[string]shared.RunningInstance)
getSettings()
if len(settings.Proxies) > 0 {
for i := 0; i < len(settings.Proxies); i++ {
logInfo("Using proxy: ", settings.Proxies[i].Host, ":", settings.Proxies[i].Port)
}
}
go startpprof()
pool = getDbPool()
p = bluemonday.NewPolicy()
spaceReg = regexp.MustCompile(`[\s\t\.]+`)
removeHTMLReg = regexp.MustCompile(`<\/?\s*br\s*>`)
re = regexp.MustCompile("^https?://([^/]*)/(.*)$")
matchurl = regexp.MustCompile("http?s://[\\w\\-]+\\.[\\w\\-]+\\S*")
staggeredStartChan = make(chan bool)
// Start instances located in database
rows, err := pool.Query(context.Background(), "SELECT endpoint FROM instances")
if err != nil {
logErr("Unable to select from instances")
return
}
defer rows.Close()
go staggeredStart()
go statusReportHandler()
for rows.Next() {
var endpoint string
err = rows.Scan(&endpoint)
if err != nil {
logErr("Unable to iterate database, exiting.")
return
}
o, exists := GetRunner(endpoint)
if o.Banned == true {
continue // Banned instance
}
if exists == false {
go StartInstance(endpoint)
}
}
go startctl()
go webmain()
runtime.Goexit()
}

View File

@ -256,6 +256,7 @@ func StartInstance(endpoint string) {
logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version) logConn("Starting " + endpoint + " as " + o.Software + " " + o.Version)
o.CaptureType = "Stream" o.CaptureType = "Stream"
UpdateRunner(endpoint, o) UpdateRunner(endpoint, o)
StreamMisskey(endpoint)
} else { } else {
o.Status = 605 o.Status = 605
UpdateRunner(endpoint, o) UpdateRunner(endpoint, o)

View File

@ -5,13 +5,134 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"net/url"
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"gitlab.com/khanzf/fedilogue/shared" "gitlab.com/khanzf/fedilogue/shared"
) )
type MisskeyReply struct {
Type string `json:"type"`
Body MisskeyReplyBody `json:"body"`
}
type MisskeyReplyBody struct {
Channel string `json:"channel"`
Type string `json:"type"`
Body MisskeyNoteBody `json:"body"`
}
type MisskeyNoteBody struct {
Uri string `json:"uri"` // Remote Note
Id string `json:"id"` // Local note
}
//////////////////////////////
type MisskeyRequest struct {
Type string `json:"type"`
Body MisskeyRequestBody `json:"body"`
}
type MisskeyRequestBody struct {
Channel string `json:"channel"`
Id string `json:"id"`
Params MisskeyRequestParams
}
type MisskeyRequestParams struct {
}
func StreamMisskey(endpoint string) {
logDebug("StreamMisskey for ", endpoint)
u := url.URL{
Scheme: "wss",
Host: endpoint,
Path: "/streaming",
}
var misskeyrequest MisskeyRequest
misskeyrequest.Type = "connect"
misskeyrequest.Body.Channel = "globalTimeline"
misskeyrequest.Body.Id = uuid.New().String()
for {
misskeyrequeststream, err := json.Marshal(misskeyrequest)
if err != nil {
panic(err)
}
for {
// Create a new WebSocket connection.
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
logErr("Error dialing misskey webSocket to ", endpoint, " :", err)
return
}
logDebug("Misskey websocket connection created: ", endpoint)
ri_mutex.Lock()
m := runninginstances[endpoint]
m.Status = shared.RUNNING
m.LastRun = "Streaming"
runninginstances[endpoint] = m
ri_mutex.Unlock()
// Send a message to the server.
err = ws.WriteMessage(websocket.TextMessage, misskeyrequeststream)
if err != nil {
logErr("Error sending misskey channel subscription: ", endpoint)
return
}
logDebug("Successfully sent misskey channel subscription: ", endpoint)
// Read a message from the server.
for {
logDebug("Starting Misskey Stream loop for ", endpoint)
_, message, err := ws.ReadMessage()
if err != nil {
logErr("Misskey stream broken: ", endpoint)
return
}
// Print the message to the console.
logDebug("Ending Misskey Stream loop for ", endpoint)
var misskeyreply MisskeyReply
err = json.Unmarshal(message, &misskeyreply)
if err != nil {
logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err)
break
}
// newactivity := misskeyreply.Body.Body
var newactivity string
if misskeyreply.Body.Body.Uri != "" { // Remote Message
newactivity = misskeyreply.Body.Body.Uri
matchset := re.FindStringSubmatch(newactivity)
if matchset != nil {
newinstance := matchset[1]
logDebug("Checking new instance from Misskey Stream: ", newinstance)
go CheckInstance(newinstance, endpoint)
}
} else { // Local Message
newactivity = "https://" + endpoint + "/notes/" + misskeyreply.Body.Body.Id
}
logDebug("Misskey new URI ", newactivity, " from instance: ", endpoint)
go check_activity(newactivity)
}
// Close the WebSocket connection.
ws.Close()
time.Sleep(time.Minute * 10)
}
}
}
func StreamPleroma(endpoint string) { func StreamPleroma(endpoint string) {
wss_url := "wss://" + endpoint + "/api/v1/streaming/?stream=public" wss_url := "wss://" + endpoint + "/api/v1/streaming/?stream=public"
var pleromaHeader shared.PleromaStreamHeader var pleromaHeader shared.PleromaStreamHeader
@ -34,24 +155,24 @@ func StreamPleroma(endpoint string) {
return return
} }
ri_mutex.Lock() // ri_mutex.Lock()
m := runninginstances[endpoint] // m := runninginstances[endpoint]
m.Status = shared.RUNNING // m.Status = shared.RUNNING
m.LastRun = "Streaming" // m.LastRun = "Streaming"
runninginstances[endpoint] = m // runninginstances[endpoint] = m
ri_mutex.Unlock() // ri_mutex.Unlock()
for { for {
logDebug("Starting loop for ", endpoint) logDebug("Starting Pleroma Stream loop for ", endpoint)
_, p, err := ws.ReadMessage() _, p, err := ws.ReadMessage()
if err != nil { if err != nil {
logWarn(err) logErr("Unable to read message from Pleroma stream: ", endpoint, " Err: ", err)
break break
} }
err = json.Unmarshal(p, &pleromaHeader) err = json.Unmarshal(p, &pleromaHeader)
if err != nil { if err != nil {
logDebug("Unable to parse data from " + endpoint + ", but still connected.") logErr("Unable to parse data from "+endpoint+", but still connected, err: ", err)
break break
} }
@ -59,7 +180,7 @@ func StreamPleroma(endpoint string) {
case "update": case "update":
err = json.Unmarshal([]byte(pleromaHeader.Payload), &newactivity) err = json.Unmarshal([]byte(pleromaHeader.Payload), &newactivity)
if err != nil { if err != nil {
logDebug("Unable to parse data from " + endpoint + ", but still connected.") logErr("Unable to parse data from " + endpoint + ", but still connected.")
break break
} }
go check_activity(newactivity.Uri) go check_activity(newactivity.Uri)
@ -73,10 +194,10 @@ func StreamPleroma(endpoint string) {
logDebug("Unimplemented pleroma stream activity: ", pleromaHeader.Event) logDebug("Unimplemented pleroma stream activity: ", pleromaHeader.Event)
continue continue
} }
logDebug("Ending loop for ", endpoint) logDebug("Ending Pleroma stream loop for ", endpoint)
} }
ws.Close() ws.Close()
time.Sleep(time.Minute * 10) // time.Sleep(time.Minute * 10)
} }
} }
@ -143,6 +264,7 @@ func StreamMastodon(endpoint string, o *shared.RunningInstance) {
s := bufio.NewScanner(resp.Body) s := bufio.NewScanner(resp.Body)
var name string var name string
for s.Scan() { for s.Scan() {
logDebug("Ending Mastodon stream loop for ", endpoint)
line := s.Text() line := s.Text()
token := strings.SplitN(line, ":", 2) token := strings.SplitN(line, ":", 2)
var newactivity shared.ReportActivity var newactivity shared.ReportActivity
@ -182,6 +304,7 @@ func StreamMastodon(endpoint string, o *shared.RunningInstance) {
logDebug("Checking new instance from Mastodon Stream: ", newinstance) logDebug("Checking new instance from Mastodon Stream: ", newinstance)
go CheckInstance(newinstance, endpoint) go CheckInstance(newinstance, endpoint)
} }
logDebug("Ending Mastodon stream loop for ", endpoint)
} }
if retry == true { if retry == true {
time.Sleep(time.Minute * 10) time.Sleep(time.Minute * 10)

1
go.mod
View File

@ -13,6 +13,7 @@ require github.com/gorilla/websocket v1.5.0
require ( require (
github.com/aymerick/douceur v0.2.0 // indirect github.com/aymerick/douceur v0.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/css v1.0.0 // indirect github.com/gorilla/css v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.0 // indirect github.com/jackc/pgconn v1.14.0 // indirect

2
go.sum
View File

@ -18,6 +18,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY= github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY=
github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c= github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=