Skip to content

Commit

Permalink
Merge pull request #49 from worldia/fix/stream-tracing
Browse files Browse the repository at this point in the history
Fix/stream tracing
  • Loading branch information
cyve authored Jan 24, 2024
2 parents 1b6233a + 047ec83 commit 50c380a
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 21 deletions.
10 changes: 4 additions & 6 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
"monolog/monolog": "^2.0",
"nyholm/dsn": "^2.0",
"nyholm/psr7": "^1.5",
"open-telemetry/sdk": "^1.0.1",
"open-telemetry/api": "^1.0.1",
"open-telemetry/context": "^1.0@dev",
"open-telemetry/sem-conv": "^1.0@dev",
"open-telemetry/api": ">=1.0.2",
"open-telemetry/sdk": "^1.0",
"promphp/prometheus_client_php": "^2.4",
"psr/http-client": "^1.0",
"symfony/dependency-injection": "*"
Expand All @@ -23,8 +21,8 @@
"doctrine/dbal": "^3.0",
"friends-of-phpspec/phpspec-expect": "^4.0",
"open-telemetry/sdk-contrib": "^1.0@dev",
"open-telemetry/transport-grpc": "^1.0@dev",
"open-telemetry/gen-otlp-protobuf": "^1.0@dev",
"open-telemetry/transport-grpc": "^1.0",
"open-telemetry/gen-otlp-protobuf": "^1.0",
"php-http/httplug": "^2.3",
"phpspec/phpspec": "^7.2",
"phpstan/phpstan": "^1.4",
Expand Down
6 changes: 3 additions & 3 deletions src/DependencyInjection/config/tracing/request.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

return static function (ContainerConfigurator $container) {
$container->services()
->set(Tracing\Propagation\EventSubscriber\RequestEventSubscriber::class)
->set(Propagation\EventSubscriber\RequestEventSubscriber::class)
->args([
service(Propagation\ForcableIdGenerator::class),
service(Propagation\IncomingTraceHeaderResolverInterface::class)->nullOnInvalid(),
])
->autoconfigure()

->set(Tracing\Instrumentation\EventSubscriber\RequestEventSubscriber::class)
->set(Instrumentation\EventSubscriber\RequestEventSubscriber::class)
->args([
service(TracerProviderInterface::class),
service(Instrumentation\MainSpanContextInterface::class),
Expand All @@ -52,7 +52,7 @@
])
->autoconfigure()

->set(Tracing\Instrumentation\EventSubscriber\AddUserEventSubscriber::class)
->set(Instrumentation\EventSubscriber\AddUserEventSubscriber::class)
->args([
service(Instrumentation\MainSpanContextInterface::class),
service(TokenStorageInterface::class)->nullOnInvalid(),
Expand Down
30 changes: 18 additions & 12 deletions src/Http/TracedResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\HttpClient\Exception\ServerException;
use Symfony\Component\HttpClient\Response\StreamableInterface;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;

Expand Down Expand Up @@ -93,20 +94,16 @@ public function getInfo(string $type = null): mixed

public function toStream(bool $throw = true)
{
try {
if ($throw) {
// Ensure headers arrived
$this->response->getHeaders();
}

if ($this->response instanceof StreamableInterface) {
return $this->stream = $this->response->toStream(false);
}
if ($throw) {
// Ensure headers arrived
$this->response->getHeaders();
}

return $this->stream = StreamWrapper::createResource($this->response);
} finally {
$this->endTracing();
if ($this->response instanceof StreamableInterface) {
return $this->stream = $this->response->toStream(false);
}

return $this->stream = StreamWrapper::createResource($this->response);
}

/**
Expand All @@ -129,6 +126,14 @@ public static function stream(HttpClientInterface $client, iterable $responses,
}

foreach ($client->stream($wrappedResponses, $timeout) as $r => $chunk) {
try {
if ($chunk->isLast() || $chunk->isTimeout()) {
$traceableMap[$r]->endTracing();
}
} catch (TransportExceptionInterface) {
$traceableMap[$r]->endTracing();
}

yield $traceableMap[$r] => $chunk;
}
}
Expand All @@ -150,6 +155,7 @@ protected function endTracing(): void
if (\in_array('response.body', $info['user_data']['span_attributes'] ?? [])) {
if (empty($this->content) && \is_resource($this->stream)) {
$this->content = stream_get_contents($this->stream) ?: null;
rewind($this->stream);
}
$this->span->setAttribute('response.body', $this->content);
}
Expand Down
6 changes: 6 additions & 0 deletions src/Http/TracingHttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Instrumentation\Tracing\Tracing;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\SDK\Common\Time\ClockInterface;
use OpenTelemetry\SemConv\TraceAttributes;
use Symfony\Component\HttpClient\DecoratorTrait;
use Symfony\Component\HttpClient\HttpClient;
Expand Down Expand Up @@ -120,6 +121,11 @@ public function request(string $method, string $url, array $options = []): Respo
$span->setAttribute(TraceAttributes::HTTP_STATUS_CODE, $info['http_code']);
$span->setAttribute(TraceAttributes::HTTP_URL, $info['url']);

if (\array_key_exists('total_time', $info)) {
$timestamp = (int) (($info['start_time'] + $info['total_time']) * ClockInterface::NANOS_PER_SECOND);
}
$span->addEvent('http.response.headers', [], $timestamp ?? null);

if ($info['http_code'] >= 400) {
$span->setStatus(StatusCode::STATUS_ERROR);
}
Expand Down
10 changes: 10 additions & 0 deletions src/Metrics/Meter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

namespace Instrumentation\Metrics;

use OpenTelemetry\API\Metrics\AsynchronousInstrument;
use OpenTelemetry\API\Metrics\CounterInterface;
use OpenTelemetry\API\Metrics\HistogramInterface;
use OpenTelemetry\API\Metrics\MeterInterface;
use OpenTelemetry\API\Metrics\ObservableCallbackInterface;
use OpenTelemetry\API\Metrics\ObservableCounterInterface;
use OpenTelemetry\API\Metrics\ObservableGaugeInterface;
use OpenTelemetry\API\Metrics\ObservableUpDownCounterInterface;
Expand Down Expand Up @@ -131,4 +133,12 @@ public function createObservableUpDownCounter(string $name, string $unit = null,
{
throw new \LogicException(sprintf('Method %s is not implemented', __METHOD__));
}

public function batchObserve(
callable $callback,
AsynchronousInstrument $instrument,
AsynchronousInstrument ...$instruments
): ObservableCallbackInterface {
throw new \LogicException(sprintf('Method %s is not implemented', __METHOD__));
}
}

0 comments on commit 50c380a

Please sign in to comment.