timing + client updates

This commit is contained in:
farhan 2020-11-17 19:57:39 -05:00
parent ab8e0182bb
commit a54c241970
3 changed files with 36 additions and 88 deletions

View File

@ -120,17 +120,30 @@ func main() {
case "add": case "add":
fmt.Println("Add instance") fmt.Println("Add instance")
case "status": case "status":
fmt.Println("Status\t\tLastRun\t\t\tEndpoint")
for endpoint, runninginstance := range responseback.RunningInstances { for endpoint, runninginstance := range responseback.RunningInstances {
if runninginstance.Status == 0 { if runninginstance.Status == 0 {
fmt.Fprintf(os.Stdout, "New\t") fmt.Fprintf(os.Stdout, "New\t")
fmt.Fprintf(os.Stdout, "\tNever\t\t\t")
} else if runninginstance.Status == 200 { } else if runninginstance.Status == 200 {
fmt.Fprintf(os.Stdout, "Running\t") fmt.Fprintf(os.Stdout, "Running\t")
fmt.Fprintf(os.Stdout, "\t%s\t", runninginstance.LastRun)
} else if runninginstance.Status == 429 {
fmt.Fprintf(os.Stdout, "TooManyRequests\t")
fmt.Fprintf(os.Stdout, "%s\t", runninginstance.LastRun)
} else if runninginstance.Status == 600 {
fmt.Fprintf(os.Stdout, "ClientIssue")
fmt.Fprintf(os.Stdout, "\t%s\t", runninginstance.LastRun)
} else if runninginstance.Status == 602 { } else if runninginstance.Status == 602 {
fmt.Fprintf(os.Stdout, "BadInstance") fmt.Fprintf(os.Stdout, "BadInstance")
fmt.Fprintf(os.Stdout, "\t%s\t", runninginstance.LastRun)
} else if runninginstance.Status == 605 {
fmt.Fprintf(os.Stdout, "UnsupportedNode")
fmt.Fprintf(os.Stdout, "\t%s\t", runninginstance.LastRun)
} else { } else {
fmt.Fprintf(os.Stdout, "%d\t", runninginstance.Status) fmt.Fprintf(os.Stdout, "%d\t", runninginstance.Status)
fmt.Fprintf(os.Stdout, "\t%s\t", runninginstance.LastRun)
} }
fmt.Fprintf(os.Stdout, "\t%s\t", runninginstance.LastRun)
if runninginstance.LastRun == "Queued" { if runninginstance.LastRun == "Queued" {
fmt.Fprintf(os.Stdout, "\t\t") fmt.Fprintf(os.Stdout, "\t\t")
} }

View File

@ -8,6 +8,7 @@ const (
ONION_PROTOCOL = 601 ONION_PROTOCOL = 601
BAD_RESPONSE = 602 BAD_RESPONSE = 602
BAD_NODEINFO = 604 BAD_NODEINFO = 604
UNSUPPORTED_INSTANCE = 605
) )
// Parsing Unmarshal JSON type // Parsing Unmarshal JSON type

View File

@ -12,57 +12,42 @@ import (
"fmt" "fmt"
) )
/*
func DeferPollRun(instancereport InstanceReport, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
delay := 10
if instancereport.status == RUNNING && instancereport.numposts <= 10 {
delay = 10
} else if instancereport.status == RUNNING && instancereport.numposts > 10 {
delay = 15
} else if instancereport.status == 429 {
delay = 30
} else {
fmt.Println("error, status code is ------------->: ", instancereport.status)
os.Exit(1)
}
time.Sleep(time.Second * time.Duration(delay))
StartInstancePoll(instancereport, reportPostChan, instanceReportChan)
}
*/
func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) { func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
fmt.Println("Arrived at PollMastodonPleroma for " + endpoint)
// Make this a global variable // Make this a global variable
p := bluemonday.NewPolicy() p := bluemonday.NewPolicy()
newposts := make([]ReportPost, 0) newposts := make([]ReportPost, 0)
min_id := "" min_id := ""
http_client := http.Client{Timeout: 5 * time.Second} http_client := http.Client{Timeout: 5 * time.Second}
for { for {
m := runninginstances[endpoint]
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&min_id=" + min_id api_timeline := "https://" + endpoint + "/api/v1/timelines/public?limit=40&min_id=" + min_id
fmt.Println(api_timeline)
numposts := 0
// newinstances := make([]string, 0)
resp, err := http_client.Get(api_timeline) resp, err := http_client.Get(api_timeline)
m.LastRun = time.Now().Format("2006.01.02-15:04:05")
if err != nil { if err != nil {
ri_mutex.Lock() ri_mutex.Lock()
var m = runninginstances[endpoint]
m.Status = CLIENT_ISSUE m.Status = CLIENT_ISSUE
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
return return
} }
if resp.StatusCode == 429 {
// Apparently you just need to do this to throw away the body
_, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = TOOMANYREQUESTS
runninginstances[endpoint] = m
ri_mutex.Unlock()
time.Sleep(time.Second * 30)
continue
}
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
ri_mutex.Lock() ri_mutex.Lock()
var m = runninginstances[endpoint]
m.Status = BAD_RESPONSE m.Status = BAD_RESPONSE
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
@ -71,15 +56,18 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
err = json.Unmarshal(body, &newposts) err = json.Unmarshal(body, &newposts)
if err != nil { if err != nil {
ri_mutex.Lock() ri_mutex.Lock()
var m = runninginstances[endpoint]
m.Status = BAD_RESPONSE m.Status = BAD_RESPONSE
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
return return
} }
resp.Body.Close() resp.Body.Close() // Release as soon as done
ri_mutex.Lock()
m.Status = RUNNING
runninginstances[endpoint] = m
ri_mutex.Unlock()
for _, newpost := range newposts { for _, newpost := range newposts {
if newpost.Account.Acct == "" { if newpost.Account.Acct == "" {
@ -109,8 +97,6 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
if newpost.Id > min_id { if newpost.Id > min_id {
min_id = newpost.Id min_id = newpost.Id
} }
numposts = numposts + 1
newinstance := newpost.Account.Acct[at_sign+1:] newinstance := newpost.Account.Acct[at_sign+1:]
ri_mutex.Lock() ri_mutex.Lock()
_, exists := runninginstances[newinstance] _, exists := runninginstances[newinstance]
@ -122,11 +108,8 @@ func PollMastodonPleroma(endpoint string, reportPostChan chan ReportPost) {
ri_mutex.Unlock() ri_mutex.Unlock()
} }
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
} }
} }
// Change this to return a proper "err" // Change this to return a proper "err"
@ -153,21 +136,13 @@ func GetNodeInfo(endpoint string) (NodeInfo) {
} }
func StartInstance(endpoint string, reportPostChan chan ReportPost) { func StartInstance(endpoint string, reportPostChan chan ReportPost) {
// This might not be necessary...
// ri_mutex.Lock()
// _, exists := runninginstances[endpoint]
// fmt.Println("The exists is", exists)
// ri_mutex.Unlock()
// if exists == true {
// return
// }
nodeinfo := GetNodeInfo(endpoint) nodeinfo := GetNodeInfo(endpoint)
if nodeinfo.Software.Name == "" { if nodeinfo.Software.Name == "" {
ri_mutex.Lock()
var m = runninginstances[endpoint] var m = runninginstances[endpoint]
m.Software = "" m.Software = ""
m.LastRun = time.Now().Format("2006.01.02-15:04:05")
m.Status = UNSUPPORTED_INSTANCE
ri_mutex.Lock()
runninginstances[endpoint] = m runninginstances[endpoint] = m
ri_mutex.Unlock() ri_mutex.Unlock()
return return
@ -179,47 +154,6 @@ func StartInstance(endpoint string, reportPostChan chan ReportPost) {
} }
/*
func NewInstance(endpoint string, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
var nodeinfo NodeInfo
if endpoint == "" {
return
}
// No repeats
for _, runninginstance := range runninginstances {
if runninginstance.Endpoint == endpoint {
return
}
}
// Check node type
GetNodeInfo(endpoint, &nodeinfo)
if nodeinfo.Software.Name == "" {
go func() {
var q InstanceReport
q.endpoint = endpoint
q.status = BAD_NODEINFO
instanceReportChan <- q
}()
}
newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE, "Queued"}
runninginstances = append(runninginstances, newinstance)
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
var newinstancereport InstanceReport
newinstancereport.endpoint = endpoint
newinstancereport.status = 0
newinstancereport.min_id = ""
newinstancereport.numposts = 0
go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan)
}
}
*/
/* /*
func SuspendInstance(suspendinstance InstanceReport) { func SuspendInstance(suspendinstance InstanceReport) {
for i, runninginstance := range runninginstances { for i, runninginstance := range runninginstances {