verifying is being done

middle layer commit
database middle layer might be redundant
screwed up tables.sql
isn't re-useing connections
This commit is contained in:
farhan 2020-12-22 17:35:05 +00:00
parent ad9fe9fad2
commit e9178b1d09
7 changed files with 254 additions and 197 deletions

View File

@ -1,4 +1,4 @@
FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go
FEDICTL_GOFILES = fedictl.go headers.go FEDICTL_GOFILES = fedictl.go headers.go
build: build:

6
db.go
View File

@ -5,8 +5,8 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/jackc/pgx/pgxpool" "github.com/jackc/pgx/pgxpool"
"github.com/davecgh/go-spew/spew" // "github.com/davecgh/go-spew/spew"
"time" // "time"
) )
func postHandler(reportPostChan chan ReportPost) { func postHandler(reportPostChan chan ReportPost) {
@ -16,6 +16,7 @@ func postHandler(reportPostChan chan ReportPost) {
} }
} }
/*
func check_user(uri string) (AccountType, error) { func check_user(uri string) (AccountType, error) {
conn, _:= pool.Acquire(context.Background()) conn, _:= pool.Acquire(context.Background())
defer conn.Release() defer conn.Release()
@ -36,6 +37,7 @@ func check_user(uri string) (AccountType, error) {
spew.Dump(accountData) spew.Dump(accountData)
return accountData, err return accountData, err
} }
*/
func writePost(reportpost ReportPost) { func writePost(reportpost ReportPost) {
conn, err := pool.Acquire(context.Background()) conn, err := pool.Acquire(context.Background())

View File

@ -15,6 +15,7 @@ import (
var runninginstances map[string]RunningInstance var runninginstances map[string]RunningInstance
var ri_mutex = &sync.Mutex{} var ri_mutex = &sync.Mutex{}
var pool *pgxpool.Pool var pool *pgxpool.Pool
var requestconnchan chan ConnRequest
func startpprof() { func startpprof() {
log.Print("Starting http/pprof on :7777") log.Print("Starting http/pprof on :7777")
@ -25,6 +26,7 @@ func main() {
// Initial Setup // Initial Setup
reportPostChan := make(chan ReportPost) reportPostChan := make(chan ReportPost)
runninginstances = make(map[string]RunningInstance) runninginstances = make(map[string]RunningInstance)
requestconnchan = make(chan ConnRequest)
getSettings() getSettings()
go startpprof() go startpprof()
@ -32,7 +34,7 @@ func main() {
pool = getDbPool() pool = getDbPool()
for i := 0; i < settings.Database.Workers; i++ { for i := 0; i < settings.Database.Workers; i++ {
go postHandler(reportPostChan) go requestConn()
} }
p = bluemonday.NewPolicy() p = bluemonday.NewPolicy()

78
poll.go
View File

@ -1,10 +1,10 @@
package main package main
import ( import (
"crypto/sha1" //"crypto/sha1"
"encoding/json" "encoding/json"
"fmt" "fmt"
"html" //"html"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
@ -51,6 +51,7 @@ type PostInfo struct {
Content string `"json:content"` Content string `"json:content"`
} }
/*
func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) { func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) {
var userinfo UserInfo var userinfo UserInfo
accounttype, err := check_user(uri) accounttype, err := check_user(uri)
@ -91,7 +92,9 @@ func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) {
return userinfo, nil return userinfo, nil
} }
*/
/*
func fetch_post(http_client http.Client, uri string) (PostInfo, error) { func fetch_post(http_client http.Client, uri string) (PostInfo, error) {
var postinfo PostInfo var postinfo PostInfo
@ -115,6 +118,7 @@ func fetch_post(http_client http.Client, uri string) (PostInfo, error) {
return postinfo, nil return postinfo, nil
} }
*/
func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) { func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) {
newposts := make([]ReportPost, 0) newposts := make([]ReportPost, 0)
@ -240,78 +244,10 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c
ri_mutex.Unlock() ri_mutex.Unlock()
for _, newpost := range newposts { for _, newpost := range newposts {
if newpost.Account.Acct == "" { fmt.Println("---------------> ", newpost.Uri)
continue
}
at_sign := strings.Index(newpost.Account.Acct, "@") at_sign := strings.Index(newpost.Account.Acct, "@")
newinstance := newpost.Account.Acct[at_sign+1:] newinstance := newpost.Account.Acct[at_sign+1:]
// Trust the post if it comes from the same source
if newinstance != endpoint {
ri_mutex.Lock()
o, exist := runninginstances[newinstance]
ri_mutex.Unlock()
if exist == false {
o := RunningInstance{}
new_client := http.Client{}
o.client = new_client
o.Status = KEEPALIVE
ri_mutex.Lock()
runninginstances[newinstance] = o
ri_mutex.Unlock()
}
realuser, err := fetch_user_info(o.client, newpost.Account.Url)
if err != nil {
continue
}
realpost, err := fetch_post(o.client, newpost.Uri)
if err != nil {
continue
}
// Minor verification for now...
newpost.Account.Display_name = realuser.Name
newpost.Content = realpost.Content
newpost.Created_at = realpost.Published
}
posthash := sha1.New()
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Uri)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
// Validate time
t, err := time.Parse(time.RFC3339, newpost.Created_at)
if err != nil {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
if err != nil {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
reportPostChan <- newpost
// Check min_id // Check min_id
if newpost.Id > min_id { if newpost.Id > min_id {

229
retrieve.go Normal file
View File

@ -0,0 +1,229 @@
package main
import (
"context"
"fmt"
"strings"
"log"
"encoding/json"
"github.com/jackc/pgx/pgxpool"
"time"
"net/http"
// "io/ioutil"
// "github.com/davecgh/go-spew/spew"
)
type ImageType struct {
// Type string `json:"type"`
Url string `json:"url"`
}
type PublicKeyType struct {
PublicKeyPem string `json:"publicKeyPem"`
}
type UserJson struct {
ID string `json:"id"`
Type string `json:"type"`
Inbox string `json:"inbox"`
Outbox string `json:"outbox"`
Followers string `json:"followers"`
Following string `json:"following"`
Url string `json:"url"`
PreferredUsername string `json:"preferredUsername"`
Name string `json:"name"`
Summary string `json:"summary"`
Icon ImageType `json:"icon"`
Image ImageType `json:"image"`
PublicKey PublicKeyType `json:"publicKey"`
instance string
}
type PostJson struct {
ID string `json:"id"`
InReplyTo string `json:"inReplyTo"`
normalized string
posthash []byte
receivedAt time.Time `json:"created_at"`
Content string `json:"content"`
Conversation string `json:"conversation"`
Published time.Time `json:"published"`
Source string `json:"source"`
Summary string `json:"summary"`
// Ignoring tag for now
To []string `json:"to"`
Type string `json:"type"`
Actor string `json:"actor"`
AttributedTo string `json:"attributedTo"`
instance string
}
type ConnRequest struct {
conn chan *pgxpool.Conn
b chan bool
}
func requestConn() {
conn, _:= pool.Acquire(context.Background())
defer conn.Release()
for connRequest := range requestconnchan {
fmt.Println("Sending request")
connRequest.conn <-conn
_ = <-connRequest.b
}
}
func check_post(uri string) (PostJson, error) {
// conn, _:= pool.Acquire(context.Background())
// defer conn.Release()
connrequest := ConnRequest{}
connrequest.conn = make(chan *pgxpool.Conn)
connrequest.b = make(chan bool)
requestconnchan <- connrequest
myconn := <-connrequest.conn
var postjson PostJson
selectRet := myconn.QueryRow(context.Background(), "SELECT id, inReplyTo, published, summary, content, normalized, attributedto, posthash, received_at FROM posts WHERE id = $1", uri)
err := selectRet.Scan(&postjson.ID, &postjson.InReplyTo, &postjson.Published, &postjson.Summary, &postjson.Content, &postjson.normalized, &postjson.AttributedTo, &postjson.posthash, &postjson.receivedAt)
close(connrequest.b)
if err == nil {
fmt.Println("First return!")
return postjson, nil
}
log.Print(uri)
endslash := strings.Index(uri[8:], "/")
postjson.instance = uri[8:endslash+8]
client := http.Client{}
req, _ := http.NewRequest("GET", uri, nil)
req.Header.Add("Accept", "application/ld+json")
resp, err := client.Do(req)
err = json.NewDecoder(resp.Body).Decode(&postjson)
if err != nil {
return postjson, err
}
if postjson.InReplyTo != "" && postjson.InReplyTo != uri {
log.Print("GOING INTO NEW POST: ", postjson.InReplyTo)
go check_post(postjson.InReplyTo)
}
check_user(postjson.AttributedTo) // This must be done BEFORE the `INSERT INTO posts` below
connrequest = ConnRequest{}
connrequest.conn = make(chan *pgxpool.Conn)
connrequest.b = make(chan bool)
requestconnchan <- connrequest
myconn = <-connrequest.conn
_, err = myconn.Exec(context.Background(), "INSERT INTO posts (id, inReplyTo, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance)
close(connrequest.b)
if err != nil {
log.Print("INSERT posts ", err)
return postjson, err
}
for _, to := range postjson.To {
if to != "https://www.w3.org/ns/activitystreams#Public" {
log.Print("Going into: " + to)
go check_user(to)
}
}
fmt.Println("Second return")
return postjson, nil
}
func check_user(uri string) (UserJson, error) {
// conn, _:= pool.Acquire(context.Background())
// defer conn.Release()
connrequest := ConnRequest{}
connrequest.conn = make(chan *pgxpool.Conn)
connrequest.b = make(chan bool)
requestconnchan <- connrequest
myconn := <-connrequest.conn
var userjson UserJson
selectRet := myconn.QueryRow(context.Background(), "SELECT id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance FROM accounts WHERE id = $1", uri)
err := selectRet.Scan(&userjson.ID, &userjson.Type, &userjson.Inbox, &userjson.Outbox, &userjson.Followers, &userjson.Following, &userjson.Url, &userjson.PreferredUsername, &userjson.Name, &userjson.Summary, &userjson.Icon.Url, &userjson.Image.Url, &userjson.PublicKey.PublicKeyPem, &userjson.instance)
close(connrequest.b)
if err == nil {
fmt.Println("First return!")
return userjson, nil
}
endslash := strings.Index(uri[8:], "/")
userjson.instance = uri[8:endslash+8]
client := http.Client{}
req, _ := http.NewRequest("GET", uri, nil)
req.Header.Add("Accept", "application/ld+json")
resp, err := client.Do(req)
if err != nil {
//log.Fatal(err)
return userjson, err
}
err = json.NewDecoder(resp.Body).Decode(&userjson)
if err != nil {
//log.Fatal(err)
return userjson, err
}
connrequest = ConnRequest{}
connrequest.conn = make(chan *pgxpool.Conn)
connrequest.b = make(chan bool)
requestconnchan <- connrequest
myconn = <-connrequest.conn
_, err = myconn.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance)
close(connrequest.b)
if err != nil {
//log.Fatal("INSERT accounts ", err)
return userjson, err
}
fmt.Println("Second return")
return userjson, nil
}
//var pool *pgxpool.Pool
/*
func main() {
getDbPool()
// userjson := check_user("https://islamicate.space/users/fikran")
// _ = check_user("https://social.farhan.codes/users/testacct555")
// postjson := spew.Dump(userjson)
postjson, err := check_post("https://social.farhan.codes/objects/39cb2c26-153e-4e2e-bb3e-4d6971c04df1")
if err != nil {
log.Fatal("The error is: ", err)
}
fmt.Println(postjson)
// check_post("https://honk.tedunangst.com/u/tedu/h/v1Mz2rgpw1b45g99vS")
}
*/

View File

@ -2,10 +2,10 @@ package main
import ( import (
"bufio" "bufio"
"crypto/sha1" // "crypto/sha1"
"encoding/json" "encoding/json"
"fmt" // "fmt"
"html" // "html"
"log" "log"
"net/http" "net/http"
"strings" "strings"
@ -92,81 +92,12 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost) {
continue continue
} }
if newpost.Account.Acct == "" { log.Print("----------> " + newpost.Uri)
continue go check_post(newpost.Uri)
}
at_sign := strings.Index(newpost.Account.Acct, "@") at_sign := strings.Index(newpost.Account.Acct, "@")
newinstance := newpost.Account.Acct[at_sign+1:] newinstance := newpost.Account.Acct[at_sign+1:]
// Trust the post if it comes from the same source
if newinstance != endpoint {
ri_mutex.Lock()
o, exist := runninginstances[newinstance]
ri_mutex.Unlock()
if exist == false {
o := RunningInstance{}
new_client := http.Client{}
o.client = new_client
o.Status = KEEPALIVE
ri_mutex.Lock()
runninginstances[newinstance] = o
ri_mutex.Unlock()
}
realuser, err := fetch_user_info(o.client, newpost.Account.Url)
if err != nil {
continue
}
realpost, err := fetch_post(o.client, newpost.Uri)
if err != nil {
continue
}
// Minor verification for now...
newpost.Account.Display_name = realuser.Name
newpost.Content = realpost.Content
newpost.Created_at = realpost.Published
}
posthash := sha1.New()
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
}
// Calculate the post hash
fmt.Fprint(posthash, newpost.Uri)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
// Validate time
t, err := time.Parse(time.RFC3339, newpost.Created_at)
if err != nil {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Created_at = time.Now().Format(time.RFC3339)
}
t, err = time.Parse(time.RFC3339, newpost.Account.Created_at)
if err != nil {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
if t.Unix() < 0 {
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
}
// Reporting post
reportPostChan <- newpost
if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false { if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false {
ri_mutex.Lock() ri_mutex.Lock()
o, exists := runninginstances[newinstance] o, exists := runninginstances[newinstance]

53
web.go
View File

@ -1,16 +1,16 @@
package main package main
import ( import (
"crypto/sha1" // "crypto/sha1"
"encoding/json" "encoding/json"
"fmt" "fmt"
"html" // "html"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
"strings" // "strings"
"time" // "time"
) )
// CreateObject - Used by post web receiver // CreateObject - Used by post web receiver
@ -122,50 +122,7 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc {
} }
newpost.Uri = findtype.ID newpost.Uri = findtype.ID
log.Print(newpost.Uri)
startSlashes := strings.Index(createobject.Actor, "//") + 2
endSlashes := strings.Index(createobject.Actor[startSlashes:], "/")
newinstance := createobject.Actor[startSlashes : startSlashes+endSlashes]
// For now we are just verifying the user
ri_mutex.Lock()
o, exist := runninginstances[newinstance]
if exist == false {
o := RunningInstance{}
newClient := http.Client{}
o.client = newClient
o.Status = KEEPALIVE
runninginstances[newinstance] = o
}
ri_mutex.Unlock()
// This only needs to be done if the user does not exist in the database
log.Print("Actor object: ", createobject.Actor)
realuser, err := fetch_user_info(o.client, createobject.Actor)
_, _ = check_user(createobject.Actor)
if err != nil {
return
}
posthash := sha1.New()
fmt.Fprint(posthash, newpost.Uri)
fmt.Fprint(posthash, newpost.normalized)
fmt.Fprint(posthash, newpost.Account.Acct)
fmt.Fprint(posthash, newpost.Account.Display_name)
newpost.posthash = posthash.Sum(nil)
newpost.Content = createobject.Content
newpost.Created_at = findtype.Published
newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content)))
newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ")
newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ")
newpost.Account.Acct = realuser.PreferredUsername + "@" + newinstance
newpost.Account.Avatar = realuser.Icon.Url
newpost.Account.Bot = false
newpost.Account.Created_at = time.Now().Format(time.RFC3339)
newpost.Account.Display_name = realuser.Name
newpost.Account.Url = createobject.Actor
reportPostChan <- newpost
case "Like": case "Like":
case "Announcement": case "Announcement":