@dataclassclassFlowRunEngine(Generic[P,R]):flow:Union[Flow[P,R],Flow[P,Coroutine[Any,Any,R]]]parameters:Optional[Dict[str,Any]]=Noneflow_run:Optional[FlowRun]=Noneflow_run_id:Optional[UUID]=Nonelogger:logging.Logger=field(default_factory=lambda:get_logger("engine"))wait_for:Optional[Iterable[PrefectFuture]]=None# holds the return value from the user code_return_value:Union[R,Type[NotSet]]=NotSet# holds the exception raised by the user code, if any_raised:Union[Exception,Type[NotSet]]=NotSet_is_started:bool=False_client:Optional[SyncPrefectClient]=Noneshort_circuit:bool=False_flow_run_name_set:bool=Falsedef__post_init__(self):ifself.flowisNoneandself.flow_run_idisNone:raiseValueError("Either a flow or a flow_run_id must be provided.")ifself.parametersisNone:self.parameters={}@propertydefclient(self)->SyncPrefectClient:ifnotself._is_startedorself._clientisNone:raiseRuntimeError("Engine has not started.")returnself._client@propertydefstate(self)->State:returnself.flow_run.state# type: ignoredef_resolve_parameters(self):ifnotself.parameters:return{}resolved_parameters={}forparameter,valueinself.parameters.items():try:resolved_parameters[parameter]=visit_collection(value,visit_fn=resolve_to_final_result,return_data=True,max_depth=-1,remove_annotations=True,context={},)exceptUpstreamTaskError:raiseexceptExceptionasexc:raisePrefectException(f"Failed to resolve inputs in parameter {parameter!r}. If your"" parameter type is not supported, consider using the `quote`"" annotation to skip resolution of inputs.")fromexcself.parameters=resolved_parametersdef_wait_for_dependencies(self):ifnotself.wait_for:returnvisit_collection(self.wait_for,visit_fn=resolve_to_final_result,return_data=False,max_depth=-1,remove_annotations=True,context={},)defbegin_run(self)->State:try:self._resolve_parameters()self._wait_for_dependencies()exceptUpstreamTaskErrorasupstream_exc:state=self.set_state(Pending(name="NotReady",message=str(upstream_exc),),# if orchestrating a run already in a pending state, force orchestration to# update the state nameforce=self.state.is_pending(),)returnstate# validate prior to context so that context receives validated paramsifself.flow.should_validate_parameters:try:self.parameters=self.flow.validate_parameters(self.parametersor{})exceptExceptionasexc:message="Validation of flow parameters failed with error:"self.logger.error("%s%s",message,exc)self.handle_exception(exc,msg=message,result_store=get_result_store().update_for_flow(self.flow,_sync=True),)self.short_circuit=Trueself.call_hooks()new_state=Running()state=self.set_state(new_state)whilestate.is_pending():time.sleep(0.2)state=self.set_state(new_state)returnstatedefset_state(self,state:State,force:bool=False)->State:""" """# prevents any state-setting activityifself.short_circuit:returnself.statestate=propose_state_sync(self.client,state,flow_run_id=self.flow_run.id,force=force)# type: ignoreself.flow_run.state=state# type: ignoreself.flow_run.state_name=state.name# type: ignoreself.flow_run.state_type=state.type# type: ignorereturnstatedefresult(self,raise_on_failure:bool=True)->"Union[R, State, None]":ifself._return_valueisnotNotSetandnotisinstance(self._return_value,State):ifisinstance(self._return_value,BaseResult):_result=self._return_value.get()else:_result=self._return_valueifasyncio.iscoroutine(_result):# getting the value for a BaseResult may return an awaitable# depending on whether the parent frame is sync or not_result=run_coro_as_sync(_result)return_resultifself._raisedisnotNotSet:ifraise_on_failure:raiseself._raisedreturnself._raised# This is a fall through case which leans on the existing state result mechanics to get the# return value. This is necessary because we currently will return a State object if the# the State was Prefect-created.# TODO: Remove the need to get the result from a State except in cases where the return value# is a State object._result=self.state.result(raise_on_failure=raise_on_failure,fetch=True)# type: ignore# state.result is a `sync_compatible` function that may or may not return an awaitable# depending on whether the parent frame is sync or notifasyncio.iscoroutine(_result):_result=run_coro_as_sync(_result)return_resultdefhandle_success(self,result:R)->R:result_store=getattr(FlowRunContext.get(),"result_store",None)ifresult_storeisNone:raiseValueError("Result store is not set")resolved_result=resolve_futures_to_states(result)terminal_state=run_coro_as_sync(return_value_to_state(resolved_result,result_store=result_store,write_result=should_persist_result(),))self.set_state(terminal_state)self._return_value=resolved_resultreturnresultdefhandle_exception(self,exc:Exception,msg:Optional[str]=None,result_store:Optional[ResultStore]=None,)->State:context=FlowRunContext.get()terminal_state=cast(State,run_coro_as_sync(exception_to_failed_state(exc,message=msgor"Flow run encountered an exception:",result_store=result_storeorgetattr(context,"result_store",None),write_result=True,)),)state=self.set_state(terminal_state)ifself.state.is_scheduled():self.logger.info((f"Received non-final state {state.name!r} when proposing final"f" state {terminal_state.name!r} and will attempt to run again..."),)state=self.set_state(Running())self._raised=excreturnstatedefhandle_timeout(self,exc:TimeoutError)->None:ifisinstance(exc,FlowRunTimeoutError):message=(f"Flow run exceeded timeout of {self.flow.timeout_seconds} second(s)")else:message=f"Flow run failed due to timeout: {exc!r}"self.logger.error(message)state=Failed(data=exc,message=message,name="TimedOut",)self.set_state(state)self._raised=excdefhandle_crash(self,exc:BaseException)->None:state=run_coro_as_sync(exception_to_crashed_state(exc))self.logger.error(f"Crash detected! {state.message}")self.logger.debug("Crash details:",exc_info=exc)self.set_state(state,force=True)self._raised=excdefload_subflow_run(self,parent_task_run:TaskRun,client:SyncPrefectClient,context:FlowRunContext,)->Union[FlowRun,None]:""" This method attempts to load an existing flow run for a subflow task run, if appropriate. If the parent task run is in a final but not COMPLETED state, and not being rerun, then we attempt to load an existing flow run instead of creating a new one. This will prevent the engine from running the subflow again. If no existing flow run is found, or if the subflow should be rerun, then no flow run is returned. """# check if the parent flow run is rerunningrerunning=(context.flow_run.run_count>1ifgetattr(context,"flow_run",None)andisinstance(context.flow_run,FlowRun)elseFalse)# if the parent task run is in a final but not completed state, and# not rerunning, then retrieve the most recent flow run instead of# creating a new one. This effectively loads a cached flow run for# situations where we are confident the flow should not be run# again.assertisinstance(parent_task_run.state,State)ifparent_task_run.state.is_final()andnot(rerunningandnotparent_task_run.state.is_completed()):# return the most recent flow run, if it existsflow_runs=client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_task_run_id={"any_":[parent_task_run.id]}),sort=FlowRunSort.EXPECTED_START_TIME_ASC,limit=1,)ifflow_runs:loaded_flow_run=flow_runs[-1]self._return_value=loaded_flow_run.statereturnloaded_flow_rundefcreate_flow_run(self,client:SyncPrefectClient)->FlowRun:flow_run_ctx=FlowRunContext.get()parameters=self.parametersor{}parent_task_run=None# this is a subflow runifflow_run_ctx:# add a task to a parent flow run that represents the execution of a subflow runparent_task=Task(name=self.flow.name,fn=self.flow.fn,version=self.flow.version)parent_task_run=run_coro_as_sync(parent_task.create_run(flow_run_context=flow_run_ctx,parameters=self.parameters,wait_for=self.wait_for,))# check if there is already a flow run for this subflowifsubflow_run:=self.load_subflow_run(parent_task_run=parent_task_run,client=client,context=flow_run_ctx):returnsubflow_runflow_run=client.create_flow_run(flow=self.flow,parameters=self.flow.serialize_parameters(parameters),state=Pending(),parent_task_run_id=getattr(parent_task_run,"id",None),tags=TagsContext.get().current_tags,)ifflow_run_ctx:parent_logger=get_run_logger(flow_run_ctx)parent_logger.info(f"Created subflow run {flow_run.name!r} for flow {self.flow.name!r}")else:self.logger.info(f"Created flow run {flow_run.name!r} for flow {self.flow.name!r}")returnflow_rundefcall_hooks(self,state:Optional[State]=None):ifstateisNone:state=self.stateflow=self.flowflow_run=self.flow_runifnotflow_run:raiseValueError("Flow run is not set")enable_cancellation_and_crashed_hooks=(os.environ.get("PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS","true").lower()=="true")ifstate.is_failed()andflow.on_failure_hooks:hooks=flow.on_failure_hookselifstate.is_completed()andflow.on_completion_hooks:hooks=flow.on_completion_hookselif(enable_cancellation_and_crashed_hooksandstate.is_cancelling()andflow.on_cancellation_hooks):hooks=flow.on_cancellation_hookselif(enable_cancellation_and_crashed_hooksandstate.is_crashed()andflow.on_crashed_hooks):hooks=flow.on_crashed_hookselifstate.is_running()andflow.on_running_hooks:hooks=flow.on_running_hookselse:hooks=Noneforhookinhooksor[]:hook_name=_get_hook_name(hook)try:self.logger.info(f"Running hook {hook_name!r} in response to entering state"f" {state.name!r}")result=hook(flow,flow_run,state)ifasyncio.iscoroutine(result):run_coro_as_sync(result)exceptException:self.logger.error(f"An error was encountered while running hook {hook_name!r}",exc_info=True,)else:self.logger.info(f"Hook {hook_name!r} finished running successfully")@contextmanagerdefsetup_run_context(self,client:Optional[SyncPrefectClient]=None):fromprefect.utilities.engineimport(should_log_prints,)ifclientisNone:client=self.clientifnotself.flow_run:raiseValueError("Flow run not set")self.flow_run=client.read_flow_run(self.flow_run.id)log_prints=should_log_prints(self.flow)withExitStack()asstack:# TODO: Explore closing task runner before completing the flow to# wait for futures to completestack.enter_context(capture_sigterm())iflog_prints:stack.enter_context(patch_print())task_runner=stack.enter_context(self.flow.task_runner.duplicate())stack.enter_context(FlowRunContext(flow=self.flow,log_prints=log_prints,flow_run=self.flow_run,parameters=self.parameters,client=client,result_store=get_result_store().update_for_flow(self.flow,_sync=True),task_runner=task_runner,persist_result=self.flow.persist_resultifself.flow.persist_resultisnotNoneelseshould_persist_result(),))stack.enter_context(ConcurrencyContextV1())stack.enter_context(ConcurrencyContext())# set the logger to the flow run loggerself.logger=flow_run_logger(flow_run=self.flow_run,flow=self.flow)# update the flow run name if necessaryifnotself._flow_run_name_setandself.flow.flow_run_name:flow_run_name=_resolve_custom_flow_run_name(flow=self.flow,parameters=self.parameters)self.client.set_flow_run_name(flow_run_id=self.flow_run.id,name=flow_run_name)self.logger.extra["flow_run_name"]=flow_run_nameself.logger.debug(f"Renamed flow run {self.flow_run.name!r} to {flow_run_name!r}")self.flow_run.name=flow_run_nameself._flow_run_name_set=Trueyield@contextmanagerdefinitialize_run(self):""" Enters a client context and creates a flow run if needed. """withSyncClientContext.get_or_create()asclient_ctx:self._client=client_ctx.clientself._is_started=Trueifnotself.flow_run:self.flow_run=self.create_flow_run(self.client)flow_run_url=url_for(self.flow_run)ifflow_run_url:self.logger.info(f"View at {flow_run_url}",extra={"send_to_api":False})else:# Update the empirical policy to match the flow if it is not setifself.flow_run.empirical_policy.retry_delayisNone:self.flow_run.empirical_policy.retry_delay=(self.flow.retry_delay_seconds)ifself.flow_run.empirical_policy.retriesisNone:self.flow_run.empirical_policy.retries=self.flow.retriesself.client.update_flow_run(flow_run_id=self.flow_run.id,flow_version=self.flow.version,empirical_policy=self.flow_run.empirical_policy,)try:yieldselfexceptTerminationSignalasexc:self.cancel_all_tasks()self.handle_crash(exc)raiseexceptException:# regular exceptions are caught and re-raised to the userraiseexcept(Abort,Pause):raiseexceptGeneratorExit:# Do not capture generator exits as crashesraiseexceptBaseExceptionasexc:# BaseExceptions are caught and handled as crashesself.handle_crash(exc)raisefinally:# If debugging, use the more complete `repr` than the usual `str` descriptiondisplay_state=(repr(self.state)ifPREFECT_DEBUG_MODEelsestr(self.state))self.logger.log(level=logging.INFOifself.state.is_completed()elselogging.ERROR,msg=f"Finished in state {display_state}",)self._is_started=Falseself._client=Nonedefis_running(self)->bool:ifgetattr(self,"flow_run",None)isNone:returnFalsereturngetattr(self,"flow_run").state.is_running()defis_pending(self)->bool:ifgetattr(self,"flow_run",None)isNone:returnFalse# TODO: handle this differently?returngetattr(self,"flow_run").state.is_pending()defcancel_all_tasks(self):ifhasattr(self.flow.task_runner,"cancel_all"):self.flow.task_runner.cancel_all()# type: ignore# --------------------------## The following methods compose the main task run loop## --------------------------@contextmanagerdefstart(self)->Generator[None,None,None]:withself.initialize_run():self.begin_run()ifself.state.is_running():self.call_hooks()yield@contextmanagerdefrun_context(self):timeout_context=timeout_asyncifself.flow.isasyncelsetimeout# reenter the run context to ensure it is up to date for every runwithself.setup_run_context():try:withtimeout_context(seconds=self.flow.timeout_seconds,timeout_exc_type=FlowRunTimeoutError,):self.logger.debug(f"Executing flow {self.flow.name!r} for flow run {self.flow_run.name!r}...")yieldselfexceptTimeoutErrorasexc:self.handle_timeout(exc)exceptExceptionasexc:self.logger.exception("Encountered exception during execution: %r",exc)self.handle_exception(exc)finally:ifself.state.is_final()orself.state.is_cancelling():self.call_hooks()defcall_flow_fn(self)->Union[R,Coroutine[Any,Any,R]]:""" Convenience method to call the flow function. Returns a coroutine if the flow is async. """ifself.flow.isasync:asyncdef_call_flow_fn():result=awaitcall_with_parameters(self.flow.fn,self.parameters)self.handle_success(result)return_call_flow_fn()else:result=call_with_parameters(self.flow.fn,self.parameters)self.handle_success(result)
call_flow_fn()
Convenience method to call the flow function. Returns a coroutine if the
flow is async.
Source code in src/prefect/flow_engine.py
665666667668669670671672673674675676677678679
defcall_flow_fn(self)->Union[R,Coroutine[Any,Any,R]]:""" Convenience method to call the flow function. Returns a coroutine if the flow is async. """ifself.flow.isasync:asyncdef_call_flow_fn():result=awaitcall_with_parameters(self.flow.fn,self.parameters)self.handle_success(result)return_call_flow_fn()else:result=call_with_parameters(self.flow.fn,self.parameters)self.handle_success(result)
initialize_run()
Enters a client context and creates a flow run if needed.
@contextmanagerdefinitialize_run(self):""" Enters a client context and creates a flow run if needed. """withSyncClientContext.get_or_create()asclient_ctx:self._client=client_ctx.clientself._is_started=Trueifnotself.flow_run:self.flow_run=self.create_flow_run(self.client)flow_run_url=url_for(self.flow_run)ifflow_run_url:self.logger.info(f"View at {flow_run_url}",extra={"send_to_api":False})else:# Update the empirical policy to match the flow if it is not setifself.flow_run.empirical_policy.retry_delayisNone:self.flow_run.empirical_policy.retry_delay=(self.flow.retry_delay_seconds)ifself.flow_run.empirical_policy.retriesisNone:self.flow_run.empirical_policy.retries=self.flow.retriesself.client.update_flow_run(flow_run_id=self.flow_run.id,flow_version=self.flow.version,empirical_policy=self.flow_run.empirical_policy,)try:yieldselfexceptTerminationSignalasexc:self.cancel_all_tasks()self.handle_crash(exc)raiseexceptException:# regular exceptions are caught and re-raised to the userraiseexcept(Abort,Pause):raiseexceptGeneratorExit:# Do not capture generator exits as crashesraiseexceptBaseExceptionasexc:# BaseExceptions are caught and handled as crashesself.handle_crash(exc)raisefinally:# If debugging, use the more complete `repr` than the usual `str` descriptiondisplay_state=(repr(self.state)ifPREFECT_DEBUG_MODEelsestr(self.state))self.logger.log(level=logging.INFOifself.state.is_completed()elselogging.ERROR,msg=f"Finished in state {display_state}",)self._is_started=Falseself._client=None
load_subflow_run(parent_task_run,client,context)
This method attempts to load an existing flow run for a subflow task
run, if appropriate.
If the parent task run is in a final but not COMPLETED state, and not
being rerun, then we attempt to load an existing flow run instead of
creating a new one. This will prevent the engine from running the
subflow again.
If no existing flow run is found, or if the subflow should be rerun,
then no flow run is returned.
defload_subflow_run(self,parent_task_run:TaskRun,client:SyncPrefectClient,context:FlowRunContext,)->Union[FlowRun,None]:""" This method attempts to load an existing flow run for a subflow task run, if appropriate. If the parent task run is in a final but not COMPLETED state, and not being rerun, then we attempt to load an existing flow run instead of creating a new one. This will prevent the engine from running the subflow again. If no existing flow run is found, or if the subflow should be rerun, then no flow run is returned. """# check if the parent flow run is rerunningrerunning=(context.flow_run.run_count>1ifgetattr(context,"flow_run",None)andisinstance(context.flow_run,FlowRun)elseFalse)# if the parent task run is in a final but not completed state, and# not rerunning, then retrieve the most recent flow run instead of# creating a new one. This effectively loads a cached flow run for# situations where we are confident the flow should not be run# again.assertisinstance(parent_task_run.state,State)ifparent_task_run.state.is_final()andnot(rerunningandnotparent_task_run.state.is_completed()):# return the most recent flow run, if it existsflow_runs=client.read_flow_runs(flow_run_filter=FlowRunFilter(parent_task_run_id={"any_":[parent_task_run.id]}),sort=FlowRunSort.EXPECTED_START_TIME_ASC,limit=1,)ifflow_runs:loaded_flow_run=flow_runs[-1]self._return_value=loaded_flow_run.statereturnloaded_flow_run