Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add four pecha alignment ann transfer #4

Merged
merged 22 commits into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
parse.py

# C extensions
*.so
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ classifiers = [
]

dependencies = [
"openpecha @ git+https://github.com/OpenPecha/toolkit-v2.git"
"openpecha @ git+https://github.com/OpenPecha/toolkit-v2.git@951e2ece53254a88efab7f2c6686e974c0d1d0fd"
]

[project.optional-dependencies]
Expand Down
97 changes: 97 additions & 0 deletions src/alignment_ann_transfer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from pathlib import Path
from typing import Dict, List

from openpecha.pecha import Pecha
from stam import AnnotationStore


class AlignmentTransfer:
def get_first_layer_path(self, pecha: Pecha) -> Path:
return next(pecha.layer_path.rglob("*.json"))

def get_root_pechas_mapping(
self, root_pecha: Pecha, root_display_pecha: Pecha
) -> Dict[int, List]:
"""
Get segmentation mapping from root_pecha -> root_display_pecha
"""
display_layer_path = self.get_first_layer_path(root_display_pecha)
new_tgt_layer = self.base_update(root_pecha, root_display_pecha)

display_layer = AnnotationStore(file=str(display_layer_path))
transfer_layer = AnnotationStore(file=str(new_tgt_layer))

map = self.map_layer_to_layer(transfer_layer, display_layer)

# Clean up the layer
new_tgt_layer.unlink()
return map

def base_update(self, src_pecha: Pecha, tgt_pecha: Pecha) -> Path:
"""
1. Take the layer from src pecha
2. Migrate the layer to tgt pecha using base update
"""
src_base_name = list(src_pecha.bases.keys())[0]
tgt_base_name = list(tgt_pecha.bases.keys())[0]
tgt_pecha.merge_pecha(src_pecha, src_base_name, tgt_base_name)

src_layer_name = next(src_pecha.layer_path.rglob("*.json")).name
new_layer_path = tgt_pecha.layer_path / tgt_base_name / src_layer_name
return new_layer_path

def extract_anns(self, layer: AnnotationStore) -> Dict:
"""
Extract annotation from layer(STAM)
"""
anns = {}
for ann in layer.annotations():
start, end = ann.offset().begin().value(), ann.offset().end().value()
ann_metadata = {}
for data in ann:
ann_metadata[data.key().id()] = str(data.value())
anns[int(ann_metadata["root_idx_mapping"])] = {
"Span": {"start": start, "end": end},
"text": str(ann),
"root_idx_mapping": int(ann_metadata["root_idx_mapping"]),
}
return anns

def map_layer_to_layer(
self, src_layer: AnnotationStore, tgt_layer: AnnotationStore
):
"""
1. Extract annotations from source and target layers
2. Map the annotations from source to target layer
src_layer -> tgt_layer (One to Many)
"""
mapping: Dict = {}

src_anns = self.extract_anns(src_layer)
tgt_anns = self.extract_anns(tgt_layer)

for src_idx, src_span in src_anns.items():
src_start, src_end = src_span["Span"]["start"], src_span["Span"]["end"]
mapping[src_idx] = []

for tgt_idx, tgt_span in tgt_anns.items():
tgt_start, tgt_end = tgt_span["Span"]["start"], tgt_span["Span"]["end"]

# Check for mapping conditions
if (
src_start
<= tgt_start
< src_end # Target annotation starts within source
or src_start
< tgt_end
<= src_end # Target annotation ends within source
or (
tgt_start < src_start and tgt_end > src_end
) # Target fully contains source
) and not (
tgt_start == src_end or tgt_end == src_start
): # No exact edge overlap
mapping[src_idx].append([tgt_idx, [tgt_start, tgt_end]])

# Sort the mapping by source indices
return dict(sorted(mapping.items()))
156 changes: 5 additions & 151 deletions src/alignment_ann_transfer/commentary.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
from typing import Dict, List, Tuple

from openpecha.pecha import Pecha
from stam import AnnotationStore

from alignment_ann_transfer import AlignmentTransfer

class CommentaryAlignmentTransfer:
def get_alignment_mapping(
self, src_pecha: Pecha, tgt_pecha: Pecha
) -> Dict[int, List]:
self.base_update(src_pecha, tgt_pecha)
display_layer, transfer_layer = self.get_display_transfer_layer(
src_pecha, tgt_pecha
)
map = self.map_display_to_transfer_layer(display_layer, transfer_layer)
return map

def get_serialized_aligned_commentary(
self, src_pecha: Pecha, tgt_pecha: Pecha, commentary_pecha: Pecha
class CommentaryAlignmentTransfer(AlignmentTransfer):
def get_serialized_commentary(
self, root_pecha: Pecha, root_display_pecha: Pecha, commentary_pecha: Pecha
):
"""
Input: map from transfer_layer -> display_layer (One to Many)
Structure in a way such as : <chapter number><display idx>commentary text
Note: From many relation in display layer, take first idx (Sefaria map limitation)
"""
map = self.get_alignment_mapping(src_pecha, tgt_pecha)
map = self.get_root_pechas_mapping(root_pecha, root_display_pecha)
layer_path = next(commentary_pecha.layer_path.rglob("*.json"))

anns = self.extract_anns(AnnotationStore(file=str(layer_path)))
Expand All @@ -36,139 +26,3 @@ def get_serialized_aligned_commentary(
display_idx = display_map[0][0]
segments.append(f"<1><{display_idx}>{commentary_text}")
return segments

def get_aligned_display_commentary(
self, src_pecha: Pecha, tgt_pecha: Pecha, commentary_pecha: Pecha
) -> List[Dict]:
"""
Get map from display_layer -> transfer_layer
"""

# From transfer -> display map get display -> transfer map
map = self.get_alignment_mapping(src_pecha, tgt_pecha)
display_transfer_map = {}
for t_idx, display_map in map.items():
display_indicies = [d_map[0] for d_map in display_map]
for d_idx in display_indicies:
if d_idx not in display_transfer_map:
display_transfer_map[d_idx] = [t_idx]
else:
display_transfer_map[d_idx].append(t_idx)

# Get ann texts from display and commentary layer
display_layer, _ = self.get_display_transfer_layer(src_pecha, tgt_pecha)
display_anns = self.extract_anns(display_layer)

layer_path = next(commentary_pecha.layer_path.rglob("*.json"))
commentary_anns = self.extract_anns(AnnotationStore(file=str(layer_path)))

aligned_commentary = []
for d_idx, t_indicies in display_transfer_map.items():
display_text = display_anns[d_idx]["text"]
commentary_texts = []
for t_idx in t_indicies:
commentary_texts.append(commentary_anns[t_idx]["text"])

aligned_commentary.append(
{"display_text": display_text, "commentary_text": commentary_texts}
)
return aligned_commentary

def base_update(self, src_pecha: Pecha, tgt_pecha: Pecha):
"""
1. Take the layer from src pecha
2. Migrate the layer to tgt pecha using base update
"""
src_base_name = list(src_pecha.bases.keys())[0]
tgt_base_name = list(tgt_pecha.bases.keys())[0]
tgt_pecha.merge_pecha(src_pecha, src_base_name, tgt_base_name)

src_layer_name = next(src_pecha.layer_path.rglob("*.json")).name
new_layer = str(tgt_pecha.layer_path / tgt_base_name / src_layer_name)
return AnnotationStore(file=new_layer)

def get_display_transfer_layer(
self, src_pecha: Pecha, tgt_pecha: Pecha
) -> Tuple[AnnotationStore, AnnotationStore]:
src_layer_name = next(src_pecha.layer_path.rglob("*.json")).name
display_layer_path = next(
(
layer_path
for layer_path in tgt_pecha.layer_path.rglob("*.json")
if layer_path.name != src_layer_name
),
None,
)
new_layer_path = next(
(
layer_path
for layer_path in tgt_pecha.layer_path.rglob("*.json")
if layer_path.name == src_layer_name
),
None,
)

display_layer = AnnotationStore(file=str(display_layer_path))
new_layer = AnnotationStore(file=str(new_layer_path))
return (display_layer, new_layer)

def extract_anns(self, layer: AnnotationStore) -> Dict:
"""
Extract annotation from layer(STAM)
"""
anns = {}
for ann in layer.annotations():
start, end = ann.offset().begin().value(), ann.offset().end().value()
ann_metadata = {}
for data in ann:
ann_metadata[data.key().id()] = str(data.value())
anns[int(ann_metadata["root_idx_mapping"])] = {
"Span": {"start": start, "end": end},
"text": str(ann),
"root_idx_mapping": int(ann_metadata["root_idx_mapping"]),
}
return anns

def map_display_to_transfer_layer(
self, display_layer: AnnotationStore, transfer_layer: AnnotationStore
):
"""
1. Extract annotations from display and transfer layer
2. Map the annotations from display to transfer layer
transfer_layer -> display_layer (One to Many)
"""
map: Dict = {}

display_anns = self.extract_anns(display_layer)
transfer_anns = self.extract_anns(transfer_layer)

for t_idx, t_span in transfer_anns.items():
t_start, t_end = (
t_span["Span"]["start"],
t_span["Span"]["end"],
)
map[t_idx] = []
for d_idx, d_span in display_anns.items():
d_start, d_end = (
d_span["Span"]["start"],
d_span["Span"]["end"],
)
flag = False

# In between
if t_start <= d_start <= t_end - 1 or t_start <= d_end - 1 <= t_end - 1:
flag = True

# Contain
if d_start < t_start and d_end > t_end:
flag = True

# Overlap
if d_start == t_end or d_end == t_start:
flag = False

if flag:
map[t_idx].append([d_idx, [d_start, d_end]])
# Sort the map
map = dict(sorted(map.items()))
return map
Loading