Farhan Khan
a50916eb36
some web work lost track of what's going on and I'm not accountable to anyone...so screw it
159 lines
3.7 KiB
Go
159 lines
3.7 KiB
Go
package main
|
|
|
|
import (
|
|
"net/http"
|
|
"encoding/json"
|
|
"crypto/sha1"
|
|
"strings"
|
|
"bufio"
|
|
"time"
|
|
"html"
|
|
"log"
|
|
"fmt"
|
|
)
|
|
|
|
func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
|
|
http_client := http.Client{}
|
|
|
|
var client_id string
|
|
var client_secret string
|
|
var oauthData OAuth
|
|
var err error
|
|
|
|
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
|
|
req, err := http.NewRequest("GET", api_timeline, nil)
|
|
if err != nil {
|
|
log.Print("Unable to create new request")
|
|
return
|
|
}
|
|
|
|
for _, extaccount := range settings.Externalaccounts {
|
|
if extaccount.Endpoint == endpoint {
|
|
// use_auth = true
|
|
get_client(endpoint, &http_client)
|
|
|
|
client_id, client_secret, err = get_client(endpoint, &http_client);
|
|
if err != nil {
|
|
log.Fatal("Unable to register client: ", err)
|
|
}
|
|
|
|
oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret)
|
|
if err != nil {
|
|
log.Print("Unable to login: ", err)
|
|
return
|
|
}
|
|
// This needs to updated with the time
|
|
// last_refresh := time.Now().Unix()
|
|
_ = time.Now().Unix()
|
|
|
|
|
|
req.Header.Add("Authorization", oauthData.Access_token)
|
|
|
|
}
|
|
}
|
|
|
|
resp, err := http_client.Do(req)
|
|
if err != nil {
|
|
log.Fatal("Error occured for " + api_timeline)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
ri_mutex.Lock()
|
|
m := runninginstances[endpoint]
|
|
m.Status = RUNNING
|
|
runninginstances[endpoint] = m
|
|
m.LastRun = "Streaming"
|
|
ri_mutex.Unlock()
|
|
|
|
s := bufio.NewScanner(resp.Body)
|
|
var name string
|
|
for s.Scan() {
|
|
line := s.Text()
|
|
token := strings.SplitN(line, ":", 2)
|
|
var newpost ReportPost
|
|
if len(token) != 2 {
|
|
continue
|
|
}
|
|
switch strings.TrimSpace(token[0]) {
|
|
case "event":
|
|
name = strings.TrimSpace(token[1])
|
|
continue
|
|
case "data":
|
|
switch name {
|
|
case "update":
|
|
jsoner := token[1][1:]
|
|
err := json.Unmarshal([]byte(jsoner), &newpost)
|
|
if err != nil {
|
|
log.Fatal("Unable to unmarshal with error: ", err)
|
|
}
|
|
default:
|
|
continue
|
|
}
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if newpost.Account.Acct == "" {
|
|
continue
|
|
}
|
|
posthash := sha1.New()
|
|
|
|
at_sign := strings.Index(newpost.Account.Acct, "@")
|
|
|
|
if at_sign == -1 {
|
|
at_sign = len(newpost.Account.Acct)
|
|
newpost.Account.Acct += "@" + endpoint
|
|
}
|
|
|
|
// Calculate the post hash
|
|
fmt.Fprint(posthash, newpost.Uri)
|
|
fmt.Fprint(posthash, newpost.normalized)
|
|
fmt.Fprint(posthash, newpost.Account.Acct)
|
|
fmt.Fprint(posthash, newpost.Account.Display_name)
|
|
newpost.posthash = posthash.Sum(nil)
|
|
|
|
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
|
|
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
|
|
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
|
|
|
|
// Validate time
|
|
t, err := time.Parse(time.RFC3339, newpost.Created_at)
|
|
if err != nil {
|
|
newpost.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
if t.Unix() < 0 {
|
|
newpost.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
|
|
t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
|
|
if err != nil {
|
|
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
if t.Unix() < 0 {
|
|
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
|
|
// Reporting post
|
|
reportPostChan <- newpost
|
|
|
|
if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false {
|
|
newinstance := newpost.Account.Acct[at_sign+1:]
|
|
ri_mutex.Lock()
|
|
_, exists := runninginstances[newinstance]
|
|
if exists == false {
|
|
m := RunningInstance{}
|
|
runninginstances[newinstance] = m
|
|
go StartInstance(newinstance, reportPostChan)
|
|
}
|
|
|
|
ri_mutex.Unlock()
|
|
}
|
|
}
|
|
|
|
ri_mutex.Lock()
|
|
m.Status = STREAM_ENDED
|
|
runninginstances[endpoint] = m
|
|
m.LastRun = time.Now().Format(time.RFC3339)
|
|
ri_mutex.Unlock()
|
|
}
|