classPipeline(BasePipeline):"""Local pipeline implementation using `multiprocessing`."""defrun(self,parameters:Optional[Dict[str,Dict[str,Any]]]=None,use_cache:bool=True,)->"Distiset":"""Runs the pipeline. Args: parameters: A dictionary with the step name as the key and a dictionary with the runtime parameters for the step as the value. Defaults to `None`. use_cache: Whether to use the cache from previous pipeline runs. Defaults to `True`. Returns: The `Distiset` created by the pipeline. Raises: RuntimeError: If the pipeline fails to load all the steps. """try:mp.set_start_method("forkserver")exceptRuntimeError:passlog_queue=mp.Queue()setup_logging(log_queue)# type: ignoreself._logger=logging.getLogger("distilabel.pipeline.local")super().run(parameters,use_cache)ifself._batch_managerisNone:self._batch_manager=_BatchManager.from_dag(self.dag)# If the batch manager is not able to generate batches, that means that the loaded# `_BatchManager` from cache didn't have any remaining batches to process i.e.# the previous pipeline execution was completed successfully.ifnotself._batch_manager.can_generate():self._logger.info("πΎ Loaded batch manager from cache doesn't have any remaining data. Returning"" `Distiset` from cache data...")stop_logging()returncreate_distiset(self._cache_location["data"],pipeline_path=self._cache_location["pipeline"],)buffer_data_path=self._cache_location["data"]self._logger.info(f"π Pipeline data will be written to '{buffer_data_path}'")write_buffer=_WriteBuffer(buffer_data_path,self.dag.leaf_steps)num_processes=len(self.dag)ctx=mp.get_context("forkserver")# type: ignorewithctx.Manager()asmanager,ctx.Pool(num_processes,initializer=_init_worker,initargs=(log_queue,))aspool:self.output_queue:"Queue[Any]"=manager.Queue()self.shared_info=self._create_shared_info_dict(manager)self._handle_keyboard_interrupt()# Run the steps using the pool of processesself._run_steps_in_loop(pool,manager,self.output_queue,self.shared_info)# Wait for all the steps to be loaded correctlyifnotself._all_steps_loaded():write_buffer.close()self._batch_manager=Nonestop_logging()raiseRuntimeError("Failed to load all the steps. Could not run pipeline.")from_SUBPROCESS_EXCEPTION# Send the "first" batches to the steps so the batches starts flowing through# the input queues and output queueself._request_initial_batches()# Start a loop to receive the output batches from the stepsself._run_output_queue_loop_in_thread(write_buffer)# Send `None` to steps `input_queue`s just in case some step is still waitingself._notify_steps_to_stop()pool.close()pool.join()write_buffer.close()distiset=create_distiset(self._cache_location["data"],pipeline_path=self._cache_location["pipeline"])stop_logging()returndistisetdef_run_output_queue_loop_in_thread(self,write_buffer:"_WriteBuffer")->None:"""Runs the output queue loop in a separate thread to receive the output batches from the steps. This is done to avoid the signal handler to block the loop, which would prevent the pipeline from stopping correctly. Args: write_buffer: The write buffer to write the data from the leaf steps to disk. """thread=threading.Thread(target=self._output_queue_loop,args=(write_buffer,))thread.start()thread.join()def_notify_steps_to_stop(self)->None:"""Notifies the steps to stop their infinite running loop by sending `None` to their input queues."""forstep_nameinself.dag:ifinput_queue:=self.dag.get_step(step_name).get("input_queue"):input_queue.put(None)def_output_queue_loop(self,write_buffer:"_WriteBuffer")->None:"""Loop to receive the output batches from the steps and manage the flow of the batches through the pipeline. Args: write_buffer: The write buffer to write the data from the leaf steps to disk. """whileself._batch_manager.can_generate()andnot_STOP_CALLED:# type: ignoreself._logger.debug("Waiting for output batch from step...")if(batch:=self.output_queue.get())isNone:self._logger.debug("Received `None` from output queue. Breaking loop.")breakifbatch.step_nameinself.dag.leaf_steps:write_buffer.add_batch(batch)# If `_STOP_CALLED` was set to `True` while waiting for the output queue, then# we need to handle the stop of the pipeline and break the loop to avoid# propagating the batches through the pipeline and making the stop process# slower.if_STOP_CALLED:self._handle_batch_on_stop(batch)breakself._logger.debug(f"Received batch with seq_no {batch.seq_no} from step '{batch.step_name}'"f" from output queue: {batch}")self._manage_batch_flow(batch)if_STOP_CALLED:self._handle_stop(write_buffer)def_manage_batch_flow(self,batch:"_Batch")->None:"""Checks if the step that generated the batch has more data in its buffer to generate a new batch. If there's data, then a new batch is sent to the step. If the step has no data in its buffer, then the predecessors generator steps are requested to send a new batch. Args: batch: The batch that was processed. """assertself._batch_manager,"Batch manager is not set"self._batch_manager.register_batch(batch)self._logger.debug(f"Batch {batch.seq_no} from step '{batch.step_name}' registered in batch"" manager")step:"Step"=self.dag.get_step(batch.step_name)["step"]forsuccessorinself.dag.get_step_successors(step.name):self._batch_manager.add_batch(successor,batch)# Check if the step is a generator and if there are successors that need data# from this step. This usually happens when the generator `batch_size` is smaller# than the `input_batch_size` of the successor steps.if(step.is_generatorandstep.nameinself._batch_manager.step_empty_buffers(successor)):last_batch=self._batch_manager.get_last_batch(step.name)self._send_batch_to_step(last_batch.next_batch())# type: ignoreifnew_batch:=self._batch_manager.get_batch(successor):self._send_batch_to_step(new_batch)ifstep.is_generator:return# Step has enough data on its buffers to create a new batchifnext_batch:=self._batch_manager.get_batch(step.name):self._send_batch_to_step(next_batch)return# Request more batches to the predecessors generator stepsempty_buffers=self._batch_manager.step_empty_buffers(step.name)forprevious_step_nameinempty_buffers:ifprevious_step_namenotinself.dag.root_steps:continueiflast_batch:=self._batch_manager.get_last_batch(previous_step_name):self._logger.debug(f"Step '{step.name}' input buffer for step '{previous_step_name}' is"" empty. Requesting new batch...")self._send_batch_to_step(last_batch.next_batch())self._cache()def_handle_stop(self,write_buffer:"_WriteBuffer")->None:"""Handles the stop of the pipeline execution, which will stop the steps from processing more batches and wait for the output queue to be empty, to not lose any data that was already processed by the steps before the stop was called. Args: write_buffer: The write buffer to write the data from the leaf steps to disk. """self._logger.debug("Handling stop of the pipeline execution...")# Send `None` to the input queues of all the steps to notify them to stop# processing batches.forstep_nameinself.dag:ifinput_queue:=self.dag.get_step(step_name).get("input_queue"):whilenotinput_queue.empty():batch=input_queue.get()self._batch_manager.add_batch(# type: ignoreto_step=step_name,batch=batch,prepend=True)self._logger.debug(f"Adding batch back to the batch manager: {batch}")input_queue.put(None)# Wait for the input queue to be empty, which means that all the steps finished# processing the batches that were sent before the stop flag.forstep_nameinself.dag:self._wait_step_input_queue_empty(step_name)# Consume the output queue until it's empty to not lose any data that was already# processed by the steps before stop was called.whilenotself.output_queue.empty():batch=self.output_queue.get()ifbatch.step_nameinself.dag.leaf_steps:write_buffer.add_batch(batch)self._handle_batch_on_stop(batch)self._cache()def_handle_batch_on_stop(self,batch:"_Batch")->None:"""Handles a batch that was received from the output queue when the pipeline was stopped. It will add and register the batch in the batch manager. Args: batch: The batch to handle. """self._batch_manager.register_batch(batch)# type: ignorestep:"Step"=self.dag.get_step(batch.step_name)["step"]forsuccessorinself.dag.get_step_successors(step.name):self._batch_manager.add_batch(successor,batch)# type: ignoredef_wait_step_input_queue_empty(self,step_name:str)->Union["Queue[Any]",None]:"""Waits for the input queue of a step to be empty. Args: step_name: The name of the step. Returns: The input queue of the step if it's not loaded or finished, `None` otherwise. """ifself._check_step_not_loaded_or_finished(step_name):returnNoneifinput_queue:=self.dag.get_step(step_name).get("input_queue"):whileinput_queue.qsize()!=0:passreturninput_queuedef_create_shared_info_dict(self,manager:"SyncManager")->"DictProxy[str, Any]":"""Creates the shared information dictionary to be used by the processes. Args: manager: The manager to create the shared information. Returns: The shared information dictionary. """# TODO: not very important, but we could use a different lock for each matterreturnmanager.dict(**{_STEPS_LOADED_KEY:manager.list(),_STEPS_LOADED_LOCK_KEY:manager.Lock(),_CUDA_LLM_DEVICE_PLACEMENT_KEY:manager.dict(**{}),_CUDA_LLM_DEVICE_PLACEMENT_LOCK_KEY:manager.Lock(),})def_all_steps_loaded(self)->bool:"""Waits for all the steps to load. Returns: `True` if all the steps have been loaded correctly, `False` otherwise. """def_update_all_steps_loaded(steps_loaded:List[str])->None:with_STEPS_LOADED_LOCK:_STEPS_LOADED.update(steps_loaded)self._logger.info("β³ Waiting for all the steps to load...")previous_message=Nonewhilenot_STOP_CALLED:withself.shared_info[_STEPS_LOADED_LOCK_KEY]:steps_loaded=self.shared_info[_STEPS_LOADED_KEY]num_steps_loaded=(len(steps_loaded)ifsteps_loaded!=[_STEPS_LOADED_ERROR_CODE]else0)self._logger.debug(f"Steps loaded: {steps_loaded}")message=f"β³ Steps loaded: {num_steps_loaded}/{len(self.dag)}"ifnum_steps_loaded>0andmessage!=previous_message:self._logger.info(message)previous_message=messageifnum_steps_loaded==len(self.dag):self._logger.info("β All the steps have been loaded!")_update_all_steps_loaded(steps_loaded)returnTrueifsteps_loaded==[_STEPS_LOADED_ERROR_CODE]:self._logger.error("β Failed to load all the steps")_update_all_steps_loaded(steps_loaded)returnFalsetime.sleep(2.5)returnnot_STOP_CALLEDdef_request_initial_batches(self)->None:"""Requests the initial batches to the generator steps."""assertself._batch_manager,"Batch manager is not set"forstepinself._batch_manager._steps.values():ifbatch:=step.get_batch():self._logger.debug(f"Sending initial batch to '{step.step_name}' step: {batch}")self._send_batch_to_step(batch)forstep_nameinself.dag.root_steps:seq_no=0iflast_batch:=self._batch_manager.get_last_batch(step_name):seq_no=last_batch.seq_no+1batch=_Batch(seq_no=seq_no,step_name=step_name,last_batch=False)self._logger.debug(f"Requesting initial batch to '{step_name}' generator step: {batch}")self._send_batch_to_step(batch)def_send_batch_to_step(self,batch:"_Batch")->None:"""Sends a batch to the input queue of a step. Args: batch: The batch to send. """self._logger.debug(f"Sending batch {batch.seq_no} to step '{batch.step_name}': {batch}")input_queue=self.dag.get_step(batch.step_name)["input_queue"]input_queue.put(batch)def_run_steps_in_loop(self,pool:"Pool",manager:"SyncManager",output_queue:"Queue[_Batch]",shared_info:"DictProxy[str, Any]",)->None:"""Using the `pool`, runs the steps in the DAG in an infinite loop waiting for input batches and sending the output batches to the `output_queue`. Each `Step` is wrapped in a `_ProcessWrapper`, which will handle the lifecycle of the `Step` and the communication with the `input_queue` and `output_queue`. The `_ProcessWrapper.run` method is the target function of the process. Args: pool: The pool of processes. manager: The manager to create the queues. output_queue: The queue to send the output batches. shared_info: The shared information between the processes. """forstep_nameinself.dag:step:"Step"=self.dag.get_step(step_name)["step"]input_queue=manager.Queue()self.dag.set_step_attr(step.name,"input_queue",input_queue)# Set `pipeline` to `None` as in some Python environments the pipeline is not# picklable and it will raise an error when trying to send the step to the process.# `TypeError: cannot pickle 'code' object`step.pipeline=Noneprocess_wrapper=_ProcessWrapper(step=step,input_queue=input_queue,output_queue=output_queue,shared_info=shared_info,)pool.apply_async(process_wrapper.run,callback=self._finished_callback,error_callback=self._error_callback,)# type: ignoredef_error_callback(self,e:BaseException)->None:"""Error callback that will be called when an error occurs in a `Step` process. Args: e: The exception raised by the process. """global_SUBPROCESS_EXCEPTION# First we check that the exception is a `_ProcessWrapperException`, otherwise, we# print it out and stop the pipeline, since some errors may be unhandledifnotisinstance(e,_ProcessWrapperException):self._logger.error(f"β Failed with an unhandled exception: {e}")self._stop()returnife.is_load_error:self._logger.error(f"β Failed to load step '{e.step.name}': {e.message}")withself.shared_info[_STEPS_LOADED_LOCK_KEY]:self.shared_info[_STEPS_LOADED_KEY]=[_STEPS_LOADED_ERROR_CODE]_SUBPROCESS_EXCEPTION=e.subprocess_exception_SUBPROCESS_EXCEPTION.__traceback__=tblib.Traceback.from_string(e.formatted_traceback).as_traceback()return# If the step is global, is not in the last trophic level and has no successors,# then we can ignore the error and continue executing the pipelineif(e.step.is_globalandnotself.dag.step_in_last_trophic_level(e.step.name)andlist(self.dag.get_step_successors(e.step.name))==[]):self._logger.error(f"β An error occurred when running global step '{e.step.name}' with no"" successors and not in the last trophic level. Pipeline execution can"f" continue. Error will be ignored.")self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}")return# Global step with successors failedself._logger.error(f"An error occurred in global step '{e.step.name}'")self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}")self._cache()self._stop()def_finished_callback(self,step_name:str)->None:"""Callback that will be called when a `Step` process finishes. Args: step_name: The name of the step that finished. """with_STEPS_FINISHED_LOCK:_STEPS_FINISHED.add(step_name)def_check_step_not_loaded_or_finished(self,step_name:str)->bool:"""Checks if a step is not loaded or already finished. Args: step_name: The name of the step. Returns: `True` if the step is not loaded or already finished, `False` otherwise. """with_STEPS_LOADED_LOCK:ifstep_namenotin_STEPS_LOADED:returnTruewith_STEPS_FINISHED_LOCK:ifstep_namein_STEPS_FINISHED:returnTruereturnFalsedef_stop(self)->None:"""Stops the pipeline execution. It will first send `None` to the input queues of all the steps and then wait until the output queue is empty i.e. all the steps finished processing the batches that were sent before the stop flag. Then it will send `None` to the output queue to notify the pipeline to stop."""global_STOP_CALLEDwith_STOP_CALLED_LOCK:if_STOP_CALLED:self._logger.warning("π Stop has already been called. Ignoring subsequent calls and waiting"" for the pipeline to finish...")return_STOP_CALLED=Trueself._logger.debug(f"Steps loaded before calling `stop`: {_STEPS_LOADED}")self._logger.info("π Stopping pipeline. Waiting for steps to finish processing batches...")self._logger.debug("Sending `None` to the output queue to notify stop...")self.output_queue.put(None)def_handle_keyboard_interrupt(self)->None:"""Handles KeyboardInterrupt signal sent during the Pipeline.run method. It will try to call self._stop (if the pipeline didn't started yet, it won't have any effect), and if the pool is already started, will close it before exiting the program. """defsignal_handler(signumber:int,frame:Any)->None:self._stop()signal.signal(signal.SIGINT,signal_handler)
defrun(self,parameters:Optional[Dict[str,Dict[str,Any]]]=None,use_cache:bool=True,)->"Distiset":"""Runs the pipeline. Args: parameters: A dictionary with the step name as the key and a dictionary with the runtime parameters for the step as the value. Defaults to `None`. use_cache: Whether to use the cache from previous pipeline runs. Defaults to `True`. Returns: The `Distiset` created by the pipeline. Raises: RuntimeError: If the pipeline fails to load all the steps. """try:mp.set_start_method("forkserver")exceptRuntimeError:passlog_queue=mp.Queue()setup_logging(log_queue)# type: ignoreself._logger=logging.getLogger("distilabel.pipeline.local")super().run(parameters,use_cache)ifself._batch_managerisNone:self._batch_manager=_BatchManager.from_dag(self.dag)# If the batch manager is not able to generate batches, that means that the loaded# `_BatchManager` from cache didn't have any remaining batches to process i.e.# the previous pipeline execution was completed successfully.ifnotself._batch_manager.can_generate():self._logger.info("πΎ Loaded batch manager from cache doesn't have any remaining data. Returning"" `Distiset` from cache data...")stop_logging()returncreate_distiset(self._cache_location["data"],pipeline_path=self._cache_location["pipeline"],)buffer_data_path=self._cache_location["data"]self._logger.info(f"π Pipeline data will be written to '{buffer_data_path}'")write_buffer=_WriteBuffer(buffer_data_path,self.dag.leaf_steps)num_processes=len(self.dag)ctx=mp.get_context("forkserver")# type: ignorewithctx.Manager()asmanager,ctx.Pool(num_processes,initializer=_init_worker,initargs=(log_queue,))aspool:self.output_queue:"Queue[Any]"=manager.Queue()self.shared_info=self._create_shared_info_dict(manager)self._handle_keyboard_interrupt()# Run the steps using the pool of processesself._run_steps_in_loop(pool,manager,self.output_queue,self.shared_info)# Wait for all the steps to be loaded correctlyifnotself._all_steps_loaded():write_buffer.close()self._batch_manager=Nonestop_logging()raiseRuntimeError("Failed to load all the steps. Could not run pipeline.")from_SUBPROCESS_EXCEPTION# Send the "first" batches to the steps so the batches starts flowing through# the input queues and output queueself._request_initial_batches()# Start a loop to receive the output batches from the stepsself._run_output_queue_loop_in_thread(write_buffer)# Send `None` to steps `input_queue`s just in case some step is still waitingself._notify_steps_to_stop()pool.close()pool.join()write_buffer.close()distiset=create_distiset(self._cache_location["data"],pipeline_path=self._cache_location["pipeline"])stop_logging()returndistiset