fedilogue/engine/stream.go

159 lines
3.7 KiB
Go
Raw Normal View History

package main
import (
"net/http"
"encoding/json"
"crypto/sha1"
"strings"
"bufio"
"time"
"html"
"log"
"fmt"
)
func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
http_client := http.Client{}
var client_id string
var client_secret string
var oauthData OAuth
var err error
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
req, err := http.NewRequest("GET", api_timeline, nil)
if err != nil {
log.Print("Unable to create new request")
return
}
for _, extaccount := range settings.Externalaccounts {
if extaccount.Endpoint == endpoint {
// use_auth = true
2020-12-13 07:38:03 +00:00
get_client(endpoint, &http_client)
2020-12-13 07:38:03 +00:00
client_id, client_secret, err = get_client(endpoint, &http_client);
if err != nil {
log.Fatal("Unable to register client: ", err)
}
oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret)
if err != nil {
log.Print("Unable to login: ", err)
return
}
// This needs to updated with the time
// last_refresh := time.Now().Unix()
_ = time.Now().Unix()
req.Header.Add("Authorization", oauthData.Access_token)
}
}
resp, err := http_client.Do(req)
if err != nil {
log.Fatal("Error occured for " + api_timeline)
}
defer resp.Body.Close()
2020-12-13 08:26:50 +00:00
ri_mutex.Lock()
m := runninginstances[endpoint]
m.Status = RUNNING
runninginstances[endpoint] = m
m.LastRun = "Streaming"
ri_mutex.Unlock()
s := bufio.NewScanner(resp.Body)
var name string
for s.Scan() {
line := s.Text()
token := strings.SplitN(line, ":", 2)
var newpost ReportPost
if len(token) != 2 {
continue
}
switch strings.TrimSpace(token[0]) {
case "event":
name = strings.TrimSpace(token[1])
continue
case "data":
switch name {
case "update":
jsoner := token[1][1:]
err := json.Unmarshal([]byte(jsoner), &newpost)
if err != nil {
log.Fatal("Unable to unmarshal with error: ", err)
}
default:
continue
}
default:
continue
}
if newpost.Account.Acct == "" {
continue
}
posthash := sha1.New()
at_sign := strings.Index(newpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Url)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
// Validate time
t, err := time.Parse(time.RFC3339, newpost.Created_at)
if err != nil {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
if err != nil {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
// Reporting post
reportPostChan <- newpost
if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false {
newinstance := newpost.Account.Acct[at_sign+1:]
ri_mutex.Lock()
_, exists := runninginstances[newinstance]
if exists == false {
m := RunningInstance{}
runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan)
}
ri_mutex.Unlock()
}
}
2020-12-13 08:26:50 +00:00
ri_mutex.Lock()
m.Status = STREAM_ENDED
runninginstances[endpoint] = m
m.LastRun = time.Now().Format(time.RFC3339)
ri_mutex.Unlock()
}