From f460900f41fa2969500b907d376ebae80ac6152c Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Sun, 25 Oct 2020 23:44:55 +0000 Subject: [PATCH] writes to database, delays based on message type --- poll/engine.go | 56 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/poll/engine.go b/poll/engine.go index 79d0dd9..3f73c43 100644 --- a/poll/engine.go +++ b/poll/engine.go @@ -2,12 +2,12 @@ package main import ( "github.com/microcosm-cc/bluemonday" -// "github.com/jackc/pgx" + "github.com/jackc/pgx/pgxpool" "encoding/json" "crypto/sha1" "io/ioutil" "net/http" -// "context" + "context" "strings" "time" "fmt" @@ -19,8 +19,9 @@ import ( type PollMessage struct { from string - status string + status int min_id string + numposts int } // Parsing Unmarshal JSON type @@ -90,7 +91,7 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor resp, err := http.Get(api_timeline) if err != nil { log.Fatal(err) - pollMessageChan <- PollMessage{endpoint, "get_failure", ""} + pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "" , 0} } body, err := ioutil.ReadAll(resp.Body) @@ -98,9 +99,10 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor if err != nil { log.Fatal(err) panic(err) - pollMessageChan <- PollMessage{endpoint, "unmarshal_crash", ""} + pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "", 0} } + numposts := 0 for _, newpost := range newposts { posthash := sha1.New() @@ -124,9 +126,10 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor if newpost.Id > min_id { min_id = newpost.Id } + numposts = numposts + 1 } - pollMessageChan <- PollMessage{endpoint, "ok", min_id} + pollMessageChan <- PollMessage{endpoint, resp.StatusCode, min_id, numposts} } func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) { @@ -166,7 +169,18 @@ func DeferRun(pollmessage PollMessage, runninginstances *[]RunningInstance, repo } } - time.Sleep(time.Second * 2) + delay := 2 + if pollmessage.status == 200 && pollmessage.numposts <= 10 { + delay = 2 + } else if pollmessage.status == 200 && pollmessage.numposts > 10 { + delay = 5 + } else if pollmessage.status == 429 { + delay = 30 + } else { + fmt.Println("error, status code is: ", pollmessage.status) + os.Exit(1) + } + time.Sleep(time.Second * time.Duration(delay)) go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan) } @@ -185,9 +199,23 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportIns // go StartGetPeers(endpoint, reportInstanceChan) } -func writePost(reportpost ReportPost) { +func writePost(pool *pgxpool.Pool, reportpost ReportPost) { fmt.Println("Writing post", reportpost) // sqlWritePost := `INSERT INTO post (url, content, strippedcontent, posthash) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (posthash) DO NOTHING` + + conn, err := pool.Acquire(context.Background()) + if err != nil { + fmt.Println("Error acquiring connection:", err) + os.Exit(1) + } + defer conn.Release() + + _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, strippedcontent, posthash) VALUES ($1, $2, $3, $4) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.StrippedContent, reportpost.Posthash) + if err != nil { + fmt.Println("Error on channel???") + fmt.Println(err) + os.Exit(1) + } } func main() { @@ -201,11 +229,11 @@ func main() { pollMessageChan := make (chan PollMessage, 100) // Setup Database -// dbconn, err := pgx.Connect(context.Background(), "postgres://postgres@127.0.0.1/tutorial") -// if err != nil { -// fmt.Println(err) -// os.Exit(1) -// } + pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/tutorial") + if err != nil { + fmt.Fprintln(os.Stderr, "Unable to connect to database:", err) + os.Exit(1) + } l, err := net.Listen("tcp", "127.0.0.1:5555") if err != nil { @@ -245,7 +273,7 @@ func main() { } go DeferRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) case v := <-reportPostChan: // New Post - go writePost( v) + go writePost(pool, v) case w := <-reportInstanceChan: // Start a new instance NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) }