diff --git a/instance.go b/instance.go index aa69f8f..cfd0772 100644 --- a/instance.go +++ b/instance.go @@ -60,6 +60,7 @@ func GetNodeInfo(endpoint string, o RunningInstance) (RunningInstance) { err = json.NewDecoder(pleromastodon_api_resp.Body).Decode(&nodeinfo) if err == nil { o.Software = nodeinfo.Software.Name + o.Version = nodeinfo.Software.Version o.LastRun = time.Now().Format(time.RFC3339) defer pleromastodon_api_resp.Body.Close() return o @@ -87,15 +88,12 @@ func GetNodeInfo(endpoint string, o RunningInstance) (RunningInstance) { indexstr := string(indexbin) if strings.Contains(indexstr, "Pleroma") || strings.Contains(indexstr, "Soapbox") { - log.Print("Manual view: Pleroma" + endpoint) o.Software = "pleroma" o.Version = "guess" } else if strings.Contains(indexstr, "Mastodon") { - log.Print("Manual view: Mastodon" + endpoint) o.Software = "mastodon" o.Version = "guess" } else if strings.Contains(indexstr, "Gab") { - log.Print("Manual view: Gab" + endpoint) o.Software = "gab" o.Version = "guess" } @@ -115,12 +113,12 @@ func StartInstance(endpoint string) { } if o.Software == "pleroma" { - log.Print("Starting " + endpoint + " as " + o.Software) + log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Poll" UpdateRunner(endpoint, o) PollMastodonPleroma(endpoint, &o) } else if o.Software == "mastodon" { - log.Print("Starting " + endpoint + " as " + o.Software) + log.Print("Starting " + endpoint + " as " + o.Software + " " + o.Version) o.CaptureType = "Stream" UpdateRunner(endpoint, o) StreamMastodon(endpoint, &o) diff --git a/retrieve.go b/retrieve.go index ae0e001..dd7c5c0 100644 --- a/retrieve.go +++ b/retrieve.go @@ -90,9 +90,11 @@ func check_post(uri string) (PostJson, error) { selectRet := pool.QueryRow(context.Background(), "SELECT id, inReplyTo, published, summary, content, normalized, attributedto, posthash, received_at FROM posts WHERE id = $1", uri) err := selectRet.Scan(&postjson.ID, &postjson.InReplyTo, &postjson.Published, &postjson.Summary, &postjson.Content, &postjson.normalized, &postjson.AttributedTo, &postjson.posthash, &postjson.receivedAt) + if err == nil { return postjson, nil } + endslash := strings.Index(uri[8:], "/") if endslash == -1 { return postjson, errors.New("Invalid URI " + uri) @@ -141,7 +143,7 @@ func check_post(uri string) (PostJson, error) { _, err = pool.Exec(context.Background(), "INSERT INTO posts (id, inreplyto, published, summary, content, normalized, attributedto, posthash, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)", postjson.ID, postjson.InReplyTo, postjson.Published, postjson.Summary, postjson.Content, postjson.normalized, postjson.AttributedTo, postjson.posthash, postjson.instance) if err != nil { - log.Print("INSERT posts error of " + uri + ": ", err) +// log.Print("INSERT posts error of " + uri + ": ", err) return postjson, err } @@ -191,7 +193,7 @@ func check_user(uri string) (UserJson, error) { _, err = pool.Exec(context.Background(), "INSERT INTO accounts (id, actor_type, inbox, outbox, followers, following, url, preferredUsername, name, summary, icon, image, publicKey, instance) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", userjson.ID, userjson.Type, userjson.Inbox, userjson.Outbox, userjson.Followers, userjson.Following, userjson.Url, userjson.PreferredUsername, userjson.Name, userjson.Summary, userjson.Icon.Url, userjson.Image.Url, userjson.PublicKey.PublicKeyPem, userjson.instance) if err != nil { - log.Print("INSERT accounts error: ", err) +// log.Print("INSERT accounts error: ", err) return userjson, err } diff --git a/stream.go b/stream.go index 3c0dd1f..9aa9d20 100644 --- a/stream.go +++ b/stream.go @@ -15,99 +15,106 @@ func StreamMastodon(endpoint string, o *RunningInstance) { //var client_id string //var client_secret string var oauthData OAuth - var err error - api_timeline := "https://" + endpoint + "/api/v1/streaming/public" - req, err := http.NewRequest("GET", api_timeline, nil) - if err != nil { - log.Print("Unable to create new request") - return - } - - for _, extaccount := range settings.Externalaccounts { - if extaccount.Endpoint == endpoint { - get_client(endpoint, o) - - err = get_client(endpoint, o) - if err != nil { - log.Fatal("Unable to register client: ", err) - } - - oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password) - if err != nil { - log.Print("Unable to login: ", err) - return - } - - req.Header.Add("Authorization", oauthData.Access_token) + for { + api_timeline := "https://" + endpoint + "/api/v1/streaming/public" + req, err := http.NewRequest("GET", api_timeline, nil) + if err != nil { + log.Print("Unable to create new request") + return } - } - resp, err := http_client.Do(req) - if err != nil { - log.Print("Unable to stream " + api_timeline + ": ", err) - return - } - defer resp.Body.Close() + for _, extaccount := range settings.Externalaccounts { + if extaccount.Endpoint == endpoint { + get_client(endpoint, o) - ri_mutex.Lock() - m := runninginstances[endpoint] - m.Status = RUNNING - m.LastRun = "Streaming" - runninginstances[endpoint] = m - ri_mutex.Unlock() - - s := bufio.NewScanner(resp.Body) - var name string - for s.Scan() { - line := s.Text() - token := strings.SplitN(line, ":", 2) - var newpost ReportPost - if len(token) != 2 { - continue - } - switch strings.TrimSpace(token[0]) { - case "event": - name = strings.TrimSpace(token[1]) - continue - case "data": - switch name { - case "update": - jsoner := token[1][1:] - err := json.Unmarshal([]byte(jsoner), &newpost) + err = get_client(endpoint, o) if err != nil { - log.Fatal("Unable to unmarshal with error: ", err) + log.Fatal("Unable to register client: ", err) + } + + oauthData, err = oauth_login(endpoint, o, extaccount.Username, extaccount.Password) + if err != nil { + log.Print("Unable to login: ", err) + return + } + + req.Header.Add("Authorization", oauthData.Access_token) + + } + } + + resp, err := http_client.Do(req) + if err != nil { + log.Print("Unable to stream " + api_timeline + ": ", err) + return + } + defer resp.Body.Close() + + ri_mutex.Lock() + m := runninginstances[endpoint] + m.Status = RUNNING + m.LastRun = "Streaming" + runninginstances[endpoint] = m + ri_mutex.Unlock() + + s := bufio.NewScanner(resp.Body) + var name string + for s.Scan() { + line := s.Text() + token := strings.SplitN(line, ":", 2) + var newpost ReportPost + if len(token) != 2 { + continue + } + + + + switch strings.TrimSpace(token[0]) { + case "event": + name = strings.TrimSpace(token[1]) + continue + case "data": + switch name { + case "update": + jsoner := token[1][1:] + err := json.Unmarshal([]byte(jsoner), &newpost) + if err != nil { + continue + log.Fatal("Unable to unmarshal with error: ", err) + } + default: + continue } default: continue } - default: - continue - } - go check_post(newpost.Uri) + go check_post(newpost.Uri) - at_sign := strings.Index(newpost.Account.Acct, "@") - newinstance := newpost.Account.Acct[at_sign+1:] + at_sign := strings.Index(newpost.Account.Acct, "@") + newinstance := newpost.Account.Acct[at_sign+1:] - if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false { - ri_mutex.Lock() - o, exists := runninginstances[newinstance] - if exists == false || o.Status == KEEPALIVE { - m := RunningInstance{} - runninginstances[newinstance] = m - go StartInstance(newinstance) + if settings.Crawl == true && stringexists(endpoint, settings.Banned) == false { + ri_mutex.Lock() + o, exists := runninginstances[newinstance] + if exists == false || o.Status == KEEPALIVE { + m := RunningInstance{} + runninginstances[newinstance] = m + go StartInstance(newinstance) + } + + ri_mutex.Unlock() } - - ri_mutex.Unlock() } + time.Sleep(time.Minute * 30) } - ri_mutex.Lock() - m = runninginstances[endpoint] - m.LastRun = time.Now().Format(time.RFC3339) - m.Status = STREAM_ENDED - runninginstances[endpoint] = m - ri_mutex.Unlock() +// ri_mutex.Lock() +// m = runninginstances[endpoint] +// m.LastRun = time.Now().Format(time.RFC3339) +// m.Status = STREAM_ENDED +// runninginstances[endpoint] = m +// ri_mutex.Unlock() }