railtracks.evaluations
1from .evaluators import JudgeEvaluator, LLMInferenceEvaluator, ToolUseEvaluator, metrics 2from .point import extract_agent_data_points 3from .runners._evaluate import evaluate 4 5__all__ = [ 6 "metrics", 7 "evaluate", 8 "extract_agent_data_points", 9 "JudgeEvaluator", 10 "ToolUseEvaluator", 11 "LLMInferenceEvaluator", 12]
135def evaluate( 136 data: AgentDataPoint | list[AgentDataPoint], 137 evaluators: list[Evaluator], 138 agent_selection: bool = True, 139 agents: list[str] | None = None, 140 name: str | None = None, 141 payload_callback: Callable[[dict[str, Any]], Any] | None = None, 142): 143 """Evaluate agent data using the provided evaluators. 144 145 Args: 146 data: The agent data to be evaluated. Can be a single AgentDataPoint, a list of AgentDataPoints, or an EvaluationDataset. 147 evaluators: A list of Evaluator instances to run on the data. 148 agent_selection: If True and multiple agents are found in the data, prompts the user to select which agents to evaluate. 149 If False, evaluates all agents without prompting. 150 agents: An optional list of agent names to evaluate. If provided, only these agents will be evaluated. Overrides agent_selection if both are provided. 151 name: An optional name for the evaluation, which will be included in the EvaluationResult. 152 payload_callback: An optional callback function that will be called with the evaluation results payload after evaluation is complete. Can be used for custom logging, notifications, etc. 153 Returns: 154 A list of EvaluationResult instances containing the results from each evaluator. 155 """ 156 _check_evaluators(evaluators) 157 158 data_dict, agents = _setup_agent_data(data, agent_selection, agents) 159 160 evaluation_results: list[EvaluationResult] = [] 161 162 for agent_name in agents: 163 logger.info( 164 f"Evaluation for {agent_name} with {len(data_dict[agent_name])} data points CREATED" 165 ) 166 167 evaluator_results: list[EvaluatorResult] = [] 168 169 start_time = datetime.now(timezone.utc) 170 for evaluator in evaluators: 171 logger.info(f"Evaluator: {evaluator.__class__.__name__} CREATED") 172 try: 173 result = evaluator.run(data_dict[agent_name]) 174 except Exception as e: 175 logger.error(f"Evaluator {evaluator.__class__.__name__} FAILED: {e}") 176 continue 177 178 evaluator_results.append(result) 179 logger.info(f"Evaluator: {evaluator.__class__.__name__} DONE") 180 181 logger.info(f"Evaluation for {agent_name} DONE.") 182 183 metrics_map = {} 184 for er in evaluator_results: 185 metrics = er.metrics 186 for metric in metrics: 187 metrics_map[metric.identifier] = metric 188 189 end_time = datetime.now(timezone.utc) 190 191 evaluation_results.append( 192 EvaluationResult( 193 evaluation_name=name or None, 194 created_at=start_time, 195 completed_at=end_time, 196 agents=[ 197 { 198 "agent_name": agent_name, 199 "agent_node_ids": [ 200 { 201 "session_id": adp.session_id, 202 "agent_node_id": adp.identifier, 203 } 204 for adp in data_dict[agent_name] 205 ], 206 } 207 ], 208 metrics_map=metrics_map, 209 evaluator_results=evaluator_results, 210 ) 211 ) 212 213 logger.info("Evaluation DONE.") 214 215 if payload_callback is not None: 216 try: 217 for result in evaluation_results: 218 payload_callback(payload(result)) 219 except Exception as e: 220 logger.error(f"Failed to execute payload callback: {e}") 221 222 try: 223 save(evaluation_results) 224 except Exception as e: 225 logger.error(f"Failed to save evaluation results: {e}") 226 return evaluation_results
Evaluate agent data using the provided evaluators.
Arguments:
- data: The agent data to be evaluated. Can be a single AgentDataPoint, a list of AgentDataPoints, or an EvaluationDataset.
- evaluators: A list of Evaluator instances to run on the data.
- agent_selection: If True and multiple agents are found in the data, prompts the user to select which agents to evaluate. If False, evaluates all agents without prompting.
- agents: An optional list of agent names to evaluate. If provided, only these agents will be evaluated. Overrides agent_selection if both are provided.
- name: An optional name for the evaluation, which will be included in the EvaluationResult.
- payload_callback: An optional callback function that will be called with the evaluation results payload after evaluation is complete. Can be used for custom logging, notifications, etc.
Returns:
A list of EvaluationResult instances containing the results from each evaluator.
317def extract_agent_data_points(session_files: list[str] | str) -> list[AgentDataPoint]: 318 """ 319 Extract AgentDataPoint instances from session JSON files. 320 321 This function processes Railtracks session JSON files and creates AgentDataPoint 322 instances for each agent execution found. It extracts agent inputs, outputs, and 323 internals (including LLM metrics if available). 324 325 Args: 326 session_files: List of paths to session JSON files, a single file path, or a directory path. 327 If a directory is provided, all files within it will be processed. 328 329 Returns: 330 List of AgentDataPoint instances, one for each agent execution found in the files. 331 Returns empty list if no valid agent data is found. 332 """ 333 file_paths = resolve_file_paths(session_files) 334 data_points = [] 335 336 for file_path in file_paths: 337 try: 338 session_data = load_session(file_path) 339 except (FileNotFoundError, ValueError) as e: 340 logger.error(str(e)) 341 continue 342 343 session_id = session_data.get("session_id") 344 if session_id is None: 345 logger.warning(f"no session_id found in file: {file_path}, skipping file.") 346 continue 347 348 runs = session_data.get("runs", []) 349 350 if len(runs) == 0: 351 logger.warning(f"Session file {file_path} contains no runs") 352 if len(runs) > 1: 353 logger.warning(f"Session file {file_path} contains multiple runs") 354 355 for run in runs: 356 nodes = { 357 UUID(node["identifier"]): NodeDataPoint( 358 identifier=node["identifier"], 359 node_type=NodeType(node["node_type"]), 360 name=node["name"], 361 details=node.get("details", {}), 362 ) 363 for node in run.get("nodes", []) 364 } 365 366 edges: dict[tuple[UUID | None, UUID], EdgeDataPoint] = {} 367 for edge in run.get("edges", []): 368 key = ( 369 (UUID(edge["source"]), UUID(edge["target"])) 370 if edge["source"] is not None 371 else (None, UUID(edge["target"])) 372 ) 373 edges[key] = EdgeDataPoint( 374 identifier=UUID(edge["identifier"]), 375 source=UUID(edge["source"]) if edge["source"] is not None else None, 376 target=UUID(edge["target"]), 377 details=edge.get("details", {}), 378 ) 379 380 graph, sink_list = construct_graph(edges) 381 382 for node in nodes.values(): 383 if node.node_type == NodeType.AGENT: 384 llm_details_dict = node.details.get("internals", {}).get( 385 "llm_details", [] 386 ) 387 llm_details = ( 388 extract_llm_details(llm_details_dict) 389 if llm_details_dict 390 else LLMDetails(calls=[]) 391 ) 392 393 tool_details = extract_tool_details( 394 nodes, edges, graph, node.identifier 395 ) 396 397 agent_input, agent_output = extract_agent_io( 398 sink_list, node, file_path 399 ) 400 401 data_points.append( 402 AgentDataPoint( 403 identifier=node.identifier, 404 session_id=UUID(session_id), 405 agent_name=node.name, 406 agent_input=agent_input, 407 agent_output=agent_output, 408 llm_details=llm_details, 409 tool_details=tool_details, 410 ) 411 ) 412 413 return data_points
Extract AgentDataPoint instances from session JSON files.
This function processes Railtracks session JSON files and creates AgentDataPoint instances for each agent execution found. It extracts agent inputs, outputs, and internals (including LLM metrics if available).
Arguments:
- session_files: List of paths to session JSON files, a single file path, or a directory path. If a directory is provided, all files within it will be processed.
Returns:
List of AgentDataPoint instances, one for each agent execution found in the files. Returns empty list if no valid agent data is found.
30class JudgeEvaluator(Evaluator): 31 def __init__( 32 self, 33 llm: rt.llm.ModelBase, 34 metrics: list[Metric], 35 system_prompt: str | None = None, 36 timeout: float | None = None, 37 reasoning: bool = True, 38 ): 39 """ 40 The JudgeEvaluator with a system prompt, LLM, metric, and reasoning flag. 41 42 Args: 43 system_prompt: The system prompt template for the judge LLM. 44 llm: The LLM model to be used as the judge. 45 metrics: A list of Metrics to guide the evaluation. 46 reasoning: A flag indicating whether the judge should provide reasoning for its evaluations. 47 """ 48 # These are config not state 49 self._metrics: dict[str, Metric] = {m.identifier: m for m in metrics} 50 for m in metrics: 51 if isinstance(m, Categorical): 52 self._metrics[m.identifier] = m 53 else: 54 logger.warning( 55 f"JudgeEvaluator currently only supports Categorical metrics, metric {m.name} of type {type(m)} will be skipped." 56 ) 57 self._llm = llm 58 self._reasoning: bool = reasoning 59 self._template = self._load_yaml() 60 self._system_prompt = ( 61 system_prompt 62 if system_prompt is not None 63 else self._template["system_prompt"] 64 ) 65 super().__init__() 66 67 self.timeout = timeout 68 self._judge = rt.agent_node( 69 llm=self._llm, 70 output_schema=JudgeResponseSchema, 71 tool_nodes=[], 72 ) 73 74 def run( 75 self, data: list[AgentDataPoint] 76 ) -> EvaluatorResult[Metric, MetricResult, CategoricalAggregateNode]: 77 # (metric_id, adp_id, JudgeResponseSchema) 78 judge_outputs: list[tuple[str, str, JudgeResponseSchema]] = self._invoke(data) 79 80 self.agent_data_ids = {adp.identifier for adp in data} 81 results: dict[Metric, list[MetricResult]] = defaultdict(list) 82 forest = AggregateForest[CategoricalAggregateNode, MetricResult]() 83 84 for output in judge_outputs: 85 metric = self._metrics[output[0]] 86 87 metric_result = MetricResult( 88 result_name=f"JudgeResult/{metric.name}", 89 metric_id=metric.identifier, 90 agent_data_id=[UUID(output[1])], 91 value=output[2].metric_value, 92 ) 93 results[metric].append(metric_result) 94 forest.add_node(metric_result) 95 96 if self._reasoning: 97 reasoning_metric = Metric(name=f"{metric.name}_reasoning") 98 if output[2].reasoning is not None: 99 results[reasoning_metric].append( 100 MetricResult( 101 result_name=f"JudgeReasoning/{metric.name}", 102 metric_id=reasoning_metric.identifier, 103 agent_data_id=[UUID(output[1])], 104 value=output[2].reasoning, 105 ) 106 ) 107 else: 108 logger.warning( 109 f"No reasoning returned for Judge Evaluator Metric: {metric.name}, AgentDataPoint ID: {output[1]}" 110 ) 111 112 self._aggregate_metrics(results, forest) 113 114 self._result = EvaluatorResult( 115 evaluator_name=self.name, 116 evaluator_id=self.identifier, 117 agent_data_ids=self.agent_data_ids, 118 metric_results=[item for sublist in results.values() for item in sublist], 119 aggregate_results=forest, 120 metrics=list(self._metrics.values()), 121 ) 122 return self._result 123 124 def __repr__(self) -> str: 125 return ( 126 f"JudgeEvaluator, " 127 f"llm={self._llm}, " 128 f"metrics={list(self._metrics.values())}, " 129 f"reasoning={self._reasoning})" 130 ) 131 132 def _invoke( 133 self, data: list[AgentDataPoint] 134 ) -> list[tuple[str, str, JudgeResponseSchema]]: 135 @rt.function_node 136 async def judge_flow(): 137 output = [] 138 for metric in self._metrics.values(): 139 logger.info( 140 f"START Evaluating Metric: {metric.name} for {len(data)} AgentDataPoints" 141 ) 142 143 for idx, adp in enumerate(data): 144 user_message = self._generate_user_prompt(adp) 145 system_message = self._generate_system_prompt(metric) 146 message_history = rt.llm.MessageHistory( 147 [ 148 rt.llm.SystemMessage(system_message), 149 rt.llm.UserMessage(user_message), 150 ] 151 ) 152 res = await rt.call( 153 self._judge, 154 message_history, 155 ) 156 output.append( 157 (metric.identifier, str(adp.identifier), res.structured) 158 ) 159 160 logger.info( 161 f"AgentDataPoint ID: {adp.identifier} {idx + 1}/{len(data)} DONE" 162 ) 163 164 return output 165 166 judge_evaluator_flow = rt.Flow( 167 name="JudgeEvaluatorFlow", 168 entry_point=judge_flow, 169 timeout=self.timeout, 170 save_state=False, 171 ) 172 173 return judge_evaluator_flow.invoke() 174 175 def _aggregate_metrics( 176 self, 177 results: dict[Metric, list[MetricResult]], 178 forest: AggregateForest[CategoricalAggregateNode, MetricResult], 179 ) -> None: 180 for metric in results: 181 if isinstance(metric, Numerical): 182 continue 183 elif isinstance(metric, Categorical): 184 aggregate_node = CategoricalAggregateNode( 185 name=f"Aggregate/{metric.name}", 186 metric=metric, 187 children=[val.identifier for val in results[metric]], 188 forest=forest, 189 ) 190 191 forest.roots.append(aggregate_node.identifier) 192 forest.add_node(aggregate_node) 193 194 def _generate_user_prompt(self, data: AgentDataPoint) -> str: 195 return self._template["user"].format( 196 agent_input=data.agent_input, 197 agent_output=data.agent_output.get("message_history", ""), 198 ) 199 200 def _generate_system_prompt(self, metric: Metric) -> str: 201 system_prompt: str = self._template["system_prompt"] 202 203 system_prompt += "\n" + self._template["metric"].format(metric=str(metric)) 204 205 if self._reasoning: 206 system_prompt += self._template["reasoning"] 207 208 return system_prompt 209 210 def _load_yaml(self): 211 yaml_path = Path(__file__).parent / "judge_evaluator.yaml" 212 with open(yaml_path, "r") as f: 213 template = yaml.safe_load(f) 214 215 return template 216 217 def _get_config(self) -> dict: 218 return { 219 "llm": self._llm.model_name(), 220 "llm_provider": self._llm.model_provider(), 221 "system_prompt": self._system_prompt, 222 "metrics": sorted(self._metrics.keys()), 223 "reasoning": self._reasoning, 224 }
Helper class that provides a standard way to create an ABC using inheritance.
31 def __init__( 32 self, 33 llm: rt.llm.ModelBase, 34 metrics: list[Metric], 35 system_prompt: str | None = None, 36 timeout: float | None = None, 37 reasoning: bool = True, 38 ): 39 """ 40 The JudgeEvaluator with a system prompt, LLM, metric, and reasoning flag. 41 42 Args: 43 system_prompt: The system prompt template for the judge LLM. 44 llm: The LLM model to be used as the judge. 45 metrics: A list of Metrics to guide the evaluation. 46 reasoning: A flag indicating whether the judge should provide reasoning for its evaluations. 47 """ 48 # These are config not state 49 self._metrics: dict[str, Metric] = {m.identifier: m for m in metrics} 50 for m in metrics: 51 if isinstance(m, Categorical): 52 self._metrics[m.identifier] = m 53 else: 54 logger.warning( 55 f"JudgeEvaluator currently only supports Categorical metrics, metric {m.name} of type {type(m)} will be skipped." 56 ) 57 self._llm = llm 58 self._reasoning: bool = reasoning 59 self._template = self._load_yaml() 60 self._system_prompt = ( 61 system_prompt 62 if system_prompt is not None 63 else self._template["system_prompt"] 64 ) 65 super().__init__() 66 67 self.timeout = timeout 68 self._judge = rt.agent_node( 69 llm=self._llm, 70 output_schema=JudgeResponseSchema, 71 tool_nodes=[], 72 )
The JudgeEvaluator with a system prompt, LLM, metric, and reasoning flag.
Arguments:
- system_prompt: The system prompt template for the judge LLM.
- llm: The LLM model to be used as the judge.
- metrics: A list of Metrics to guide the evaluation.
- reasoning: A flag indicating whether the judge should provide reasoning for its evaluations.
74 def run( 75 self, data: list[AgentDataPoint] 76 ) -> EvaluatorResult[Metric, MetricResult, CategoricalAggregateNode]: 77 # (metric_id, adp_id, JudgeResponseSchema) 78 judge_outputs: list[tuple[str, str, JudgeResponseSchema]] = self._invoke(data) 79 80 self.agent_data_ids = {adp.identifier for adp in data} 81 results: dict[Metric, list[MetricResult]] = defaultdict(list) 82 forest = AggregateForest[CategoricalAggregateNode, MetricResult]() 83 84 for output in judge_outputs: 85 metric = self._metrics[output[0]] 86 87 metric_result = MetricResult( 88 result_name=f"JudgeResult/{metric.name}", 89 metric_id=metric.identifier, 90 agent_data_id=[UUID(output[1])], 91 value=output[2].metric_value, 92 ) 93 results[metric].append(metric_result) 94 forest.add_node(metric_result) 95 96 if self._reasoning: 97 reasoning_metric = Metric(name=f"{metric.name}_reasoning") 98 if output[2].reasoning is not None: 99 results[reasoning_metric].append( 100 MetricResult( 101 result_name=f"JudgeReasoning/{metric.name}", 102 metric_id=reasoning_metric.identifier, 103 agent_data_id=[UUID(output[1])], 104 value=output[2].reasoning, 105 ) 106 ) 107 else: 108 logger.warning( 109 f"No reasoning returned for Judge Evaluator Metric: {metric.name}, AgentDataPoint ID: {output[1]}" 110 ) 111 112 self._aggregate_metrics(results, forest) 113 114 self._result = EvaluatorResult( 115 evaluator_name=self.name, 116 evaluator_id=self.identifier, 117 agent_data_ids=self.agent_data_ids, 118 metric_results=[item for sublist in results.values() for item in sublist], 119 aggregate_results=forest, 120 metrics=list(self._metrics.values()), 121 ) 122 return self._result
48class ToolUseEvaluator(Evaluator): 49 """ 50 Evaluator that analyzes tool usage patterns across agent runs. 51 52 Computes per-call and aggregated metrics for each tool, including 53 runtime, failure rate, and usage count. 54 """ 55 56 def __init__( 57 self, 58 ): 59 super().__init__() 60 61 def run( 62 self, data: list[AgentDataPoint] 63 ) -> EvaluatorResult[ToolMetric, ToolMetricResult, ToolAggregateNode]: 64 """ 65 Run the evaluator over a list of agent data points. 66 67 Args: 68 data: A list of AgentDataPoint instances to evaluate. 69 70 Returns: 71 An EvaluatorResult containing per-call metric results and 72 aggregated nodes across runs. 73 """ 74 agent_data_ids: set[UUID] = {adp.identifier for adp in data} 75 forest = AggregateForest[ToolAggregateNode, ToolMetricResult]() 76 77 results = self._extract_tool_stats(data, forest) 78 self._aggregate_per_run(results, forest) 79 self._aggregate_across_runs(results, forest) 80 81 metrics = list(results.keys()) 82 83 return EvaluatorResult( 84 evaluator_name=self.name, 85 evaluator_id=self.identifier, 86 agent_data_ids=agent_data_ids, 87 metrics=metrics, 88 metric_results=[item for sublist in results.values() for item in sublist], 89 aggregate_results=forest, 90 ) 91 92 def _extract_tool_stats( 93 self, 94 data: list[AgentDataPoint], 95 forest: AggregateForest[ToolAggregateNode, ToolMetricResult], 96 ) -> dict[ToolMetric, list[ToolMetricResult]]: 97 """ 98 Retrieve tool usage statistics from the agent data points. 99 There is no aggregation at this level, so results are at the tool call level. 100 101 Args: 102 data: A list of AgentDataPoint instances. 103 """ 104 105 results: dict[ToolMetric, list[ToolMetricResult]] = defaultdict(list) 106 # (agent_datapoint_id, tool_name): stats_dict 107 stats: dict[tuple[UUID, str], ToolStats] = defaultdict( 108 lambda: {"usage_count": 0, "failure_count": 0, "runtimes": []} 109 ) 110 111 for datapoint in data: 112 for tool in datapoint.tool_details.calls: 113 tool_name = tool.name 114 key = (datapoint.identifier, tool_name) 115 116 stats[key]["usage_count"] += 1 117 118 metric_result = ToolMetricResult( 119 result_name=f"{METRICS['Runtime'].name}/{tool_name}", 120 agent_data_id=[datapoint.identifier], 121 metric_id=METRICS["Runtime"].identifier, 122 tool_name=tool_name, 123 tool_node_id=tool.identifier, 124 value=tool.runtime if tool.runtime is not None else 0.0, 125 ) 126 forest.add_node(metric_result) 127 results[METRICS["ToolFailure"]].append(metric_result) 128 129 if tool.status == Status.FAILED: 130 stats[key]["failure_count"] += 1 131 runtime = tool.runtime 132 133 if runtime is not None: 134 stats[key]["runtimes"].append(runtime) 135 136 metric_result = ToolMetricResult( 137 result_name=f"{METRICS['Runtime'].name}/{tool_name}", 138 agent_data_id=[datapoint.identifier], 139 metric_id=METRICS["Runtime"].identifier, 140 tool_name=tool_name, 141 tool_node_id=tool.identifier, 142 value=runtime, 143 ) 144 forest.add_node(metric_result) 145 results[METRICS["Runtime"]].append(metric_result) 146 147 for key, tool_data in stats.items(): 148 adp_id, tool_name = key 149 150 failure_rate = ( 151 tool_data["failure_count"] / tool_data["usage_count"] 152 if tool_data["usage_count"] > 0 153 else 0.0 154 ) 155 156 tmr = ToolMetricResult( 157 result_name=f"{METRICS['FailureRate'].name}/{tool_name}", 158 agent_data_id=[adp_id], 159 metric_id=METRICS["FailureRate"].identifier, 160 tool_name=tool_name, 161 tool_node_id=None, 162 value=failure_rate, 163 ) 164 forest.add_node(tmr) 165 results[METRICS["FailureRate"]].append(tmr) 166 167 tmr = ToolMetricResult( 168 result_name=f"{METRICS['UsageCount'].name}/{tool_name}", 169 agent_data_id=[adp_id], 170 metric_id=METRICS["UsageCount"].identifier, 171 tool_name=tool_name, 172 tool_node_id=None, 173 value=tool_data["usage_count"], 174 ) 175 forest.add_node(tmr) 176 results[METRICS["UsageCount"]].append(tmr) 177 178 return results 179 180 def _aggregate_per_run( 181 self, 182 results: dict[ToolMetric, list[ToolMetricResult]], 183 forest: AggregateForest[ToolAggregateNode, ToolMetricResult], 184 ) -> None: 185 metric_results = results[METRICS["Runtime"]] 186 metric_results_by_adp_id: dict[UUID, list[ToolMetricResult]] = defaultdict(list) 187 188 values: dict[UUID, dict[str, list[ToolMetricResult]]] = defaultdict(dict) 189 190 for result in metric_results: 191 for adp_id in result.agent_data_id: 192 metric_results_by_adp_id[adp_id].append(result) 193 194 for adp_id in metric_results_by_adp_id: 195 values[adp_id] = defaultdict(list) 196 197 for tmr in metric_results_by_adp_id[adp_id]: 198 values[adp_id][tmr.tool_name].append(tmr) 199 200 for tool_name in values[adp_id]: 201 aggregate_node = ToolAggregateNode( 202 name=f"Aggregate/{METRICS['Runtime'].name}", 203 metric=METRICS["Runtime"], 204 tool_name=tool_name, 205 children=[tmr.identifier for tmr in values[adp_id][tool_name]], 206 forest=forest, 207 ) 208 forest.roots.append(aggregate_node.identifier) 209 forest.add_node(aggregate_node) 210 211 def _aggregate_across_runs( 212 self, 213 results: dict[ToolMetric, list[ToolMetricResult]], 214 forest: AggregateForest[ToolAggregateNode, ToolMetricResult], 215 ) -> None: 216 """ 217 Aggregates the ToolUseEvaluator metrics across runs on an agent level. 218 This is a separate step from the initial extraction to allow for more flexible aggregation strategies in the future. 219 220 Args: 221 results: A dictionary of ToolMetric to list of ToolMetricResult at the tool call level. 222 223 Returns: 224 A list of ToolAggregateNode instances containing the aggregated results at the run level. 225 """ 226 227 for metric in [METRICS["FailureRate"], METRICS["UsageCount"]]: 228 metric_results = results[metric] 229 values: dict[str, list[ToolMetricResult]] = defaultdict(list) 230 231 for tmr in metric_results: 232 values[tmr.tool_name].append(tmr) 233 234 for tool_name, vals in values.items(): 235 aggregate_node = ToolAggregateNode( 236 name=f"Aggregate/{metric.name}", 237 metric=metric, 238 tool_name=tool_name, 239 children=[val.identifier for val in vals], 240 forest=forest, 241 ) 242 forest.roots.append(aggregate_node.identifier) 243 forest.add_node(aggregate_node) 244 245 ## Aggregation of Runtime ------------------------------ 246 tool_breakdown = defaultdict(list) 247 for root_id in forest.roots: 248 agg = forest.get(root_id) 249 if isinstance(agg, ToolMetricResult): 250 raise ValueError( 251 f"Expected root nodes in the forest to be ToolAggregateNodes, but got {type(agg)}" 252 ) 253 if agg.metric == METRICS["Runtime"]: 254 tool_breakdown[agg.tool_name].append(agg) 255 256 for tool_name in tool_breakdown: 257 if len(tool_breakdown[tool_name]) < 2: 258 raise ValueError( 259 f"Expected multiple aggregate nodes for tool '{tool_name}' to perform cross-run aggregation, but found only one. Found nodes: {[agg.identifier for agg in tool_breakdown[tool_name]]}" 260 ) 261 parent = ToolAggregateNode( 262 name=f"Aggregate/{METRICS['Runtime'].name}", 263 metric=METRICS["Runtime"], 264 tool_name=tool_name, 265 children=[ 266 tool_agg.identifier for tool_agg in tool_breakdown[tool_name] 267 ], 268 forest=forest, 269 ) 270 forest.add_node(parent) 271 forest.roots.append(parent.identifier)
Evaluator that analyzes tool usage patterns across agent runs.
Computes per-call and aggregated metrics for each tool, including runtime, failure rate, and usage count.
61 def run( 62 self, data: list[AgentDataPoint] 63 ) -> EvaluatorResult[ToolMetric, ToolMetricResult, ToolAggregateNode]: 64 """ 65 Run the evaluator over a list of agent data points. 66 67 Args: 68 data: A list of AgentDataPoint instances to evaluate. 69 70 Returns: 71 An EvaluatorResult containing per-call metric results and 72 aggregated nodes across runs. 73 """ 74 agent_data_ids: set[UUID] = {adp.identifier for adp in data} 75 forest = AggregateForest[ToolAggregateNode, ToolMetricResult]() 76 77 results = self._extract_tool_stats(data, forest) 78 self._aggregate_per_run(results, forest) 79 self._aggregate_across_runs(results, forest) 80 81 metrics = list(results.keys()) 82 83 return EvaluatorResult( 84 evaluator_name=self.name, 85 evaluator_id=self.identifier, 86 agent_data_ids=agent_data_ids, 87 metrics=metrics, 88 metric_results=[item for sublist in results.values() for item in sublist], 89 aggregate_results=forest, 90 )
Run the evaluator over a list of agent data points.
Arguments:
- data: A list of AgentDataPoint instances to evaluate.
Returns:
An EvaluatorResult containing per-call metric results and aggregated nodes across runs.
19class LLMInferenceEvaluator(Evaluator): 20 """ 21 Evaluator that analyzes LLM inference statistics across agent runs. 22 23 Computes per-call and aggregated metrics for each LLM invocation, 24 including input/output token counts, token cost, and latency. 25 """ 26 27 def __init__( 28 self, 29 ): 30 super().__init__() 31 32 def run( 33 self, data: list[AgentDataPoint] 34 ) -> EvaluatorResult[LLMMetric, LLMMetricResult, LLMInferenceAggregateNode]: 35 """ 36 Run the evaluator over a list of agent data points. 37 38 Args: 39 data: A list of AgentDataPoint instances to evaluate. 40 41 Returns: 42 An EvaluatorResult containing per-call metric results and 43 aggregated nodes grouped by model and call index. 44 """ 45 agent_data_ids: set[UUID] = {adp.identifier for adp in data} 46 forest = AggregateForest[LLMInferenceAggregateNode, LLMMetricResult]() 47 48 results = self._retrieve_llm_states(data, forest) 49 self._aggregate_metrics(results, forest) 50 51 metrics = list(results.keys()) 52 53 return EvaluatorResult( 54 evaluator_name=self.name, 55 evaluator_id=self.identifier, 56 agent_data_ids=agent_data_ids, 57 metrics=metrics, 58 metric_results=[item for sublist in results.values() for item in sublist], 59 aggregate_results=forest, 60 ) 61 62 def _retrieve_llm_states( 63 self, 64 data: list[AgentDataPoint], 65 forest: AggregateForest[LLMInferenceAggregateNode, LLMMetricResult], 66 ) -> dict[LLMMetric, list[LLMMetricResult]]: 67 results: dict[LLMMetric, list[LLMMetricResult]] = defaultdict(list) 68 69 for datapoint in data: 70 llm_details = datapoint.llm_details 71 72 for call in llm_details.calls: 73 # Input Tokens 74 metric = LLMMetric( 75 name="InputTokens", 76 min_value=0, 77 ) 78 79 metric_result = LLMMetricResult( 80 result_name="InputTokens", 81 metric_id=metric.identifier, 82 agent_data_id=[datapoint.identifier], 83 value=call.input_tokens, 84 llm_call_index=call.index, 85 model_name=call.model_name, 86 model_provider=call.model_provider, 87 ) 88 results[metric].append(metric_result) 89 forest.add_node(metric_result) 90 91 # Output Tokens 92 metric = LLMMetric( 93 name="OutputTokens", 94 min_value=0, 95 ) 96 97 metric_result = LLMMetricResult( 98 result_name="OutputTokens", 99 metric_id=metric.identifier, 100 agent_data_id=[datapoint.identifier], 101 value=call.output_tokens, 102 llm_call_index=call.index, 103 model_name=call.model_name, 104 model_provider=call.model_provider, 105 ) 106 results[metric].append(metric_result) 107 forest.add_node(metric_result) 108 109 # Total Cost 110 metric = LLMMetric( 111 name="TokenCost", 112 min_value=0.0, 113 ) 114 115 metric_result = LLMMetricResult( 116 result_name="TokenCost", 117 metric_id=metric.identifier, 118 agent_data_id=[datapoint.identifier], 119 value=call.total_cost, 120 llm_call_index=call.index, 121 model_name=call.model_name, 122 model_provider=call.model_provider, 123 ) 124 results[metric].append(metric_result) 125 forest.add_node(metric_result) 126 127 # Latency 128 metric = LLMMetric( 129 name="Latency", 130 min_value=0.0, 131 ) 132 metric_result = LLMMetricResult( 133 result_name="Latency", 134 metric_id=metric.identifier, 135 agent_data_id=[datapoint.identifier], 136 value=call.latency, 137 llm_call_index=call.index, 138 model_name=call.model_name, 139 model_provider=call.model_provider, 140 ) 141 results[metric].append(metric_result) 142 forest.add_node(metric_result) 143 144 return results 145 146 def _aggregate_metrics( 147 self, 148 results: dict[LLMMetric, list[LLMMetricResult]], 149 forest: AggregateForest[LLMInferenceAggregateNode, LLMMetricResult], 150 ) -> None: 151 for metric in results: 152 metric_results = results[metric] 153 values: dict[tuple[str, str, int], list[LLMMetricResult]] = defaultdict( 154 list 155 ) 156 for mr in metric_results: 157 if isinstance(mr.value, (int, float)): 158 key = (mr.model_name, mr.model_provider, mr.llm_call_index) 159 values[key].append(mr) 160 161 for (model_name, model_provider, llm_call_index), vals in values.items(): 162 aggregate_node = LLMInferenceAggregateNode( 163 name=f"Aggregate/{metric.name}/{model_name}/{model_provider}/Call_{llm_call_index}", 164 metric=metric, 165 children=[val.identifier for val in vals], 166 model_name=model_name, 167 model_provider=model_provider, 168 llm_call_index=llm_call_index, 169 forest=forest, 170 ) 171 172 forest.roots.append(aggregate_node.identifier) 173 forest.add_node(aggregate_node)
Evaluator that analyzes LLM inference statistics across agent runs.
Computes per-call and aggregated metrics for each LLM invocation, including input/output token counts, token cost, and latency.
32 def run( 33 self, data: list[AgentDataPoint] 34 ) -> EvaluatorResult[LLMMetric, LLMMetricResult, LLMInferenceAggregateNode]: 35 """ 36 Run the evaluator over a list of agent data points. 37 38 Args: 39 data: A list of AgentDataPoint instances to evaluate. 40 41 Returns: 42 An EvaluatorResult containing per-call metric results and 43 aggregated nodes grouped by model and call index. 44 """ 45 agent_data_ids: set[UUID] = {adp.identifier for adp in data} 46 forest = AggregateForest[LLMInferenceAggregateNode, LLMMetricResult]() 47 48 results = self._retrieve_llm_states(data, forest) 49 self._aggregate_metrics(results, forest) 50 51 metrics = list(results.keys()) 52 53 return EvaluatorResult( 54 evaluator_name=self.name, 55 evaluator_id=self.identifier, 56 agent_data_ids=agent_data_ids, 57 metrics=metrics, 58 metric_results=[item for sublist in results.values() for item in sublist], 59 aggregate_results=forest, 60 )
Run the evaluator over a list of agent data points.
Arguments:
- data: A list of AgentDataPoint instances to evaluate.
Returns:
An EvaluatorResult containing per-call metric results and aggregated nodes grouped by model and call index.