package main import ( "context" "encoding/json" "fmt" "log" "net/http" "strconv" "time" "github.com/jackc/pgx/v4" ) var trendingexport string var trendingHashTags string var trendingwords string var hashtagtotal map[string]interface{} var wordstotal map[string]interface{} //hashtagmeta := make(map[string]interface{}); func enableCors(w *http.ResponseWriter) { (*w).Header().Set("Access-Control-Allow-Origin", "*") } func getTrendingHashtags() { sql := "SELECT UNNEST(activities.hashtags) as hashtags, count(actors.id) " + "FROM activities " + "LEFT JOIN actors ON activities.document->>'attributedTo'=actors.document->>'id' " + "WHERE actors.bot=TRUE " + //"AND activities.identifiedat > LOCALTIMESTAMP - INTERVAL '30 MINUTES' " + "GROUP BY hashtags ORDER BY count DESC LIMIT 20" for { rows, err := pool.Query(context.Background(), sql) if err != nil { panic(err) } hashtagitems := make([]interface{}, 0); hashcount := 0 for rows.Next() { var hashtag string var count int err = rows.Scan(&hashtag, &count) if err != nil { panic(err) } hashtagitem := make(map[string]interface{}) hashtagitem["hashtag"] = hashtag hashtagitem["count"] = count hashtagitems = append(hashtagitems, hashtagitem) hashcount = hashcount + 1 } rows.Close() hashtagtotal = make(map[string]interface{}); hashtagtotal["count"] = hashcount hashtagtotal["datetime"] = time.Now().UTC() hashtagtotal["items"] = hashtagitems totalJson := make(map[string]interface{}) totalJson["hashtags"] = hashtagtotal data, err := json.Marshal(totalJson) if err != nil { log.Fatalf("error marshaling combined activity: %v\n", err) } trendingHashTags = string(data) time.Sleep(time.Minute * 30) } } func gettrends() { for { rows, err := pool.Query(context.Background(), "SELECT word, ndoc FROM ts_stat($$ SELECT normalized_tsvector FROM activities WHERE activities.identifiedat > current_timestamp - interval '60 minutes' $$) ORDER BY ndoc DESC LIMIT 10") if err != nil { panic(err) } trenditems := make([]interface{}, 0) for rows.Next() { var word string var ndoc int err = rows.Scan(&word, &ndoc) if err != nil { panic(err) } trenditem := make(map[string]interface{}) trenditem["ndoc"] = ndoc trenditem["word"] = word trenditems = append(trenditems, trenditem) } rows.Close() totalJson := make(map[string]interface{}) totalJson["trends"] = trenditems data, err := json.Marshal(totalJson) if err != nil { log.Fatalf("error marshaling combined activity: %v\n", err) } trendingexport = string(data) time.Sleep(time.Second * 60) } } func func gettrending(w http.ResponseWriter, r *http.Request) { enableCors(&w) // fmt.Fprintf(w, "%s", trendingexport) } func gettrendinghashtags(w http.ResponseWriter, r *http.Request) { enableCors(&w) fmt.Fprintf(w, "%s", trendingHashTags) } func gettrendingwords(w http.ResponseWriter, r *http.Request) { enableCors(&w) fmt.Fprintf(w, "%s", trendingexport) } func getsearch(w http.ResponseWriter, r *http.Request) { enableCors(&w) searchkeys, exists_search := r.URL.Query()["s"] offsetkeys, exists_offset := r.URL.Query()["o"] var err error var rows pgx.Rows var searchKey string if exists_search { searchKey = searchkeys[0] } var offsetKey int if exists_offset { offsetKey, _ = strconv.Atoi(offsetkeys[0]) } else { offsetKey = -1 } if exists_search && searchKey != "" { if offsetKey == -1 { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) ORDER BY activities.id DESC LIMIT 10", searchKey) } else { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) AND activities.id < $2 ORDER BY activities.id DESC LIMIT 10", searchKey, offsetKey) } } else { if offsetKey == -1 { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' ORDER BY activities.id DESC LIMIT 10") } else { rows, err = pool.Query(context.Background(), "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' AND activities.id < $1 ORDER BY activities.id DESC LIMIT 10", offsetKey) } } if err != nil { panic(err) } defer rows.Close() var earliestid int earliestid = 0 var activitiesJson []map[string]json.RawMessage for rows.Next() { var id int var activityRaw string var actorRaw string var activityJson map[string]json.RawMessage err = rows.Scan(&id, &activityRaw, &actorRaw) if err != nil { panic(err) } err := json.Unmarshal([]byte(activityRaw), &activityJson) if err != nil { fmt.Println(err) } if earliestid == 0 { earliestid = id } else if earliestid > id { earliestid = id } activityJson["actor"] = json.RawMessage(actorRaw) activitiesJson = append(activitiesJson, activityJson) } requestData := make(map[string]int) requestData["earliestid"] = earliestid totalJson := make(map[string]interface{}) totalJson["requestdata"] = requestData totalJson["activities"] = activitiesJson data, err := json.Marshal(totalJson) if err != nil { log.Fatalf("error marshaling combined activity: %v\n", err) } fmt.Fprintf(w, "%s", data) } func main() { pool = getDbPool() go gettrends() go getTrendingHashtags() http.HandleFunc("/api/v1/search", getsearch) http.HandleFunc("/api/v1/trending", gettrending) http.HandleFunc("/api/v1/trending/hashtags", gettrendinghashtags) http.HandleFunc("/api/v1/trending/words", gettrendingwords) log.Print("Starting HTTP inbox on port http://0.0.0.0:6431") log.Fatal(http.ListenAndServe("0.0.0.0:6431", nil)) }