From 2a4d1c8dad028da7a83f7c04b50410a6b31110ce Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Fri, 23 Oct 2020 02:10:35 +0000 Subject: [PATCH] good time to save --- poll/engine.go | 254 +++++++++++++++++++++++++++++++++++++++++++++++ poll/tables.psql | 28 ++++++ 2 files changed, 282 insertions(+) create mode 100644 poll/engine.go create mode 100644 poll/tables.psql diff --git a/poll/engine.go b/poll/engine.go new file mode 100644 index 0000000..77dd5bd --- /dev/null +++ b/poll/engine.go @@ -0,0 +1,254 @@ +package main + +import ( + "github.com/microcosm-cc/bluemonday" +// "github.com/jackc/pgx" + "encoding/json" + "crypto/sha1" + "io/ioutil" + "net/http" +// "context" + "strings" + "time" + "fmt" + "net" + "log" + "io" + "os" +) + +type PollMessage struct { + from string + status string + min_id string +} + +// Parsing Unmarshal JSON type +type ReportPost struct { + + // Retrieved values + Id string `json:"id"` + Url string `json:"url"` + Account AccountType + Content string `json:"content"` + + // Derived values + StrippedContent string + Posthash []byte +} + +type AccountType struct { + Acct string `json:"acct"` + Avatar string `json:"avatar"` + Bot bool `json:"bot"` + Created_at string `json:"created_at"` + Display_name string `json:"display_name"` + Url string `json:"url"` +} + + +// Used to report a new instance to main +type ReportInstance struct { + from string + endpoint string +} + +type RunningInstance struct { + endpoint string + min_id string +} + +func handleClient(commandClient net.Conn) { + rawCommand := make([]byte, 20) + + for { + _, err := io.ReadFull(commandClient, rawCommand) + if err != nil { + fmt.Println(err) + commandClient.Close() + } + fmt.Println(rawCommand) + } +} + +/* + * This code can be refactored per + * https://golang.org/pkg/encoding/binary/ + * But for now, this should be sufficient + */ +/* +func parseCommand(c net.Conn) { + rawCommand := make([]byte, 1) +} +*/ + +func StartInstancePoll(endpoint string, min_id string, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) { + p := bluemonday.NewPolicy() + newposts := make([]ReportPost, 0) + + api_timeline := "https://" + endpoint + "/api/v1/timelines/public?min_id=" + min_id + resp, err := http.Get(api_timeline) + if err != nil { + log.Fatal(err) + pollMessageChan <- PollMessage{endpoint, "get_failure", ""} + } + + body, err := ioutil.ReadAll(resp.Body) + err = json.Unmarshal(body, &newposts) + if err != nil { + log.Fatal(err) + panic(err) + pollMessageChan <- PollMessage{endpoint, "unmarshal_crash", ""} + } + + for _, newpost := range newposts { + posthash := sha1.New() + + if strings.Contains(newpost.Account.Acct, "@") == false { + newpost.Account.Acct += "@" + endpoint + } + // Calculate the post hash + fmt.Fprint(posthash, newpost.Url) + fmt.Fprint(posthash, newpost.Content) + fmt.Fprint(posthash, newpost.Account.Acct) + fmt.Fprint(posthash, newpost.Account.Created_at) + fmt.Fprint(posthash, newpost.Account.Display_name) + fmt.Fprint(posthash, newpost.Account.Url) + newpost.Posthash = posthash.Sum(nil) + + newpost.StrippedContent = p.Sanitize(newpost.Content) + + reportPostChan <- newpost + + // Check min_id + if newpost.Id > min_id { + min_id = newpost.Id + } + } + + pollMessageChan <- PollMessage{endpoint, "ok", min_id} +} + +func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) { + var newpeers []string + + api_peers := "https://" + endpoint + "/api/v1/instance/peers" + resp, err := http.Get(api_peers) + if err != nil { + fmt.Println("Peer instance failure") + os.Exit(1) + } + + body, err := ioutil.ReadAll(resp.Body) + + err = json.Unmarshal([]byte(body), &newpeers) + if err != nil { + fmt.Println("Unmarshal error") + log.Fatal(err) + panic(err) + } + + for _, newpeer := range newpeers { + var q ReportInstance + q.from = endpoint + q.endpoint = newpeer + reportInstanceChan <- q + } +} + +func DeferRun(pollmessage PollMessage, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) { + var min_id string + + for _, runninginstance := range *runninginstances { + if runninginstance.endpoint == pollmessage.from { + min_id = runninginstance.min_id + break + } + } + + time.Sleep(time.Second * 2) + + go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan) +} + +func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) { + for _, runninginstance := range *runninginstances { + if runninginstance.endpoint == endpoint { + return + } + } + newinstance := RunningInstance{endpoint, ""} + *runninginstances = append(*runninginstances, newinstance) + + go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan) + fmt.Println("Temporarily disabled Peer Hunting") +// go StartGetPeers(endpoint, reportInstanceChan) +} + +func writePost(reportpost ReportPost) { + fmt.Println("Writing post", reportpost) +// sqlWritePost := `INSERT INTO post (url, content, strippedcontent, posthash) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (posthash) DO NOTHING` +} + +func main() { + + // Current instances + runninginstances := make([]RunningInstance, 0) + + // Initial Setup + reportPostChan := make(chan ReportPost, 100) + reportInstanceChan := make(chan ReportInstance, 100) + pollMessageChan := make (chan PollMessage, 100) + + // Setup Database +// dbconn, err := pgx.Connect(context.Background(), "postgres://postgres@127.0.0.1/tutorial") +// if err != nil { +// fmt.Println(err) +// os.Exit(1) +// } + + l, err := net.Listen("tcp", "127.0.0.1:5555") + if err != nil { + fmt.Println(err) + return + } + defer l.Close() + + var q ReportInstance + q.from = "" + q.endpoint = "mastodon.social" + reportInstanceChan <- q + + for { + commandClient := make(chan net.Conn) + + go func(l net.Listener) { + for { + c, err := l.Accept() + if err != nil { + commandClient <- nil + return + } + commandClient <- c + } + }(l) + + for { + select { + case c := <-commandClient: // New client connection + go handleClient(c) + case p := <-pollMessageChan: // A poller ended + for i, runninginstance := range runninginstances { + if runninginstance.endpoint == p.from { + runninginstances[i].min_id = p.min_id + } + } + DeferRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) + case v := <-reportPostChan: // New Post + go writePost( v) + case w := <-reportInstanceChan: // Start a new instance + NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) + } + } + } +} diff --git a/poll/tables.psql b/poll/tables.psql new file mode 100644 index 0000000..edead61 --- /dev/null +++ b/poll/tables.psql @@ -0,0 +1,28 @@ +CREATE TABLE accounts ( + id serial NOT NULL PRIMARY KEY, + acct VARCHAR(100) NOT NULL, + avatar VARCHAR(2083) NOT NULL, + bot boolean, + created_at VARCHAR(100) NOT NULL, + display_name VARCHAR(100) NOT NULL, + url VARCHAR(2083) NOT NULL +); + +CREATE TABLE posts ( + id serial NOT NULL PRIMARY KEY, + url VARCHAR(2083) NOT NULL, + content text, + strippedcontent text, + posthash bytea UNIQUE +); + +CREATE TABLE instances ( + id serial NOT NULL PRIMARY KEY, + endpoint VARCHAR(2083) NOT NULL, + autostart boolean, + state varchar(16), + username varchar(32), + password varchar(32), + + software varchar(50) +);