Skip to content

Commit

Permalink
Merge branch 'main' into mchok-cli-nested-phases-in-validate-and-proc…
Browse files Browse the repository at this point in the history
…essors
  • Loading branch information
sfc-gh-mchok authored Nov 13, 2024
2 parents a25569c + afd611e commit 01213ad
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
import re
from pathlib import Path
from textwrap import dedent
from typing import List, Literal, Optional, Union
Expand Down Expand Up @@ -67,6 +68,7 @@
from snowflake.cli.api.project.schemas.v1.native_app.package import DistributionOptions
from snowflake.cli.api.project.schemas.v1.native_app.path_mapping import PathMapping
from snowflake.cli.api.project.util import (
SCHEMA_AND_NAME,
append_test_resource_suffix,
extract_schema,
identifier_to_show_like_pattern,
Expand Down Expand Up @@ -141,6 +143,15 @@ def transform_artifacts(

return transformed_artifacts

@field_validator("stage")
@classmethod
def validate_source_stage(cls, input_value: str):
if not re.match(SCHEMA_AND_NAME, input_value):
raise ValueError(
"Incorrect value for stage of native_app. Expected format for this field is {schema_name}.{stage_name} "
)
return input_value


class ApplicationPackageEntity(EntityBase[ApplicationPackageEntityModel]):
"""
Expand Down
29 changes: 29 additions & 0 deletions src/snowflake/cli/_plugins/spcs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,34 @@ def handle_object_already_exists(
raise error


def filter_log_timestamp(log: str, include_timestamps: bool) -> str:
if include_timestamps:
return log
else:
return log.split(" ", 1)[1] if " " in log else log


def new_logs_only(prev_log_records: list[str], new_log_records: list[str]) -> list[str]:
# Sort the log records, we get time-ordered logs
# due to ISO 8601 timestamp format in the log content
# eg: 2024-10-22T01:12:29.873896187Z Count: 1
new_log_records_sorted = sorted(new_log_records)

# Get the first new log record to establish the overlap point
first_new_log_record = new_log_records_sorted[0]

# Traverse previous logs in reverse and remove duplicates from new logs
for prev_log in reversed(prev_log_records):
# Stop if the previous log is earlier than the first new log
if prev_log < first_new_log_record:
break

# Remove matching previous logs from the new logs list
if prev_log in new_log_records_sorted:
new_log_records_sorted.remove(prev_log)

return new_log_records_sorted


class NoPropertiesProvidedError(ClickException):
pass
65 changes: 53 additions & 12 deletions src/snowflake/cli/_plugins/spcs/services/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

from __future__ import annotations

import sys
import itertools
from pathlib import Path
from typing import List, Optional
from typing import Generator, Iterable, List, Optional, cast

import typer
from click import ClickException
Expand All @@ -26,7 +26,6 @@
)
from snowflake.cli._plugins.object.common import CommentOption, Tag, TagOption
from snowflake.cli._plugins.spcs.common import (
print_log_lines,
validate_and_set_instances,
)
from snowflake.cli._plugins.spcs.services.manager import ServiceManager
Expand All @@ -38,12 +37,15 @@
)
from snowflake.cli.api.commands.snow_typer import SnowTyperFactory
from snowflake.cli.api.constants import ObjectType
from snowflake.cli.api.exceptions import IncompatibleParametersError
from snowflake.cli.api.identifiers import FQN
from snowflake.cli.api.output.types import (
CommandResult,
MessageResult,
QueryJsonValueResult,
QueryResult,
SingleQueryResult,
StreamResult,
)
from snowflake.cli.api.project.util import is_valid_object_name

Expand Down Expand Up @@ -221,20 +223,59 @@ def logs(
num_lines: int = typer.Option(
500, "--num-lines", help="Number of lines to retrieve."
),
since_timestamp: Optional[str] = typer.Option(
"", "--since", help="Timestamp to retrieve logs from"
),
include_timestamps: bool = typer.Option(
False, "--include-timestamps", help="Include timestamps in logs", is_flag=True
),
follow: bool = typer.Option(
False, "--follow", "-f", help="Continue polling for logs.", is_flag=True
),
follow_interval: int = typer.Option(
2,
"--follow-interval",
help="Polling interval in seconds when using the --follow flag",
),
**options,
):
"""
Retrieves local logs from a service container.
"""
results = ServiceManager().logs(
service_name=name.identifier,
instance_id=instance_id,
container_name=container_name,
num_lines=num_lines,
)
cursor = results.fetchone()
logs = next(iter(cursor)).split("\n")
print_log_lines(sys.stdout, name, "0", logs)
if follow:
if num_lines != 500:
raise IncompatibleParametersError(["--follow", "--num-lines"])

manager = ServiceManager()

if follow:
stream: Iterable[CommandResult] = (
MessageResult(log_batch)
for log_batch in manager.stream_logs(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
num_lines=num_lines,
since_timestamp=since_timestamp,
include_timestamps=include_timestamps,
interval_seconds=follow_interval,
)
)
stream = itertools.chain(stream, [MessageResult("")])
else:
stream = (
MessageResult(log)
for log in manager.logs(
service_name=name.identifier,
container_name=container_name,
instance_id=instance_id,
num_lines=num_lines,
since_timestamp=since_timestamp,
include_timestamps=include_timestamps,
)
)

return StreamResult(cast(Generator[CommandResult, None, None], stream))


@app.command(requires_connection=True)
Expand Down
65 changes: 62 additions & 3 deletions src/snowflake/cli/_plugins/spcs/services/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
from __future__ import annotations

import json
import time
from pathlib import Path
from typing import List, Optional

import yaml
from snowflake.cli._plugins.object.common import Tag
from snowflake.cli._plugins.spcs.common import (
NoPropertiesProvidedError,
filter_log_timestamp,
handle_object_already_exists,
new_logs_only,
strip_empty_lines,
)
from snowflake.cli.api.constants import DEFAULT_SIZE_LIMIT_MB, ObjectType
Expand Down Expand Up @@ -133,12 +136,68 @@ def status(self, service_name: str) -> SnowflakeCursor:
return self.execute_query(f"CALL SYSTEM$GET_SERVICE_STATUS('{service_name}')")

def logs(
self, service_name: str, instance_id: str, container_name: str, num_lines: int
self,
service_name: str,
instance_id: str,
container_name: str,
num_lines: int,
since_timestamp: str = "",
include_timestamps: bool = False,
):
return self.execute_query(
f"call SYSTEM$GET_SERVICE_LOGS('{service_name}', '{instance_id}', '{container_name}', {num_lines});"
cursor = self.execute_query(
f"call SYSTEM$GET_SERVICE_LOGS('{service_name}', '{instance_id}', '{container_name}', "
f"{num_lines}, False, '{since_timestamp}', {include_timestamps});"
)

for log in cursor.fetchall():
yield log[0] if isinstance(log, tuple) else log

def stream_logs(
self,
service_name: str,
instance_id: str,
container_name: str,
num_lines: int,
since_timestamp: str,
include_timestamps: bool,
interval_seconds: int,
):
try:
prev_timestamp = since_timestamp
prev_log_records: List[str] = []

while True:
raw_log_blocks = [
log
for log in self.logs(
service_name=service_name,
instance_id=instance_id,
container_name=container_name,
num_lines=num_lines,
since_timestamp=prev_timestamp,
include_timestamps=True,
)
]

new_log_records = []
for block in raw_log_blocks:
new_log_records.extend(block.split("\n"))

new_log_records = [line for line in new_log_records if line.strip()]

if new_log_records:
dedup_log_records = new_logs_only(prev_log_records, new_log_records)
for log in dedup_log_records:
yield filter_log_timestamp(log, include_timestamps)

prev_timestamp = dedup_log_records[-1].split(" ", 1)[0]
prev_log_records = dedup_log_records

time.sleep(interval_seconds)

except KeyboardInterrupt:
return

def upgrade_spec(self, service_name: str, spec_path: Path):
spec = self._read_yaml(spec_path)
query = f"alter service {service_name} from specification $$ {spec} $$"
Expand Down
22 changes: 14 additions & 8 deletions tests/__snapshots__/test_help_messages.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -7293,14 +7293,20 @@
| [required] |
+------------------------------------------------------------------------------+
+- Options --------------------------------------------------------------------+
| * --container-name TEXT Name of the container. |
| [required] |
| * --instance-id TEXT ID of the service instance, starting |
| with 0. |
| [required] |
| --num-lines INTEGER Number of lines to retrieve. |
| [default: 500] |
| --help -h Show this message and exit. |
| * --container-name TEXT Name of the container. |
| [required] |
| * --instance-id TEXT ID of the service instance, |
| starting with 0. |
| [required] |
| --num-lines INTEGER Number of lines to retrieve. |
| [default: 500] |
| --since TEXT Timestamp to retrieve logs from |
| --include-timestamps Include timestamps in logs |
| --follow -f Continue polling for logs. |
| --follow-interval INTEGER Polling interval in seconds when |
| using the --follow flag |
| [default: 2] |
| --help -h Show this message and exit. |
+------------------------------------------------------------------------------+
+- Connection configuration ---------------------------------------------------+
| --connection,--environment -c TEXT Name of the connection, as |
Expand Down
21 changes: 20 additions & 1 deletion tests/project/test_project_definition_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,33 @@
"bundle_root": "",
"deploy_root": "",
"generated_root": "",
"stage": "stage",
"stage": "schema.stage",
"scratch_stage": "scratch_stage",
"distribution": "internal",
}
}
},
None,
],
[
{
"entities": {
"pkg": {
"type": "application package",
"identifier": "",
"artifacts": [],
"manifest": "",
"bundle_root": "",
"deploy_root": "",
"generated_root": "",
"stage": "just_stage",
"scratch_stage": "scratch_stage",
"distribution": "internal",
}
}
},
"Incorrect value for stage of native_app. Expected format for this field is {schema_name}.{stage_name}",
],
[
{
"entities": {
Expand Down
Loading

0 comments on commit 01213ad

Please sign in to comment.