Skip to content

Commit

Permalink
Merge branch 'main' into surrogate-escape-strings
Browse files Browse the repository at this point in the history
  • Loading branch information
yunzheng authored Oct 3, 2024
2 parents d019974 + 8d6fe37 commit 426eadf
Showing 1 changed file with 46 additions and 35 deletions.
81 changes: 46 additions & 35 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,12 +523,14 @@ def get_required_fields() -> Mapping[str, RecordField]:
"""
Get required fields mapping. eg:
{
"_source": RecordField("_source", "string"),
"_classification": RecordField("_classification", "datetime"),
"_generated": RecordField("_generated", "datetime"),
"_version": RecordField("_version", "vaeint"),
}
.. code-block:: text
{
"_source": RecordField("_source", "string"),
"_classification": RecordField("_classification", "datetime"),
"_generated": RecordField("_generated", "datetime"),
"_version": RecordField("_version", "vaeint"),
}
Returns:
Mapping of required fields
Expand All @@ -540,10 +542,12 @@ def fields(self) -> Mapping[str, RecordField]:
"""
Get fields mapping (without required fields). eg:
{
"foo": RecordField("foo", "string"),
"bar": RecordField("bar", "varint"),
}
.. code-block:: text
{
"foo": RecordField("foo", "string"),
"bar": RecordField("bar", "varint"),
}
Returns:
Mapping of Record fields
Expand All @@ -556,15 +560,17 @@ def get_all_fields(self) -> Mapping[str, RecordField]:
"""
Get all fields including required meta fields. eg:
{
"ts": RecordField("ts", "datetime"),
"foo": RecordField("foo", "string"),
"bar": RecordField("bar", "varint"),
"_source": RecordField("_source", "string"),
"_classification": RecordField("_classification", "datetime"),
"_generated": RecordField("_generated", "datetime"),
"_version": RecordField("_version", "varint"),
}
.. code-block:: text
{
"ts": RecordField("ts", "datetime"),
"foo": RecordField("foo", "string"),
"bar": RecordField("bar", "varint"),
"_source": RecordField("_source", "string"),
"_classification": RecordField("_classification", "datetime"),
"_generated": RecordField("_generated", "datetime"),
"_version": RecordField("_version", "varint"),
}
Returns:
Mapping of all Record fields
Expand All @@ -591,33 +597,33 @@ def getfields(self, typename: str) -> RecordFieldSet:
return RecordFieldSet(field for field in self.fields.values() if field.typename == name)

def __call__(self, *args, **kwargs) -> Record:
"""Create a new Record initialized with `args` and `kwargs`."""
"""Create a new Record initialized with ``args`` and ``kwargs``."""
return self.recordType(*args, **kwargs)

def init_from_dict(self, rdict: dict[str, Any], raise_unknown=False) -> Record:
"""Create a new Record initialized with key, value pairs from `rdict`.
"""Create a new Record initialized with key, value pairs from ``rdict``.
If `raise_unknown=True` then fields on `rdict` that are unknown to this
If ``raise_unknown=True`` then fields on ``rdict`` that are unknown to this
RecordDescriptor will raise a TypeError exception due to initializing
with unknown keyword arguments. (default: False)
Returns:
Record with data from `rdict`
Record with data from ``rdict``
"""

if not raise_unknown:
rdict = {k: v for k, v in rdict.items() if k in self.recordType.__slots__}
return self.recordType(**rdict)

def init_from_record(self, record: Record, raise_unknown=False) -> Record:
"""Create a new Record initialized with data from another `record`.
"""Create a new Record initialized with data from another ``record``.
If `raise_unknown=True` then fields on `record` that are unknown to this
If ``raise_unknown=True`` then fields on ``record`` that are unknown to this
RecordDescriptor will raise a TypeError exception due to initializing
with unknown keyword arguments. (default: False)
Returns:
Record with data from `record`
Record with data from ``record``
"""
return self.init_from_dict(record._asdict(), raise_unknown=raise_unknown)

Expand All @@ -633,7 +639,9 @@ def extend(self, fields: Sequence[tuple[str, str]]) -> RecordDescriptor:
def get_field_tuples(self) -> tuple[tuple[str, str]]:
"""Returns a tuple containing the (typename, name) tuples, eg:
(('boolean', 'foo'), ('string', 'bar'))
.. code-block:: text
(('boolean', 'foo'), ('string', 'bar'))
Returns:
Tuple of (typename, name) tuples
Expand Down Expand Up @@ -676,7 +684,7 @@ def __repr__(self) -> str:
def definition(self, reserved: bool = True) -> str:
"""Return the RecordDescriptor as Python definition string.
If `reserved` is True it will also return the reserved fields.
If ``reserved`` is True it will also return the reserved fields.
Returns:
Descriptor definition string
Expand Down Expand Up @@ -769,7 +777,7 @@ def open_path(path: str, mode: str, clobber: bool = True) -> IO:
Args:
path: Filename or path to filename to open
mode: Could be "r", "rb" to open file for reading, "w", "wb" for writing
clobber: Overwrite file if it already exists if `clobber=True`, else raises IOError.
clobber: Overwrite file if it already exists if ``clobber=True``, else raises IOError.
"""
binary = "b" in mode
Expand Down Expand Up @@ -1040,6 +1048,8 @@ def normalize_fieldname(field_name: str) -> str:
This normalizes the name so it can still be used in flow.record.
Reserved field_names are not normalized.
.. code-block:: text
>>> normalize_fieldname("my-variable-name-with-dashes")
'my_variable_name_with_dashes'
>>> normalize_fieldname("_my_name_starting_with_underscore")
Expand Down Expand Up @@ -1100,25 +1110,26 @@ def __call__(self, *args, **kwargs):


def iter_timestamped_records(record: Record) -> Iterator[Record]:
"""Yields timestamped annotated records for each `datetime` fieldtype in `record`.
If `record` does not have any `datetime` fields the original record is returned.
"""Yields timestamped annotated records for each ``datetime`` fieldtype in ``record``.
If ``record`` does not have any ``datetime`` fields the original record is returned.
Args:
record: Record to add timestamp fields for.
Yields:
Record annotated with `ts` and `ts_description` fields for each `datetime` fieldtype.
Record annotated with ``ts`` and ``ts_description`` fields for each ``datetime`` fieldtype.
"""
# get all `datetime` fields. (excluding _generated).

# get all ``datetime`` fields. (excluding _generated).
dt_fields = record._desc.getfields("datetime")
if not dt_fields:
yield record
return

# yield a new record for each `datetime` field assigned as `ts`.
# yield a new record for each ``datetime`` field assigned as ``ts``.
record_name = record._desc.name
for field in dt_fields:
ts_record = TimestampRecord(getattr(record, field.name), field.name)
# we extend `ts_record` with original `record` so TSRecord info goes first.
# we extend ``ts_record`` with original ``record`` so TSRecord info goes first.
record = extend_record(ts_record, [record], name=record_name)
yield record

0 comments on commit 426eadf

Please sign in to comment.