Source code for oumi.analyze.pipeline

# Copyright 2025 - Oumi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Analysis pipeline for orchestrating multiple analyzers."""

import logging
from collections.abc import Iterable
from pathlib import Path
from typing import TYPE_CHECKING, Any, TypeVar

import tiktoken
from pydantic import BaseModel
from tqdm import tqdm

if TYPE_CHECKING:
    import pandas as pd

from oumi.analyze.base import (
    ConversationAnalyzer,
    DatasetAnalyzer,
    MessageAnalyzer,
    PreferenceAnalyzer,
)
from oumi.core.types.conversation import Conversation

logger = logging.getLogger(__name__)

_TOKENIZER_ATTR = "tokenizer"
_CACHE_FILENAME = "analysis_results.json"

# Type aliases for consistency
AnyAnalyzer = (
    MessageAnalyzer[Any]
    | ConversationAnalyzer[Any]
    | DatasetAnalyzer[Any]
    | PreferenceAnalyzer[Any]
)
AnalysisResults = dict[str, list[BaseModel] | BaseModel]

T = TypeVar("T")


[docs] class AnalysisPipeline: """Pipeline for orchestrating multiple analyzers on a dataset. The AnalysisPipeline manages running multiple analyzers on conversations, handling different analyzer scopes appropriately, and providing unified access to results. The pipeline can inject shared resources (like tokenizers) into analyzers that need them, ensuring consistent configuration across the analysis. Note: PreferenceAnalyzers are not run by `run()`. Use `run_preference()` separately to analyze preference pairs (chosen/rejected conversations). Example: >>> from oumi.analyze import AnalysisPipeline, LengthAnalyzer >>> >>> pipeline = AnalysisPipeline( ... analyzers=[LengthAnalyzer()], ... cache_dir="./analysis_cache", ... ) >>> results = pipeline.run(conversations) Args: analyzers: List of analyzer instances to run. cache_dir: Optional directory for caching results. tokenizer: Optional tokenizer to inject into analyzers that need one. If None, uses tiktoken with the specified encoding as default. tiktoken_encoding: Tiktoken encoding to use when no tokenizer is provided. Defaults to "cl100k_base" (GPT-4 encoding). """ def __init__( self, analyzers: list[AnyAnalyzer], cache_dir: str | Path | None = None, tokenizer: Any | None = None, tiktoken_encoding: str = "cl100k_base", ): """Initialize the analysis pipeline. Args: analyzers: List of analyzer instances to run. cache_dir: Optional directory for caching results. tokenizer: Optional tokenizer to inject into analyzers that need one. Must have an `encode(text) -> list` method. If None, tiktoken is used as the default. tiktoken_encoding: Tiktoken encoding to use when no tokenizer provided. """ self.analyzers = analyzers self.cache_dir = Path(cache_dir) if cache_dir else None if tokenizer is not None: self._tokenizer = tokenizer else: self._tokenizer = tiktoken.get_encoding(tiktoken_encoding) self._results: AnalysisResults = {} self._conversations: list[Conversation] = [] self._message_to_conversation_idx: list[int] = [] self._message_analyzers: list[MessageAnalyzer[Any]] = [] self._conversation_analyzers: list[ConversationAnalyzer[Any]] = [] self._dataset_analyzers: list[DatasetAnalyzer[Any]] = [] self._preference_analyzers: list[PreferenceAnalyzer[Any]] = [] for analyzer in analyzers: self._inject_tokenizer(analyzer) if isinstance(analyzer, MessageAnalyzer): self._message_analyzers.append(analyzer) elif isinstance(analyzer, ConversationAnalyzer): self._conversation_analyzers.append(analyzer) elif isinstance(analyzer, DatasetAnalyzer): self._dataset_analyzers.append(analyzer) elif isinstance(analyzer, PreferenceAnalyzer): self._preference_analyzers.append(analyzer) def _inject_tokenizer(self, analyzer: AnyAnalyzer) -> None: if hasattr(analyzer, _TOKENIZER_ATTR): current = getattr(analyzer, _TOKENIZER_ATTR) if current is None: setattr(analyzer, _TOKENIZER_ATTR, self._tokenizer) logger.debug(f"Injected tokenizer into {analyzer.__class__.__name__}")
[docs] def run( self, conversations: list[Conversation], ) -> AnalysisResults: """Run all analyzers on the provided conversations. Note: PreferenceAnalyzers are not run by this method. Use `run_preference()` separately to analyze preference pairs. Args: conversations: List of conversations to analyze. Returns: Dictionary mapping analyzer names to their results. - For ConversationAnalyzer: list of results (one per conversation) - For MessageAnalyzer: list of results (one per message) - For DatasetAnalyzer: single result for entire dataset """ self._conversations = conversations self._results = {} self._message_to_conversation_idx = [] for conv_idx, conv in enumerate(conversations): for _ in conv.messages: self._message_to_conversation_idx.append(conv_idx) logger.info( f"Running analysis pipeline with {len(self.analyzers)} analyzers " f"on {len(conversations)} conversations" ) self._run_conversation_analyzers(conversations) self._run_message_analyzers(conversations) self._run_dataset_analyzers(conversations) logger.info(f"Analysis complete: {len(self._results)} analyzer results") if self.cache_dir: self._save_cache() return self._results
[docs] def run_preference( self, pairs: list[tuple[Conversation, Conversation]], ) -> AnalysisResults: """Run preference analyzers on conversation pairs. Args: pairs: List of (chosen, rejected) conversation tuples. Returns: Dictionary mapping analyzer names to their results. """ results: AnalysisResults = {} sorted_analyzers = self._topological_sort(self._preference_analyzers) for analyzer in sorted_analyzers: self._inject_dependencies(analyzer) name = self._get_analyzer_name(analyzer) logger.debug(f"Running preference analyzer: {name}") try: analyzer_results = analyzer.analyze_batch(pairs) results[name] = analyzer_results logger.debug(f" Completed {name}: {len(analyzer_results)} results") except Exception as e: logger.error(f" Failed {name}: {e}") raise return results
[docs] def to_dataframe(self) -> "pd.DataFrame": """Convert cached results to a pandas DataFrame. Returns: DataFrame with one row per conversation, columns for each metric. Raises: RuntimeError: If no results are cached (run() not called). """ from oumi.analyze.utils.dataframe import to_analysis_dataframe if not self._results: raise RuntimeError( "No results available. Call run() first to analyze conversations." ) return to_analysis_dataframe( self._conversations, self._results, message_to_conversation_idx=self._message_to_conversation_idx, )
[docs] def load_cache(self) -> bool: """Load results from cache directory. Note: Loaded results are raw dictionaries, not Pydantic model instances. Use `get_cached_result()` to reconstruct typed results if needed, or access raw data directly via `self.results`. Returns: True if cache was loaded successfully, False otherwise. """ import json if not self.cache_dir: return False results_path = self.cache_dir / _CACHE_FILENAME if not results_path.exists(): logger.debug(f"No cache found at {results_path}") return False try: with open(results_path) as f: self._results = json.load(f) logger.debug(f"Loaded cached results from {results_path}") return True except Exception as e: logger.warning(f"Failed to load cache: {e}") return False
@property def results(self) -> AnalysisResults: """Get the cached analysis results. Returns: Dictionary mapping analyzer names to results. """ return self._results @property def conversations(self) -> list[Conversation]: """Get the analyzed conversations. Returns: List of conversations that were analyzed. """ return self._conversations @property def message_to_conversation_idx(self) -> list[int]: """Get the mapping from message index to conversation index.""" return self._message_to_conversation_idx
[docs] def get_analyzer(self, name: str) -> AnyAnalyzer | None: """Get an analyzer by name. Args: name: Name of the analyzer to find. Returns: Analyzer instance or None if not found. """ for analyzer in self.analyzers: if self._get_analyzer_name(analyzer) == name: return analyzer return None
# ------------------------------------------------------------------------- # Private helper methods # ------------------------------------------------------------------------- def _run_conversation_analyzers(self, conversations: list[Conversation]) -> None: """Run all conversation-level analyzers in dependency order.""" sorted_analyzers = self._topological_sort(self._conversation_analyzers) for analyzer in self._iter_with_progress( sorted_analyzers, "Running conversation analyzers" ): self._inject_dependencies(analyzer) self._run_single_analyzer( analyzer, lambda a: a.analyze_batch(conversations), is_batch=True ) def _run_message_analyzers(self, conversations: list[Conversation]) -> None: """Run all message-level analyzers in dependency order.""" if not self._message_analyzers: return all_messages = [msg for conv in conversations for msg in conv.messages] sorted_analyzers = self._topological_sort(self._message_analyzers) for analyzer in self._iter_with_progress( sorted_analyzers, "Running message analyzers" ): self._inject_dependencies(analyzer) self._run_single_analyzer( analyzer, lambda a: a.analyze_batch(all_messages), is_batch=True ) def _run_dataset_analyzers(self, conversations: list[Conversation]) -> None: """Run all dataset-level analyzers in dependency order.""" sorted_analyzers = self._topological_sort(self._dataset_analyzers) for analyzer in self._iter_with_progress( sorted_analyzers, "Running dataset analyzers" ): self._inject_dependencies(analyzer) self._run_single_analyzer( analyzer, lambda a: a.analyze(conversations), is_batch=False ) def _run_single_analyzer( self, analyzer: AnyAnalyzer, run_func: Any, is_batch: bool, ) -> None: """Run a single analyzer and store results. Args: analyzer: The analyzer to run. run_func: Function that takes the analyzer and returns results. is_batch: Whether the result is a list (batch) or single value. """ name = self._get_analyzer_name(analyzer) scope = self._get_analyzer_scope(analyzer) logger.debug(f"Running {scope} analyzer: {name}") try: result = run_func(analyzer) self._results[name] = result if is_batch and isinstance(result, list): logger.debug(f" Completed {name}: {len(result)} results") else: logger.debug(f" Completed {name}") except Exception as e: logger.error(f" Failed {name}: {e}") raise def _topological_sort(self, analyzers: list[T]) -> list[T]: """Sort analyzers by dependencies using topological sort. Raises: ValueError: If there's a circular dependency. """ from graphlib import CycleError, TopologicalSorter if not analyzers: return [] name_to_analyzer: dict[str, T] = {} for analyzer in analyzers: name = self._get_analyzer_name(analyzer) # type: ignore[arg-type] name_to_analyzer[name] = analyzer all_names = set(name_to_analyzer.keys()) graph: dict[str, set[str]] = {} for analyzer in analyzers: name = self._get_analyzer_name(analyzer) # type: ignore[arg-type] depends_on = getattr(analyzer, "depends_on", None) or [] graph[name] = {dep for dep in depends_on if dep in all_names} try: sorter = TopologicalSorter(graph) sorted_names = list(sorter.static_order()) except CycleError as e: raise ValueError(f"Circular dependency detected: {e}") from e return [name_to_analyzer[name] for name in sorted_names] def _inject_dependencies(self, analyzer: AnyAnalyzer) -> None: """Inject dependency results into a derived analyzer. If the analyzer has a `depends_on` attribute listing dependency names, and a `set_dependencies` method, this will pass the results from those dependencies to the analyzer. Args: analyzer: The derived analyzer to inject dependencies into. """ depends_on = getattr(analyzer, "depends_on", None) if not depends_on: return if not hasattr(analyzer, "set_dependencies"): logger.warning( f"Analyzer {self._get_analyzer_name(analyzer)} has depends_on " f"but no set_dependencies method" ) return dependency_results: dict[str, list[BaseModel] | BaseModel] = {} for dep_name in depends_on: if dep_name in self._results: dependency_results[dep_name] = self._results[dep_name] else: logger.warning( f"Dependency '{dep_name}' not found for analyzer " f"'{self._get_analyzer_name(analyzer)}'" ) analyzer.set_dependencies(dependency_results) # type: ignore[union-attr] def _iter_with_progress(self, items: list[T], desc: str) -> Iterable[T]: """Iterate with progress bar.""" return tqdm(items, desc=desc, unit="analyzer") def _get_analyzer_name(self, analyzer: AnyAnalyzer) -> str: """Get the name for an analyzer (analyzer_id or class name).""" if analyzer.analyzer_id is not None: return analyzer.analyzer_id return analyzer.__class__.__name__ def _get_analyzer_scope(self, analyzer: AnyAnalyzer) -> str: """Get the scope name for an analyzer.""" return analyzer.get_scope() def _save_cache(self) -> None: """Save results to cache directory.""" import json if not self.cache_dir: return self.cache_dir.mkdir(parents=True, exist_ok=True) results_path = self.cache_dir / _CACHE_FILENAME serialized = {} for name, result in self._results.items(): if isinstance(result, list): serialized[name] = [r.model_dump() for r in result] else: serialized[name] = result.model_dump() with open(results_path, "w") as f: json.dump(serialized, f, indent=2, default=str) logger.debug(f"Cached results to {results_path}")