fedilogue/engine/stream.go
Farhan Khan a50916eb36 using the same http client, adding it to running instances
some web work
lost track of what's going on and I'm not accountable to anyone...so screw it
2020-12-16 02:41:45 +00:00

159 lines
3.7 KiB
Go

package main
import (
"net/http"
"encoding/json"
"crypto/sha1"
"strings"
"bufio"
"time"
"html"
"log"
"fmt"
)
func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
http_client := http.Client{}
var client_id string
var client_secret string
var oauthData OAuth
var err error
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
req, err := http.NewRequest("GET", api_timeline, nil)
if err != nil {
log.Print("Unable to create new request")
return
}
for _, extaccount := range settings.Externalaccounts {
if extaccount.Endpoint == endpoint {
// use_auth = true
get_client(endpoint, &http_client)
client_id, client_secret, err = get_client(endpoint, &http_client);
if err != nil {
log.Fatal("Unable to register client: ", err)
}
oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret)
if err != nil {
log.Print("Unable to login: ", err)
return
}
// This needs to updated with the time
// last_refresh := time.Now().Unix()
_ = time.Now().Unix()
req.Header.Add("Authorization", oauthData.Access_token)
}
}
resp, err := http_client.Do(req)
if err != nil {
log.Fatal("Error occured for " + api_timeline)
}
defer resp.Body.Close()
ri_mutex.Lock()
m := runninginstances[endpoint]
m.Status = RUNNING
runninginstances[endpoint] = m
m.LastRun = "Streaming"
ri_mutex.Unlock()
s := bufio.NewScanner(resp.Body)
var name string
for s.Scan() {
line := s.Text()
token := strings.SplitN(line, ":", 2)
var newpost ReportPost
if len(token) != 2 {
continue
}
switch strings.TrimSpace(token[0]) {
case "event":
name = strings.TrimSpace(token[1])
continue
case "data":
switch name {
case "update":
jsoner := token[1][1:]
err := json.Unmarshal([]byte(jsoner), &newpost)
if err != nil {
log.Fatal("Unable to unmarshal with error: ", err)
}
default:
continue
}
default:
continue
}
if newpost.Account.Acct == "" {
continue
}
posthash := sha1.New()
at_sign := strings.Index(newpost.Account.Acct, "@")
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Uri)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
// Validate time
t, err := time.Parse(time.RFC3339, newpost.Created_at)
if err != nil {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
if err != nil {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
// Reporting post
reportPostChan <- newpost
if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false {
newinstance := newpost.Account.Acct[at_sign+1:]
ri_mutex.Lock()
_, exists := runninginstances[newinstance]
if exists == false {
m := RunningInstance{}
runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan)
}
ri_mutex.Unlock()
}
}
ri_mutex.Lock()
m.Status = STREAM_ENDED
runninginstances[endpoint] = m
m.LastRun = time.Now().Format(time.RFC3339)
ri_mutex.Unlock()
}