writes to database, delays based on message type

This commit is contained in:
farhan 2020-10-25 23:44:55 +00:00
parent 305432e58a
commit f460900f41

View File

@ -2,12 +2,12 @@ package main
import ( import (
"github.com/microcosm-cc/bluemonday" "github.com/microcosm-cc/bluemonday"
// "github.com/jackc/pgx" "github.com/jackc/pgx/pgxpool"
"encoding/json" "encoding/json"
"crypto/sha1" "crypto/sha1"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
// "context" "context"
"strings" "strings"
"time" "time"
"fmt" "fmt"
@ -19,8 +19,9 @@ import (
type PollMessage struct { type PollMessage struct {
from string from string
status string status int
min_id string min_id string
numposts int
} }
// Parsing Unmarshal JSON type // Parsing Unmarshal JSON type
@ -90,7 +91,7 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor
resp, err := http.Get(api_timeline) resp, err := http.Get(api_timeline)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
pollMessageChan <- PollMessage{endpoint, "get_failure", ""} pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "" , 0}
} }
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
@ -98,9 +99,10 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
panic(err) panic(err)
pollMessageChan <- PollMessage{endpoint, "unmarshal_crash", ""} pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "", 0}
} }
numposts := 0
for _, newpost := range newposts { for _, newpost := range newposts {
posthash := sha1.New() posthash := sha1.New()
@ -124,9 +126,10 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor
if newpost.Id > min_id { if newpost.Id > min_id {
min_id = newpost.Id min_id = newpost.Id
} }
numposts = numposts + 1
} }
pollMessageChan <- PollMessage{endpoint, "ok", min_id} pollMessageChan <- PollMessage{endpoint, resp.StatusCode, min_id, numposts}
} }
func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) { func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) {
@ -166,7 +169,18 @@ func DeferRun(pollmessage PollMessage, runninginstances *[]RunningInstance, repo
} }
} }
time.Sleep(time.Second * 2) delay := 2
if pollmessage.status == 200 && pollmessage.numposts <= 10 {
delay = 2
} else if pollmessage.status == 200 && pollmessage.numposts > 10 {
delay = 5
} else if pollmessage.status == 429 {
delay = 30
} else {
fmt.Println("error, status code is: ", pollmessage.status)
os.Exit(1)
}
time.Sleep(time.Second * time.Duration(delay))
go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan) go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan)
} }
@ -185,9 +199,23 @@ func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportIns
// go StartGetPeers(endpoint, reportInstanceChan) // go StartGetPeers(endpoint, reportInstanceChan)
} }
func writePost(reportpost ReportPost) { func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
fmt.Println("Writing post", reportpost) fmt.Println("Writing post", reportpost)
// sqlWritePost := `INSERT INTO post (url, content, strippedcontent, posthash) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (posthash) DO NOTHING` // sqlWritePost := `INSERT INTO post (url, content, strippedcontent, posthash) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (posthash) DO NOTHING`
conn, err := pool.Acquire(context.Background())
if err != nil {
fmt.Println("Error acquiring connection:", err)
os.Exit(1)
}
defer conn.Release()
_, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, strippedcontent, posthash) VALUES ($1, $2, $3, $4) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.StrippedContent, reportpost.Posthash)
if err != nil {
fmt.Println("Error on channel???")
fmt.Println(err)
os.Exit(1)
}
} }
func main() { func main() {
@ -201,11 +229,11 @@ func main() {
pollMessageChan := make (chan PollMessage, 100) pollMessageChan := make (chan PollMessage, 100)
// Setup Database // Setup Database
// dbconn, err := pgx.Connect(context.Background(), "postgres://postgres@127.0.0.1/tutorial") pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/tutorial")
// if err != nil { if err != nil {
// fmt.Println(err) fmt.Fprintln(os.Stderr, "Unable to connect to database:", err)
// os.Exit(1) os.Exit(1)
// } }
l, err := net.Listen("tcp", "127.0.0.1:5555") l, err := net.Listen("tcp", "127.0.0.1:5555")
if err != nil { if err != nil {
@ -245,7 +273,7 @@ func main() {
} }
go DeferRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) go DeferRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
case v := <-reportPostChan: // New Post case v := <-reportPostChan: // New Post
go writePost( v) go writePost(pool, v)
case w := <-reportInstanceChan: // Start a new instance case w := <-reportInstanceChan: // Start a new instance
NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
} }