railtracks.evaluations

 1from .evaluators import JudgeEvaluator, LLMInferenceEvaluator, ToolUseEvaluator, metrics
 2from .point import extract_agent_data_points
 3from .runners._evaluate import evaluate
 4
 5__all__ = [
 6    "metrics",
 7    "evaluate",
 8    "extract_agent_data_points",
 9    "JudgeEvaluator",
10    "ToolUseEvaluator",
11    "LLMInferenceEvaluator",
12]
def evaluate( data: railtracks.evaluations.point.AgentDataPoint | list[railtracks.evaluations.point.AgentDataPoint], evaluators: list[railtracks.evaluations.evaluators.evaluator.Evaluator], agent_selection: bool = True, agents: list[str] | None = None, name: str | None = None, payload_callback: Optional[Callable[[dict[str, Any]], Any]] = None):
135def evaluate(
136    data: AgentDataPoint | list[AgentDataPoint],
137    evaluators: list[Evaluator],
138    agent_selection: bool = True,
139    agents: list[str] | None = None,
140    name: str | None = None,
141    payload_callback: Callable[[dict[str, Any]], Any] | None = None,
142):
143    """Evaluate agent data using the provided evaluators.
144
145    Args:
146        data: The agent data to be evaluated. Can be a single AgentDataPoint, a list of AgentDataPoints, or an EvaluationDataset.
147        evaluators: A list of Evaluator instances to run on the data.
148        agent_selection: If True and multiple agents are found in the data, prompts the user to select which agents to evaluate.
149                         If False, evaluates all agents without prompting.
150        agents: An optional list of agent names to evaluate. If provided, only these agents will be evaluated. Overrides agent_selection if both are provided.
151        name: An optional name for the evaluation, which will be included in the EvaluationResult.
152        payload_callback: An optional callback function that will be called with the evaluation results payload after evaluation is complete. Can be used for custom logging, notifications, etc.
153    Returns:
154        A list of EvaluationResult instances containing the results from each evaluator.
155    """
156    _check_evaluators(evaluators)
157
158    data_dict, agents = _setup_agent_data(data, agent_selection, agents)
159
160    evaluation_results: list[EvaluationResult] = []
161
162    for agent_name in agents:
163        logger.info(
164            f"Evaluation for {agent_name} with {len(data_dict[agent_name])} data points CREATED"
165        )
166
167        evaluator_results: list[EvaluatorResult] = []
168
169        start_time = datetime.now(timezone.utc)
170        for evaluator in evaluators:
171            logger.info(f"Evaluator: {evaluator.__class__.__name__} CREATED")
172            try:
173                result = evaluator.run(data_dict[agent_name])
174            except Exception as e:
175                logger.error(f"Evaluator {evaluator.__class__.__name__} FAILED: {e}")
176                continue
177
178            evaluator_results.append(result)
179            logger.info(f"Evaluator: {evaluator.__class__.__name__} DONE")
180
181        logger.info(f"Evaluation for {agent_name} DONE.")
182
183        metrics_map = {}
184        for er in evaluator_results:
185            metrics = er.metrics
186            for metric in metrics:
187                metrics_map[metric.identifier] = metric
188
189        end_time = datetime.now(timezone.utc)
190
191        evaluation_results.append(
192            EvaluationResult(
193                evaluation_name=name or None,
194                created_at=start_time,
195                completed_at=end_time,
196                agents=[
197                    {
198                        "agent_name": agent_name,
199                        "agent_node_ids": [
200                            {
201                                "session_id": adp.session_id,
202                                "agent_node_id": adp.identifier,
203                            }
204                            for adp in data_dict[agent_name]
205                        ],
206                    }
207                ],
208                metrics_map=metrics_map,
209                evaluator_results=evaluator_results,
210            )
211        )
212
213    logger.info("Evaluation DONE.")
214
215    if payload_callback is not None:
216        try:
217            for result in evaluation_results:
218                payload_callback(payload(result))
219        except Exception as e:
220            logger.error(f"Failed to execute payload callback: {e}")
221
222    try:
223        save(evaluation_results)
224    except Exception as e:
225        logger.error(f"Failed to save evaluation results: {e}")
226    return evaluation_results

Evaluate agent data using the provided evaluators.

Arguments:
  • data: The agent data to be evaluated. Can be a single AgentDataPoint, a list of AgentDataPoints, or an EvaluationDataset.
  • evaluators: A list of Evaluator instances to run on the data.
  • agent_selection: If True and multiple agents are found in the data, prompts the user to select which agents to evaluate. If False, evaluates all agents without prompting.
  • agents: An optional list of agent names to evaluate. If provided, only these agents will be evaluated. Overrides agent_selection if both are provided.
  • name: An optional name for the evaluation, which will be included in the EvaluationResult.
  • payload_callback: An optional callback function that will be called with the evaluation results payload after evaluation is complete. Can be used for custom logging, notifications, etc.
Returns:

A list of EvaluationResult instances containing the results from each evaluator.

def extract_agent_data_points( session_files: list[str] | str) -> list[railtracks.evaluations.point.AgentDataPoint]:
317def extract_agent_data_points(session_files: list[str] | str) -> list[AgentDataPoint]:
318    """
319    Extract AgentDataPoint instances from session JSON files.
320
321    This function processes Railtracks session JSON files and creates AgentDataPoint
322    instances for each agent execution found. It extracts agent inputs, outputs, and
323    internals (including LLM metrics if available).
324
325    Args:
326        session_files: List of paths to session JSON files, a single file path, or a directory path.
327                      If a directory is provided, all files within it will be processed.
328
329    Returns:
330        List of AgentDataPoint instances, one for each agent execution found in the files.
331        Returns empty list if no valid agent data is found.
332    """
333    file_paths = resolve_file_paths(session_files)
334    data_points = []
335
336    for file_path in file_paths:
337        try:
338            session_data = load_session(file_path)
339        except (FileNotFoundError, ValueError) as e:
340            logger.error(str(e))
341            continue
342
343        session_id = session_data.get("session_id")
344        if session_id is None:
345            logger.warning(f"no session_id found in file: {file_path}, skipping file.")
346            continue
347
348        runs = session_data.get("runs", [])
349
350        if len(runs) == 0:
351            logger.warning(f"Session file {file_path} contains no runs")
352        if len(runs) > 1:
353            logger.warning(f"Session file {file_path} contains multiple runs")
354
355        for run in runs:
356            nodes = {
357                UUID(node["identifier"]): NodeDataPoint(
358                    identifier=node["identifier"],
359                    node_type=NodeType(node["node_type"]),
360                    name=node["name"],
361                    details=node.get("details", {}),
362                )
363                for node in run.get("nodes", [])
364            }
365
366            edges: dict[tuple[UUID | None, UUID], EdgeDataPoint] = {}
367            for edge in run.get("edges", []):
368                key = (
369                    (UUID(edge["source"]), UUID(edge["target"]))
370                    if edge["source"] is not None
371                    else (None, UUID(edge["target"]))
372                )
373                edges[key] = EdgeDataPoint(
374                    identifier=UUID(edge["identifier"]),
375                    source=UUID(edge["source"]) if edge["source"] is not None else None,
376                    target=UUID(edge["target"]),
377                    details=edge.get("details", {}),
378                )
379
380            graph, sink_list = construct_graph(edges)
381
382            for node in nodes.values():
383                if node.node_type == NodeType.AGENT:
384                    llm_details_dict = node.details.get("internals", {}).get(
385                        "llm_details", []
386                    )
387                    llm_details = (
388                        extract_llm_details(llm_details_dict)
389                        if llm_details_dict
390                        else LLMDetails(calls=[])
391                    )
392
393                    tool_details = extract_tool_details(
394                        nodes, edges, graph, node.identifier
395                    )
396
397                    agent_input, agent_output = extract_agent_io(
398                        sink_list, node, file_path
399                    )
400
401                    data_points.append(
402                        AgentDataPoint(
403                            identifier=node.identifier,
404                            session_id=UUID(session_id),
405                            agent_name=node.name,
406                            agent_input=agent_input,
407                            agent_output=agent_output,
408                            llm_details=llm_details,
409                            tool_details=tool_details,
410                        )
411                    )
412
413    return data_points

Extract AgentDataPoint instances from session JSON files.

This function processes Railtracks session JSON files and creates AgentDataPoint instances for each agent execution found. It extracts agent inputs, outputs, and internals (including LLM metrics if available).

Arguments:
  • session_files: List of paths to session JSON files, a single file path, or a directory path. If a directory is provided, all files within it will be processed.
Returns:

List of AgentDataPoint instances, one for each agent execution found in the files. Returns empty list if no valid agent data is found.

class JudgeEvaluator(railtracks.evaluations.evaluators.evaluator.Evaluator):
 30class JudgeEvaluator(Evaluator):
 31    def __init__(
 32        self,
 33        llm: rt.llm.ModelBase,
 34        metrics: list[Metric],
 35        system_prompt: str | None = None,
 36        timeout: float | None = None,
 37        reasoning: bool = True,
 38    ):
 39        """
 40        The JudgeEvaluator with a system prompt, LLM, metric, and reasoning flag.
 41
 42        Args:
 43            system_prompt: The system prompt template for the judge LLM.
 44            llm: The LLM model to be used as the judge.
 45            metrics: A list of Metrics to guide the evaluation.
 46            reasoning: A flag indicating whether the judge should provide reasoning for its evaluations.
 47        """
 48        # These are config not state
 49        self._metrics: dict[str, Metric] = {m.identifier: m for m in metrics}
 50        for m in metrics:
 51            if isinstance(m, Categorical):
 52                self._metrics[m.identifier] = m
 53            else:
 54                logger.warning(
 55                    f"JudgeEvaluator currently only supports Categorical metrics, metric {m.name} of type {type(m)} will be skipped."
 56                )
 57        self._llm = llm
 58        self._reasoning: bool = reasoning
 59        self._template = self._load_yaml()
 60        self._system_prompt = (
 61            system_prompt
 62            if system_prompt is not None
 63            else self._template["system_prompt"]
 64        )
 65        super().__init__()
 66
 67        self.timeout = timeout
 68        self._judge = rt.agent_node(
 69            llm=self._llm,
 70            output_schema=JudgeResponseSchema,
 71            tool_nodes=[],
 72        )
 73
 74    def run(
 75        self, data: list[AgentDataPoint]
 76    ) -> EvaluatorResult[Metric, MetricResult, CategoricalAggregateNode]:
 77        # (metric_id, adp_id, JudgeResponseSchema)
 78        judge_outputs: list[tuple[str, str, JudgeResponseSchema]] = self._invoke(data)
 79
 80        self.agent_data_ids = {adp.identifier for adp in data}
 81        results: dict[Metric, list[MetricResult]] = defaultdict(list)
 82        forest = AggregateForest[CategoricalAggregateNode, MetricResult]()
 83
 84        for output in judge_outputs:
 85            metric = self._metrics[output[0]]
 86
 87            metric_result = MetricResult(
 88                result_name=f"JudgeResult/{metric.name}",
 89                metric_id=metric.identifier,
 90                agent_data_id=[UUID(output[1])],
 91                value=output[2].metric_value,
 92            )
 93            results[metric].append(metric_result)
 94            forest.add_node(metric_result)
 95
 96            if self._reasoning:
 97                reasoning_metric = Metric(name=f"{metric.name}_reasoning")
 98                if output[2].reasoning is not None:
 99                    results[reasoning_metric].append(
100                        MetricResult(
101                            result_name=f"JudgeReasoning/{metric.name}",
102                            metric_id=reasoning_metric.identifier,
103                            agent_data_id=[UUID(output[1])],
104                            value=output[2].reasoning,
105                        )
106                    )
107                else:
108                    logger.warning(
109                        f"No reasoning returned for Judge Evaluator Metric: {metric.name}, AgentDataPoint ID: {output[1]}"
110                    )
111
112        self._aggregate_metrics(results, forest)
113
114        self._result = EvaluatorResult(
115            evaluator_name=self.name,
116            evaluator_id=self.identifier,
117            agent_data_ids=self.agent_data_ids,
118            metric_results=[item for sublist in results.values() for item in sublist],
119            aggregate_results=forest,
120            metrics=list(self._metrics.values()),
121        )
122        return self._result
123
124    def __repr__(self) -> str:
125        return (
126            f"JudgeEvaluator, "
127            f"llm={self._llm}, "
128            f"metrics={list(self._metrics.values())}, "
129            f"reasoning={self._reasoning})"
130        )
131
132    def _invoke(
133        self, data: list[AgentDataPoint]
134    ) -> list[tuple[str, str, JudgeResponseSchema]]:
135        @rt.function_node
136        async def judge_flow():
137            output = []
138            for metric in self._metrics.values():
139                logger.info(
140                    f"START Evaluating Metric: {metric.name} for {len(data)} AgentDataPoints"
141                )
142
143                for idx, adp in enumerate(data):
144                    user_message = self._generate_user_prompt(adp)
145                    system_message = self._generate_system_prompt(metric)
146                    message_history = rt.llm.MessageHistory(
147                        [
148                            rt.llm.SystemMessage(system_message),
149                            rt.llm.UserMessage(user_message),
150                        ]
151                    )
152                    res = await rt.call(
153                        self._judge,
154                        message_history,
155                    )
156                    output.append(
157                        (metric.identifier, str(adp.identifier), res.structured)
158                    )
159
160                    logger.info(
161                        f"AgentDataPoint ID: {adp.identifier} {idx + 1}/{len(data)} DONE"
162                    )
163
164            return output
165
166        judge_evaluator_flow = rt.Flow(
167            name="JudgeEvaluatorFlow",
168            entry_point=judge_flow,
169            timeout=self.timeout,
170            save_state=False,
171        )
172
173        return judge_evaluator_flow.invoke()
174
175    def _aggregate_metrics(
176        self,
177        results: dict[Metric, list[MetricResult]],
178        forest: AggregateForest[CategoricalAggregateNode, MetricResult],
179    ) -> None:
180        for metric in results:
181            if isinstance(metric, Numerical):
182                continue
183            elif isinstance(metric, Categorical):
184                aggregate_node = CategoricalAggregateNode(
185                    name=f"Aggregate/{metric.name}",
186                    metric=metric,
187                    children=[val.identifier for val in results[metric]],
188                    forest=forest,
189                )
190
191                forest.roots.append(aggregate_node.identifier)
192                forest.add_node(aggregate_node)
193
194    def _generate_user_prompt(self, data: AgentDataPoint) -> str:
195        return self._template["user"].format(
196            agent_input=data.agent_input,
197            agent_output=data.agent_output.get("message_history", ""),
198        )
199
200    def _generate_system_prompt(self, metric: Metric) -> str:
201        system_prompt: str = self._template["system_prompt"]
202
203        system_prompt += "\n" + self._template["metric"].format(metric=str(metric))
204
205        if self._reasoning:
206            system_prompt += self._template["reasoning"]
207
208        return system_prompt
209
210    def _load_yaml(self):
211        yaml_path = Path(__file__).parent / "judge_evaluator.yaml"
212        with open(yaml_path, "r") as f:
213            template = yaml.safe_load(f)
214
215        return template
216
217    def _get_config(self) -> dict:
218        return {
219            "llm": self._llm.model_name(),
220            "llm_provider": self._llm.model_provider(),
221            "system_prompt": self._system_prompt,
222            "metrics": sorted(self._metrics.keys()),
223            "reasoning": self._reasoning,
224        }

Helper class that provides a standard way to create an ABC using inheritance.

JudgeEvaluator( llm: railtracks.llm.ModelBase, metrics: list[railtracks.evaluations.evaluators.metrics.Metric], system_prompt: str | None = None, timeout: float | None = None, reasoning: bool = True)
31    def __init__(
32        self,
33        llm: rt.llm.ModelBase,
34        metrics: list[Metric],
35        system_prompt: str | None = None,
36        timeout: float | None = None,
37        reasoning: bool = True,
38    ):
39        """
40        The JudgeEvaluator with a system prompt, LLM, metric, and reasoning flag.
41
42        Args:
43            system_prompt: The system prompt template for the judge LLM.
44            llm: The LLM model to be used as the judge.
45            metrics: A list of Metrics to guide the evaluation.
46            reasoning: A flag indicating whether the judge should provide reasoning for its evaluations.
47        """
48        # These are config not state
49        self._metrics: dict[str, Metric] = {m.identifier: m for m in metrics}
50        for m in metrics:
51            if isinstance(m, Categorical):
52                self._metrics[m.identifier] = m
53            else:
54                logger.warning(
55                    f"JudgeEvaluator currently only supports Categorical metrics, metric {m.name} of type {type(m)} will be skipped."
56                )
57        self._llm = llm
58        self._reasoning: bool = reasoning
59        self._template = self._load_yaml()
60        self._system_prompt = (
61            system_prompt
62            if system_prompt is not None
63            else self._template["system_prompt"]
64        )
65        super().__init__()
66
67        self.timeout = timeout
68        self._judge = rt.agent_node(
69            llm=self._llm,
70            output_schema=JudgeResponseSchema,
71            tool_nodes=[],
72        )

The JudgeEvaluator with a system prompt, LLM, metric, and reasoning flag.

Arguments:
  • system_prompt: The system prompt template for the judge LLM.
  • llm: The LLM model to be used as the judge.
  • metrics: A list of Metrics to guide the evaluation.
  • reasoning: A flag indicating whether the judge should provide reasoning for its evaluations.
timeout
def run( self, data: list[railtracks.evaluations.point.AgentDataPoint]) -> railtracks.evaluations.result.evaluator_results.EvaluatorResult[Metric, MetricResult, CategoricalAggregateNode]:
 74    def run(
 75        self, data: list[AgentDataPoint]
 76    ) -> EvaluatorResult[Metric, MetricResult, CategoricalAggregateNode]:
 77        # (metric_id, adp_id, JudgeResponseSchema)
 78        judge_outputs: list[tuple[str, str, JudgeResponseSchema]] = self._invoke(data)
 79
 80        self.agent_data_ids = {adp.identifier for adp in data}
 81        results: dict[Metric, list[MetricResult]] = defaultdict(list)
 82        forest = AggregateForest[CategoricalAggregateNode, MetricResult]()
 83
 84        for output in judge_outputs:
 85            metric = self._metrics[output[0]]
 86
 87            metric_result = MetricResult(
 88                result_name=f"JudgeResult/{metric.name}",
 89                metric_id=metric.identifier,
 90                agent_data_id=[UUID(output[1])],
 91                value=output[2].metric_value,
 92            )
 93            results[metric].append(metric_result)
 94            forest.add_node(metric_result)
 95
 96            if self._reasoning:
 97                reasoning_metric = Metric(name=f"{metric.name}_reasoning")
 98                if output[2].reasoning is not None:
 99                    results[reasoning_metric].append(
100                        MetricResult(
101                            result_name=f"JudgeReasoning/{metric.name}",
102                            metric_id=reasoning_metric.identifier,
103                            agent_data_id=[UUID(output[1])],
104                            value=output[2].reasoning,
105                        )
106                    )
107                else:
108                    logger.warning(
109                        f"No reasoning returned for Judge Evaluator Metric: {metric.name}, AgentDataPoint ID: {output[1]}"
110                    )
111
112        self._aggregate_metrics(results, forest)
113
114        self._result = EvaluatorResult(
115            evaluator_name=self.name,
116            evaluator_id=self.identifier,
117            agent_data_ids=self.agent_data_ids,
118            metric_results=[item for sublist in results.values() for item in sublist],
119            aggregate_results=forest,
120            metrics=list(self._metrics.values()),
121        )
122        return self._result
class ToolUseEvaluator(railtracks.evaluations.evaluators.evaluator.Evaluator):
 48class ToolUseEvaluator(Evaluator):
 49    """
 50    Evaluator that analyzes tool usage patterns across agent runs.
 51
 52    Computes per-call and aggregated metrics for each tool, including
 53    runtime, failure rate, and usage count.
 54    """
 55
 56    def __init__(
 57        self,
 58    ):
 59        super().__init__()
 60
 61    def run(
 62        self, data: list[AgentDataPoint]
 63    ) -> EvaluatorResult[ToolMetric, ToolMetricResult, ToolAggregateNode]:
 64        """
 65        Run the evaluator over a list of agent data points.
 66
 67        Args:
 68            data: A list of AgentDataPoint instances to evaluate.
 69
 70        Returns:
 71            An EvaluatorResult containing per-call metric results and
 72            aggregated nodes across runs.
 73        """
 74        agent_data_ids: set[UUID] = {adp.identifier for adp in data}
 75        forest = AggregateForest[ToolAggregateNode, ToolMetricResult]()
 76
 77        results = self._extract_tool_stats(data, forest)
 78        self._aggregate_per_run(results, forest)
 79        self._aggregate_across_runs(results, forest)
 80
 81        metrics = list(results.keys())
 82
 83        return EvaluatorResult(
 84            evaluator_name=self.name,
 85            evaluator_id=self.identifier,
 86            agent_data_ids=agent_data_ids,
 87            metrics=metrics,
 88            metric_results=[item for sublist in results.values() for item in sublist],
 89            aggregate_results=forest,
 90        )
 91
 92    def _extract_tool_stats(
 93        self,
 94        data: list[AgentDataPoint],
 95        forest: AggregateForest[ToolAggregateNode, ToolMetricResult],
 96    ) -> dict[ToolMetric, list[ToolMetricResult]]:
 97        """
 98        Retrieve tool usage statistics from the agent data points.
 99        There is no aggregation at this level, so results are at the tool call level.
100
101        Args:
102            data: A list of AgentDataPoint instances.
103        """
104
105        results: dict[ToolMetric, list[ToolMetricResult]] = defaultdict(list)
106        # (agent_datapoint_id, tool_name): stats_dict
107        stats: dict[tuple[UUID, str], ToolStats] = defaultdict(
108            lambda: {"usage_count": 0, "failure_count": 0, "runtimes": []}
109        )
110
111        for datapoint in data:
112            for tool in datapoint.tool_details.calls:
113                tool_name = tool.name
114                key = (datapoint.identifier, tool_name)
115
116                stats[key]["usage_count"] += 1
117
118                metric_result = ToolMetricResult(
119                    result_name=f"{METRICS['Runtime'].name}/{tool_name}",
120                    agent_data_id=[datapoint.identifier],
121                    metric_id=METRICS["Runtime"].identifier,
122                    tool_name=tool_name,
123                    tool_node_id=tool.identifier,
124                    value=tool.runtime if tool.runtime is not None else 0.0,
125                )
126                forest.add_node(metric_result)
127                results[METRICS["ToolFailure"]].append(metric_result)
128
129                if tool.status == Status.FAILED:
130                    stats[key]["failure_count"] += 1
131                runtime = tool.runtime
132
133                if runtime is not None:
134                    stats[key]["runtimes"].append(runtime)
135
136                    metric_result = ToolMetricResult(
137                        result_name=f"{METRICS['Runtime'].name}/{tool_name}",
138                        agent_data_id=[datapoint.identifier],
139                        metric_id=METRICS["Runtime"].identifier,
140                        tool_name=tool_name,
141                        tool_node_id=tool.identifier,
142                        value=runtime,
143                    )
144                    forest.add_node(metric_result)
145                    results[METRICS["Runtime"]].append(metric_result)
146
147        for key, tool_data in stats.items():
148            adp_id, tool_name = key
149
150            failure_rate = (
151                tool_data["failure_count"] / tool_data["usage_count"]
152                if tool_data["usage_count"] > 0
153                else 0.0
154            )
155
156            tmr = ToolMetricResult(
157                result_name=f"{METRICS['FailureRate'].name}/{tool_name}",
158                agent_data_id=[adp_id],
159                metric_id=METRICS["FailureRate"].identifier,
160                tool_name=tool_name,
161                tool_node_id=None,
162                value=failure_rate,
163            )
164            forest.add_node(tmr)
165            results[METRICS["FailureRate"]].append(tmr)
166
167            tmr = ToolMetricResult(
168                result_name=f"{METRICS['UsageCount'].name}/{tool_name}",
169                agent_data_id=[adp_id],
170                metric_id=METRICS["UsageCount"].identifier,
171                tool_name=tool_name,
172                tool_node_id=None,
173                value=tool_data["usage_count"],
174            )
175            forest.add_node(tmr)
176            results[METRICS["UsageCount"]].append(tmr)
177
178        return results
179
180    def _aggregate_per_run(
181        self,
182        results: dict[ToolMetric, list[ToolMetricResult]],
183        forest: AggregateForest[ToolAggregateNode, ToolMetricResult],
184    ) -> None:
185        metric_results = results[METRICS["Runtime"]]
186        metric_results_by_adp_id: dict[UUID, list[ToolMetricResult]] = defaultdict(list)
187
188        values: dict[UUID, dict[str, list[ToolMetricResult]]] = defaultdict(dict)
189
190        for result in metric_results:
191            for adp_id in result.agent_data_id:
192                metric_results_by_adp_id[adp_id].append(result)
193
194        for adp_id in metric_results_by_adp_id:
195            values[adp_id] = defaultdict(list)
196
197            for tmr in metric_results_by_adp_id[adp_id]:
198                values[adp_id][tmr.tool_name].append(tmr)
199
200            for tool_name in values[adp_id]:
201                aggregate_node = ToolAggregateNode(
202                    name=f"Aggregate/{METRICS['Runtime'].name}",
203                    metric=METRICS["Runtime"],
204                    tool_name=tool_name,
205                    children=[tmr.identifier for tmr in values[adp_id][tool_name]],
206                    forest=forest,
207                )
208                forest.roots.append(aggregate_node.identifier)
209                forest.add_node(aggregate_node)
210
211    def _aggregate_across_runs(
212        self,
213        results: dict[ToolMetric, list[ToolMetricResult]],
214        forest: AggregateForest[ToolAggregateNode, ToolMetricResult],
215    ) -> None:
216        """
217        Aggregates the ToolUseEvaluator metrics across runs on an agent level.
218        This is a separate step from the initial extraction to allow for more flexible aggregation strategies in the future.
219
220        Args:
221            results: A dictionary of ToolMetric to list of ToolMetricResult at the tool call level.
222
223        Returns:
224            A list of ToolAggregateNode instances containing the aggregated results at the run level.
225        """
226
227        for metric in [METRICS["FailureRate"], METRICS["UsageCount"]]:
228            metric_results = results[metric]
229            values: dict[str, list[ToolMetricResult]] = defaultdict(list)
230
231            for tmr in metric_results:
232                values[tmr.tool_name].append(tmr)
233
234            for tool_name, vals in values.items():
235                aggregate_node = ToolAggregateNode(
236                    name=f"Aggregate/{metric.name}",
237                    metric=metric,
238                    tool_name=tool_name,
239                    children=[val.identifier for val in vals],
240                    forest=forest,
241                )
242                forest.roots.append(aggregate_node.identifier)
243                forest.add_node(aggregate_node)
244
245        ## Aggregation of Runtime ------------------------------
246        tool_breakdown = defaultdict(list)
247        for root_id in forest.roots:
248            agg = forest.get(root_id)
249            if isinstance(agg, ToolMetricResult):
250                raise ValueError(
251                    f"Expected root nodes in the forest to be ToolAggregateNodes, but got {type(agg)}"
252                )
253            if agg.metric == METRICS["Runtime"]:
254                tool_breakdown[agg.tool_name].append(agg)
255
256        for tool_name in tool_breakdown:
257            if len(tool_breakdown[tool_name]) < 2:
258                raise ValueError(
259                    f"Expected multiple aggregate nodes for tool '{tool_name}' to perform cross-run aggregation, but found only one. Found nodes: {[agg.identifier for agg in tool_breakdown[tool_name]]}"
260                )
261            parent = ToolAggregateNode(
262                name=f"Aggregate/{METRICS['Runtime'].name}",
263                metric=METRICS["Runtime"],
264                tool_name=tool_name,
265                children=[
266                    tool_agg.identifier for tool_agg in tool_breakdown[tool_name]
267                ],
268                forest=forest,
269            )
270            forest.add_node(parent)
271            forest.roots.append(parent.identifier)

Evaluator that analyzes tool usage patterns across agent runs.

Computes per-call and aggregated metrics for each tool, including runtime, failure rate, and usage count.

def run( self, data: list[railtracks.evaluations.point.AgentDataPoint]) -> railtracks.evaluations.result.evaluator_results.EvaluatorResult[ToolMetric, ToolMetricResult, ToolAggregateNode]:
61    def run(
62        self, data: list[AgentDataPoint]
63    ) -> EvaluatorResult[ToolMetric, ToolMetricResult, ToolAggregateNode]:
64        """
65        Run the evaluator over a list of agent data points.
66
67        Args:
68            data: A list of AgentDataPoint instances to evaluate.
69
70        Returns:
71            An EvaluatorResult containing per-call metric results and
72            aggregated nodes across runs.
73        """
74        agent_data_ids: set[UUID] = {adp.identifier for adp in data}
75        forest = AggregateForest[ToolAggregateNode, ToolMetricResult]()
76
77        results = self._extract_tool_stats(data, forest)
78        self._aggregate_per_run(results, forest)
79        self._aggregate_across_runs(results, forest)
80
81        metrics = list(results.keys())
82
83        return EvaluatorResult(
84            evaluator_name=self.name,
85            evaluator_id=self.identifier,
86            agent_data_ids=agent_data_ids,
87            metrics=metrics,
88            metric_results=[item for sublist in results.values() for item in sublist],
89            aggregate_results=forest,
90        )

Run the evaluator over a list of agent data points.

Arguments:
  • data: A list of AgentDataPoint instances to evaluate.
Returns:

An EvaluatorResult containing per-call metric results and aggregated nodes across runs.

class LLMInferenceEvaluator(railtracks.evaluations.evaluators.evaluator.Evaluator):
 19class LLMInferenceEvaluator(Evaluator):
 20    """
 21    Evaluator that analyzes LLM inference statistics across agent runs.
 22
 23    Computes per-call and aggregated metrics for each LLM invocation,
 24    including input/output token counts, token cost, and latency.
 25    """
 26
 27    def __init__(
 28        self,
 29    ):
 30        super().__init__()
 31
 32    def run(
 33        self, data: list[AgentDataPoint]
 34    ) -> EvaluatorResult[LLMMetric, LLMMetricResult, LLMInferenceAggregateNode]:
 35        """
 36        Run the evaluator over a list of agent data points.
 37
 38        Args:
 39            data: A list of AgentDataPoint instances to evaluate.
 40
 41        Returns:
 42            An EvaluatorResult containing per-call metric results and
 43            aggregated nodes grouped by model and call index.
 44        """
 45        agent_data_ids: set[UUID] = {adp.identifier for adp in data}
 46        forest = AggregateForest[LLMInferenceAggregateNode, LLMMetricResult]()
 47
 48        results = self._retrieve_llm_states(data, forest)
 49        self._aggregate_metrics(results, forest)
 50
 51        metrics = list(results.keys())
 52
 53        return EvaluatorResult(
 54            evaluator_name=self.name,
 55            evaluator_id=self.identifier,
 56            agent_data_ids=agent_data_ids,
 57            metrics=metrics,
 58            metric_results=[item for sublist in results.values() for item in sublist],
 59            aggregate_results=forest,
 60        )
 61
 62    def _retrieve_llm_states(
 63        self,
 64        data: list[AgentDataPoint],
 65        forest: AggregateForest[LLMInferenceAggregateNode, LLMMetricResult],
 66    ) -> dict[LLMMetric, list[LLMMetricResult]]:
 67        results: dict[LLMMetric, list[LLMMetricResult]] = defaultdict(list)
 68
 69        for datapoint in data:
 70            llm_details = datapoint.llm_details
 71
 72            for call in llm_details.calls:
 73                # Input Tokens
 74                metric = LLMMetric(
 75                    name="InputTokens",
 76                    min_value=0,
 77                )
 78
 79                metric_result = LLMMetricResult(
 80                    result_name="InputTokens",
 81                    metric_id=metric.identifier,
 82                    agent_data_id=[datapoint.identifier],
 83                    value=call.input_tokens,
 84                    llm_call_index=call.index,
 85                    model_name=call.model_name,
 86                    model_provider=call.model_provider,
 87                )
 88                results[metric].append(metric_result)
 89                forest.add_node(metric_result)
 90
 91                # Output Tokens
 92                metric = LLMMetric(
 93                    name="OutputTokens",
 94                    min_value=0,
 95                )
 96
 97                metric_result = LLMMetricResult(
 98                    result_name="OutputTokens",
 99                    metric_id=metric.identifier,
100                    agent_data_id=[datapoint.identifier],
101                    value=call.output_tokens,
102                    llm_call_index=call.index,
103                    model_name=call.model_name,
104                    model_provider=call.model_provider,
105                )
106                results[metric].append(metric_result)
107                forest.add_node(metric_result)
108
109                # Total Cost
110                metric = LLMMetric(
111                    name="TokenCost",
112                    min_value=0.0,
113                )
114
115                metric_result = LLMMetricResult(
116                    result_name="TokenCost",
117                    metric_id=metric.identifier,
118                    agent_data_id=[datapoint.identifier],
119                    value=call.total_cost,
120                    llm_call_index=call.index,
121                    model_name=call.model_name,
122                    model_provider=call.model_provider,
123                )
124                results[metric].append(metric_result)
125                forest.add_node(metric_result)
126
127                # Latency
128                metric = LLMMetric(
129                    name="Latency",
130                    min_value=0.0,
131                )
132                metric_result = LLMMetricResult(
133                    result_name="Latency",
134                    metric_id=metric.identifier,
135                    agent_data_id=[datapoint.identifier],
136                    value=call.latency,
137                    llm_call_index=call.index,
138                    model_name=call.model_name,
139                    model_provider=call.model_provider,
140                )
141                results[metric].append(metric_result)
142                forest.add_node(metric_result)
143
144        return results
145
146    def _aggregate_metrics(
147        self,
148        results: dict[LLMMetric, list[LLMMetricResult]],
149        forest: AggregateForest[LLMInferenceAggregateNode, LLMMetricResult],
150    ) -> None:
151        for metric in results:
152            metric_results = results[metric]
153            values: dict[tuple[str, str, int], list[LLMMetricResult]] = defaultdict(
154                list
155            )
156            for mr in metric_results:
157                if isinstance(mr.value, (int, float)):
158                    key = (mr.model_name, mr.model_provider, mr.llm_call_index)
159                    values[key].append(mr)
160
161            for (model_name, model_provider, llm_call_index), vals in values.items():
162                aggregate_node = LLMInferenceAggregateNode(
163                    name=f"Aggregate/{metric.name}/{model_name}/{model_provider}/Call_{llm_call_index}",
164                    metric=metric,
165                    children=[val.identifier for val in vals],
166                    model_name=model_name,
167                    model_provider=model_provider,
168                    llm_call_index=llm_call_index,
169                    forest=forest,
170                )
171
172                forest.roots.append(aggregate_node.identifier)
173                forest.add_node(aggregate_node)

Evaluator that analyzes LLM inference statistics across agent runs.

Computes per-call and aggregated metrics for each LLM invocation, including input/output token counts, token cost, and latency.

def run( self, data: list[railtracks.evaluations.point.AgentDataPoint]) -> railtracks.evaluations.result.evaluator_results.EvaluatorResult[LLMMetric, LLMMetricResult, LLMInferenceAggregateNode]:
32    def run(
33        self, data: list[AgentDataPoint]
34    ) -> EvaluatorResult[LLMMetric, LLMMetricResult, LLMInferenceAggregateNode]:
35        """
36        Run the evaluator over a list of agent data points.
37
38        Args:
39            data: A list of AgentDataPoint instances to evaluate.
40
41        Returns:
42            An EvaluatorResult containing per-call metric results and
43            aggregated nodes grouped by model and call index.
44        """
45        agent_data_ids: set[UUID] = {adp.identifier for adp in data}
46        forest = AggregateForest[LLMInferenceAggregateNode, LLMMetricResult]()
47
48        results = self._retrieve_llm_states(data, forest)
49        self._aggregate_metrics(results, forest)
50
51        metrics = list(results.keys())
52
53        return EvaluatorResult(
54            evaluator_name=self.name,
55            evaluator_id=self.identifier,
56            agent_data_ids=agent_data_ids,
57            metrics=metrics,
58            metric_results=[item for sublist in results.values() for item in sublist],
59            aggregate_results=forest,
60        )

Run the evaluator over a list of agent data points.

Arguments:
  • data: A list of AgentDataPoint instances to evaluate.
Returns:

An EvaluatorResult containing per-call metric results and aggregated nodes grouped by model and call index.