Sometimes, you may need to bulk reschedule flow runs that are late - for example, if you've accidentally scheduled many flow runs of a deployment to an inactive work pool.
To do this, we can delete late flow runs and create new ones in a Scheduled state with a delay.
This example reschedules the last 3 late flow runs of a deployment named healthcheck-storage-test to run 6 hours later than their original expected start time. It also deletes any remaining late flow runs of that deployment.
importasynciofromdatetimeimportdatetime,timedelta,timezonefromtypingimportOptionalfromprefectimportget_clientfromprefect.client.schemas.filtersimport(DeploymentFilter,FlowRunFilter)fromprefect.client.schemas.objectsimportFlowRunfromprefect.client.schemas.sortingimportFlowRunSortfromprefect.statesimportScheduledasyncdefreschedule_late_flow_runs(deployment_name:str,delay:timedelta,most_recent_n:int,delete_remaining:bool=True,states:Optional[list[str]]=None)->list[FlowRun]:ifnotstates:states=["Late"]asyncwithget_client()asclient:flow_runs=awaitclient.read_flow_runs(flow_run_filter=FlowRunFilter(state=dict(name=dict(any_=states)),expected_start_time=dict(before_=datetime.now(timezone.utc)),),deployment_filter=DeploymentFilter(name={'like_':deployment_name}),sort=FlowRunSort.START_TIME_DESC,limit=most_recent_nifnotdelete_remainingelseNone)ifnotflow_runs:print(f"No flow runs found in states: {states!r}")return[]rescheduled_flow_runs=[]fori,runinenumerate(flow_runs):awaitclient.delete_flow_run(flow_run_id=run.id)ifi<most_recent_n:new_run=awaitclient.create_flow_run_from_deployment(deployment_id=run.deployment_id,state=Scheduled(scheduled_time=run.expected_start_time+delay),)rescheduled_flow_runs.append(new_run)returnrescheduled_flow_runsif__name__=="__main__":rescheduled_flow_runs=asyncio.run(reschedule_late_flow_runs(deployment_name="healthcheck-storage-test",delay=timedelta(hours=6),most_recent_n=3,))print(f"Rescheduled {len(rescheduled_flow_runs)} flow runs")assertall(run.state.is_scheduled()forruninrescheduled_flow_runs)assertall(run.expected_start_time>datetime.now(timezone.utc)forruninrescheduled_flow_runs)
Get the last N completed flow runs from my workspace¶
To get the last N completed flow runs from our workspace, we can make use of read_flow_runs and prefect.client.schemas.
This example gets the last three completed flow runs from our workspace:
Instead of the last three from the whole workspace, you could also use the DeploymentFilter like the previous example to get the last three completed flow runs of a specific deployment.
Transition all running flows to cancelled via the Client¶
It can be cumbersome to cancel many flow runs through the UI.
You can use get_clientto set multiple runs to a Cancelled state.
The code below will cancel all flow runs that are in Pending, Running, Scheduled, or Late states when the script is run.
importanyiofromprefectimportget_clientfromprefect.client.schemas.filtersimportFlowRunFilter,FlowRunFilterState,FlowRunFilterStateNamefromprefect.client.schemas.objectsimportStateTypeasyncdeflist_flow_runs_with_states(states:list[str]):asyncwithget_client()asclient:flow_runs=awaitclient.read_flow_runs(flow_run_filter=FlowRunFilter(state=FlowRunFilterState(name=FlowRunFilterStateName(any_=states))))returnflow_runsasyncdefcancel_flow_runs(flow_runs):asyncwithget_client()asclient:foridx,flow_runinenumerate(flow_runs):print(f"[{idx+1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")state_updates={}state_updates.setdefault("name","Cancelled")state_updates.setdefault("type",StateType.CANCELLED)state=flow_run.state.copy(update=state_updates)awaitclient.set_flow_run_state(flow_run.id,state,force=True)asyncdefbulk_cancel_flow_runs():states=["Pending","Running","Scheduled","Late"]flow_runs=awaitlist_flow_runs_with_states(states)whilelen(flow_runs)>0:print(f"Cancelling {len(flow_runs)} flow runs\n")awaitcancel_flow_runs(flow_runs)flow_runs=awaitlist_flow_runs_with_states(states)print("Done!")if__name__=="__main__":anyio.run(bulk_cancel_flow_runs)
There are other ways to filter objects like flow runs
See the filters API reference for more ways to filter flow runs and other objects in your Prefect ecosystem.