2021-02-13 02:49:36 +00:00
package main
import (
2021-06-30 22:28:53 +00:00
"context"
"encoding/json"
2021-02-13 02:49:36 +00:00
"fmt"
"log"
"net/http"
2021-03-17 23:14:24 +00:00
"strconv"
2021-06-30 22:28:53 +00:00
"time"
2021-02-13 02:49:36 +00:00
"github.com/jackc/pgx/v4"
)
2021-12-19 21:36:10 -05:00
var metricsText string
2021-06-30 22:28:53 +00:00
2021-03-17 23:14:24 +00:00
func enableCors ( w * http . ResponseWriter ) {
( * w ) . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" )
}
2021-12-18 04:34:04 +00:00
func runMetrics ( ) {
2021-12-19 21:36:10 -05:00
hashtagtotal := runTrendingHashtags ( )
wordstotal := runTrendingWords ( )
2021-12-18 04:34:04 +00:00
totalJson := make ( map [ string ] interface { } )
totalJson [ "hashtags" ] = hashtagtotal
2021-12-19 21:36:10 -05:00
totalJson [ "words" ] = wordstotal
totalJson [ "datetime" ] = time . Now ( ) . UTC ( )
2021-12-17 00:45:52 -05:00
2021-12-18 04:34:04 +00:00
data , err := json . Marshal ( totalJson )
if err != nil {
log . Fatalf ( "error marshaling combined activity: %v\n" , err )
2021-12-17 00:45:52 -05:00
}
2021-12-19 21:36:10 -05:00
metricsText = string ( data )
2021-12-18 04:34:04 +00:00
}
2021-06-30 22:28:53 +00:00
2021-12-19 21:36:10 -05:00
func runTrendingHashtags ( ) map [ string ] interface { } {
2021-12-18 04:34:04 +00:00
sql := ` SELECT UNNEST ( activities . hashtags ) as hashtags , count ( actors . id )
from activities
LEFT JOIN actors ON activities . document - >> ' attributedTo ' = actors . document - >> ' id '
WHERE actors . bot = false
AND activities . identifiedat > LOCALTIMESTAMP - INTERVAL ' 30 MINUTES '
GROUP BY hashtags ORDER BY count DESC LIMIT 20 ; `
2021-06-30 22:28:53 +00:00
2021-12-18 04:34:04 +00:00
rows , err := pool . Query ( context . Background ( ) , sql )
if err != nil {
panic ( err )
}
2021-06-30 22:28:53 +00:00
2021-12-18 04:34:04 +00:00
hashtagitems := make ( [ ] interface { } , 0 ) ;
hashcount := 0
for rows . Next ( ) {
var hashtag string
var count int
2021-06-30 22:28:53 +00:00
2021-12-18 04:34:04 +00:00
err = rows . Scan ( & hashtag , & count )
2021-06-30 22:28:53 +00:00
if err != nil {
2021-12-18 04:34:04 +00:00
panic ( err )
2021-06-30 22:28:53 +00:00
}
2021-12-18 04:34:04 +00:00
hashtagitem := make ( map [ string ] interface { } )
hashtagitem [ "hashtag" ] = hashtag
hashtagitem [ "count" ] = count
hashtagitems = append ( hashtagitems , hashtagitem )
hashcount = hashcount + 1
2021-06-30 22:28:53 +00:00
}
2021-12-18 04:34:04 +00:00
rows . Close ( )
2021-12-17 00:45:52 -05:00
2021-12-18 04:34:04 +00:00
hashtagtotal := make ( map [ string ] interface { } ) ;
hashtagtotal [ "count" ] = hashcount
hashtagtotal [ "items" ] = hashtagitems
2021-12-17 00:45:52 -05:00
2021-12-18 04:34:04 +00:00
return hashtagtotal
2021-12-17 00:45:52 -05:00
}
2021-12-19 21:36:10 -05:00
func runTrendingWords ( ) map [ string ] interface { } {
sql := ` WITH popular_words AS (
select word FROM ts_stat (
'
SELECT to_tsvector ( ' ' simple ' ' , normalized ) FROM activities
LEFT JOIN actors ON activities . document - >> ' ' attributedTo ' ' = actors . document - >> ' ' id ' '
WHERE activities . identifiedat > current_timestamp - interval ' ' 60 minutes ' '
AND actors . bot = false
'
)
WHERE length ( word ) > 3
AND NOT word in ( SELECT word FROM stopwords )
ORDER BY ndoc DESC LIMIT 100 )
SELECT concat_ws ( ' ' , a1 . word , a2 . word ) phrase , count ( * )
FROM popular_words AS a1
CROSS JOIN popular_words AS a2
CROSS JOIN activities
WHERE normalized ilike format ( ' % % % s % s % % ' , a1 . word , a2 . word )
AND identifiedat > current_timestamp - interval ' 60 minutes '
GROUP BY 1
HAVING count ( * ) > 1
2022-01-02 10:52:31 -05:00
ORDER BY 2 DESC LIMIT 20 ; `
2021-12-19 21:36:10 -05:00
rows , err := pool . Query ( context . Background ( ) , sql )
if err != nil {
panic ( err )
}
trendingitems := make ( [ ] interface { } , 0 ) ;
trendingcount := 0
for rows . Next ( ) {
var trendingword string
var count int
err = rows . Scan ( & trendingword , & count )
if err != nil {
panic ( err )
}
trendingitem := make ( map [ string ] interface { } )
trendingitem [ "trending" ] = trendingword
trendingitem [ "count" ] = count
trendingitems = append ( trendingitems , trendingitem )
trendingcount = trendingcount + 1
}
rows . Close ( )
trendingwordtotal := make ( map [ string ] interface { } ) ;
trendingwordtotal [ "count" ] = trendingcount
trendingwordtotal [ "items" ] = trendingitems
return trendingwordtotal
}
2021-12-18 04:34:04 +00:00
// GET handlers
func getTrending ( w http . ResponseWriter , r * http . Request ) {
2021-12-17 00:45:52 -05:00
enableCors ( & w )
2021-12-19 21:36:10 -05:00
fmt . Fprintf ( w , "%s" , metricsText )
2021-12-17 00:45:52 -05:00
}
2021-12-18 04:34:04 +00:00
func getSearch ( w http . ResponseWriter , r * http . Request ) {
2021-03-17 23:14:24 +00:00
enableCors ( & w )
searchkeys , exists_search := r . URL . Query ( ) [ "s" ]
offsetkeys , exists_offset := r . URL . Query ( ) [ "o" ]
2021-02-13 02:49:36 +00:00
var err error
var rows pgx . Rows
var searchKey string
2021-03-17 23:14:24 +00:00
if exists_search {
2021-02-13 02:49:36 +00:00
searchKey = searchkeys [ 0 ]
}
2021-03-17 23:14:24 +00:00
var offsetKey int
if exists_offset {
offsetKey , _ = strconv . Atoi ( offsetkeys [ 0 ] )
} else {
offsetKey = - 1
}
2021-02-13 02:49:36 +00:00
2021-03-17 23:14:24 +00:00
if exists_search && searchKey != "" {
if offsetKey == - 1 {
2021-06-30 15:00:08 +00:00
rows , err = pool . Query ( context . Background ( ) , "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) ORDER BY activities.id DESC LIMIT 10" , searchKey )
2021-03-17 23:14:24 +00:00
} else {
2021-06-30 15:00:08 +00:00
rows , err = pool . Query ( context . Background ( ) , "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' WHERE activities.normalized_tsvector @@ plainto_tsquery($1) AND activities.id < $2 ORDER BY activities.id DESC LIMIT 10" , searchKey , offsetKey )
2021-03-17 23:14:24 +00:00
}
2021-02-13 02:49:36 +00:00
} else {
2021-03-17 23:14:24 +00:00
if offsetKey == - 1 {
2021-06-30 15:00:08 +00:00
rows , err = pool . Query ( context . Background ( ) , "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' ORDER BY activities.id DESC LIMIT 10" )
2021-03-17 23:14:24 +00:00
} else {
2021-06-30 15:00:08 +00:00
rows , err = pool . Query ( context . Background ( ) , "SELECT activities.id, activities.document, actors.document FROM activities as activities INNER JOIN actors as actors ON activities.document->>'actor' = actors.document->>'id' AND activities.id < $1 ORDER BY activities.id DESC LIMIT 10" , offsetKey )
2021-03-17 23:14:24 +00:00
}
2021-02-13 02:49:36 +00:00
}
if err != nil {
panic ( err )
}
defer rows . Close ( )
2021-06-30 15:00:08 +00:00
var earliestid int
earliestid = 0
2021-02-13 02:49:36 +00:00
var activitiesJson [ ] map [ string ] json . RawMessage
for rows . Next ( ) {
var id int
var activityRaw string
var actorRaw string
var activityJson map [ string ] json . RawMessage
err = rows . Scan ( & id , & activityRaw , & actorRaw )
if err != nil {
panic ( err )
}
err := json . Unmarshal ( [ ] byte ( activityRaw ) , & activityJson )
if err != nil {
fmt . Println ( err )
}
2021-06-30 15:00:08 +00:00
if earliestid == 0 {
earliestid = id
} else if earliestid > id {
earliestid = id
2021-03-17 23:14:24 +00:00
}
2021-02-13 02:49:36 +00:00
activityJson [ "actor" ] = json . RawMessage ( actorRaw )
activitiesJson = append ( activitiesJson , activityJson )
}
2021-03-17 23:14:24 +00:00
requestData := make ( map [ string ] int )
2021-06-30 15:00:08 +00:00
requestData [ "earliestid" ] = earliestid
2021-03-17 23:14:24 +00:00
2021-06-30 22:28:53 +00:00
totalJson := make ( map [ string ] interface { } )
2021-03-17 23:14:24 +00:00
totalJson [ "requestdata" ] = requestData
totalJson [ "activities" ] = activitiesJson
data , err := json . Marshal ( totalJson )
2021-02-13 02:49:36 +00:00
if err != nil {
2021-12-09 21:12:00 +00:00
log . Fatalf ( "error marshaling combined activity: %v\n" , err )
2021-02-13 02:49:36 +00:00
}
fmt . Fprintf ( w , "%s" , data )
}
func main ( ) {
pool = getDbPool ( )
2021-12-19 21:36:10 -05:00
metricsText = "[]"
2021-12-18 04:34:04 +00:00
go func ( ) {
for {
runMetrics ( )
2021-12-19 21:36:10 -05:00
time . Sleep ( 10 * time . Minute )
2021-12-18 04:34:04 +00:00
}
} ( )
2021-06-30 22:28:53 +00:00
2021-12-18 04:34:04 +00:00
http . HandleFunc ( "/api/v1/search" , getSearch )
http . HandleFunc ( "/api/v1/trending" , getTrending )
2021-06-30 22:28:53 +00:00
log . Print ( "Starting HTTP inbox on port http://0.0.0.0:6431" )
2021-06-30 15:00:08 +00:00
log . Fatal ( http . ListenAndServe ( "0.0.0.0:6431" , nil ) )
2021-02-13 02:49:36 +00:00
}