From afadc4dc398201deac22435ef3a280260d193778 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Tue, 27 Oct 2020 04:35:01 +0000 Subject: [PATCH] Added Peer hunting, but I might remove it or make it an option --- poll/engine.go | 106 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 34 deletions(-) diff --git a/poll/engine.go b/poll/engine.go index 1ab9802..73da0b2 100644 --- a/poll/engine.go +++ b/poll/engine.go @@ -17,6 +17,14 @@ import ( "os" ) +const ( + NEW_INSTANCE = 0 + CLIENT_ISSUE = 600 + ONION_PROTOCOL = 601 + UNMARSHAL_ERROR = 602 + NO_CONNECTION = 603 +) + type PollMessage struct { from string status int @@ -48,16 +56,18 @@ type AccountType struct { Url string `json:"url"` } - // Used to report a new instance to main type ReportInstance struct { from string endpoint string + status int } +// Instance's new min_id value type RunningInstance struct { endpoint string min_id string + status int } func handleClient(commandClient net.Conn) { @@ -84,26 +94,36 @@ func parseCommand(c net.Conn) { } */ -func StartInstancePoll(endpoint string, min_id string, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) { +func StartInstancePoll(endpoint string, min_id string, reportPostChan chan ReportPost, pollMessageChan chan PollMessage, reportInstanceChan chan ReportInstance) { p := bluemonday.NewPolicy() newposts := make([]ReportPost, 0) - api_timeline := "https://" + endpoint + "/api/v1/timelines/public?since_id=40&min_id=" + min_id + // Only placing this here to later have the option of using an HTTP client via a SOCKS5 Tor proxy + if strings.Contains(endpoint, ".onion") == true { + reportInstanceChan <- ReportInstance{endpoint, endpoint, ONION_PROTOCOL} + return + } + + api_timeline := "https://" + endpoint + "/api/v1/timelines/public?min_id=" + min_id resp, err := http.Get(api_timeline) if err != nil { - fmt.Println("Failure to retrieve HTTPS data...") - log.Fatal(err) - pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "" , 0} + reportInstanceChan <- ReportInstance{endpoint, endpoint, CLIENT_ISSUE} + return } body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal(body, &newposts) if err != nil { - pollMessageChan <- PollMessage{endpoint, resp.StatusCode, "", 0} - fmt.Println("Failure to unmarshal 1") - fmt.Println(string(body)) + // Perhaps get rid of this if-condition? + if resp.StatusCode >= 400 && resp.StatusCode < 500 { + reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode} + } else if resp.StatusCode >= 500 && resp.StatusCode < 600 { + reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode} + } else { + reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR} + } log.Fatal(err) - panic(err) + return } numposts := 0 @@ -141,23 +161,25 @@ func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) { api_peers := "https://" + endpoint + "/api/v1/instance/peers" resp, err := http.Get(api_peers) if err != nil { - fmt.Println("Peer instance failure") - os.Exit(1) + reportInstanceChan <- ReportInstance{endpoint, endpoint, NO_CONNECTION} + return +// os.Exit(1) } body, err := ioutil.ReadAll(resp.Body) err = json.Unmarshal([]byte(body), &newpeers) if err != nil { - fmt.Println("Unmarshal error") log.Fatal(err) - panic(err) + reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR} + return } for _, newpeer := range newpeers { var q ReportInstance q.from = endpoint q.endpoint = newpeer + q.status = NEW_INSTANCE reportInstanceChan <- q } } @@ -181,30 +203,37 @@ func DeferPollRun(pollmessage PollMessage, runninginstances *[]RunningInstance, delay = 30 } else { fmt.Println("error, status code is: ", pollmessage.status) - os.Exit(1) } time.Sleep(time.Second * time.Duration(delay)) - go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan) + go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan, reportInstanceChan) +} + +func GetNodeInfo(endpoint string) { } func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) { + + // Check node type + + + if endpoint == "" { + return + } for _, runninginstance := range *runninginstances { if runninginstance.endpoint == endpoint { return } } - newinstance := RunningInstance{endpoint, ""} + newinstance := RunningInstance{endpoint, "", NEW_INSTANCE} *runninginstances = append(*runninginstances, newinstance) - go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan) + go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan, reportInstanceChan) // fmt.Println("Temporarily disabled Peer Hunting") // go StartGetPeers(endpoint, reportInstanceChan) } func writePost(pool *pgxpool.Pool, reportpost ReportPost) { -// fmt.Println("Writing post", reportpost) - conn, err := pool.Acquire(context.Background()) if err != nil { fmt.Println("Error acquiring connection:", err) @@ -212,23 +241,27 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) { } defer conn.Release() - // Insert new post if new - _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, strippedcontent, posthash) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.StrippedContent, reportpost.Posthash) - if err != nil { - fmt.Println("Error on channel???") - fmt.Println(err) - os.Exit(1) - } - // Insert new account if new _, err = conn.Exec(context.Background(), "INSERT INTO accounts (acct, avatar, bot, created_at, display_name, url) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (acct) DO NOTHING", reportpost.Account.Acct, reportpost.Account.Avatar, reportpost.Account.Bot, reportpost.Account.Created_at, reportpost.Account.Display_name, reportpost.Account.Url) if err != nil { - fmt.Println("Error on channel???") - fmt.Println(err) - os.Exit(1) + return } -// fmt.Println(reportpost.Account) + // Insert new post if new + _, err = conn.Exec(context.Background(), "INSERT INTO posts (url, content, created_at, strippedcontent, posthash) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (posthash) DO NOTHING", reportpost.Url, reportpost.Content, reportpost.Created_at, reportpost.StrippedContent, reportpost.Posthash) + if err != nil { + return + } + +} + +func SuspendInstance(suspendinstance ReportInstance, runninginstances *[]RunningInstance) { + for _, runninginstance := range *runninginstances { + if runninginstance.endpoint == suspendinstance.endpoint { + runninginstance.status = suspendinstance.status + return + } + } } func main() { @@ -258,6 +291,7 @@ func main() { var q ReportInstance q.from = "" q.endpoint = "mastodon.social" + q.status = NEW_INSTANCE reportInstanceChan <- q for { @@ -287,8 +321,12 @@ func main() { go DeferPollRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) case v := <-reportPostChan: // New Post go writePost(pool, v) - case w := <-reportInstanceChan: // Start a new instance - NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) + case w := <-reportInstanceChan: // Start or suspend instance + if w.status == NEW_INSTANCE { + NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan) + } else { + SuspendInstance(w, &runninginstances) + } } } }