Skip to content

The agent class

This page documents the Agent class, which runs the main loop of the agent. To learn about the configuration objects used to specify the behavior of an agent, see the agent configuration reference page.

sweagent.agent.agents.RetryAgent

RetryAgent(config: RetryAgentConfig)

Bases: AbstractAgent

Source code in sweagent/agent/agents.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def __init__(self, config: RetryAgentConfig):
    # Always copy config to avoid shared state between different instances
    self.config = config.model_copy(deep=True)
    self._hooks = []
    self._i_attempt = 0
    self.logger = get_logger("swea-agent", emoji="🤠")
    self._agent: DefaultAgent | None = None
    self._attempt_data: list[dict[str, Any]] = []
    self._total_instance_attempt_stats = InstanceStats()
    """Note that total_instance_attempt_stats only accumulates the states of the sub-agent,
    not the reviewer. Use self._total_instance_stats for the total stats.
    """
    self._chook = CombinedAgentHook()
    self._traj_path: Path | None = None
    self._problem_statement: ProblemStatement | None = None
    self._env: SWEEnv | None = None
    self._output_dir: Path | None = None
    self._rloop: ScoreRetryLoop | ChooserRetryLoop | None = None

config instance-attribute

config = model_copy(deep=True)

logger instance-attribute

logger = get_logger('swea-agent', emoji='🤠')

add_hook

add_hook(hook: AbstractAgentHook) -> None
Source code in sweagent/agent/agents.py
243
244
245
def add_hook(self, hook: AbstractAgentHook) -> None:
    self._chook.add_hook(hook)
    self._hooks.append(hook)

from_config classmethod

from_config(config: RetryAgentConfig) -> Self
Source code in sweagent/agent/agents.py
239
240
241
@classmethod
def from_config(cls, config: RetryAgentConfig) -> Self:
    return cls(config)

get_trajectory_data

get_trajectory_data(choose: bool) -> dict[str, Any]

Get all data that we save in .traj files.

Source code in sweagent/agent/agents.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def get_trajectory_data(self, choose: bool) -> dict[str, Any]:
    """Get all data that we save in .traj files."""
    assert self._rloop is not None

    data = {
        "attempts": self._attempt_data,
    }

    if choose:
        try:
            best_attempt_idx = self._rloop.get_best()
        except Exception as e:
            self.logger.critical(f"Error getting best attempt index: {e}. Setting to 0.", exc_info=True)
            best_attempt_idx = 0
        data |= copy.deepcopy(self._attempt_data[best_attempt_idx])  # type: ignore
        data["info"]["best_attempt_idx"] = best_attempt_idx
        data["info"]["rloop_model_stats"] = self._rloop.review_model_stats.model_dump()
        # Overwrite model stats with total stats
        data["info"]["model_stats"] = self._total_instance_stats.model_dump()
        if isinstance(self._rloop, ChooserRetryLoop):
            data["info"]["chooser"] = (
                self._rloop._chooser_output.model_dump() if self._rloop._chooser_output else {}
            )
    return data

run

run(env: SWEEnv, problem_statement: ProblemStatement | ProblemStatementConfig, output_dir: Path = Path('.')) -> AgentRunResult

Run the agent on a problem instance. This method contains the main loop that repeatedly calls self._step until the problem is solved.

Parameters:

Name Type Description Default
env SWEEnv

The environment to run the agent on.

required
problem_statement ProblemStatement | ProblemStatementConfig

The problem statement to run the agent on.

required
output_dir Path

Directory to save the trajectory to

Path('.')
Source code in sweagent/agent/agents.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def run(
    self,
    env: SWEEnv,
    problem_statement: ProblemStatement | ProblemStatementConfig,
    output_dir: Path = Path("."),
) -> AgentRunResult:
    """Run the agent on a problem instance. This method contains the
    main loop that repeatedly calls `self._step` until the problem is solved.

    Args:
        env: The environment to run the agent on.
        problem_statement: The problem statement to run the agent on.
        output_dir: Directory to save the trajectory to
    """
    output_dir.mkdir(parents=True, exist_ok=True)
    self.setup(env=env, problem_statement=problem_statement, output_dir=output_dir)
    assert self._rloop is not None

    # Run action/observation loop
    self._chook.on_run_start()
    step_output = StepOutput()
    self._setup_agent()
    assert self._agent is not None
    while not step_output.done:
        step_output = self.step()
        self.save_trajectory(choose=False)
        if step_output.done:
            self._rloop.on_submit(
                ReviewSubmission(
                    trajectory=self._agent.trajectory,
                    info=self._agent.info,
                    model_stats=self._agent.model.stats,
                )
            )
            if isinstance(self._rloop, ScoreRetryLoop):
                self._agent.info["review"] = self._rloop.reviews[-1].model_dump()  # type: ignore
            self._finalize_agent_run()
            self.save_trajectory(choose=False)
            if self._rloop.retry():
                assert self._env is not None
                self._next_attempt()
                step_output.done = False
    self.save_trajectory(choose=True)  # call again after we finalized
    self._chook.on_run_done(trajectory=self._agent.trajectory, info=self._agent.info)

    self.logger.info("Trajectory saved to %s", self._traj_path)

    # Here we want to return the "global" information (e.g., submission should
    # be the best submission instead of the last one, etc.), so we get it from the traj file
    data = self.get_trajectory_data(choose=True)
    return AgentRunResult(info=data["info"], trajectory=data["trajectory"])

save_trajectory

save_trajectory(choose: bool) -> None
Source code in sweagent/agent/agents.py
337
338
339
340
def save_trajectory(self, choose: bool) -> None:
    data = self.get_trajectory_data(choose=choose)
    assert self._traj_path is not None
    self._traj_path.write_text(json.dumps(data, indent=2))

setup

setup(env: SWEEnv, problem_statement: ProblemStatement | ProblemStatementConfig, output_dir: Path = Path('.')) -> None

Setup the retry agent for a new problem instance. This is mostly a bookkeeping step.

Source code in sweagent/agent/agents.py
247
248
249
250
251
252
253
254
255
256
257
258
def setup(
    self, env: SWEEnv, problem_statement: ProblemStatement | ProblemStatementConfig, output_dir: Path = Path(".")
) -> None:
    """Setup the retry agent for a new problem instance.
    This is mostly a bookkeeping step.
    """
    self._total_instance_attempt_stats = InstanceStats()
    self._problem_statement = problem_statement
    self._traj_path = output_dir / (self._problem_statement.id + ".traj")
    self._env = env
    self._output_dir = output_dir
    self._rloop = get_retry_loop_from_config(self.config.retry_loop, problem_statement=problem_statement)

step

step() -> StepOutput

Step the agent of the current attempt. Attempt autosubmit if an error occurs (though all errors should already be handled by the attempt agent).

Source code in sweagent/agent/agents.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def step(self) -> StepOutput:
    """Step the agent of the current attempt.
    Attempt autosubmit if an error occurs (though all errors should already be handled by the attempt agent).
    """
    assert self._agent is not None
    # Failsafe cost check, this should not actually happen, because the sub-agent should have already been
    # initialized with the correct cost limit to not exceed the total cost limit. Using factor of 1.1, because
    # sub-agent might only catch the cost limit after attempting.
    if self._total_instance_stats.instance_cost > 1.1 * self.config.retry_loop.cost_limit > 0:
        msg = "Total instance cost exceeded cost limit. This should not happen, please report this. Triggering autosubmit."
        self.logger.critical(msg)
        return self._agent.attempt_autosubmission_after_error(step=StepOutput())
    try:
        step = self._agent.step()
    except Exception as e:
        msg = "Error in agent step: %s. This really shouldn't happen, please report this. Triggering autosubmit."
        self.logger.critical(msg, e, exc_info=True)
        step = self._agent.attempt_autosubmission_after_error(step=StepOutput())
    return step

sweagent.agent.agents.DefaultAgent

DefaultAgent(*, templates: TemplateConfig, tools: ToolHandler, history_processors: list[HistoryProcessor], model: AbstractModel, max_requeries: int = 3, name: str = 'main', _catch_errors: bool = True, _always_require_zero_exit_code: bool = False, action_sampler_config: ActionSamplerConfig | None = None)

Bases: AbstractAgent

The agent handles the behaviour of the model and how it interacts with the environment.

To run the agent, either call self.run or self.setup and then self.step in a loop.

Source code in sweagent/agent/agents.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def __init__(
    self,
    *,
    templates: TemplateConfig,
    tools: ToolHandler,
    history_processors: list[HistoryProcessor],
    model: AbstractModel,
    max_requeries: int = 3,
    name: str = "main",
    _catch_errors: bool = True,
    _always_require_zero_exit_code: bool = False,
    action_sampler_config: ActionSamplerConfig | None = None,
):
    """The agent handles the behaviour of the model and how it interacts with the environment.

    To run the agent, either call `self.run` or `self.setup` and then `self.step` in a loop.
    """
    self._catch_errors = _catch_errors
    self._always_require_zero_exit_code = _always_require_zero_exit_code
    self.name = name
    self.model = model
    self.templates = templates
    self.tools = tools
    if isinstance(self.model, HumanThoughtModel):
        self.tools.config.parse_function = ThoughtActionParser()
    elif isinstance(self.model, HumanModel):
        self.tools.config.parse_function = ActionOnlyParser()
    self.history_processors = history_processors
    self.max_requeries = max_requeries
    self.logger = get_logger("swea-agent", emoji="🤠")
    # Set in run method
    self._env: SWEEnv | None = None
    self._problem_statement: ProblemStatement | ProblemStatementConfig | None = None
    self.traj_path: Path | None = None

    #: The following three attributes collect the information about how the agent
    #: solved the problem.
    self.history = []
    self._trajectory = []
    self.info = AgentInfo()

    self._chook = CombinedAgentHook()

    self._replay_config: BaseModel | None = None
    """This can be set to a RunSingleConfig from the Run instance whenever possible.
    It can be used to replay the agent's trajectory in an environment.
    """

    self._action_sampler: AbstractActionSampler | None = None
    if action_sampler_config is not None:
        self._action_sampler = action_sampler_config.get(self.model, self.tools)

    #: Count how many timeout errors have occurred consecutively. Kills agent
    #: after 5 of them.
    self._n_consecutive_timeouts = 0
    self._total_execution_time = 0.0

history instance-attribute

history = []

history_processors instance-attribute

history_processors = history_processors

info instance-attribute

info = AgentInfo()

logger instance-attribute

logger = get_logger('swea-agent', emoji='🤠')

max_requeries instance-attribute

max_requeries = max_requeries

messages property

messages: list[dict[str, Any]]

Return the history of the agent for this attempt since the last reset, processed through all history processors.

model instance-attribute

model = model

name instance-attribute

name = name

replay_config property writable

replay_config: BaseModel | None

templates instance-attribute

templates = templates

tools instance-attribute

tools = tools

traj_path instance-attribute

traj_path: Path | None = None

trajectory property

trajectory: Trajectory

add_demonstrations_to_history

add_demonstrations_to_history() -> None

Add demonstrations to history

Source code in sweagent/agent/agents.py
559
560
561
562
def add_demonstrations_to_history(self) -> None:
    """Add demonstrations to history"""
    for demonstration_path in self.templates.demonstrations:
        self._add_demonstration_to_history(demonstration_path)

add_hook

add_hook(hook: AbstractAgentHook) -> None

Add hook to agent

Source code in sweagent/agent/agents.py
468
469
470
471
def add_hook(self, hook: AbstractAgentHook) -> None:
    """Add hook to agent"""
    hook.on_init(agent=self)
    self._chook.add_hook(hook)

add_instance_template_to_history

add_instance_template_to_history(state: dict[str, str]) -> None

Add observation to history, as well as the instance template or demonstrations if we're at the start of a new attempt.

Source code in sweagent/agent/agents.py
690
691
692
693
694
695
696
697
698
699
700
701
702
def add_instance_template_to_history(self, state: dict[str, str]) -> None:
    """Add observation to history, as well as the instance template or demonstrations if we're
    at the start of a new attempt.
    """
    templates: list[str] = []
    # Determine observation template based on what prior observation was
    assert self.history[-1]["role"] == "system" or self.history[-1].get("is_demo", False)
    # Show instance template if prev. obs. was initial system message
    templates = [self.templates.instance_template]
    if self.templates.strategy_template is not None:
        templates.append(self.templates.strategy_template)

    self._add_templated_messages_to_history(templates, **state)  # type: ignore

add_step_to_history

add_step_to_history(step: StepOutput) -> None

Adds a step (command that was run and output) to the model history

Source code in sweagent/agent/agents.py
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
def add_step_to_history(self, step: StepOutput) -> None:
    """Adds a step (command that was run and output) to the model history"""
    self._append_history(
        {
            "role": "assistant",
            "content": step.output,
            "thought": step.thought,
            "action": step.action,
            "agent": self.name,
            "tool_calls": step.tool_calls,
            "message_type": "action",
        },
    )

    elided_chars = 0
    if step.observation.strip() == "":
        # Show no output template if observation content was empty
        templates = [self.templates.next_step_no_output_template]
    elif len(step.observation) > self.templates.max_observation_length:
        templates = [self.templates.next_step_truncated_observation_template]
        elided_chars = len(step.observation) - self.templates.max_observation_length
        step.observation = step.observation[: self.templates.max_observation_length]
    else:
        # Show standard output template if there is observation content
        templates = [self.templates.next_step_template]
    self._add_templated_messages_to_history(
        templates,
        observation=step.observation,
        elided_chars=elided_chars,
        max_observation_length=self.templates.max_observation_length,
        tool_call_ids=step.tool_call_ids,
        **step.state,
    )

add_step_to_trajectory

add_step_to_trajectory(step: StepOutput) -> None
Source code in sweagent/agent/agents.py
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
def add_step_to_trajectory(self, step: StepOutput) -> None:
    trajectory_step = TrajectoryStep(
        {
            "action": step.action,
            "observation": step.observation,
            "response": step.output,
            "thought": step.thought,
            "execution_time": step.execution_time,
            "state": step.state,
            "messages": self.messages,
            "extra_info": step.extra_info,
        },
    )
    self.trajectory.append(trajectory_step)

add_system_message_to_history

add_system_message_to_history() -> None

Add system message to history

Source code in sweagent/agent/agents.py
550
551
552
553
554
555
556
557
def add_system_message_to_history(self) -> None:
    """Add system message to history"""
    assert self._problem_statement is not None
    system_msg = Template(self.templates.system_template).render(**self._get_format_dict())
    self.logger.info(f"SYSTEM ({self.name})\n{system_msg}")
    self._append_history(
        {"role": "system", "content": system_msg, "agent": self.name, "message_type": "system_prompt"}
    )

attempt_autosubmission_after_error

attempt_autosubmission_after_error(step: StepOutput) -> StepOutput

For most exceptions, we attempt to still extract the patch and submit that. This means we send the submit command to the runtime and parse the output.

Source code in sweagent/agent/agents.py
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
def attempt_autosubmission_after_error(self, step: StepOutput) -> StepOutput:
    """For most exceptions, we attempt to still extract the patch and submit that.
    This means we send the `submit` command to the runtime and parse the output.
    """
    self.logger.warning("Attempting autosubmission after error")
    step = step.model_copy(deep=True)
    step.done = True
    assert self._env is not None
    if not asyncio.run(self._env.deployment.is_alive(timeout=10)):
        # The agent is dead. This is very bad. Maybe we can take a 'diff' that was saved
        # for a previous step? (if running with diff in tools)
        self.logger.error("Runtime is no longer alive")
        try:
            last_trajectory_step = self.trajectory[-1]
        except IndexError:
            self.logger.info("No last trajectory step to extract patch from")
            return step
        if "diff" not in last_trajectory_step["state"]:
            self.logger.info("No diff in last trajectory step state, cannot autosubmit")
            return step
        diff = last_trajectory_step["state"]["diff"]
        self.logger.info("Using diff from last trajectory step to autosubmit")
        step.submission = diff
        if step.submission:
            step.observation = "Environment died unexpectedly. Exited (autosubmitted)"
            step.exit_status = f"submitted ({step.exit_status})"
        else:
            self.logger.info("Diff from last traj step empty.")
        return step
    # Let us manually run the submission command and collect the output
    repo_name = "/"
    if self._env.repo is not None:
        repo_name = f"/{self._env.repo.repo_name}"
    submission_command = "git add -A && git diff --cached > /root/model.patch"
    self.logger.info("Executing submission command %s in %s", submission_command, repo_name)
    try:
        self._env.execute_command(submission_command, check=True, cwd=repo_name)
    except Exception as e:
        self.logger.error("Failed to execute submission command, got %s", e)
    # There's still hope for the submission, because the `/root/model.patch` file might have been
    # generated by the state command
    step = self.handle_submission(step, observation="", force_submission=True)
    if step.submission:
        self.logger.info("Exiting with autosubmission")
        step.observation = "Exited (autosubmitted)"
    return step

forward

forward(history: list[dict[str, str]]) -> StepOutput

Forward the model without handling errors.

All exceptions raised will contain the StepOutput object with some of the attributes set.

Parameters:

Name Type Description Default
history list[dict[str, str]]

history to query the model with

required

Returns:

Name Type Description
step_output StepOutput

step output

Source code in sweagent/agent/agents.py
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
def forward(self, history: list[dict[str, str]]) -> StepOutput:
    """Forward the model without handling errors.

    All exceptions raised will contain the `StepOutput` object
    with some of the attributes set.

    Args:
        history: history to query the model with

    Returns:
        step_output: step output
    """
    if self._total_execution_time > self.tools.config.total_execution_timeout:
        raise _TotalExecutionTimeExceeded()

    # we continuously add actions, output etc. to the step object
    # because some of the specific exception handling requires some of these
    # attributes (e.g., if we want to requery the model for a bash syntax error, we
    # need to have the previous model output to format the requery template)
    step = StepOutput()
    try:
        # Forward model and get actions
        self._chook.on_model_query(messages=history, agent=self.name)
        # todo: Add all options to the extra info
        if self._action_sampler is not None:
            assert self._problem_statement is not None
            best = self._action_sampler.get_action(
                problem_statement=self._problem_statement,
                trajectory=self.trajectory,
                history=history,
            )
            output = best.completion
            # todo: Handle history and trajectory
            step.extra_info.update(best.extra_info)
        else:
            output = self.model.query(history)  # type: ignore
        step.output = output["message"]
        # todo: Can't I override the parser in __init__?
        step.thought, step.action = self.tools.parse_actions(output)
        if output.get("tool_calls") is not None:
            step.tool_call_ids = [call["id"] for call in output["tool_calls"]]
            step.tool_calls = output["tool_calls"]
        self.logger.info(f"💭 THOUGHT\n{step.thought}\n\n🎬 ACTION\n{step.action.strip()}")
        self._chook.on_actions_generated(step=step)
        return self.handle_action(step)
    except Exception as e:
        if step.action == step.thought == "":
            # Probably the parsing failed/no action included. Let's still fill in thought
            # so that trajectory viewers have something to show us for this step.
            step.thought = step.output
        # Attach the step object to the exception
        e.step = step  # type: ignore
        raise

forward_with_handling

forward_with_handling(history: list[dict[str, str]]) -> StepOutput

Forward the model and handle errors, requerying the model if we can. For example, if the model outputs a bash command that has syntax errors, we will not execute it but requery the model for a corrected command.

Note: This will update the trajectory, but not the history.

Parameters:

Name Type Description Default
history list[dict[str, str]]

history to forward

required

Returns:

Name Type Description
step_output StepOutput

step output

Source code in sweagent/agent/agents.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
def forward_with_handling(self, history: list[dict[str, str]]) -> StepOutput:
    """Forward the model and handle errors, requerying the model if we can.
    For example, if the model outputs a bash command that has syntax errors,
    we will not execute it but requery the model for a corrected command.

    Note: This will update the trajectory, but not the history.

    Args:
        history: history to forward

    Returns:
        step_output: step output
    """

    def handle_error_with_autosubmission(exit_status: str, message: str) -> StepOutput:
        """Attempts to autosubmit (extract patch from the environment) and stops the loop."""
        self.logger.warning(message)
        return self.attempt_autosubmission_after_error(
            StepOutput(
                thought=message,
                exit_status=exit_status,
                output=message,
                done=True,
            )
        )

    def handle_error_with_retry(exception: Exception, template: str, n_requeries: int) -> list[dict[str, str]]:
        """Requeries the model if the error is a format/blocklist/bash syntax error."""
        self.logger.warning("Requerying model after %s (%dth requery)", type(exception).__name__, n_requeries)
        step: StepOutput = getattr(exception, "step", StepOutput())
        self.add_step_to_trajectory(step)
        exception_message = getattr(exception, "message", "")
        if not exception_message:
            try:
                exception_message = exception.args[0]
            except (IndexError, AttributeError):
                pass
        return self.get_model_requery_history(
            error_template=template,
            **step.to_template_format_dict(),
            **getattr(exception, "extra_info", {}),
            exception_message=exception_message,
        )

    n_format_fails = 0
    while n_format_fails < self.max_requeries:
        try:
            return self.forward(history)

        # Errors that are raised

        except KeyboardInterrupt:
            raise

        # Errors that cause requery

        except FormatError as e:
            n_format_fails += 1
            history = handle_error_with_retry(
                exception=e, template=self.tools.config.format_error_template, n_requeries=n_format_fails
            )
        except _BlockedActionError as e:
            n_format_fails += 1
            history = handle_error_with_retry(
                exception=e, template=self.tools.config.filter.blocklist_error_template, n_requeries=n_format_fails
            )
        except ContentPolicyViolationError:
            self.logger.warning("Content policy violation, trying to resample")
            n_format_fails += 1
            # Try if simply resampling helps here
            pass
        except BashIncorrectSyntaxError as e:
            n_format_fails += 1
            history = handle_error_with_retry(
                exception=e,
                template=self.templates.shell_check_error_template,
                n_requeries=n_format_fails,
            )
        except _RetryWithOutput as e:
            history = handle_error_with_retry(
                exception=e,
                template=self.templates.next_step_template,
                n_requeries=n_format_fails,
            )
        except _RetryWithoutOutput:
            pass
            # Requery with the same template as the last step

        # Errors that cause exit
        except _TotalExecutionTimeExceeded:
            self.logger.exception("Exiting due to total execution time exceeded", exc_info=True)
            return handle_error_with_autosubmission(
                "exit_total_execution_time",
                "Exit due to total execution time exceeded",
            )

        except CommandTimeoutError:
            self.logger.exception("Exiting due to multiple consecutive command timeouts", exc_info=True)
            return handle_error_with_autosubmission(
                "exit_command_timeout",
                "Exit due to multiple consecutive command timeouts",
            )

        except ContextWindowExceededError:
            return handle_error_with_autosubmission(
                "exit_context",
                "Exit due to context window",
            )
        except CostLimitExceededError:
            return handle_error_with_autosubmission(
                "exit_cost",
                "Exit due to cost limit",
            )
        except RetryError as e:
            self.logger.exception(f"Exiting due to retry error: {e}", exc_info=True)
            return handle_error_with_autosubmission(
                "exit_api",
                f"Exit due to retry error: {e}",
            )
        except SwerexException as e:
            self.logger.exception(f"Exiting due to environment error: {e}", exc_info=True)
            return handle_error_with_autosubmission(
                "exit_environment_error",
                f"Exit due to environment error: {e}",
            )
        except RuntimeError as e:
            self.logger.exception(f"Exiting due to runtime error: {e}", exc_info=True)
            return handle_error_with_autosubmission(
                "exit_error",
                f"Exit due to runtime error: {e}",
            )
        except Exception as e:
            self.logger.exception(f"Exiting due to unknown error: {e}", exc_info=True)
            return handle_error_with_autosubmission(
                "exit_error",
                f"Exit due to unknown error: {e}",
            )
    self.logger.exception(
        "Exit due to repeated format/blocklist/bash syntax errors",
        exc_info=True,
    )
    return handle_error_with_autosubmission(
        "exit_format",
        "Exit due to repeated format/blocklist/bash syntax errors",
    )

from_config classmethod

from_config(config: DefaultAgentConfig) -> Self
Source code in sweagent/agent/agents.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
@classmethod
def from_config(cls, config: DefaultAgentConfig) -> Self:
    # To ensure that all models stay completely independent, we deepcopy the
    # model config, because it lives on as a property in the model, tools, etc.
    config = config.model_copy(deep=True)
    model = get_model(config.model, config.tools)
    return cls(
        templates=config.templates,
        tools=ToolHandler(config.tools),
        history_processors=config.history_processors,
        model=model,
        max_requeries=config.max_requeries,
        action_sampler_config=config.action_sampler,
    )

get_model_requery_history

get_model_requery_history(error_template: str, *, output: str, **kwargs: str | int | float | bool | None) -> list[dict[str, str]]

Ask the model to correct after a hitting one of the following errors:

  1. Malformatted output (could not parse action)
  2. Blocked action (command is on the blocklist)
  3. Bash command syntax error

At the time this function is called, the proposed action and observation are not part of the history yet.

This function adds temporary history based on the error template and queries the model. If the model is able to correct itself, the records of the mistakes will not be part of the history (but they are saved in the trajectory).

Parameters:

Name Type Description Default
error_template str

error template

required
output str

model output

required
**kwargs str | int | float | bool | None

keyword arguments to be passed to the error template

{}

Returns:

Type Description
list[dict[str, str]]

model output after requery

Source code in sweagent/agent/agents.py
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
def get_model_requery_history(
    self, error_template: str, *, output: str, **kwargs: str | int | float | bool | None
) -> list[dict[str, str]]:
    """Ask the model to correct after a hitting one of the following errors:

    1. Malformatted output (could not parse action)
    2. Blocked action (command is on the blocklist)
    3. Bash command syntax error

    At the time this function is called, the proposed action and observation are not part of the history
    yet.

    This function adds temporary history based on the error template and queries the model.
    If the model is able to correct itself, the records of the mistakes will not be part of the history
    (but they are saved in the trajectory).

    Args:
        error_template: error template
        output: model output
        **kwargs: keyword arguments to be passed to the error template

    Returns:
        model output after requery
    """
    format_dict = {**kwargs, **self._get_format_dict()}
    error_template = Template(error_template).render(**format_dict)

    self.logger.warning(f"{error_template}")

    return self.messages + [
        {"role": "assistant", "content": output, "agent": self.name},
        {"role": "user", "content": error_template, "agent": self.name},
    ]

get_trajectory_data

get_trajectory_data() -> dict[str, Any]

Get all data that we save in .traj files.

Source code in sweagent/agent/agents.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
def get_trajectory_data(self) -> dict[str, Any]:
    """Get all data that we save in .traj files."""

    assert self._env is not None
    # The deepcopy here is important because else the
    # data["info"]["model_stats"] update will create havoc!
    attempt_data = copy.deepcopy(
        {
            "trajectory": self.trajectory,
            "history": self.history,
            "info": self.info,
        }
    )
    attempt_data["replay_config"] = self.replay_config.model_dump_json() if self.replay_config is not None else None
    attempt_data["environment"] = self._env.name
    return attempt_data

handle_action

handle_action(step: StepOutput) -> StepOutput

Runs an action proposed by the agent in the environment and returns the corresponding output.

Parameters:

Name Type Description Default
action

command to run in bash shell

required
output

output from model (only used for error handling)

required

Returns:

Name Type Description
action_execution_output StepOutput

action execution output

Source code in sweagent/agent/agents.py
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
def handle_action(self, step: StepOutput) -> StepOutput:
    """Runs an action proposed by the agent in the environment and returns the corresponding output.

    Args:
        action: command to run in bash shell
        output: output from model (only used for error handling)

    Returns:
        action_execution_output: action execution output
    """
    if self.tools.should_block_action(step.action):
        raise _BlockedActionError()

    if step.action.strip() == "exit":
        self.logger.info("Exiting agent")
        step.done = True
        step.observation = "Exited"
        step.exit_status = "exit_command"
        assert self._env is not None
        step.state = self.tools.get_state(env=self._env)  # for history
        return step

    assert self._env is not None
    self._chook.on_action_started(step=step)
    execution_t0 = time.perf_counter()
    run_action: str = self.tools.guard_multiline_input(step.action).strip()
    try:
        step.observation = self._env.communicate(
            input=run_action,
            timeout=self.tools.config.execution_timeout,
            check="raise" if self._always_require_zero_exit_code else "ignore",
        )
    except CommandTimeoutError:
        try:
            if self._n_consecutive_timeouts >= self.tools.config.max_consecutive_execution_timeouts:
                msg = "Exiting agent due to too many consecutive execution timeouts"
                self.logger.critical(msg)
                raise
            self._env.interrupt_session()
            self._n_consecutive_timeouts += 1
        except Exception as f:
            self.logger.exception("Failed to interrupt session after command timeout: %s", f, exc_info=True)
            raise
        step.observation = Template(self.templates.command_cancelled_timeout_template).render(
            **self._get_format_dict(),
            timeout=self.tools.config.execution_timeout,
            command=run_action,
        )
    else:
        self._n_consecutive_timeouts = 0
    step.execution_time = time.perf_counter() - execution_t0
    self._total_execution_time += step.execution_time
    self._chook.on_action_executed(step=step)
    step.state = self.tools.get_state(env=self._env)

    if RETRY_WITH_OUTPUT_TOKEN in step.observation:
        step.observation = step.observation.replace(RETRY_WITH_OUTPUT_TOKEN, "")
        raise _RetryWithOutput()
    elif RETRY_WITHOUT_OUTPUT_TOKEN in step.observation:
        step.observation = step.observation.replace(RETRY_WITHOUT_OUTPUT_TOKEN, "")
        raise _RetryWithoutOutput()

    return self.handle_submission(step)

handle_submission

handle_submission(step: StepOutput, *, observation='', force_submission: bool = False) -> StepOutput

Check if there was a submission in the observation and handle it.

Parameters:

Name Type Description Default
step StepOutput
required
observation

If specified, will use this rather than stepobservation

''
force_submission bool

If True, will always submit even if no submission is found

False

Returns:

Name Type Description
step StepOutput

step with submission and observation updated (if submission was found)

Source code in sweagent/agent/agents.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
def handle_submission(self, step: StepOutput, *, observation="", force_submission: bool = False) -> StepOutput:
    """Check if there was a submission in the observation and handle it.

    Args:
        step:
        observation: If specified, will use this rather than stepobservation
        force_submission: If True, will always submit even if no submission is found

    Returns:
        step: step with submission and observation updated (if submission was found)
    """
    step = step.model_copy(deep=True)
    assert self.tools is not None
    is_submission = self.tools.check_for_submission_cmd(observation or step.observation)
    if is_submission or force_submission:
        assert self._env is not None
        try:
            submission = self._env.read_file("/root/model.patch", encoding="utf-8", errors="backslashreplace")
        except FileNotFoundError:
            self.logger.warning("Submission file not found, no submission was made")
            return step
        except Exception as e:
            self.logger.exception("Failed to read submission file, got %s", e)
            return step
        if submission.strip() != "":
            step.submission = submission
        else:
            step.submission = None
        step.observation = submission
        if not step.exit_status:
            step.exit_status = "submitted"
        elif step.submission:
            step.exit_status = f"submitted ({step.exit_status})"
        step.done = True
        self.logger.info(f"Found submission: {submission}")
    return step

run

run(env: SWEEnv, problem_statement: ProblemStatement | ProblemStatementConfig, output_dir: Path = Path('.')) -> AgentRunResult

Run the agent on a problem instance. This method contains the main loop that repeatedly calls self._step until the problem is solved.

Parameters:

Name Type Description Default
setup_args

Arguments to pass to the agent's setup method.

required
env SWEEnv

The environment to run the agent on.

required
traj_dir

Directory to save the trajectory to

required
Source code in sweagent/agent/agents.py
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
def run(
    self,
    env: SWEEnv,
    problem_statement: ProblemStatement | ProblemStatementConfig,
    output_dir: Path = Path("."),
) -> AgentRunResult:
    """Run the agent on a problem instance. This method contains the
    main loop that repeatedly calls `self._step` until the problem is solved.

    Args:
        setup_args: Arguments to pass to the agent's setup method.
        env: The environment to run the agent on.
        traj_dir: Directory to save the trajectory to
    """
    self.setup(env=env, problem_statement=problem_statement, output_dir=output_dir)

    # Run action/observation loop
    self._chook.on_run_start()
    step_output = StepOutput()
    while not step_output.done:
        step_output = self.step()
        self.save_trajectory()
    self._chook.on_run_done(trajectory=self.trajectory, info=self.info)

    self.logger.info("Trajectory saved to %s", self.traj_path)

    # Here we want to return the "global" information (e.g., submission should
    # be the best submission instead of the last one, etc.), so we get it from the traj file
    data = self.get_trajectory_data()
    return AgentRunResult(info=data["info"], trajectory=data["trajectory"])

save_trajectory

save_trajectory() -> None

Save the trajectory to disk. This includes the history, the environment state, and the model stats.

Source code in sweagent/agent/agents.py
721
722
723
724
725
726
727
728
729
def save_trajectory(
    self,
) -> None:
    """Save the trajectory to disk.
    This includes the history, the environment state, and the model stats.
    """
    data = self.get_trajectory_data()
    assert self.traj_path is not None
    self.traj_path.write_text(json.dumps(data, indent=2))

setup

setup(env: SWEEnv, problem_statement: ProblemStatement | ProblemStatementConfig, output_dir: Path = Path('.')) -> None

Setup the agent for a new instance. This includes formatting the system message and adding demonstrations to the history.

This method is called by self.run.

Source code in sweagent/agent/agents.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
def setup(
    self,
    env: SWEEnv,
    problem_statement: ProblemStatement | ProblemStatementConfig,
    output_dir: Path = Path("."),
) -> None:
    """Setup the agent for a new instance. This includes
    formatting the system message and adding demonstrations to the history.

    This method is called by `self.run`.
    """
    output_dir.mkdir(parents=True, exist_ok=True)
    self._problem_statement = problem_statement
    self._env = env
    iid = self._problem_statement.id
    self.logger.info("Setting up agent for instance %s", iid)

    # Save/reset some attributes
    self.traj_path = output_dir / (self._problem_statement.id + ".traj")
    self.logger.info("Trajectory will be saved to %s", self.traj_path)

    self._chook.on_tools_installation_started()
    self.tools.install(self._env)
    self._chook.on_setup_attempt()
    self.info = AgentInfo()
    self.info["swe_agent_hash"] = get_agent_commit_hash()
    self.info["swe_agent_version"] = __version__
    self.info["swe_rex_version"] = get_rex_version()
    self.info["swe_rex_hash"] = get_rex_commit_hash()
    assert self._env is not None
    assert self._problem_statement is not None
    self._env.set_env_variables({"PROBLEM_STATEMENT": self._problem_statement.get_problem_statement()})
    self.add_system_message_to_history()
    self.add_demonstrations_to_history()
    self.add_instance_template_to_history(state=self.tools.get_state(self._env))
    self._chook.on_setup_done()

step

step() -> StepOutput

Run a step of the agent. This is a wrapper around self.forward_with_handling with additional bookkeeping:

  1. Update message history with performed action and observation
  2. Update trajectory with the final executed result
  3. Update the info dictionary

Returns:

Name Type Description
step_output StepOutput

step output (same as the output of self.forward_with_handling)

Source code in sweagent/agent/agents.py
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
def step(self) -> StepOutput:
    """Run a step of the agent. This is a wrapper around `self.forward_with_handling`
    with additional bookkeeping:

    1. Update message history with performed action and observation
    2. Update trajectory with the final executed result
    3. Update the info dictionary

    Returns:
        step_output: step output (same as the output of `self.forward_with_handling`)
    """

    assert self._env is not None
    self._chook.on_step_start()

    n_step = len(self.trajectory) + 1
    self.logger.info("=" * 25 + f" STEP {n_step} " + "=" * 25)
    step_output = self.forward_with_handling(self.messages)
    self.add_step_to_history(step_output)

    self.info["submission"] = step_output.submission
    self.info["exit_status"] = step_output.exit_status  # type: ignore
    self.info.update(self._get_edited_files_with_context(patch=step_output.submission or ""))  # type: ignore
    self.info["model_stats"] = self.model.stats.model_dump()

    self.add_step_to_trajectory(step_output)

    self._chook.on_step_done(step=step_output, info=self.info)
    return step_output