Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
timacdonald committed Nov 3, 2023
1 parent 2bc6e41 commit 31bd17c
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 21 deletions.
116 changes: 111 additions & 5 deletions src/Commands/WorkCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

use Carbon\CarbonImmutable;
use Illuminate\Cache\CacheManager;
use Illuminate\Config\Repository;
use Illuminate\Console\Command;
use Illuminate\Redis\RedisManager;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Sleep;
use Laravel\Pulse\Contracts\Ingest;
use Laravel\Pulse\Contracts\Storage;
use Laravel\Pulse\Pulse;
use Laravel\Pulse\Redis;
use Symfony\Component\Console\Attribute\AsCommand;
use Laravel\Pulse\Redis as RedisAdapter;

#[AsCommand(name: 'pulse:work')]
class WorkCommand extends Command
Expand All @@ -36,29 +41,130 @@ public function handle(
Ingest $ingest,
Storage $storage,
CacheManager $cache,
RedisManager $redisManager,
Repository $config,
): int {
$lastRestart = $cache->get('laravel:pulse:restart');

$lastTrimmedStorageAt = (new CarbonImmutable)->startOfMinute();

// todo: handle not having a redis connection configured / available.
// todo: refactor this out
$redis = new RedisAdapter($redisManager->connection(
$config->get('pulse.ingest.redis.connection')
), $config);

$lastUsageUpdatedAt = with($redis->get('laravel:pulse:usage_updated_at'), fn (?string $value) => $value === null
? null
: CarbonImmutable::createFromTimestamp($lastUsageUpdatedAt));

while (true) {
$now = new CarbonImmutable;

if ($lastRestart !== $cache->get('laravel:pulse:restart')) {
return self::SUCCESS;
}


$ingest->store($storage);


// todo: only do every 10 seconds
// todo: handle the correct DB connection
// todo: extract
$redis->pipeline(function (RedisAdapter $redis) use ($now, $lastUsageUpdatedAt) {
if ($lastUsageUpdatedAt === null) {
$this->info('Clearing existing usage keys.');

$redis->del('laravel:pulse:usage:user:7_days');
$redis->del('laravel:pulse:usage:usage:24_hours');
$redis->del('laravel:pulse:usage:usage:6_hours');
$redis->del('laravel:pulse:usage:usage:1_hour');
}

$this->info('Incrementing 7 day data.');

DB::table('pulse_requests')
->selectRaw('COUNT(*) as `count`, `user_id`')
->where(...dump(['date', '>=', ($lastUsageUpdatedAt ?: $now->subWeek())->toDateTimeString()]))
->where(...dump(['date', '<', $now->toDateTimeString()]))
->whereNotNull('user_id')
->orderBy('user_id')
->groupBy('user_id')
->each(function ($record) use ($redis) {
$redis->zincrby('laravel:pulse:usage:usage:7_days', $record->count, $record->user_id);
});

$this->info('Incrementing 24 hour data.');

DB::table('pulse_requests')
->selectRaw('COUNT(*) as `count`, `user_id`')
->where('date', '>=', ($lastUsageUpdatedAt ?: $now->subDay())->toDateTimeString())
->where('date', '<', $now->toDateTimeString())
->whereNotNull('user_id')
->orderBy('user_id')
->groupBy('user_id')
->each(function ($record) use ($redis) {
$redis->zincrby('laravel:pulse:usage:usage:24_hours', $record->count, $record->user_id);
});

$this->info('Incrementing 6 hour data.');

DB::table('pulse_requests')
->selectRaw('COUNT(*) as `count`, `user_id`')
->where('date', '>=', ($lastUsageUpdatedAt ?: $now->subHours(6))->toDateTimeString())
->where('date', '<', $now->toDateTimeString())
->whereNotNull('user_id')
->orderBy('user_id')
->groupBy('user_id')
->each(function ($record) use ($redis) {
$redis->zincrby('laravel:pulse:usage:usage:6_hours', $record->count, $record->user_id);
});

$this->info('Incrementing 1 hour data.');

DB::table('pulse_requests')
->selectRaw('COUNT(*) as `count`, `user_id`')
->where('date', '>=', ($lastUsageUpdatedAt ?: $now->subHour())->toDateTimeString())
->where('date', '<', $now->toDateTimeString())
->whereNotNull('user_id')
->orderBy('user_id')
->groupBy('user_id')
->each(function ($record) use ($redis) {
$redis->zincrby('laravel:pulse:usage:usage:1_hour', $record->count, $record->user_id);
});

if ($lastUsageUpdatedAt !== null) {
$this->info('Decrementing 7 day data.');

DB::table('pulse_requests')
->selectRaw('COUNT(*) as `count`, `user_id`')
->where(...dump(['date', '>=', $lastUsageUpdatedAt->subWeek()->toDateTimeString()]))
->where(...dump(['date', '<', $now->subWeek()->toDateTimeString()]))
->whereNotNull('user_id')
->orderBy('user_id')
->groupBy('user_id')
->each(function ($record) use ($redis) {
$redis->zincrby('laravel:pulse:usage:usage:7_days', $record->count, $record->user_id);
});

dd('done');
}

// todo decrement
});

$lastUsageUpdatedAt = $now;

$this->info('Done');

if ($now->subHour()->greaterThan($lastTrimmedStorageAt)) {
$storage->trim($pulse->tables());

$lastTrimmedStorageAt = $now;
}

$processed = $ingest->store($storage);

if ($processed === 0) {
Sleep::for(1)->second();
}
Sleep::for(10)->second();
}
}
}
34 changes: 19 additions & 15 deletions src/Ingests/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,30 @@ public function trim(): void
*/
public function store(Storage $storage): int
{
$entries = collect($this->connection()->xrange(
$this->stream,
'-',
'+',
$this->config->get('pulse.ingest.redis.chunk')
));
$count = 0;

if ($entries->isEmpty()) {
return 0;
}
while (true) {
$entries = collect($this->connection()->xrange(
$this->stream,
'-',
'+',
$this->config->get('pulse.ingest.redis.chunk')
));

$keys = $entries->keys();
if ($entries->isEmpty()) {
return $count;
}

$storage->store(
$entries->map(fn (array $payload): Entry|Update => unserialize($payload['data']))->values()
);
$keys = $entries->keys();

$this->connection()->xdel($this->stream, $keys);
$storage->store(
$entries->map(fn (array $payload): Entry|Update => unserialize($payload['data']))->values()
);

return $entries->count();
$this->connection()->xdel($this->stream, $keys);

$count = $count + $entries->count();
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Ingests/Storage.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public function trim(): void
*/
public function store(StorageContract $store): int
{
throw new RuntimeException('The storage ingest driver does not need to process entries.');
return 0;
}
}
5 changes: 5 additions & 0 deletions src/Queries/Usage.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ public function __invoke(Interval $interval, string $type): Collection
->filter()
->values();
}

public function warm()
{
//
}
}
24 changes: 24 additions & 0 deletions src/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ public function __construct(
//
}

/**
* Get the key.
*/
public function get(string $key): null|string|Pipeline|PhpRedis
{
return $this->client()->get($key);
}

/**
* Delete the key.
*/
public function del(string $key): int|Pipeline|PhpRedis
{
return $this->client()->del($key);
}

/**
* Increment the given members value.
*/
public function zincrby(string $key, int $increment, string $member): int|float|Pipeline|PhpRedis
{
return $this->client()->zincrby($key, $increment, $member);
}

/**
* Add an entry to the stream.
*
Expand Down

0 comments on commit 31bd17c

Please sign in to comment.