changed comments, made streaming in a loop

This commit is contained in:
farhan 2020-12-29 16:38:14 +00:00
parent 72821c4641
commit 5473052519
3 changed files with 92 additions and 85 deletions

View File

@ -60,6 +60,7 @@ func GetNodeInfo(endpoint string, o RunningInstance) (RunningInstance) {
err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo) err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo)
if err == nil { if err == nil {
o.Software = nodeinfo.Software.Name o.Software = nodeinfo.Software.Name
o.Version = nodeinfo.Software.Version
o.LastRun = time.Now().Format(time.RFC3339) o.LastRun = time.Now().Format(time.RFC3339)
defer pleromastodon_api_resp.Body.Close() defer pleromastodon_api_resp.Body.Close()
return o return o
@ -87,15 +88,12 @@ func GetNodeInfo(endpoint string, o RunningInstance) (RunningInstance) {
indexstr := string(indexbin) indexstr := string(indexbin)
if strings.Contains(indexstr, "Pleroma") || strings.Contains(indexstr, "Soapbox") { if strings.Contains(indexstr, "Pleroma") || strings.Contains(indexstr, "Soapbox") {
log.Print("Manual view: Pleroma" + endpoint)
o.Software = "pleroma" o.Software = "pleroma"
o.Version = "guess" o.Version = "guess"
} else if strings.Contains(indexstr, "Mastodon") { } else if strings.Contains(indexstr, "Mastodon") {
log.Print("Manual view: Mastodon" + endpoint)
o.Software = "mastodon" o.Software = "mastodon"
o.Version = "guess" o.Version = "guess"
} else if strings.Contains(indexstr, "Gab") { } else if strings.Contains(indexstr, "Gab") {
log.Print("Manual view: Gab" + endpoint)
o.Software = "gab" o.Software = "gab"
o.Version = "guess" o.Version = "guess"
} }
@ -115,12 +113,12 @@ func StartInstance(endpoint string) {
} }
if o.Software == "pleroma" { if o.Software == "pleroma" {
log.Print("Starting " + endpoint + " as " + o.Software) log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version)
o.CaptureType = "Poll" o.CaptureType = "Poll"
UpdateRunner(endpoint, o) UpdateRunner(endpoint, o)
PollMastodonPleroma(endpoint, &o) PollMastodonPleroma(endpoint, &o)
} else if o.Software == "mastodon" { } else if o.Software == "mastodon" {
log.Print("Starting " + endpoint + " as " + o.Software) log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version)
o.CaptureType = "Stream" o.CaptureType = "Stream"
UpdateRunner(endpoint, o) UpdateRunner(endpoint, o)
StreamMastodon(endpoint, &o) StreamMastodon(endpoint, &o)

View File

@ -90,9 +90,11 @@ func check_post(uri string) (PostJson, error) {
selectRet := pool.QueryRow(context.Background(), "SELECT id, inReplyTo, published, summary, content, normalized, attributedto, posthash, received_at FROM posts WHERE id = $1", uri) selectRet := pool.QueryRow(context.Background(), "SELECT id, inReplyTo, published, summary, content, normalized, attributedto, posthash, received_at FROM posts WHERE id = $1", uri)
err := selectRet.Scan(&postjson.ID, &postjson.InReplyTo, &postjson.Published, &postjson.Summary, &postjson.Content, &postjson.normalized, &postjson.AttributedTo, &postjson.posthash, &postjson.receivedAt) err := selectRet.Scan(&postjson.ID, &postjson.InReplyTo, &postjson.Published, &postjson.Summary, &postjson.Content, &postjson.normalized, &postjson.AttributedTo, &postjson.posthash, &postjson.receivedAt)
if err == nil { if err == nil {
return postjson, nil return postjson, nil
} }
endslash := strings.Index(uri[8:], "/") endslash := strings.Index(uri[8:], "/")
if endslash == -1 { if endslash == -1 {
return postjson, errors.New("Invalid URI " + uri) return postjson, errors.New("Invalid URI " + uri)
@ -141,7 +143,7 @@ func check_post(uri string) (PostJson, error) {
_, err = pool.Exec(context.Background(), "INSERT INTO posts (id, inreplyto, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance) _, err = pool.Exec(context.Background(), "INSERT INTO posts (id, inreplyto, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance)
if err != nil { if err != nil {
log.Print("INSERT posts error of " + uri + ": ", err) // log.Print("INSERT posts error of " + uri + ": ", err)
return postjson, err return postjson, err
} }
@ -191,7 +193,7 @@ func check_user(uri string) (UserJson, error) {
_, err = pool.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance) _, err = pool.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance)
if err != nil { if err != nil {
log.Print("INSERT accounts error: ", err) // log.Print("INSERT accounts error: ", err)
return userjson, err return userjson, err
} }

163
stream.go
View File

@ -15,99 +15,106 @@ func StreamMastodon(endpoint string, o *RunningInstance) {
//var client_id string //var client_id string
//var client_secret string //var client_secret string
var oauthData OAuth var oauthData OAuth
var err error
api_timeline := "https://" + endpoint + "/api/v1/streaming/public" for {
req, err := http.NewRequest("GET", api_timeline, nil)
if err != nil {
log.Print("Unable to create new request")
return
}
for _, extaccount := range settings.Externalaccounts {
if extaccount.Endpoint == endpoint {
get_client(endpoint, o)
err = get_client(endpoint, o)
if err != nil {
log.Fatal("Unable to register client: ", err)
}
oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password)
if err != nil {
log.Print("Unable to login: ", err)
return
}
req.Header.Add("Authorization", oauthData.Access_token)
api_timeline := "https://" + endpoint + "/api/v1/streaming/public"
req, err := http.NewRequest("GET", api_timeline, nil)
if err != nil {
log.Print("Unable to create new request")
return
} }
}
resp, err := http_client.Do(req) for _, extaccount := range settings.Externalaccounts {
if err != nil { if extaccount.Endpoint == endpoint {
log.Print("Unable to stream " + api_timeline + ": ", err) get_client(endpoint, o)
return
}
defer resp.Body.Close()
ri_mutex.Lock() err = get_client(endpoint, o)
m := runninginstances[endpoint]
m.Status = RUNNING
m.LastRun = "Streaming"
runninginstances[endpoint] = m
ri_mutex.Unlock()
s := bufio.NewScanner(resp.Body)
var name string
for s.Scan() {
line := s.Text()
token := strings.SplitN(line, ":", 2)
var newpost ReportPost
if len(token) != 2 {
continue
}
switch strings.TrimSpace(token[0]) {
case "event":
name = strings.TrimSpace(token[1])
continue
case "data":
switch name {
case "update":
jsoner := token[1][1:]
err := json.Unmarshal([]byte(jsoner), &newpost)
if err != nil { if err != nil {
log.Fatal("Unable to unmarshal with error: ", err) log.Fatal("Unable to register client: ", err)
}
oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password)
if err != nil {
log.Print("Unable to login: ", err)
return
}
req.Header.Add("Authorization", oauthData.Access_token)
}
}
resp, err := http_client.Do(req)
if err != nil {
log.Print("Unable to stream " + api_timeline + ": ", err)
return
}
defer resp.Body.Close()
ri_mutex.Lock()
m := runninginstances[endpoint]
m.Status = RUNNING
m.LastRun = "Streaming"
runninginstances[endpoint] = m
ri_mutex.Unlock()
s := bufio.NewScanner(resp.Body)
var name string
for s.Scan() {
line := s.Text()
token := strings.SplitN(line, ":", 2)
var newpost ReportPost
if len(token) != 2 {
continue
}
switch strings.TrimSpace(token[0]) {
case "event":
name = strings.TrimSpace(token[1])
continue
case "data":
switch name {
case "update":
jsoner := token[1][1:]
err := json.Unmarshal([]byte(jsoner), &newpost)
if err != nil {
continue
log.Fatal("Unable to unmarshal with error: ", err)
}
default:
continue
} }
default: default:
continue continue
} }
default:
continue
}
go check_post(newpost.Uri) go check_post(newpost.Uri)
at_sign := strings.Index(newpost.Account.Acct, "@") at_sign := strings.Index(newpost.Account.Acct, "@")
newinstance := newpost.Account.Acct[at_sign+1:] newinstance := newpost.Account.Acct[at_sign+1:]
if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false { if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false {
ri_mutex.Lock() ri_mutex.Lock()
o, exists := runninginstances[newinstance] o, exists := runninginstances[newinstance]
if exists == false || o.Status == KEEPALIVE { if exists == false || o.Status == KEEPALIVE {
m := RunningInstance{} m := RunningInstance{}
runninginstances[newinstance] = m runninginstances[newinstance] = m
go StartInstance(newinstance) go StartInstance(newinstance)
}
ri_mutex.Unlock()
} }
ri_mutex.Unlock()
} }
time.Sleep(time.Minute * 30)
} }
ri_mutex.Lock() // ri_mutex.Lock()
m = runninginstances[endpoint] // m = runninginstances[endpoint]
m.LastRun = time.Now().Format(time.RFC3339) // m.LastRun = time.Now().Format(time.RFC3339)
m.Status = STREAM_ENDED // m.Status = STREAM_ENDED
runninginstances[endpoint] = m // runninginstances[endpoint] = m
ri_mutex.Unlock() // ri_mutex.Unlock()
} }