325 lines
8.0 KiB
Go
325 lines
8.0 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"crypto/sha1"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strings"
|
|
"html"
|
|
"time"
|
|
"fmt"
|
|
"log"
|
|
)
|
|
|
|
type ImageData struct {
|
|
Type string `"json:type"`
|
|
Url string `"json:url"`
|
|
}
|
|
|
|
type PublicKeyData struct {
|
|
Id string `"json:id"`
|
|
Owner string `"json:owner"`
|
|
PublicKeyPem string `"json:publicKeyPem"`
|
|
}
|
|
|
|
type UserInfo struct {
|
|
Id string `"json:id"`
|
|
Type string `"json:type"`
|
|
Following string `"json:following"`
|
|
Followers string `"json:followers"`
|
|
Inbox string `"json:inbox"`
|
|
Outbox string `"json:outbox"`
|
|
Featured string `"json:featured"`
|
|
PreferredUsername string `"json:preferredUsername"`
|
|
PublicKey PublicKeyData `"json:publicKeyPem"`
|
|
Name string `"json:name"`
|
|
Summary string `"json:summary"`
|
|
Url string `"json:Url"`
|
|
|
|
|
|
// ManuallyApprovesFollowers string `"json:manuallyApprovesFollowers"`
|
|
// Discoverable bool `"json:discoverable"`
|
|
}
|
|
|
|
type PostInfo struct {
|
|
Id string `"json:id"`
|
|
Type string `"json:type"`
|
|
Published string `"json:published"`
|
|
Url string `"json:Url"`
|
|
Content string `"json:content"`
|
|
}
|
|
|
|
func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) {
|
|
var userinfo UserInfo
|
|
|
|
// http_client := http.Client{}
|
|
req, err := http.NewRequest(http.MethodGet, uri, nil)
|
|
if err != nil {
|
|
return UserInfo{}, err
|
|
}
|
|
|
|
req.Header.Set("Accept", "application/ld+json")
|
|
|
|
resp, err := http_client.Do(req)
|
|
if err != nil {
|
|
return UserInfo{}, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
err = json.NewDecoder(resp.Body).Decode(&userinfo)
|
|
if err != nil {
|
|
return UserInfo{}, err
|
|
}
|
|
|
|
return userinfo, nil
|
|
}
|
|
|
|
|
|
func fetch_post(http_client http.Client, uri string) (PostInfo, error) {
|
|
var postinfo PostInfo
|
|
|
|
req, err := http.NewRequest(http.MethodGet, uri, nil)
|
|
if err != nil {
|
|
return PostInfo{}, err
|
|
}
|
|
|
|
req.Header.Set("Accept", "application/ld+json")
|
|
|
|
resp, err := http_client.Do(req)
|
|
if err != nil {
|
|
return PostInfo{}, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
err = json.NewDecoder(resp.Body).Decode(&postinfo)
|
|
if err != nil {
|
|
return PostInfo{}, err
|
|
}
|
|
|
|
return postinfo, nil
|
|
}
|
|
|
|
func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) {
|
|
newposts := make([]ReportPost, 0)
|
|
|
|
min_id := ""
|
|
|
|
parsing_error := 0
|
|
unprocess_error := 0
|
|
use_auth := false
|
|
|
|
var last_refresh int64
|
|
var client_id string
|
|
var client_secret string
|
|
var oauthData OAuth
|
|
var err error
|
|
|
|
for _, extaccount := range settings.Externalaccounts {
|
|
if extaccount.Endpoint == endpoint {
|
|
use_auth = true
|
|
client_id, client_secret, err = get_client(endpoint, &http_client);
|
|
if err != nil {
|
|
log.Fatal("Unable to register client: ", err)
|
|
}
|
|
|
|
oauthData, err = oauth_login(endpoint, extaccount.Username, extaccount.Password, client_id, client_secret)
|
|
if err != nil {
|
|
log.Print("Unable to login: ", err)
|
|
return
|
|
}
|
|
last_refresh = time.Now().Unix()
|
|
|
|
}
|
|
}
|
|
|
|
for {
|
|
ri_mutex.Lock()
|
|
m := runninginstances[endpoint]
|
|
ri_mutex.Unlock()
|
|
|
|
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&since_id=" + min_id
|
|
req, err := http.NewRequest("GET", api_timeline, nil)
|
|
if err != nil {
|
|
log.Print("Unable to create new request")
|
|
return
|
|
}
|
|
|
|
if use_auth == true {
|
|
if time.Now().Unix() > last_refresh + oauthData.Expires_in {
|
|
oauthData, err = oauth_refresh(endpoint, client_id, client_secret, oauthData.Refresh_token)
|
|
if err != nil {
|
|
log.Print("Unable to refresh: ", err)
|
|
return
|
|
}
|
|
last_refresh = time.Now().Unix()
|
|
}
|
|
req.Header.Add("Authorization", oauthData.Access_token)
|
|
}
|
|
|
|
m.LastRun = time.Now().Format(time.RFC3339)
|
|
resp, err := http_client.Do(req)
|
|
if err != nil {
|
|
m.Status = CLIENT_ISSUE
|
|
ri_mutex.Lock()
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
log.Fatal("Failure here", err.Error())
|
|
return
|
|
}
|
|
|
|
if resp.StatusCode == TOOMANYREQUESTS { // Short Delay, 30 seconds
|
|
log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
|
|
_, _ = ioutil.ReadAll(resp.Body)
|
|
resp.Body.Close() // Release as soon as done
|
|
m.Status = resp.StatusCode
|
|
ri_mutex.Lock()
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
if unprocess_error > 5 {
|
|
log.Print("Exiting for " + endpoint)
|
|
}
|
|
unprocess_error = unprocess_error + 1
|
|
time.Sleep(time.Second * 30)
|
|
continue
|
|
} else if resp.StatusCode == INTERNAL_ERROR { // Longer delay, 1 hour
|
|
log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
|
|
_, _ = ioutil.ReadAll(resp.Body)
|
|
resp.Body.Close() // Release as soon as done
|
|
m.Status = 765
|
|
ri_mutex.Lock()
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
time.Sleep(time.Second * 3600)
|
|
continue
|
|
} else if resp.StatusCode != 200 { // Crash
|
|
log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode)
|
|
_, _ = ioutil.ReadAll(resp.Body)
|
|
resp.Body.Close() // Release as soon as done
|
|
m.Status = resp.StatusCode
|
|
ri_mutex.Lock()
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
return
|
|
}
|
|
|
|
err = json.NewDecoder(resp.Body).Decode(&newposts)
|
|
if err != nil {
|
|
if parsing_error > 5 {
|
|
m.Status = BAD_RESPONSE
|
|
ri_mutex.Lock()
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
log.Print("Giving up on " + endpoint)
|
|
return
|
|
}
|
|
parsing_error = parsing_error + 1
|
|
time.Sleep(time.Second * 30)
|
|
}
|
|
resp.Body.Close() // Release as soon as done
|
|
|
|
m.Status = RUNNING
|
|
ri_mutex.Lock()
|
|
runninginstances[endpoint] = m
|
|
ri_mutex.Unlock()
|
|
|
|
for _, newpost := range newposts {
|
|
if newpost.Account.Acct == "" {
|
|
continue
|
|
}
|
|
at_sign := strings.Index(newpost.Account.Acct, "@")
|
|
newinstance := newpost.Account.Acct[at_sign+1:]
|
|
|
|
// Trust the post if it comes from the same source
|
|
if newinstance != endpoint {
|
|
ri_mutex.Lock()
|
|
o, exist := runninginstances[newinstance]
|
|
ri_mutex.Unlock()
|
|
if exist == false {
|
|
o := RunningInstance{}
|
|
new_client := http.Client{}
|
|
o.client = new_client
|
|
o.Status = KEEPALIVE
|
|
ri_mutex.Lock()
|
|
runninginstances[newinstance] = o
|
|
ri_mutex.Unlock()
|
|
}
|
|
|
|
realuser, err := fetch_user_info(o.client, newpost.Account.Url)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
realpost, err := fetch_post(o.client, newpost.Uri)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Minor verification for now...
|
|
newpost.Account.Display_name = realuser.Name
|
|
newpost.Content = realpost.Content
|
|
newpost.Created_at = realpost.Published
|
|
}
|
|
|
|
posthash := sha1.New()
|
|
|
|
if at_sign == -1 {
|
|
at_sign = len(newpost.Account.Acct)
|
|
newpost.Account.Acct += "@" + endpoint
|
|
}
|
|
|
|
// Calculate the post hash
|
|
fmt.Fprint(posthash, newpost.Uri)
|
|
fmt.Fprint(posthash, newpost.normalized)
|
|
fmt.Fprint(posthash, newpost.Account.Acct)
|
|
fmt.Fprint(posthash, newpost.Account.Display_name)
|
|
newpost.posthash = posthash.Sum(nil)
|
|
|
|
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
|
|
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
|
|
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
|
|
|
|
// Validate time
|
|
t, err := time.Parse(time.RFC3339, newpost.Created_at)
|
|
if err != nil {
|
|
newpost.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
if t.Unix() < 0 {
|
|
newpost.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
|
|
t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
|
|
if err != nil {
|
|
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
if t.Unix() < 0 {
|
|
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
|
|
}
|
|
|
|
reportPostChan <- newpost
|
|
|
|
// Check min_id
|
|
if newpost.Id > min_id {
|
|
min_id = newpost.Id
|
|
}
|
|
|
|
// Only done if we are crawling
|
|
if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false {
|
|
// Skip over this if its the same as the endpoint
|
|
if newinstance == endpoint {
|
|
continue
|
|
}
|
|
ri_mutex.Lock()
|
|
o, exists := runninginstances[newinstance]
|
|
if exists == false || o.Status == KEEPALIVE {
|
|
m := RunningInstance{}
|
|
runninginstances[newinstance] = m
|
|
go StartInstance(newinstance, reportPostChan)
|
|
}
|
|
|
|
ri_mutex.Unlock()
|
|
}
|
|
}
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
}
|