client added to runninginstances, can be shared across goroutines

adding basic post verification scaffolding
This commit is contained in:
farhan 2020-12-17 01:15:17 +00:00
parent 6c6d23a340
commit 064acdcf2d
4 changed files with 101 additions and 39 deletions

View File

@ -19,6 +19,7 @@ const (
BAD_NODEINFO = 604 BAD_NODEINFO = 604
UNSUPPORTED_INSTANCE = 605 UNSUPPORTED_INSTANCE = 605
STREAM_ENDED = 606 STREAM_ENDED = 606
KEEPALIVE = 607
) )
// Parsing Unmarshal JSON type // Parsing Unmarshal JSON type

View File

@ -12,6 +12,17 @@ import (
"log" "log"
) )
type ImageData struct {
Type string `"json:type"`
Url string `"json:url"`
}
type PublicKeyData struct {
Id string `"json:id"`
Owner string `"json:owner"`
PublicKeyPem string `"json:publicKeyPem"`
}
type UserInfo struct { type UserInfo struct {
Id string `"json:id"` Id string `"json:id"`
Type string `"json:type"` Type string `"json:type"`
@ -21,9 +32,12 @@ type UserInfo struct {
Outbox string `"json:outbox"` Outbox string `"json:outbox"`
Featured string `"json:featured"` Featured string `"json:featured"`
PreferredUsername string `"json:preferredUsername"` PreferredUsername string `"json:preferredUsername"`
PublicKey PublicKeyData `"json:publicKeyPem"`
Name string `"json:name"` Name string `"json:name"`
Summary string `"json:summary"` Summary string `"json:summary"`
Url string `"json:Url"` Url string `"json:Url"`
// ManuallyApprovesFollowers string `"json:manuallyApprovesFollowers"` // ManuallyApprovesFollowers string `"json:manuallyApprovesFollowers"`
// Discoverable bool `"json:discoverable"` // Discoverable bool `"json:discoverable"`
} }
@ -53,7 +67,6 @@ func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) {
} }
defer resp.Body.Close() defer resp.Body.Close()
// content, err := ioutil.ReadAll(resp.Body)
err = json.NewDecoder(resp.Body).Decode(&userinfo) err = json.NewDecoder(resp.Body).Decode(&userinfo)
if err != nil { if err != nil {
return UserInfo{}, err return UserInfo{}, err
@ -66,7 +79,6 @@ func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) {
func fetch_post(http_client http.Client, uri string) (PostInfo, error) { func fetch_post(http_client http.Client, uri string) (PostInfo, error) {
var postinfo PostInfo var postinfo PostInfo
// http_client := http.Client{}
req, err := http.NewRequest(http.MethodGet, uri, nil) req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil { if err != nil {
return PostInfo{}, err return PostInfo{}, err
@ -80,7 +92,6 @@ func fetch_post(http_client http.Client, uri string) (PostInfo, error) {
} }
defer resp.Body.Close() defer resp.Body.Close()
// content, err := ioutil.ReadAll(resp.Body)
err = json.NewDecoder(resp.Body).Decode(&postinfo) err = json.NewDecoder(resp.Body).Decode(&postinfo)
if err != nil { if err != nil {
return PostInfo{}, err return PostInfo{}, err
@ -149,8 +160,8 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
m.LastRun = time.Now().Format(time.RFC3339) m.LastRun = time.Now().Format(time.RFC3339)
resp, err := http_client.Do(req) resp, err := http_client.Do(req)
if err != nil { if err != nil {
ri_mutex.Lock()
m.Status = CLIENT_ISSUE m.Status = CLIENT_ISSUE
ri_mutex.Lock()
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
log.Fatal("Failure here", err.Error()) log.Fatal("Failure here", err.Error())
@ -161,8 +172,8 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") log.Print("Delaying " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
_, _ = ioutil.ReadAll(resp.Body) _, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = resp.StatusCode m.Status = resp.StatusCode
ri_mutex.Lock()
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
if unprocess_error > 5 { if unprocess_error > 5 {
@ -175,8 +186,8 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay") log.Print("Suspending " + endpoint + ", gave status ", resp.StatusCode, ", 1 hour delay")
_, _ = ioutil.ReadAll(resp.Body) _, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = 765 m.Status = 765
ri_mutex.Lock()
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
time.Sleep(time.Second * 3600) time.Sleep(time.Second * 3600)
@ -185,8 +196,8 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode) log.Print("Terminating " + endpoint + ", gave status ", resp.StatusCode)
_, _ = ioutil.ReadAll(resp.Body) _, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = resp.StatusCode m.Status = resp.StatusCode
ri_mutex.Lock()
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
return return
@ -195,8 +206,8 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
err = json.NewDecoder(resp.Body).Decode(&newposts) err = json.NewDecoder(resp.Body).Decode(&newposts)
if err != nil { if err != nil {
if parsing_error > 5 { if parsing_error > 5 {
ri_mutex.Lock()
m.Status = BAD_RESPONSE m.Status = BAD_RESPONSE
ri_mutex.Lock()
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
log.Print("Giving up on " + endpoint) log.Print("Giving up on " + endpoint)
@ -207,8 +218,8 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
} }
resp.Body.Close() // Release as soon as done resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = RUNNING m.Status = RUNNING
ri_mutex.Lock()
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
@ -218,24 +229,20 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
} }
at_sign := strings.Index(newpost.Account.Acct, "@") at_sign := strings.Index(newpost.Account.Acct, "@")
newinstance := newpost.Account.Acct[at_sign+1:] newinstance := newpost.Account.Acct[at_sign+1:]
// Trust the post if it comes from the same source // Trust the post if it comes from the same source
if newinstance == endpoint { if newinstance != endpoint {
// fmt.Println("Do not need to verify" + newpost.Account.Acct[at_sign+1:])
} else {
ri_mutex.Lock() ri_mutex.Lock()
o, exist := runninginstances[newinstance] o, exist := runninginstances[newinstance]
ri_mutex.Unlock() ri_mutex.Unlock()
if exist == false { if exist == false {
log.Print("Didn't exist so adding...")
o := RunningInstance{} o := RunningInstance{}
new_client := http.Client{} new_client := http.Client{}
o.client = new_client o.client = new_client
o.Status = KEEPALIVE
ri_mutex.Lock() ri_mutex.Lock()
runninginstances[newinstance] = o runninginstances[newinstance] = o
log.Print(runninginstances)
ri_mutex.Unlock() ri_mutex.Unlock()
} else {
log.Print("Exists! ", o)
} }
realuser, err := fetch_user_info(o.client, newpost.Account.Url) realuser, err := fetch_user_info(o.client, newpost.Account.Url)
@ -251,7 +258,6 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
newpost.Account.Display_name = realuser.Name newpost.Account.Display_name = realuser.Name
newpost.Content = realpost.Content newpost.Content = realpost.Content
newpost.Created_at = realpost.Published newpost.Created_at = realpost.Published
log.Print("Updated! for " + newinstance)
} }
posthash := sha1.New() posthash := sha1.New()
@ -304,7 +310,7 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
} }
ri_mutex.Lock() ri_mutex.Lock()
o, exists := runninginstances[newinstance] o, exists := runninginstances[newinstance]
if exists == false || o.Status == NEW_INSTANCE { if exists == false || o.Status == KEEPALIVE {
m := RunningInstance{} m := RunningInstance{}
runninginstances[newinstance] = m runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan) go StartInstance(newinstance, reportPostChan)

View File

@ -61,8 +61,8 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
ri_mutex.Lock() ri_mutex.Lock()
m := runninginstances[endpoint] m := runninginstances[endpoint]
m.Status = RUNNING m.Status = RUNNING
runninginstances[endpoint] = m
m.LastRun = "Streaming" m.LastRun = "Streaming"
runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
s := bufio.NewScanner(resp.Body) s := bufio.NewScanner(resp.Body)
@ -96,9 +96,41 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
if newpost.Account.Acct == "" { if newpost.Account.Acct == "" {
continue continue
} }
posthash := sha1.New()
at_sign := strings.Index(newpost.Account.Acct, "@") at_sign := strings.Index(newpost.Account.Acct, "@")
newinstance := newpost.Account.Acct[at_sign+1:]
// Trust the post if it comes from the same source
if newinstance != endpoint {
ri_mutex.Lock()
o, exist := runninginstances[newinstance]
ri_mutex.Unlock()
if exist == false {
o := RunningInstance{}
new_client := http.Client{}
o.client = new_client
o.Status = KEEPALIVE
ri_mutex.Lock()
runninginstances[newinstance] = o
ri_mutex.Unlock()
}
realuser, err := fetch_user_info(o.client, newpost.Account.Url)
if err != nil {
continue
}
realpost, err := fetch_post(o.client, newpost.Uri)
if err != nil {
continue
}
// Minor verification for now...
newpost.Account.Display_name = realuser.Name
newpost.Content = realpost.Content
newpost.Created_at = realpost.Published
}
posthash := sha1.New()
if at_sign == -1 { if at_sign == -1 {
at_sign = len(newpost.Account.Acct) at_sign = len(newpost.Account.Acct)
@ -137,10 +169,9 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
reportPostChan <- newpost reportPostChan <- newpost
if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false { if settings.Crawl == true && StringExists(endpoint, settings.Banned) == false {
newinstance := newpost.Account.Acct[at_sign+1:]
ri_mutex.Lock() ri_mutex.Lock()
_, exists := runninginstances[newinstance] o, exists := runninginstances[newinstance]
if exists == false { if exists == false || o.Status == KEEPALIVE {
m := RunningInstance{} m := RunningInstance{}
runninginstances[newinstance] = m runninginstances[newinstance] = m
go StartInstance(newinstance, reportPostChan) go StartInstance(newinstance, reportPostChan)
@ -151,8 +182,9 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
} }
ri_mutex.Lock() ri_mutex.Lock()
m = runninginstances[endpoint]
m.LastRun = time.Now().Format(time.RFC3339)
m.Status = STREAM_ENDED m.Status = STREAM_ENDED
runninginstances[endpoint] = m runninginstances[endpoint] = m
m.LastRun = time.Now().Format(time.RFC3339)
ri_mutex.Unlock() ri_mutex.Unlock()
} }

View File

@ -9,6 +9,7 @@ import (
"html" "html"
"time" "time"
"strings" "strings"
"crypto/sha1"
"os" "os"
) )
@ -96,6 +97,8 @@ func webfinger(w http.ResponseWriter, r *http.Request) {
func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc { func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
fmt.Println("PATH --> ", r.URL.Path) fmt.Println("PATH --> ", r.URL.Path)
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
@ -112,26 +115,55 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc {
switch findtype.Type { switch findtype.Type {
case "Create": case "Create":
var createobject CreateObject var createobject CreateObject
var newpost ReportPost
err = json.Unmarshal(findtype.Object, &createobject) err = json.Unmarshal(findtype.Object, &createobject)
if err != nil { if err != nil {
fmt.Println("Ignore the post here") return
} }
fmt.Println("Content: ", createobject.Content)
var newpost ReportPost
newpost.Uri = findtype.Id newpost.Uri = findtype.Id
start_slashes := strings.Index(createobject.Actor, "//") + 2
end_slashes := strings.Index(createobject.Actor[start_slashes:], "/")
newinstance := createobject.Actor[start_slashes:start_slashes+end_slashes]
// For now we are just verifying the user
ri_mutex.Lock()
o, exist := runninginstances[newinstance]
if exist == false {
o := RunningInstance{}
new_client := http.Client{}
o.client = new_client
o.Status = KEEPALIVE
runninginstances[newinstance] = o
}
ri_mutex.Unlock()
// This only needs to be done if the user does not exist in the database
realuser, err := fetch_user_info(o.client, createobject.Actor)
if err != nil {
return
}
posthash := sha1.New()
fmt.Fprint(posthash, newpost.Uri)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.Content = createobject.Content newpost.Content = createobject.Content
newpost.Created_at = findtype.Published newpost.Created_at = findtype.Published
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
newpost.Account.Acct = "aaa@bbb" newpost.Account.Acct = realuser.PreferredUsername + "@" + newinstance
newpost.Account.Avatar = "ttt" newpost.Account.Avatar = "ttt"
newpost.Account.Bot = false newpost.Account.Bot = false
newpost.Account.Created_at = time.Now().Format(time.RFC3339) newpost.Account.Created_at = time.Now().Format(time.RFC3339)
newpost.Account.Display_name = "Fqrhqn" newpost.Account.Display_name = realuser.Name
newpost.Account.Url = "qwerty" newpost.Account.Url = createobject.Actor
reportPostChan <- newpost reportPostChan <- newpost
case "Like": case "Like":
@ -141,15 +173,6 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc {
default: default:
fmt.Println("Others --> " + findtype.Type) fmt.Println("Others --> " + findtype.Type)
} }
fmt.Println("Actor: ", findtype.Actor)
fmt.Println("Cc: ", findtype.Cc)
fmt.Println("Id: ", findtype.Id)
fmt.Println("Published: ", findtype.Published)
fmt.Println("To: ", findtype.To)
fmt.Println("Type: ", findtype.Type)
fmt.Println("Object: ", string(findtype.Object))
} }
} }