Skip to content

Commit

Permalink
Add import command and use UTC timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
xoltia committed Oct 17, 2023
1 parent 137693f commit 18f43bc
Show file tree
Hide file tree
Showing 6 changed files with 410 additions and 17 deletions.
1 change: 1 addition & 0 deletions cmd/botsu/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func main() {
bot.AddCommand(commands.ChartCommandData, commands.NewChartCommand(activityRepo, userRepo, guildRepo))
bot.AddCommand(commands.GuildConfigCommandData, commands.NewGuildConfigCommand(guildRepo))
bot.AddCommand(commands.ExportCommandData, commands.NewExportCommand(activityRepo))
bot.AddCommand(commands.ImportCommandData, commands.NewImportCommand(activityRepo))

log.Println("Logging in")

Expand Down
1 change: 1 addition & 0 deletions internal/activities/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Activity struct {
Meta interface{} `json:"meta"`
CreatedAt time.Time `json:"created_at"`
DeletedAt *time.Time `json:"deleted_at"`
ImportedAt *time.Time `json:"imported_at"`
}

func NewActivity() *Activity {
Expand Down
173 changes: 160 additions & 13 deletions internal/activities/repository.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"github.com/jackc/pgx/v5"

orderedmap "github.com/UTD-JLA/botsu/pkg/ordered_map"
"github.com/jackc/pgx/v5/pgxpool"
)
Expand All @@ -19,6 +21,11 @@ type UserActivityPage struct {
Page int
}

type ImportInfo struct {
Timestamp time.Time
Count int
}

type ActivityRepository struct {
pool *pgxpool.Pool
}
Expand Down Expand Up @@ -54,8 +61,128 @@ func (r *ActivityRepository) Create(ctx context.Context, activity *Activity) err
return err
}

func (r *ActivityRepository) GetTotalByUserIDGroupByVideoChannel(ctx context.Context, userID string, start, end time.Time) (orderedmap.Map[time.Duration], error) {
query := `
func (r *ActivityRepository) ImportMany(ctx context.Context, as []*Activity) error {
conn, err := r.pool.Acquire(ctx)

if err != nil {
return err
}

defer conn.Release()

columnNames := []string{
"user_id",
"guild_id",
"name",
"primary_type",
"media_type",
"duration",
"date",
"meta",
"created_at",
"deleted_at",
"imported_at",
}

now := time.Now().UTC()
rows := make([][]interface{}, len(as))

for i, a := range as {
rows[i] = []interface{}{
a.UserID,
a.GuildID,
a.Name,
a.PrimaryType,
a.MediaType,
a.Duration,
a.Date,
a.Meta,
a.CreatedAt,
a.DeletedAt,
now,
}
}

_, err = conn.CopyFrom(ctx, pgx.Identifier{"activities"}, columnNames, pgx.CopyFromRows(rows))

return err
}

func (r *ActivityRepository) UndoImportByUserIDAndTimestamp(
ctx context.Context,
userID string,
timestamp time.Time,
) (int64, error) {
conn, err := r.pool.Acquire(ctx)

if err != nil {
return 0, err
}

defer conn.Release()

const sql = `
UPDATE activities
SET deleted_at = NOW() AT TIME ZONE 'UTC'
WHERE user_id = $1
AND imported_at = $2 AT TIME ZONE 'UTC'
AND deleted_at IS NULL
`

tag, err := conn.Exec(ctx, sql, userID, timestamp)
return tag.RowsAffected(), err
}

func (r *ActivityRepository) GetRecentImportsByUserID(
ctx context.Context,
userID string,
limit int,
) ([]ImportInfo, error) {
conn, err := r.pool.Acquire(ctx)

if err != nil {
return nil, err
}

defer conn.Release()

const query = `
SELECT imported_at, COUNT(*) as count
FROM activities
WHERE user_id = $1
AND imported_at IS NOT NULL
AND deleted_at IS NULL
GROUP BY imported_at
ORDER BY imported_at DESC
LIMIT $2
`

rows, err := conn.Query(ctx, query, userID, limit)

if err != nil {
return nil, err
}

var importHistory []ImportInfo

for rows.Next() {
importInfo := ImportInfo{}
if err = rows.Scan(&importInfo.Timestamp, &importInfo.Count); err != nil {
return nil, err
}

importHistory = append(importHistory, importInfo)
}

return importHistory, nil
}

func (r *ActivityRepository) GetTotalByUserIDGroupByVideoChannel(
ctx context.Context,
userID string,
start, end time.Time,
) (orderedmap.Map[time.Duration], error) {
const query = `
SELECT
COALESCE(SUM(duration), 0) AS total_duration,
meta->>'channel_handle' AS channel_handle
Expand Down Expand Up @@ -103,8 +230,12 @@ func (r *ActivityRepository) GetTotalByUserIDGroupByVideoChannel(ctx context.Con
return channels, nil
}

func (r *ActivityRepository) GetTotalByUserIDGroupedByMonth(ctx context.Context, userID, guildID string, start, end time.Time) (orderedmap.Map[time.Duration], error) {
query := `
func (r *ActivityRepository) GetTotalByUserIDGroupedByMonth(
ctx context.Context,
userID, guildID string,
start, end time.Time,
) (orderedmap.Map[time.Duration], error) {
const query = `
SELECT
to_char(date_series.month, 'YYYY-MM') AS month,
COALESCE(SUM(duration), 0) AS total_duration
Expand Down Expand Up @@ -163,9 +294,13 @@ func (r *ActivityRepository) GetTotalByUserIDGroupedByMonth(ctx context.Context,

// Returns map of day (YYYY-MM-DD) to total duration
// filling in missing days with 0 (string formatted according to user's timezone)
func (r *ActivityRepository) GetTotalByUserIDGroupedByDay(ctx context.Context, userID, guildID string, start, end time.Time) (orderedmap.Map[time.Duration], error) {
func (r *ActivityRepository) GetTotalByUserIDGroupedByDay(
ctx context.Context,
userID, guildID string,
start, end time.Time,
) (orderedmap.Map[time.Duration], error) {
// day should be truncated to a string `YYYY-MM-DD` in the user's timezone
query := `
const query = `
SELECT
to_char(date_series.day, 'YYYY-MM-DD') AS day,
COALESCE(SUM(duration), 0) AS total_duration
Expand Down Expand Up @@ -223,7 +358,7 @@ func (r *ActivityRepository) GetTotalByUserIDGroupedByDay(ctx context.Context, u
}

func (r *ActivityRepository) GetLatestByUserID(ctx context.Context, userID, guildID string) (*Activity, error) {
query := `
const query = `
SELECT activities.id,
user_id,
guild_id,
Expand All @@ -234,6 +369,7 @@ func (r *ActivityRepository) GetLatestByUserID(ctx context.Context, userID, guil
date at time zone COALESCE(u.timezone, g.timezone, 'UTC'),
created_at,
deleted_at,
imported_at,
meta
FROM activities
LEFT JOIN users u ON activities.user_id = u.id
Expand Down Expand Up @@ -265,6 +401,7 @@ func (r *ActivityRepository) GetLatestByUserID(ctx context.Context, userID, guil
&activity.Date,
&activity.CreatedAt,
&activity.DeletedAt,
&activity.ImportedAt,
&activity.Meta,
)

Expand All @@ -276,7 +413,7 @@ func (r *ActivityRepository) GetLatestByUserID(ctx context.Context, userID, guil
}

func (r *ActivityRepository) GetByID(ctx context.Context, id uint64, guildID string) (*Activity, error) {
query := `
const query = `
SELECT activities.id,
user_id,
guild_id,
Expand All @@ -287,6 +424,7 @@ func (r *ActivityRepository) GetByID(ctx context.Context, id uint64, guildID str
date at time zone COALESCE(u.timezone, g.timezone, 'UTC'),
created_at,
deleted_at,
imported_at,
meta
FROM activities
LEFT JOIN users u ON activities.user_id = u.id
Expand Down Expand Up @@ -316,6 +454,7 @@ func (r *ActivityRepository) GetByID(ctx context.Context, id uint64, guildID str
&activity.Date,
&activity.CreatedAt,
&activity.DeletedAt,
&activity.ImportedAt,
&activity.Meta,
)

Expand All @@ -327,7 +466,7 @@ func (r *ActivityRepository) GetByID(ctx context.Context, id uint64, guildID str
}

func (r *ActivityRepository) GetAllByUserID(ctx context.Context, userID, guildID string) ([]*Activity, error) {
query := `
const query = `
SELECT activities.id,
user_id,
guild_id,
Expand All @@ -338,6 +477,7 @@ func (r *ActivityRepository) GetAllByUserID(ctx context.Context, userID, guildID
date at time zone COALESCE(u.timezone, g.timezone, 'UTC'),
created_at,
deleted_at,
imported_at,
meta
FROM activities
LEFT JOIN users u ON activities.user_id = u.id
Expand Down Expand Up @@ -378,6 +518,7 @@ func (r *ActivityRepository) GetAllByUserID(ctx context.Context, userID, guildID
&activity.Date,
&activity.CreatedAt,
&activity.DeletedAt,
&activity.ImportedAt,
&activity.Meta,
); err != nil {
return nil, err
Expand All @@ -388,8 +529,12 @@ func (r *ActivityRepository) GetAllByUserID(ctx context.Context, userID, guildID
return activities, nil
}

func (r *ActivityRepository) PageByUserID(ctx context.Context, userID, guildID string, limit, offset int) (*UserActivityPage, error) {
query := `
func (r *ActivityRepository) PageByUserID(
ctx context.Context,
userID, guildID string,
limit, offset int,
) (*UserActivityPage, error) {
const query = `
SELECT activities.id,
user_id,
guild_id,
Expand All @@ -400,6 +545,7 @@ func (r *ActivityRepository) PageByUserID(ctx context.Context, userID, guildID s
date at time zone COALESCE(u.timezone, g.timezone, 'UTC'),
created_at,
deleted_at,
imported_at,
meta,
CEIL(COUNT(*) OVER() / $3::float) AS page_count,
CEIL($4::float / $3::float) + 1 AS page
Expand Down Expand Up @@ -445,6 +591,7 @@ func (r *ActivityRepository) PageByUserID(ctx context.Context, userID, guildID s
&activity.Date,
&activity.CreatedAt,
&activity.DeletedAt,
&activity.ImportedAt,
&activity.Meta,
&page.PageCount,
&page.Page,
Expand All @@ -467,7 +614,7 @@ func (r *ActivityRepository) DeleteById(ctx context.Context, id uint64) error {

_, err = conn.Exec(ctx, `
UPDATE activities
SET deleted_at = NOW()
SET deleted_at = NOW() AT TIME ZONE 'UTC'
WHERE id = $1
`, id)

Expand Down Expand Up @@ -527,7 +674,7 @@ func (r *ActivityRepository) GetAvgSpeedByMediaTypeAndUserID(ctx context.Context

defer conn.Release()

query := `
const query = `
SELECT COALESCE(AVG((meta->'speed')::numeric), 0)
FROM activities
WHERE user_id = $1
Expand Down
Loading

0 comments on commit 18f43bc

Please sign in to comment.