diff --git a/Makefile b/Makefile index 6ee58d0..9a41436 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go +FEDILOGUE_GOFILES = fedilogue.go ctl.go headers.go instance.go poll.go stream.go web.go db.go config.go oauth.go retrieve.go FEDICTL_GOFILES = fedictl.go headers.go build: diff --git a/db.go b/db.go index df27205..74aa1e6 100644 --- a/db.go +++ b/db.go @@ -5,8 +5,8 @@ import ( "fmt" "log" "github.com/jackc/pgx/pgxpool" - "github.com/davecgh/go-spew/spew" - "time" +// "github.com/davecgh/go-spew/spew" +// "time" ) func postHandler(reportPostChan chan ReportPost) { @@ -16,6 +16,7 @@ func postHandler(reportPostChan chan ReportPost) { } } +/* func check_user(uri string) (AccountType, error) { conn, _:= pool.Acquire(context.Background()) defer conn.Release() @@ -36,6 +37,7 @@ func check_user(uri string) (AccountType, error) { spew.Dump(accountData) return accountData, err } +*/ func writePost(reportpost ReportPost) { conn, err := pool.Acquire(context.Background()) diff --git a/fedilogue.go b/fedilogue.go index 80724b9..bb29446 100644 --- a/fedilogue.go +++ b/fedilogue.go @@ -15,6 +15,7 @@ import ( var runninginstances map[string]RunningInstance var ri_mutex = &sync.Mutex{} var pool *pgxpool.Pool +var requestconnchan chan ConnRequest func startpprof() { log.Print("Starting http/pprof on :7777") @@ -25,6 +26,7 @@ func main() { // Initial Setup reportPostChan := make(chan ReportPost) runninginstances = make(map[string]RunningInstance) + requestconnchan = make(chan ConnRequest) getSettings() go startpprof() @@ -32,7 +34,7 @@ func main() { pool = getDbPool() for i := 0; i < settings.Database.Workers; i++ { - go postHandler(reportPostChan) + go requestConn() } p = bluemonday.NewPolicy() diff --git a/poll.go b/poll.go index e0958d3..99b7bfe 100644 --- a/poll.go +++ b/poll.go @@ -1,10 +1,10 @@ package main import ( - "crypto/sha1" + //"crypto/sha1" "encoding/json" "fmt" - "html" + //"html" "io/ioutil" "log" "net/http" @@ -51,6 +51,7 @@ type PostInfo struct { Content string `"json:content"` } +/* func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) { var userinfo UserInfo accounttype, err := check_user(uri) @@ -91,7 +92,9 @@ func fetch_user_info(http_client http.Client, uri string) (UserInfo, error) { return userinfo, nil } +*/ +/* func fetch_post(http_client http.Client, uri string) (PostInfo, error) { var postinfo PostInfo @@ -115,6 +118,7 @@ func fetch_post(http_client http.Client, uri string) (PostInfo, error) { return postinfo, nil } +*/ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_client http.Client) { newposts := make([]ReportPost, 0) @@ -240,78 +244,10 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost, http_c ri_mutex.Unlock() for _, newpost := range newposts { - if newpost.Account.Acct == "" { - continue - } + fmt.Println("---------------> ", newpost.Uri) at_sign := strings.Index(newpost.Account.Acct, "@") newinstance := newpost.Account.Acct[at_sign+1:] - // Trust the post if it comes from the same source - if newinstance != endpoint { - ri_mutex.Lock() - o, exist := runninginstances[newinstance] - ri_mutex.Unlock() - if exist == false { - o := RunningInstance{} - new_client := http.Client{} - o.client = new_client - o.Status = KEEPALIVE - ri_mutex.Lock() - runninginstances[newinstance] = o - ri_mutex.Unlock() - } - - realuser, err := fetch_user_info(o.client, newpost.Account.Url) - if err != nil { - continue - } - realpost, err := fetch_post(o.client, newpost.Uri) - if err != nil { - continue - } - - // Minor verification for now... - newpost.Account.Display_name = realuser.Name - newpost.Content = realpost.Content - newpost.Created_at = realpost.Published - } - - posthash := sha1.New() - - if at_sign == -1 { - at_sign = len(newpost.Account.Acct) - newpost.Account.Acct += "@" + endpoint - } - - // Calculate the post hash - fmt.Fprint(posthash, newpost.Uri) - fmt.Fprint(posthash, newpost.normalized) - fmt.Fprint(posthash, newpost.Account.Acct) - fmt.Fprint(posthash, newpost.Account.Display_name) - newpost.posthash = posthash.Sum(nil) - - newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) - newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") - newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") - - // Validate time - t, err := time.Parse(time.RFC3339, newpost.Created_at) - if err != nil { - newpost.Created_at = time.Now().Format(time.RFC3339) - } - if t.Unix() < 0 { - newpost.Created_at = time.Now().Format(time.RFC3339) - } - - t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) - if err != nil { - newpost.Account.Created_at = time.Now().Format(time.RFC3339) - } - if t.Unix() < 0 { - newpost.Account.Created_at = time.Now().Format(time.RFC3339) - } - - reportPostChan <- newpost // Check min_id if newpost.Id > min_id { diff --git a/retrieve.go b/retrieve.go new file mode 100644 index 0000000..d3b0bd7 --- /dev/null +++ b/retrieve.go @@ -0,0 +1,229 @@ +package main + +import ( + "context" + "fmt" + "strings" + "log" + "encoding/json" + "github.com/jackc/pgx/pgxpool" + "time" + "net/http" +// "io/ioutil" +// "github.com/davecgh/go-spew/spew" +) + +type ImageType struct { +// Type string `json:"type"` + Url string `json:"url"` +} + +type PublicKeyType struct { + PublicKeyPem string `json:"publicKeyPem"` +} + +type UserJson struct { + ID string `json:"id"` + Type string `json:"type"` + Inbox string `json:"inbox"` + Outbox string `json:"outbox"` + Followers string `json:"followers"` + Following string `json:"following"` + Url string `json:"url"` + PreferredUsername string `json:"preferredUsername"` + Name string `json:"name"` + Summary string `json:"summary"` + Icon ImageType `json:"icon"` + Image ImageType `json:"image"` + PublicKey PublicKeyType `json:"publicKey"` + + instance string +} + +type PostJson struct { + ID string `json:"id"` + InReplyTo string `json:"inReplyTo"` + + normalized string + posthash []byte + receivedAt time.Time `json:"created_at"` + + + Content string `json:"content"` + Conversation string `json:"conversation"` + Published time.Time `json:"published"` + Source string `json:"source"` + Summary string `json:"summary"` +// Ignoring tag for now + To []string `json:"to"` + Type string `json:"type"` + + Actor string `json:"actor"` + AttributedTo string `json:"attributedTo"` + + instance string +} + +type ConnRequest struct { + conn chan *pgxpool.Conn + b chan bool +} + +func requestConn() { + conn, _:= pool.Acquire(context.Background()) + defer conn.Release() + for connRequest := range requestconnchan { + fmt.Println("Sending request") + connRequest.conn <-conn + _ = <-connRequest.b + } +} + +func check_post(uri string) (PostJson, error) { +// conn, _:= pool.Acquire(context.Background()) +// defer conn.Release() + connrequest := ConnRequest{} + connrequest.conn = make(chan *pgxpool.Conn) + connrequest.b = make(chan bool) + requestconnchan <- connrequest + + myconn := <-connrequest.conn + + var postjson PostJson + + selectRet := myconn.QueryRow(context.Background(), "SELECT id, inReplyTo, published, summary, content, normalized, attributedto, posthash, received_at FROM posts WHERE id = $1", uri) + err := selectRet.Scan(&postjson.ID, &postjson.InReplyTo, &postjson.Published, &postjson.Summary, &postjson.Content, &postjson.normalized, &postjson.AttributedTo, &postjson.posthash, &postjson.receivedAt) + close(connrequest.b) + + if err == nil { + fmt.Println("First return!") + return postjson, nil + } + log.Print(uri) + + endslash := strings.Index(uri[8:], "/") + postjson.instance = uri[8:endslash+8] + + + client := http.Client{} + req, _ := http.NewRequest("GET", uri, nil) + req.Header.Add("Accept", "application/ld+json") + + resp, err := client.Do(req) + + err = json.NewDecoder(resp.Body).Decode(&postjson) + if err != nil { + return postjson, err + } + + if postjson.InReplyTo != "" && postjson.InReplyTo != uri { + log.Print("GOING INTO NEW POST: ", postjson.InReplyTo) + go check_post(postjson.InReplyTo) + } + + check_user(postjson.AttributedTo) // This must be done BEFORE the `INSERT INTO posts` below + + connrequest = ConnRequest{} + connrequest.conn = make(chan *pgxpool.Conn) + connrequest.b = make(chan bool) + requestconnchan <- connrequest + + myconn = <-connrequest.conn + + _, err = myconn.Exec(context.Background(), "INSERT INTO posts (id, inReplyTo, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance) + close(connrequest.b) + if err != nil { + log.Print("INSERT posts ", err) + return postjson, err + } + + for _, to := range postjson.To { + if to != "https://www.w3.org/ns/activitystreams#Public" { + log.Print("Going into: " + to) + go check_user(to) + } + } + + fmt.Println("Second return") + + return postjson, nil +} + +func check_user(uri string) (UserJson, error) { +// conn, _:= pool.Acquire(context.Background()) +// defer conn.Release() + connrequest := ConnRequest{} + connrequest.conn = make(chan *pgxpool.Conn) + connrequest.b = make(chan bool) + requestconnchan <- connrequest + + myconn := <-connrequest.conn + + var userjson UserJson + + selectRet := myconn.QueryRow(context.Background(), "SELECT id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance FROM accounts WHERE id = $1", uri) + err := selectRet.Scan(&userjson.ID, &userjson.Type, &userjson.Inbox, &userjson.Outbox, &userjson.Followers, &userjson.Following, &userjson.Url, &userjson.PreferredUsername, &userjson.Name, &userjson.Summary, &userjson.Icon.Url, &userjson.Image.Url, &userjson.PublicKey.PublicKeyPem, &userjson.instance) + close(connrequest.b) + + if err == nil { + fmt.Println("First return!") + return userjson, nil + } + endslash := strings.Index(uri[8:], "/") + userjson.instance = uri[8:endslash+8] + + client := http.Client{} + req, _ := http.NewRequest("GET", uri, nil) + req.Header.Add("Accept", "application/ld+json") + + resp, err := client.Do(req) + if err != nil { + //log.Fatal(err) + return userjson, err + } + + err = json.NewDecoder(resp.Body).Decode(&userjson) + if err != nil { + //log.Fatal(err) + return userjson, err + } + + connrequest = ConnRequest{} + connrequest.conn = make(chan *pgxpool.Conn) + connrequest.b = make(chan bool) + requestconnchan <- connrequest + + myconn = <-connrequest.conn + _, err = myconn.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance) + close(connrequest.b) + if err != nil { + //log.Fatal("INSERT accounts ", err) + return userjson, err + } + + fmt.Println("Second return") + + return userjson, nil +} + +//var pool *pgxpool.Pool + +/* +func main() { + getDbPool() + +// userjson := check_user("https://islamicate.space/users/fikran") +// _ = check_user("https://social.farhan.codes/users/testacct555") +// postjson := spew.Dump(userjson) + + postjson, err := check_post("https://social.farhan.codes/objects/39cb2c26-153e-4e2e-bb3e-4d6971c04df1") + + if err != nil { + log.Fatal("The error is: ", err) + } + + fmt.Println(postjson) + +// check_post("https://honk.tedunangst.com/u/tedu/h/v1Mz2rgpw1b45g99vS") +} +*/ diff --git a/stream.go b/stream.go index b8d43ba..5f1bcfa 100644 --- a/stream.go +++ b/stream.go @@ -2,10 +2,10 @@ package main import ( "bufio" - "crypto/sha1" +// "crypto/sha1" "encoding/json" - "fmt" - "html" +// "fmt" +// "html" "log" "net/http" "strings" @@ -92,81 +92,12 @@ func StreamMastodon(endpoint string, reportPostChan chan ReportPost) { continue } - if newpost.Account.Acct == "" { - continue - } + log.Print("----------> " + newpost.Uri) + go check_post(newpost.Uri) at_sign := strings.Index(newpost.Account.Acct, "@") newinstance := newpost.Account.Acct[at_sign+1:] - // Trust the post if it comes from the same source - if newinstance != endpoint { - ri_mutex.Lock() - o, exist := runninginstances[newinstance] - ri_mutex.Unlock() - if exist == false { - o := RunningInstance{} - new_client := http.Client{} - o.client = new_client - o.Status = KEEPALIVE - ri_mutex.Lock() - runninginstances[newinstance] = o - ri_mutex.Unlock() - } - - realuser, err := fetch_user_info(o.client, newpost.Account.Url) - if err != nil { - continue - } - realpost, err := fetch_post(o.client, newpost.Uri) - if err != nil { - continue - } - - // Minor verification for now... - newpost.Account.Display_name = realuser.Name - newpost.Content = realpost.Content - newpost.Created_at = realpost.Published - } - - posthash := sha1.New() - - if at_sign == -1 { - at_sign = len(newpost.Account.Acct) - newpost.Account.Acct += "@" + endpoint - } - - // Calculate the post hash - fmt.Fprint(posthash, newpost.Uri) - fmt.Fprint(posthash, newpost.normalized) - fmt.Fprint(posthash, newpost.Account.Acct) - fmt.Fprint(posthash, newpost.Account.Display_name) - newpost.posthash = posthash.Sum(nil) - - newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) - newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") - newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") - - // Validate time - t, err := time.Parse(time.RFC3339, newpost.Created_at) - if err != nil { - newpost.Created_at = time.Now().Format(time.RFC3339) - } - if t.Unix() < 0 { - newpost.Created_at = time.Now().Format(time.RFC3339) - } - - t, err = time.Parse(time.RFC3339, newpost.Account.Created_at) - if err != nil { - newpost.Account.Created_at = time.Now().Format(time.RFC3339) - } - if t.Unix() < 0 { - newpost.Account.Created_at = time.Now().Format(time.RFC3339) - } - - // Reporting post - reportPostChan <- newpost - if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false { ri_mutex.Lock() o, exists := runninginstances[newinstance] diff --git a/web.go b/web.go index cc56389..5b19070 100644 --- a/web.go +++ b/web.go @@ -1,16 +1,16 @@ package main import ( - "crypto/sha1" +// "crypto/sha1" "encoding/json" "fmt" - "html" +// "html" "io/ioutil" "log" "net/http" "os" - "strings" - "time" +// "strings" +// "time" ) // CreateObject - Used by post web receiver @@ -122,50 +122,7 @@ func inboxHandler(reportPostChan chan ReportPost) http.HandlerFunc { } newpost.Uri = findtype.ID - - startSlashes := strings.Index(createobject.Actor, "//") + 2 - endSlashes := strings.Index(createobject.Actor[startSlashes:], "/") - newinstance := createobject.Actor[startSlashes : startSlashes+endSlashes] - - // For now we are just verifying the user - ri_mutex.Lock() - o, exist := runninginstances[newinstance] - if exist == false { - o := RunningInstance{} - newClient := http.Client{} - o.client = newClient - o.Status = KEEPALIVE - runninginstances[newinstance] = o - } - ri_mutex.Unlock() - - // This only needs to be done if the user does not exist in the database - log.Print("Actor object: ", createobject.Actor) - realuser, err := fetch_user_info(o.client, createobject.Actor) - _, _ = check_user(createobject.Actor) - if err != nil { - return - } - - posthash := sha1.New() - fmt.Fprint(posthash, newpost.Uri) - fmt.Fprint(posthash, newpost.normalized) - fmt.Fprint(posthash, newpost.Account.Acct) - fmt.Fprint(posthash, newpost.Account.Display_name) - newpost.posthash = posthash.Sum(nil) - - newpost.Content = createobject.Content - newpost.Created_at = findtype.Published - newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) - newpost.normalized = strings.ReplaceAll(newpost.normalized, "\t", " ") - newpost.normalized = spaceReg.ReplaceAllString(newpost.normalized, " ") - newpost.Account.Acct = realuser.PreferredUsername + "@" + newinstance - newpost.Account.Avatar = realuser.Icon.Url - newpost.Account.Bot = false - newpost.Account.Created_at = time.Now().Format(time.RFC3339) - newpost.Account.Display_name = realuser.Name - newpost.Account.Url = createobject.Actor - reportPostChan <- newpost + log.Print(newpost.Uri) case "Like": case "Announcement":