From 8a12f277b185e97c7d1b4aae1dd473f02bda7dd5 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Tue, 10 Nov 2020 21:53:46 -0500 Subject: [PATCH] oh yeah, go can do that multiple file thing --- poll/cli.go | 80 +++++++++ poll/engine.go | 459 ----------------------------------------------- poll/headers.go | 74 ++++++++ poll/instance.go | 175 ++++++++++++++++++ poll/main.go | 136 ++++++++++++++ 5 files changed, 465 insertions(+), 459 deletions(-) create mode 100644 poll/cli.go delete mode 100644 poll/engine.go create mode 100644 poll/headers.go create mode 100644 poll/instance.go create mode 100644 poll/main.go diff --git a/poll/cli.go b/poll/cli.go new file mode 100644 index 0000000..bb361a2 --- /dev/null +++ b/poll/cli.go @@ -0,0 +1,80 @@ +package main + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "net" + "log" + "io" + "os" +) + +func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) { + + sizebyte := make([]byte, 4) + var commandmap CommandMap + var responseback ResponseBack + n, err := io.ReadFull(commandClient, sizebyte) + if n != 4 { + fmt.Println("Did not read 4 bytes, failure.") + os.Exit(1) + } + jsonsize := int(binary.LittleEndian.Uint32(sizebyte)) + jsonbyte := make([]byte, jsonsize) + n, err = io.ReadFull(commandClient, jsonbyte) + if n != jsonsize { + fmt.Println("Failued to read json size of ", n) + os.Exit(1) + } + + err = json.Unmarshal(jsonbyte, &commandmap) + if err != nil { + fmt.Println("Unable to unmarshal") + os.Exit(1) + } + + switch commandmap.Type { + case "status": + responseback.Message = "Ok" + case "add": + log.Print("Manually added instance: " + commandmap.Endpoint) + var q InstanceReport + q.endpoint = commandmap.Endpoint + q.status = NEW_INSTANCE + + instanceReportChan <- q + + responseback.Message = "Added " + commandmap.Endpoint + case "suspend": + fmt.Println("Suspend") + case "resume": + fmt.Println("Resume") + default: + fmt.Println("Something else") + } + + + responseback.Type = "status" + responseback.RunningInstances = *runninginstances + + responsebytes, err := json.Marshal(responseback) + if err != nil { + fmt.Println("Error: ", err) + os.Exit(1) + } + + n = len(responsebytes) + binary.LittleEndian.PutUint32(sizebyte, uint32(n)) + + commandClient.Write(sizebyte) + + responsebyte, err := json.Marshal(responseback) + if err != nil { + fmt.Println("Error response back: ", err) + os.Exit(1) + } + + commandClient.Write(responsebyte) + commandClient.Close() +} diff --git a/poll/engine.go b/poll/engine.go deleted file mode 100644 index f4846e6..0000000 --- a/poll/engine.go +++ /dev/null @@ -1,459 +0,0 @@ -package main - -import ( - "github.com/microcosm-cc/bluemonday" - "github.com/jackc/pgx/pgxpool" - _ "net/http/pprof" - "encoding/binary" - "encoding/json" - "crypto/sha1" - "io/ioutil" - "net/http" - "context" - "strings" - "html" - "time" - "fmt" - "net" - "log" - "io" - "os" -) - -const ( - NEW_INSTANCE = 0 - RUNNING = 200 - TOOMANYREQUESTS = 429 - CLIENT_ISSUE = 600 - ONION_PROTOCOL = 601 - BAD_RESPONSE = 602 - NO_CONNECTION = 603 - BAD_NODEINFO = 604 -) - -// Parsing Unmarshal JSON type -type ReportPost struct { - - // Retrieved values - Id string `json:"id"` - Url string `json:"url"` - Account AccountType - Content string `json:"content"` - Created_at string `json:"created_at"` - - // Derived values - normalized string - posthash []byte -} - -type AccountType struct { - Acct string `json:"acct"` - Avatar string `json:"avatar"` - Bot bool `json:"bot"` - Created_at string `json:"created_at"` - Display_name string `json:"display_name"` - Url string `json:"url"` -} - -// Used to report a new instance to main -type InstanceReport struct { - endpoint string - status int - - min_id string - numposts int -} - -// Instance's new min_id value -type RunningInstance struct { - Endpoint string `json:"endpoint"` - Software string `json:"software"` - Min_id string - Status int `json:"status"` - LastRun string `json:"lastrun"` -} - -type NodeInfoSoftware struct { - Name string `json:"name"` - Version string `json:"version"` -} - -type NodeInfo struct { - Software NodeInfoSoftware `json:"software"` -} - -type CommandMap struct { - Type string `json:"Type"` - Endpoint string `json:"Endpoint"` -} - -type ResponseBack struct { - Type string `json:"Type"` - Message string `json:"Message"` - RunningInstances []RunningInstance `json:"RunningInstances"` -} - -func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) { - - sizebyte := make([]byte, 4) - var commandmap CommandMap - var responseback ResponseBack - n, err := io.ReadFull(commandClient, sizebyte) - if n != 4 { - fmt.Println("Did not read 4 bytes, failure.") - os.Exit(1) - } - jsonsize := int(binary.LittleEndian.Uint32(sizebyte)) - jsonbyte := make([]byte, jsonsize) - n, err = io.ReadFull(commandClient, jsonbyte) - if n != jsonsize { - fmt.Println("Failued to read json size of ", n) - os.Exit(1) - } - - err = json.Unmarshal(jsonbyte, &commandmap) - if err != nil { - fmt.Println("Unable to unmarshal") - os.Exit(1) - } - - switch commandmap.Type { - case "status": - responseback.Message = "Ok" - case "add": - log.Print("Manually added instance: " + commandmap.Endpoint) - var q InstanceReport - q.endpoint = commandmap.Endpoint - q.status = NEW_INSTANCE - - instanceReportChan <- q - - responseback.Message = "Added " + commandmap.Endpoint - case "suspend": - fmt.Println("Suspend") - case "resume": - fmt.Println("Resume") - default: - fmt.Println("Something else") - } - - - responseback.Type = "status" - responseback.RunningInstances = *runninginstances - - responsebytes, err := json.Marshal(responseback) - if err != nil { - fmt.Println("Error: ", err) - os.Exit(1) - } - - n = len(responsebytes) - binary.LittleEndian.PutUint32(sizebyte, uint32(n)) - - commandClient.Write(sizebyte) - - responsebyte, err := json.Marshal(responseback) - if err != nil { - fmt.Println("Error response back: ", err) - os.Exit(1) - } - - commandClient.Write(responsebyte) - commandClient.Close() -} - -/* - * This code can be refactored per - * https://golang.org/pkg/encoding/binary/ - * But for now, this should be sufficient - */ -/* -func parseCommand(c net.Conn) { - rawCommand := make([]byte, 1) -} -*/ - -func AppendIfMissing(hay []string, needle string) []string { - for _, ele := range hay { - if ele == needle { - return hay - } - } - return append(hay, needle) -} - -func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) { - p := bluemonday.NewPolicy() - newposts := make([]ReportPost, 0) - - // Only placing this here to later have the option of using - // an HTTP client via a SOCKS5 Tor proxy - if strings.Contains(instancereport.endpoint, ".onion") == true { - instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0} - return - } - - api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?min_id=" + instancereport.min_id - resp, err := http.Get(api_timeline) - if err != nil { - instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0} - return - } - - body, err := ioutil.ReadAll(resp.Body) - err = json.Unmarshal(body, &newposts) - if err != nil { -// instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, "", 0} - instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0} - return - } - - newinstances := make([]string, 0) - min_id := "" - numposts := 0 - for _, newpost := range newposts { - - if newpost.Account.Acct == "" { - continue - } - - posthash := sha1.New() - - at_sign := strings.Index(newpost.Account.Acct, "@") - - if at_sign == -1 { - at_sign = len(newpost.Account.Acct) - newpost.Account.Acct += "@" + instancereport.endpoint - } - - // Calculate the post hash - fmt.Fprint(posthash, newpost.Url) - fmt.Fprint(posthash, newpost.normalized) - fmt.Fprint(posthash, newpost.Account.Acct) - fmt.Fprint(posthash, newpost.Account.Display_name) - newpost.posthash = posthash.Sum(nil) - - newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) - - reportPostChan <- newpost - - // Check min_id - if newpost.Id > min_id { - min_id = newpost.Id - } - numposts = numposts + 1 - - newinstance := newpost.Account.Acct[at_sign+1:] - newinstances = AppendIfMissing(newinstances, newinstance) - } - - for _, newinstance := range newinstances { - var q InstanceReport - q.endpoint = newinstance - q.status = NEW_INSTANCE - q.min_id = "" - q.numposts = 0 - instanceReportChan <- q - } - - instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts} -} - -func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { - - delay := 10 - if instancereport.status == RUNNING && instancereport.numposts <= 10 { - delay = 10 - } else if instancereport.status == RUNNING && instancereport.numposts > 10 { - delay = 15 - } else if instancereport.status == 429 { - delay = 30 - } else { - fmt.Println("error, status code is ------------->: ", instancereport.status) - os.Exit(1) - } - time.Sleep(time.Second * time.Duration(delay)) - - go StartInstancePoll(instancereport, reportPostChan, instanceReportChan) -} - -// Change this to return a proper "err" -func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) { - api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json" - http_client := http.Client{Timeout: 5 * time.Second} - resp, err := http_client.Get(api_nodeinfo) - if err != nil { - return - } - - body, err := ioutil.ReadAll(resp.Body) - err = json.Unmarshal(body, &nodeinfo) - if err != nil { -// fmt.Println("Unmarshal 2"); - return - } -} - -func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { - var nodeinfo NodeInfo - - if endpoint == "" { - return - } - - // No repeats - for _, runninginstance := range *runninginstances { - if runninginstance.Endpoint == endpoint { - return - } - } - - // Check node type - GetNodeInfo(endpoint, &nodeinfo) - if nodeinfo.Software.Name == "" { - go func() { - var q InstanceReport - q.endpoint = endpoint - q.status = BAD_NODEINFO - instanceReportChan <- q - return - }() - } - - newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"} - *runninginstances = append(*runninginstances, newinstance) - - if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { - var newinstancereport InstanceReport - newinstancereport.endpoint = endpoint - newinstancereport.status = 0 - newinstancereport.min_id = "" - newinstancereport.numposts = 0 - go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan) - } -} - -func writePost(pool *pgxpool.Pool, reportpost ReportPost) { - conn, err := pool.Acquire(context.Background()) - if err != nil { - log.Fatal("Error connecting to database:", err) - os.Exit(1) - } - defer conn.Release() - - // Insert new account if new - var accountid int - err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid) - if err != nil { - fmt.Println("First ", err) - fmt.Println("--------------------------") - fmt.Println("Reported error: ", err) - fmt.Println("Account Acct: ", reportpost.Account.Acct) - fmt.Println("Account Avatar: ", reportpost.Account.Avatar) - fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar)) - fmt.Println("Account Bot: ", reportpost.Account.Bot) - fmt.Println("Account Created_at: ", reportpost.Account.Created_at) - fmt.Println("Account Display: ", reportpost.Account.Display_name) - fmt.Println("Account URL: ", reportpost.Account.Url) - fmt.Println(reportpost) - fmt.Println("--------------------------") - os.Exit(1) // For now I want this to die and learn why it failed - return - } - - // Insert new post if new - _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash) - if err != nil { // For now I want to know why this failed. - fmt.Println("Second ", err) - fmt.Println("--------------------------") - fmt.Println("Reported error: ", err) - fmt.Println("Url: ", reportpost.Url) - fmt.Println("Content: ", reportpost.Content) - fmt.Println("Created_at: ", reportpost.Created_at) - fmt.Println("normalized: ", reportpost.normalized) - fmt.Println("account_id", accountid) - fmt.Println("posthash: ", reportpost.posthash) - fmt.Println("--------------------------") - os.Exit(1) // For now I want this to die and learn why it failed - return - } -} - -func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) { - for i, runninginstance := range *runninginstances { - if runninginstance.Endpoint == suspendinstance.endpoint { - (*runninginstances)[i].Status = suspendinstance.status - (*runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05") - return - } - } -} - - -func engine() { - - // Current instances - runninginstances := make([]RunningInstance, 0) - - // Initial Setup - reportPostChan := make(chan ReportPost, 2000) - instanceReportChan := make(chan InstanceReport, 100) - - // Setup Database - pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue") - if err != nil { - fmt.Fprintln(os.Stderr, "Unable to connect to database:", err) - os.Exit(1) - } - - l, err := net.Listen("tcp", "127.0.0.1:5555") - if err != nil { - fmt.Println(err) - return - } - defer l.Close() - - commandClient := make(chan net.Conn) - - go func(l net.Listener) { - for { - c, err := l.Accept() - if err != nil { - fmt.Println("Error on accept") - commandClient <- nil - return - } - commandClient <- c - } - }(l) - - for { - select { - case c := <-commandClient: // New client connection - go handleClient(c, &runninginstances, instanceReportChan) - case v := <-reportPostChan: // New Post - go writePost(pool, v) - case w := <-instanceReportChan: // Start or suspend instance - if w.status == NEW_INSTANCE { - NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan) - } else if w.status == RUNNING || w.status == TOOMANYREQUESTS { - for i, runninginstance := range runninginstances { - if runninginstance.Endpoint == w.endpoint { - runninginstances[i].Min_id = w.min_id - runninginstances[i].Status = w.status - runninginstances[i].LastRun = time.Now().Format("2006.01.02-15:04:05") - } - } - go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan) - } else { - SuspendInstance(w, &runninginstances) - } - } - } -} - -func main() { - go engine() - log.Println("serving on port 8080") - log.Fatal(http.ListenAndServe(":8080", nil)) -} diff --git a/poll/headers.go b/poll/headers.go new file mode 100644 index 0000000..177297d --- /dev/null +++ b/poll/headers.go @@ -0,0 +1,74 @@ +package main + +const ( + NEW_INSTANCE = 0 + RUNNING = 200 + TOOMANYREQUESTS = 429 + CLIENT_ISSUE = 600 + ONION_PROTOCOL = 601 + BAD_RESPONSE = 602 + NO_CONNECTION = 603 + BAD_NODEINFO = 604 +) + +// Parsing Unmarshal JSON type +type ReportPost struct { + + // Retrieved values + Id string `json:"id"` + Url string `json:"url"` + Account AccountType + Content string `json:"content"` + Created_at string `json:"created_at"` + + // Derived values + normalized string + posthash []byte +} + +type AccountType struct { + Acct string `json:"acct"` + Avatar string `json:"avatar"` + Bot bool `json:"bot"` + Created_at string `json:"created_at"` + Display_name string `json:"display_name"` + Url string `json:"url"` +} + +// Used to report a new instance to main +type InstanceReport struct { + endpoint string + status int + + min_id string + numposts int +} + +// Instance's new min_id value +type RunningInstance struct { + Endpoint string `json:"endpoint"` + Software string `json:"software"` + Min_id string + Status int `json:"status"` + LastRun string `json:"lastrun"` +} + +type NodeInfoSoftware struct { + Name string `json:"name"` + Version string `json:"version"` +} + +type NodeInfo struct { + Software NodeInfoSoftware `json:"software"` +} + +type CommandMap struct { + Type string `json:"Type"` + Endpoint string `json:"Endpoint"` +} + +type ResponseBack struct { + Type string `json:"Type"` + Message string `json:"Message"` + RunningInstances []RunningInstance `json:"RunningInstances"` +} diff --git a/poll/instance.go b/poll/instance.go new file mode 100644 index 0000000..f37d161 --- /dev/null +++ b/poll/instance.go @@ -0,0 +1,175 @@ +package main + +import ( + "github.com/microcosm-cc/bluemonday" + "encoding/json" + "crypto/sha1" + "io/ioutil" + "net/http" + "strings" + "html" + "time" + "fmt" + "os" +) + +func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { + + delay := 10 + if instancereport.status == RUNNING && instancereport.numposts <= 10 { + delay = 10 + } else if instancereport.status == RUNNING && instancereport.numposts > 10 { + delay = 15 + } else if instancereport.status == 429 { + delay = 30 + } else { + fmt.Println("error, status code is ------------->: ", instancereport.status) + os.Exit(1) + } + time.Sleep(time.Second * time.Duration(delay)) + + go StartInstancePoll(instancereport, reportPostChan, instanceReportChan) +} + +func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) { + p := bluemonday.NewPolicy() + newposts := make([]ReportPost, 0) + + // Only placing this here to later have the option of using + // an HTTP client via a SOCKS5 Tor proxy + if strings.Contains(instancereport.endpoint, ".onion") == true { + instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0} + return + } + + api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?limit=40&min_id=" + instancereport.min_id + resp, err := http.Get(api_timeline) + if err != nil { + instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0} + return + } + + body, err := ioutil.ReadAll(resp.Body) + err = json.Unmarshal(body, &newposts) + if err != nil { +// instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, "", 0} + instanceReportChan <- InstanceReport{instancereport.endpoint, BAD_RESPONSE, "", 0} + return + } + + newinstances := make([]string, 0) + min_id := "" + numposts := 0 + for _, newpost := range newposts { + + if newpost.Account.Acct == "" { + continue + } + + posthash := sha1.New() + + at_sign := strings.Index(newpost.Account.Acct, "@") + + if at_sign == -1 { + at_sign = len(newpost.Account.Acct) + newpost.Account.Acct += "@" + instancereport.endpoint + } + + // Calculate the post hash + fmt.Fprint(posthash, newpost.Url) + fmt.Fprint(posthash, newpost.normalized) + fmt.Fprint(posthash, newpost.Account.Acct) + fmt.Fprint(posthash, newpost.Account.Display_name) + newpost.posthash = posthash.Sum(nil) + + newpost.normalized = html.UnescapeString(strings.ToLower(p.Sanitize(newpost.Content))) + + reportPostChan <- newpost + + // Check min_id + if newpost.Id > min_id { + min_id = newpost.Id + } + numposts = numposts + 1 + + newinstance := newpost.Account.Acct[at_sign+1:] + newinstances = AppendIfMissing(newinstances, newinstance) + } + + for _, newinstance := range newinstances { + var q InstanceReport + q.endpoint = newinstance + q.status = NEW_INSTANCE + q.min_id = "" + q.numposts = 0 + instanceReportChan <- q + } + + instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts} +} + +// Change this to return a proper "err" +func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) { + api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json" + http_client := http.Client{Timeout: 5 * time.Second} + resp, err := http_client.Get(api_nodeinfo) + if err != nil { + return + } + + body, err := ioutil.ReadAll(resp.Body) + err = json.Unmarshal(body, &nodeinfo) + if err != nil { +// fmt.Println("Unmarshal 2"); + return + } +} + +func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) { + var nodeinfo NodeInfo + + if endpoint == "" { + return + } + + // No repeats + for _, runninginstance := range *runninginstances { + if runninginstance.Endpoint == endpoint { + return + } + } + + // Check node type + GetNodeInfo(endpoint, &nodeinfo) + if nodeinfo.Software.Name == "" { + go func() { + var q InstanceReport + q.endpoint = endpoint + q.status = BAD_NODEINFO + instanceReportChan <- q + return + }() + } + + newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"} + *runninginstances = append(*runninginstances, newinstance) + + if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" { + var newinstancereport InstanceReport + newinstancereport.endpoint = endpoint + newinstancereport.status = 0 + newinstancereport.min_id = "" + newinstancereport.numposts = 0 + go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan) + } +} + +func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) { + for i, runninginstance := range *runninginstances { + if runninginstance.Endpoint == suspendinstance.endpoint { + (*runninginstances)[i].Status = suspendinstance.status + (*runninginstances)[i].LastRun = time.Now().Format("2006.01.02-15:04:05") + return + } + } +} diff --git a/poll/main.go b/poll/main.go new file mode 100644 index 0000000..3a1cf74 --- /dev/null +++ b/poll/main.go @@ -0,0 +1,136 @@ +package main + +import ( + "github.com/jackc/pgx/pgxpool" + _ "net/http/pprof" + "net/http" + "context" + "time" + "fmt" + "net" + "log" + "os" +) + +func AppendIfMissing(hay []string, needle string) []string { + for _, ele := range hay { + if ele == needle { + return hay + } + } + return append(hay, needle) +} + +func writePost(pool *pgxpool.Pool, reportpost ReportPost) { + conn, err := pool.Acquire(context.Background()) + if err != nil { + log.Fatal("Error connecting to database:", err) + os.Exit(1) + } + defer conn.Release() + + // Insert new account if new + var accountid int + err = conn.QueryRow(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT(acct) DO UPDATE SET acct=EXCLUDED.acct RETURNING id", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url).Scan(&accountid) + if err != nil { + fmt.Println("First ", err) + fmt.Println("--------------------------") + fmt.Println("Reported error: ", err) + fmt.Println("Account Acct: ", reportpost.Account.Acct) + fmt.Println("Account Avatar: ", reportpost.Account.Avatar) + fmt.Println("Account Avatar len: ", len(reportpost.Account.Avatar)) + fmt.Println("Account Bot: ", reportpost.Account.Bot) + fmt.Println("Account Created_at: ", reportpost.Account.Created_at) + fmt.Println("Account Display: ", reportpost.Account.Display_name) + fmt.Println("Account URL: ", reportpost.Account.Url) + fmt.Println(reportpost) + fmt.Println("--------------------------") + os.Exit(1) // For now I want this to die and learn why it failed + return + } + + // Insert new post if new + _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, normalized, account_id, posthash) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.normalized, accountid, reportpost.posthash) + if err != nil { // For now I want to know why this failed. + fmt.Println("Second ", err) + fmt.Println("--------------------------") + fmt.Println("Reported error: ", err) + fmt.Println("Url: ", reportpost.Url) + fmt.Println("Content: ", reportpost.Content) + fmt.Println("Created_at: ", reportpost.Created_at) + fmt.Println("normalized: ", reportpost.normalized) + fmt.Println("account_id", accountid) + fmt.Println("posthash: ", reportpost.posthash) + fmt.Println("--------------------------") + os.Exit(1) // For now I want this to die and learn why it failed + return + } +} + +func engine() { + + // Current instances + runninginstances := make([]RunningInstance, 0) + + // Initial Setup + reportPostChan := make(chan ReportPost, 2000) + instanceReportChan := make(chan InstanceReport, 20) + + // Setup Database + pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue") + if err != nil { + fmt.Fprintln(os.Stderr, "Unable to connect to database:", err) + os.Exit(1) + } + + l, err := net.Listen("tcp", "127.0.0.1:5555") + if err != nil { + fmt.Println(err) + return + } + defer l.Close() + + commandClient := make(chan net.Conn) + + go func(l net.Listener) { + for { + c, err := l.Accept() + if err != nil { + fmt.Println("Error on accept") + commandClient <- nil + return + } + commandClient <- c + } + }(l) + + for { + select { + case c := <-commandClient: // New client connection + go handleClient(c, &runninginstances, instanceReportChan) + case v := <-reportPostChan: // New Post + go writePost(pool, v) + case w := <-instanceReportChan: // Start or suspend instance + if w.status == NEW_INSTANCE { + NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan) + } else if w.status == RUNNING || w.status == TOOMANYREQUESTS { + for i, runninginstance := range runninginstances { + if runninginstance.Endpoint == w.endpoint { + runninginstances[i].Min_id = w.min_id + runninginstances[i].Status = w.status + runninginstances[i].LastRun = time.Now().Format("2006.01.02-15:04:05") + } + } + go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan) + } else { + SuspendInstance(w, &runninginstances) + } + } + } +} + +func main() { + go engine() + log.Println("serving on port 8080") + log.Fatal(http.ListenAndServe(":8080", nil)) +}