From 6a9ebf917e1ba16fcb2f8bd70b8f4e8d6bb71f55 Mon Sep 17 00:00:00 2001 From: Farhan Khan Date: Sat, 18 Dec 2021 04:34:04 +0000 Subject: [PATCH] Updating /api/v1/trending --- restapi/restapi.go | 173 +++++++++++++++------------------------------ 1 file changed, 58 insertions(+), 115 deletions(-) diff --git a/restapi/restapi.go b/restapi/restapi.go index 07090a3..9ae7eca 100644 --- a/restapi/restapi.go +++ b/restapi/restapi.go @@ -12,131 +12,72 @@ import ( "github.com/jackc/pgx/v4" ) -var trendingexport string -var trendingHashTags string -var trendingwords string - -var hashtagtotal map[string]interface{} -var wordstotal map[string]interface{} -//hashtagmeta := make(map[string]interface{}); +var trendsText string func enableCors(w *http.ResponseWriter) { (*w).Header().Set("Access-Control-Allow-Origin", "*") } -func getTrendingHashtags() { - sql := "SELECT UNNEST(activities.hashtags) as hashtags, count(actors.id) " + - "FROM activities " + - "LEFT JOIN actors ON activities.document->>'attributedTo'=actors.document->>'id' " + - "WHERE actors.bot=TRUE " + - //"AND activities.identifiedat > LOCALTIMESTAMP - INTERVAL '30 MINUTES' " + - "GROUP BY hashtags ORDER BY count DESC LIMIT 20" +func runMetrics() { + hashtagtotal := runTrendingMetrics() + totalJson := make(map[string]interface{}) + totalJson["hashtags"] = hashtagtotal - for { - rows, err := pool.Query(context.Background(), sql) + data, err := json.Marshal(totalJson) + if err != nil { + log.Fatalf("error marshaling combined activity: %v\n", err) + } + + trendsText = string(data) +} + +func runTrendingMetrics() map[string]interface{} { + sql := `SELECT UNNEST(activities.hashtags) as hashtags, count(actors.id) +from activities +LEFT JOIN actors ON activities.document->>'attributedTo'=actors.document->>'id' +WHERE actors.bot=false +AND activities.identifiedat > LOCALTIMESTAMP - INTERVAL '30 MINUTES' +GROUP BY hashtags ORDER BY count DESC LIMIT 20;` + + rows, err := pool.Query(context.Background(), sql) + if err != nil { + panic(err) + } + + hashtagitems := make([]interface{}, 0); + hashcount := 0 + for rows.Next() { + var hashtag string + var count int + + err = rows.Scan(&hashtag, &count) if err != nil { panic(err) } - hashtagitems := make([]interface{}, 0); - hashcount := 0 - for rows.Next() { - var hashtag string - var count int - - err = rows.Scan(&hashtag, &count) - if err != nil { - panic(err) - } - - hashtagitem := make(map[string]interface{}) - hashtagitem["hashtag"] = hashtag - hashtagitem["count"] = count - hashtagitems = append(hashtagitems, hashtagitem) - hashcount = hashcount + 1 - } - rows.Close() - - hashtagtotal = make(map[string]interface{}); - hashtagtotal["count"] = hashcount - hashtagtotal["datetime"] = time.Now().UTC() - hashtagtotal["items"] = hashtagitems - - totalJson := make(map[string]interface{}) - totalJson["hashtags"] = hashtagtotal - - data, err := json.Marshal(totalJson) - if err != nil { - log.Fatalf("error marshaling combined activity: %v\n", err) - } - - trendingHashTags = string(data) - time.Sleep(time.Minute * 30) - + hashtagitem := make(map[string]interface{}) + hashtagitem["hashtag"] = hashtag + hashtagitem["count"] = count + hashtagitems = append(hashtagitems, hashtagitem) + hashcount = hashcount + 1 } + rows.Close() + + hashtagtotal := make(map[string]interface{}); + hashtagtotal["count"] = hashcount + hashtagtotal["datetime"] = time.Now().UTC() + hashtagtotal["items"] = hashtagitems + + return hashtagtotal } -func gettrends() { - for { - rows, err := pool.Query(context.Background(), "SELECT word, ndoc FROM ts_stat($$ SELECT normalized_tsvector FROM activities WHERE activities.identifiedat > current_timestamp - interval '60 minutes' $$) ORDER BY ndoc DESC LIMIT 10") - if err != nil { - panic(err) - } - - trenditems := make([]interface{}, 0) - - for rows.Next() { - var word string - var ndoc int - - err = rows.Scan(&word, &ndoc) - if err != nil { - panic(err) - } - - trenditem := make(map[string]interface{}) - trenditem["ndoc"] = ndoc - trenditem["word"] = word - trenditems = append(trenditems, trenditem) - } - rows.Close() - - totalJson := make(map[string]interface{}) - totalJson["trends"] = trenditems - - data, err := json.Marshal(totalJson) - if err != nil { - log.Fatalf("error marshaling combined activity: %v\n", err) - } - - trendingexport = string(data) - - time.Sleep(time.Second * 60) - } -} - - -func - - - - -func gettrending(w http.ResponseWriter, r *http.Request) { +// GET handlers +func getTrending(w http.ResponseWriter, r *http.Request) { enableCors(&w) -// fmt.Fprintf(w, "%s", trendingexport) + fmt.Fprintf(w, "%s", trendsText) } -func gettrendinghashtags(w http.ResponseWriter, r *http.Request) { - enableCors(&w) - fmt.Fprintf(w, "%s", trendingHashTags) -} - -func gettrendingwords(w http.ResponseWriter, r *http.Request) { - enableCors(&w) - fmt.Fprintf(w, "%s", trendingexport) -} - -func getsearch(w http.ResponseWriter, r *http.Request) { +func getSearch(w http.ResponseWriter, r *http.Request) { enableCors(&w) searchkeys, exists_search := r.URL.Query()["s"] offsetkeys, exists_offset := r.URL.Query()["o"] @@ -218,13 +159,15 @@ func getsearch(w http.ResponseWriter, r *http.Request) { func main() { pool = getDbPool() - go gettrends() - go getTrendingHashtags() + go func() { + for { + runMetrics() + time.Sleep(30 * time.Second) + } + }() - http.HandleFunc("/api/v1/search", getsearch) - http.HandleFunc("/api/v1/trending", gettrending) - http.HandleFunc("/api/v1/trending/hashtags", gettrendinghashtags) - http.HandleFunc("/api/v1/trending/words", gettrendingwords) + http.HandleFunc("/api/v1/search", getSearch) + http.HandleFunc("/api/v1/trending", getTrending) log.Print("Starting HTTP inbox on port http://0.0.0.0:6431") log.Fatal(http.ListenAndServe("0.0.0.0:6431", nil)) }