Removed Poll channel, merged into InstanceReport channel

Found a bug where the main routine was sending to a channel it also
read from. If full, this resulted in the main channel blocking
Added pperf, removed Peer "hunting" code
This commit is contained in:
farhan 2020-10-31 04:24:53 +00:00
parent 1d71e8217a
commit 4050e336c3

View File

@ -3,6 +3,7 @@ package main
import (
"github.com/microcosm-cc/bluemonday"
"github.com/jackc/pgx/pgxpool"
_ "net/http/pprof"
"encoding/binary"
"encoding/json"
"crypto/sha1"
@ -20,21 +21,16 @@ import (
)
const (
NEW_INSTANCE = 0
INSTANCE_ERROR = -1
NEW_INSTANCE = 0
RUNNING = 200
TOOMANYREQUESTS = 429
CLIENT_ISSUE = 600
ONION_PROTOCOL = 601
UNMARSHAL_ERROR = 602
NO_CONNECTION = 603
)
type PollMessage struct {
from string
status int
min_id string
numposts int
}
// Parsing Unmarshal JSON type
type ReportPost struct {
@ -60,10 +56,12 @@ type AccountType struct {
}
// Used to report a new instance to main
type ReportInstance struct {
from string
type InstanceReport struct {
endpoint string
status int
min_id string
numposts int
}
// Instance's new min_id value
@ -94,7 +92,7 @@ type ResponseBack struct {
RunningInstances []RunningInstance `json:"RunningInstances"`
}
func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance) {
func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport) {
sizebyte := make([]byte, 4)
var commandmap CommandMap
@ -120,16 +118,14 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r
switch commandmap.Type {
case "status":
fmt.Println("Status")
responseback.Message = "Ok"
case "add":
fmt.Println("Add instance: " + commandmap.Endpoint)
var q ReportInstance
q.from = ""
var q InstanceReport
q.endpoint = commandmap.Endpoint
q.status = NEW_INSTANCE
reportInstanceChan <- q
instanceReportChan <- q
responseback.Message = "Added " + commandmap.Endpoint
case "suspend":
@ -144,8 +140,6 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r
responseback.Type = "status"
responseback.RunningInstances = *runninginstances
fmt.Println(responseback)
responsebytes, err := json.Marshal(responseback)
if err != nil {
fmt.Println("Error: ", err)
@ -155,7 +149,6 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r
n = len(responsebytes)
binary.LittleEndian.PutUint32(sizebyte, uint32(n))
fmt.Println(sizebyte)
commandClient.Write(sizebyte)
responsebyte, err := json.Marshal(responseback)
@ -165,7 +158,6 @@ func handleClient(commandClient net.Conn, runninginstances *[]RunningInstance, r
}
commandClient.Write(responsebyte)
commandClient.Close()
}
@ -189,40 +181,34 @@ func AppendIfMissing(hay []string, needle string) []string {
return append(hay, needle)
}
func StartInstancePoll(endpoint string, min_id string, reportPostChan chan ReportPost, pollMessageChan chan PollMessage, reportInstanceChan chan ReportInstance) {
func StartInstancePoll(instancereport InstanceReport, reportPostChan chan ReportPost, instanceReportChan chan InstanceReport) {
p := bluemonday.NewPolicy()
newposts := make([]ReportPost, 0)
// Only placing this here to later have the option of using an HTTP client via a SOCKS5 Tor proxy
if strings.Contains(endpoint, ".onion") == true {
reportInstanceChan <- ReportInstance{endpoint, endpoint, ONION_PROTOCOL}
// Only placing this here to later have the option of using
// an HTTP client via a SOCKS5 Tor proxy
if strings.Contains(instancereport.endpoint, ".onion") == true {
instanceReportChan <- InstanceReport{instancereport.endpoint, ONION_PROTOCOL, "", 0}
return
}
api_timeline := "https://" + endpoint + "/api/v1/timelines/public?min_id=" + min_id
api_timeline := "https://" + instancereport.endpoint + "/api/v1/timelines/public?min_id=" + instancereport.min_id
resp, err := http.Get(api_timeline)
if err != nil {
reportInstanceChan <- ReportInstance{endpoint, endpoint, CLIENT_ISSUE}
instanceReportChan <- InstanceReport{instancereport.endpoint, CLIENT_ISSUE, "", 0}
return
}
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &newposts)
if err != nil {
fmt.Println("Unmarshal 3");
// Perhaps get rid of this if-condition?
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode}
} else if resp.StatusCode >= 500 && resp.StatusCode < 600 {
reportInstanceChan <- ReportInstance{endpoint, endpoint, resp.StatusCode}
} else {
reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR}
}
//log.Fatal(err)
// instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, "", 0}
instanceReportChan <- InstanceReport{instancereport.endpoint, 999, "", 0}
return
}
newinstances := make([]string, 0)
min_id := ""
numposts := 0
for _, newpost := range newposts {
posthash := sha1.New()
@ -231,7 +217,7 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor
if at_sign == -1 {
at_sign = len(newpost.Account.Acct)
newpost.Account.Acct += "@" + endpoint
newpost.Account.Acct += "@" + instancereport.endpoint
}
// Calculate the post hash
@ -252,28 +238,29 @@ func StartInstancePoll(endpoint string, min_id string, reportPostChan chan Repor
numposts = numposts + 1
newinstance := newpost.Account.Acct[at_sign+1:]
newinstances = AppendIfMissing(newinstances, newinstance)
}
for _, newinstance := range newinstances {
var q ReportInstance
q.from = endpoint
var q InstanceReport
q.endpoint = newinstance
q.status = NEW_INSTANCE
reportInstanceChan <- q
q.min_id = ""
q.numposts = 0
instanceReportChan <- q
}
pollMessageChan <- PollMessage{endpoint, resp.StatusCode, min_id, numposts}
instanceReportChan <- InstanceReport{instancereport.endpoint, resp.StatusCode, min_id, numposts}
}
func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) {
/*
func StartGetPeers(endpoint string, instanceReportChan chan InstanceReport) {
var newpeers []string
api_peers := "https://" + endpoint + "/api/v1/instance/peers"
resp, err := http.Get(api_peers)
if err != nil {
reportInstanceChan <- ReportInstance{endpoint, endpoint, NO_CONNECTION}
instanceReportChan <- InstanceReport{endpoint, NO_CONNECTION, "", 0}
return
// os.Exit(1)
}
@ -284,48 +271,42 @@ func StartGetPeers(endpoint string, reportInstanceChan chan ReportInstance) {
if err != nil {
fmt.Println("Unmarshal 1");
log.Fatal(err)
reportInstanceChan <- ReportInstance{endpoint, endpoint, UNMARSHAL_ERROR}
instanceReportChan <- InstanceReport{endpoint, endpoint, UNMARSHAL_ERROR, "", 0}
return
}
for _, newpeer := range newpeers {
var q ReportInstance
q.from = endpoint
var q InstanceReport
q.endpoint = newpeer
q.status = NEW_INSTANCE
reportInstanceChan <- q
instanceReportChan <- q
}
}
*/
func DeferPollRun(pollmessage PollMessage, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) {
var min_id string
for _, runninginstance := range *runninginstances {
if runninginstance.Endpoint == pollmessage.from {
min_id = runninginstance.Min_id
break
}
}
func DeferPollRun(instancereport InstanceReport, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
delay := 10
if pollmessage.status == 200 && pollmessage.numposts <= 10 {
if instancereport.status == 200 && instancereport.numposts <= 10 {
delay = 10
} else if pollmessage.status == 200 && pollmessage.numposts > 10 {
} else if instancereport.status == 200 && instancereport.numposts > 10 {
delay = 15
} else if pollmessage.status == 429 {
} else if instancereport.status == 429 {
delay = 30
} else {
fmt.Println("error, status code is: ", pollmessage.status)
fmt.Println("error, status code is ------------->: ", instancereport.status)
os.Exit(1)
}
time.Sleep(time.Second * time.Duration(delay))
go StartInstancePoll(pollmessage.from, min_id, reportPostChan, pollMessageChan, reportInstanceChan)
go StartInstancePoll(instancereport, reportPostChan, instanceReportChan)
}
// Change this to return a proper "err"
func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) {
api_nodeinfo := "https://" + endpoint + "/nodeinfo/2.0.json"
resp, err := http.Get(api_nodeinfo)
http_client := http.Client{Timeout: 5 * time.Second}
resp, err := http_client.Get(api_nodeinfo)
if err != nil {
return
}
@ -333,39 +314,49 @@ func GetNodeInfo(endpoint string, nodeinfo *NodeInfo) {
body, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &nodeinfo)
if err != nil {
fmt.Println("Unmarshal 2");
// fmt.Println("Unmarshal 2");
return
}
}
func NewInstance(endpoint string, runninginstances *[]RunningInstance, reportInstanceChan chan ReportInstance, reportPostChan chan ReportPost, pollMessageChan chan PollMessage) {
func NewInstance(endpoint string, runninginstances *[]RunningInstance, instanceReportChan chan InstanceReport, reportPostChan chan ReportPost) {
var nodeinfo NodeInfo
// Check node type
GetNodeInfo(endpoint, &nodeinfo)
if nodeinfo.Software.Name == "" {
fmt.Println("was blank: ", nodeinfo.Software.Name)
var q ReportInstance
q.from = ""
q.endpoint = endpoint
q.status = INSTANCE_ERROR
reportInstanceChan <- q
}
if endpoint == "" {
return
}
// No repeats
for _, runninginstance := range *runninginstances {
if runninginstance.Endpoint == endpoint {
return
}
}
// Check node type
GetNodeInfo(endpoint, &nodeinfo)
if nodeinfo.Software.Name == "" {
go func() {
var q InstanceReport
q.endpoint = endpoint
q.status = INSTANCE_ERROR
instanceReportChan <- q
return
}()
}
newinstance := RunningInstance{endpoint, "", "", NEW_INSTANCE}
*runninginstances = append(*runninginstances, newinstance)
if nodeinfo.Software.Name == "pleroma" || nodeinfo.Software.Name == "mastodon" {
go StartInstancePoll(endpoint, "", reportPostChan, pollMessageChan, reportInstanceChan)
var newinstancereport InstanceReport
newinstancereport.endpoint = endpoint
newinstancereport.status = 0
newinstancereport.min_id = ""
newinstancereport.numposts = 0
go StartInstancePoll(newinstancereport, reportPostChan, instanceReportChan)
// fmt.Println("Temporarily disabled Peer Hunting")
// go StartGetPeers(endpoint, reportInstanceChan)
// go StartGetPeers(endpoint, instanceReportChan)
}
}
@ -415,25 +406,24 @@ func writePost(pool *pgxpool.Pool, reportpost ReportPost) {
}
}
func SuspendInstance(suspendinstance ReportInstance, runninginstances *[]RunningInstance) {
func SuspendInstance(suspendinstance InstanceReport, runninginstances *[]RunningInstance) {
for i, runninginstance := range *runninginstances {
if runninginstance.Endpoint == suspendinstance.endpoint {
fmt.Println("Updated status to ", suspendinstance.status)
(*runninginstances)[i].Status = suspendinstance.status
return
}
}
}
func main() {
func engine() {
// Current instances
runninginstances := make([]RunningInstance, 0)
// Initial Setup
reportPostChan := make(chan ReportPost, 100)
reportInstanceChan := make(chan ReportInstance, 100)
pollMessageChan := make (chan PollMessage, 100)
reportPostChan := make(chan ReportPost, 2000)
instanceReportChan := make(chan InstanceReport, 100)
// Setup Database
pool, err := pgxpool.Connect(context.Background(), "postgres://postgres@127.0.0.1/fedilogue")
@ -449,13 +439,13 @@ func main() {
}
defer l.Close()
for {
commandClient := make(chan net.Conn)
go func(l net.Listener) {
for {
c, err := l.Accept()
if err != nil {
fmt.Println("Error on accept")
commandClient <- nil
return
}
@ -466,24 +456,29 @@ func main() {
for {
select {
case c := <-commandClient: // New client connection
go handleClient(c, &runninginstances, reportInstanceChan)
case p := <-pollMessageChan: // A poller ended
for i, runninginstance := range runninginstances {
if runninginstance.Endpoint == p.from {
runninginstances[i].Min_id = p.min_id
}
}
go DeferPollRun(p, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
go handleClient(c, &runninginstances, instanceReportChan)
case v := <-reportPostChan: // New Post
go writePost(pool, v)
case w := <-reportInstanceChan: // Start or suspend instance
case w := <-instanceReportChan: // Start or suspend instance
if w.status == NEW_INSTANCE {
NewInstance(w.endpoint, &runninginstances, reportInstanceChan, reportPostChan, pollMessageChan)
NewInstance(w.endpoint, &runninginstances, instanceReportChan, reportPostChan)
} else if w.status == RUNNING || w.status == TOOMANYREQUESTS {
for i, runninginstance := range runninginstances {
if runninginstance.Endpoint == w.endpoint {
runninginstances[i].Min_id = w.min_id
runninginstances[i].Status = w.status
}
}
go DeferPollRun(w, &runninginstances, instanceReportChan, reportPostChan)
} else {
fmt.Println("Error here failure ", w.status)
SuspendInstance(w, &runninginstances)
}
}
}
}
}
func main() {
go engine()
log.Println("serving on port 8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}