From abc199b9ac9c149092053fccbcd9ec7ee67f4682 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Fri, 11 Dec 2020 17:20:44 +0000 Subject: [PATCH] added streaming for mastodon streaming authentication does not refresh yet... oh, and migrated development from freebsd to linux --- poll/Makefile | 2 +- poll/fedilogue.go | 2 +- poll/instance.go | 205 ++------------------------------------------- poll/poll.go | 206 ++++++++++++++++++++++++++++++++++++++++++++++ poll/stream.go | 147 +++++++++++++++++++++++++++++++++ 5 files changed, 361 insertions(+), 201 deletions(-) create mode 100644 poll/poll.go create mode 100644 poll/stream.go diff --git a/poll/Makefile b/poll/Makefile index caae96d..edffc3d 100644 --- a/poll/Makefile +++ b/poll/Makefile @@ -1,4 +1,4 @@ -GOFILES = fedilogue.go ctl.go headers.go instance.go web.go db.go config.go oauth.go +GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go build: go build -o fedilogue $(GOFILES) diff --git a/poll/fedilogue.go b/poll/fedilogue.go index de293b3..c809d7a 100644 --- a/poll/fedilogue.go +++ b/poll/fedilogue.go @@ -38,7 +38,7 @@ func main() { ri_mutex.Lock() _, exists := runninginstances[endpoint] if exists == true { - log.Println("Already exists: " + endpoint) + log.Print("Already exists: " + endpoint) } else { runninginstances[endpoint] = RunningInstance{} go StartInstance(endpoint, reportPostChan) diff --git a/poll/instance.go b/poll/instance.go index bf3d03d..a454d95 100644 --- a/poll/instance.go +++ b/poll/instance.go @@ -3,213 +3,17 @@ package main import ( "github.com/microcosm-cc/bluemonday" "encoding/json" - "crypto/sha1" "io/ioutil" "net/http" "strings" "regexp" - "html" "time" - "fmt" "log" ) var p *bluemonday.Policy var spaceReg *regexp.Regexp -func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { - newposts := make([]ReportPost, 0) - - min_id := "" - - http_client := http.Client{} - parsing_error := 0 - unprocess_error := 0 - use_auth := false - - var last_refresh int64 - var client_id string - var client_secret string - var oauthData OAuth - var err error - - for _, extaccount := range settings.Externalaccounts { - if extaccount.Endpoint == endpoint { - use_auth = true - register_client(endpoint, &http_client) - - client_id, client_secret, err = register_client(endpoint, &http_client); - if err != nil { - log.Fatal("Unable to register client: ", err) - } - - oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret) - if err != nil { - log.Print("Unable to login: ", err) - return - } - last_refresh = time.Now().Unix() - - } - } - - - for { - ri_mutex.Lock() - m := runninginstances[endpoint] - ri_mutex.Unlock() - - api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id - req, err := http.NewRequest("GET", api_timeline, nil) - if err != nil { - log.Print("Unable to create new request") - return - } - - if use_auth == true { - if time.Now().Unix() > last_refresh + oauthData.Expires_in { - oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token) - if err != nil { - log.Print("Unable to refresh: ", err) - return - } - last_refresh = time.Now().Unix() - } - req.Header.Add("Authorization", oauthData.Access_token) - } - - m.LastRun = time.Now().Format(time.RFC3339) - resp, err := http_client.Do(req) - if err != nil { - ri_mutex.Lock() - m.Status = CLIENT_ISSUE - runninginstances[endpoint] = m - ri_mutex.Unlock() - log.Fatal("Failure here", err.Error()) - return - } - - if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds - log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") - _, _ = ioutil.ReadAll(resp.Body) - resp.Body.Close() // Release as soon as done - ri_mutex.Lock() - m.Status = resp.StatusCode - runninginstances[endpoint] = m - ri_mutex.Unlock() - if unprocess_error > 5 { - log.Print("Exiting for " + endpoint) - } - unprocess_error = unprocess_error + 1 - time.Sleep(time.Second * 30) - continue - } else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour - log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") - _, _ = ioutil.ReadAll(resp.Body) - resp.Body.Close() // Release as soon as done - ri_mutex.Lock() - m.Status = 765 - runninginstances[endpoint] = m - ri_mutex.Unlock() - time.Sleep(time.Second * 3600) - continue - } else if resp.StatusCode != 200 { // Crash - log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode) - _, _ = ioutil.ReadAll(resp.Body) - resp.Body.Close() // Release as soon as done - ri_mutex.Lock() - m.Status = resp.StatusCode - runninginstances[endpoint] = m - ri_mutex.Unlock() - return - } - - err = json.NewDecoder(resp.Body).Decode(&newposts) - if err != nil { - if parsing_error > 5 { - ri_mutex.Lock() - m.Status = BAD_RESPONSE - runninginstances[endpoint] = m - ri_mutex.Unlock() - log.Print("Giving up on " + endpoint) - return - } - parsing_error = parsing_error + 1 - time.Sleep(time.Second * 30) - } - resp.Body.Close() // Release as soon as done - - ri_mutex.Lock() - m.Status = RUNNING - runninginstances[endpoint] = m - ri_mutex.Unlock() - - for _, newpost := range newposts { - if newpost.Account.Acct == "" { - continue - } - posthash := sha1.New() - - at_sign := strings.Index(newpost.Account.Acct, "@") - - if at_sign == -1 { - at_sign = len(newpost.Account.Acct) - newpost.Account.Acct += "@" + endpoint - } - - // Calculate the post hash - fmt.Fprint(posthash, newpost.Url) - fmt.Fprint(posthash, newpost.normalized) - fmt.Fprint(posthash, newpost.Account.Acct) - fmt.Fprint(posthash, newpost.Account.Display_name) - newpost.posthash = posthash.Sum(nil) - - newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) - newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") - newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") - - // Validate time - t, err := time.Parse(time.RFC3339, newpost.Created_at) - if err != nil { - newpost.Created_at = time.Now().Format(time.RFC3339) - } - if t.Unix() < 0 { - newpost.Created_at = time.Now().Format(time.RFC3339) - } - - t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) - if err != nil { - newpost.Account.Created_at = time.Now().Format(time.RFC3339) - } - if t.Unix() < 0 { - newpost.Account.Created_at = time.Now().Format(time.RFC3339) - } - - reportPostChan <- newpost - - // Check min_id - if newpost.Id > min_id { - min_id = newpost.Id - } - - // Only done if we are crawling - if settings.Crawl == true || StringExists(endpoint, settings.Banned) == false { - newinstance := newpost.Account.Acct[at_sign+1:] - ri_mutex.Lock() - _, exists := runninginstances[newinstance] - if exists == false { - m := RunningInstance{} - runninginstances[newinstance] = m - go StartInstance(newinstance, reportPostChan) - } - - ri_mutex.Unlock() - } - } - time.Sleep(time.Second * 10) - } -} - // Change this to return a proper "err" func GetNodeInfo(endpoint string) (NodeInfo) { /* Checking order @@ -280,9 +84,12 @@ func StartInstance(endpoint string, reportPostChan chan ReportPost) { return } - if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { - log.Print("Starting " + endpoint + " as Mastodon/Pleroma instance") - go PollMastodonPleroma(endpoint, reportPostChan) + if nodeinfo.Software.Name == "pleroma" { + log.Print("Starting " + endpoint + " as " + nodeinfo.Software.Name) + PollMastodonPleroma(endpoint, reportPostChan) + } else if nodeinfo.Software.Name == "mastodon" { + log.Print("Starting " + endpoint + " as " + nodeinfo.Software.Name) + StreamMastodon(endpoint, reportPostChan) } } diff --git a/poll/poll.go b/poll/poll.go new file mode 100644 index 0000000..a514924 --- /dev/null +++ b/poll/poll.go @@ -0,0 +1,206 @@ +package main + +import ( + "encoding/json" + "crypto/sha1" + "io/ioutil" + "net/http" + "strings" + "html" + "time" + "fmt" + "log" +) + +func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { + newposts := make([]ReportPost, 0) + + min_id := "" + + http_client := http.Client{} + parsing_error := 0 + unprocess_error := 0 + use_auth := false + + var last_refresh int64 + var client_id string + var client_secret string + var oauthData OAuth + var err error + + for _, extaccount := range settings.Externalaccounts { + if extaccount.Endpoint == endpoint { + use_auth = true + register_client(endpoint, &http_client) + + client_id, client_secret, err = register_client(endpoint, &http_client); + if err != nil { + log.Fatal("Unable to register client: ", err) + } + + oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret) + if err != nil { + log.Print("Unable to login: ", err) + return + } + last_refresh = time.Now().Unix() + + } + } + + + for { + ri_mutex.Lock() + m := runninginstances[endpoint] + ri_mutex.Unlock() + + api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id + req, err := http.NewRequest("GET", api_timeline, nil) + if err != nil { + log.Print("Unable to create new request") + return + } + + if use_auth == true { + if time.Now().Unix() > last_refresh + oauthData.Expires_in { + oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token) + if err != nil { + log.Print("Unable to refresh: ", err) + return + } + last_refresh = time.Now().Unix() + } + req.Header.Add("Authorization", oauthData.Access_token) + } + + m.LastRun = time.Now().Format(time.RFC3339) + resp, err := http_client.Do(req) + if err != nil { + ri_mutex.Lock() + m.Status = CLIENT_ISSUE + runninginstances[endpoint] = m + ri_mutex.Unlock() + log.Fatal("Failure here", err.Error()) + return + } + + if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds + log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") + _, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() // Release as soon as done + ri_mutex.Lock() + m.Status = resp.StatusCode + runninginstances[endpoint] = m + ri_mutex.Unlock() + if unprocess_error > 5 { + log.Print("Exiting for " + endpoint) + } + unprocess_error = unprocess_error + 1 + time.Sleep(time.Second * 30) + continue + } else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour + log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") + _, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() // Release as soon as done + ri_mutex.Lock() + m.Status = 765 + runninginstances[endpoint] = m + ri_mutex.Unlock() + time.Sleep(time.Second * 3600) + continue + } else if resp.StatusCode != 200 { // Crash + log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode) + _, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() // Release as soon as done + ri_mutex.Lock() + m.Status = resp.StatusCode + runninginstances[endpoint] = m + ri_mutex.Unlock() + return + } + + err = json.NewDecoder(resp.Body).Decode(&newposts) + if err != nil { + if parsing_error > 5 { + ri_mutex.Lock() + m.Status = BAD_RESPONSE + runninginstances[endpoint] = m + ri_mutex.Unlock() + log.Print("Giving up on " + endpoint) + return + } + parsing_error = parsing_error + 1 + time.Sleep(time.Second * 30) + } + resp.Body.Close() // Release as soon as done + + ri_mutex.Lock() + m.Status = RUNNING + runninginstances[endpoint] = m + ri_mutex.Unlock() + + for _, newpost := range newposts { + if newpost.Account.Acct == "" { + continue + } + posthash := sha1.New() + + at_sign := strings.Index(newpost.Account.Acct, "@") + + if at_sign == -1 { + at_sign = len(newpost.Account.Acct) + newpost.Account.Acct += "@" + endpoint + } + + // Calculate the post hash + fmt.Fprint(posthash, newpost.Url) + fmt.Fprint(posthash, newpost.normalized) + fmt.Fprint(posthash, newpost.Account.Acct) + fmt.Fprint(posthash, newpost.Account.Display_name) + newpost.posthash = posthash.Sum(nil) + + newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) + newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") + newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") + + // Validate time + t, err := time.Parse(time.RFC3339, newpost.Created_at) + if err != nil { + newpost.Created_at = time.Now().Format(time.RFC3339) + } + if t.Unix() < 0 { + newpost.Created_at = time.Now().Format(time.RFC3339) + } + + t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) + if err != nil { + newpost.Account.Created_at = time.Now().Format(time.RFC3339) + } + if t.Unix() < 0 { + newpost.Account.Created_at = time.Now().Format(time.RFC3339) + } + + reportPostChan <- newpost + + // Check min_id + if newpost.Id > min_id { + min_id = newpost.Id + } + + // Only done if we are crawling + if settings.Crawl == true || StringExists(endpoint, settings.Banned) == false { + newinstance := newpost.Account.Acct[at_sign+1:] + ri_mutex.Lock() + _, exists := runninginstances[newinstance] + if exists == false { + m := RunningInstance{} + runninginstances[newinstance] = m + go StartInstance(newinstance, reportPostChan) + } + + ri_mutex.Unlock() + } + } + time.Sleep(time.Second * 10) + } +} diff --git a/poll/stream.go b/poll/stream.go new file mode 100644 index 0000000..d47eaac --- /dev/null +++ b/poll/stream.go @@ -0,0 +1,147 @@ +package main + +import ( + "net/http" + "encoding/json" + "crypto/sha1" + "strings" + "bufio" + "time" + "html" + "log" + "fmt" +) + +func StreamMastodon(endpoint string, reportPostChan chan ReportPost) { + http_client := http.Client{} +// use_auth := false + +// var last_refresh int64 + var client_id string + var client_secret string + var oauthData OAuth + var err error + + api_timeline := "https://" + endpoint + "/api/v1/streaming/public" + req, err := http.NewRequest("GET", api_timeline, nil) + if err != nil { + log.Print("Unable to create new request") + return + } + + for _, extaccount := range settings.Externalaccounts { + if extaccount.Endpoint == endpoint { +// use_auth = true + register_client(endpoint, &http_client) + + client_id, client_secret, err = register_client(endpoint, &http_client); + if err != nil { + log.Fatal("Unable to register client: ", err) + } + + oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret) + if err != nil { + log.Print("Unable to login: ", err) + return + } + // This needs to updated with the time +// last_refresh := time.Now().Unix() + _ = time.Now().Unix() + + + req.Header.Add("Authorization", oauthData.Access_token) + + } + } + + resp, err := http_client.Do(req) + if err != nil { + log.Fatal("Error occured for " + api_timeline) + } + defer resp.Body.Close() + + s := bufio.NewScanner(resp.Body) + var name string + for s.Scan() { + line := s.Text() + token := strings.SplitN(line, ":", 2) + var newpost ReportPost + if len(token) != 2 { + continue + } + switch strings.TrimSpace(token[0]) { + case "event": + name = strings.TrimSpace(token[1]) + continue + case "data": + switch name { + case "update": + jsoner := token[1][1:] + err := json.Unmarshal([]byte(jsoner), &newpost) + if err != nil { + log.Fatal("Unable to unmarshal with error: ", err) + } + default: + continue + } + default: + continue + } + + if newpost.Account.Acct == "" { + continue + } + posthash := sha1.New() + + at_sign := strings.Index(newpost.Account.Acct, "@") + + if at_sign == -1 { + at_sign = len(newpost.Account.Acct) + newpost.Account.Acct += "@" + endpoint + } + + // Calculate the post hash + fmt.Fprint(posthash, newpost.Url) + fmt.Fprint(posthash, newpost.normalized) + fmt.Fprint(posthash, newpost.Account.Acct) + fmt.Fprint(posthash, newpost.Account.Display_name) + newpost.posthash = posthash.Sum(nil) + + newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) + newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") + newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") + + // Validate time + t, err := time.Parse(time.RFC3339, newpost.Created_at) + if err != nil { + newpost.Created_at = time.Now().Format(time.RFC3339) + } + if t.Unix() < 0 { + newpost.Created_at = time.Now().Format(time.RFC3339) + } + + t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) + if err != nil { + newpost.Account.Created_at = time.Now().Format(time.RFC3339) + } + if t.Unix() < 0 { + newpost.Account.Created_at = time.Now().Format(time.RFC3339) + } + + // Reporting post + reportPostChan <- newpost + + if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false { + newinstance := newpost.Account.Acct[at_sign+1:] + ri_mutex.Lock() + _, exists := runninginstances[newinstance] + if exists == false { + m := RunningInstance{} + runninginstances[newinstance] = m + go StartInstance(newinstance, reportPostChan) + } + + ri_mutex.Unlock() + } + } +}