# -*- coding: utf-8 -*- # Copyright (C) 2014-2024 EDF # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # See https://www.salome-platform.org/ or email : webmaster.salome@opencascade.com # # Author : Konstantin Leontev (OpenCascade S.A.S) import inspect from traceback import format_exc from qtsalome import QApplication, pyqtSignal, QThread, Qt from .geomrepairadv_logger import logger class Worker(QThread): """ Creates a tread to run a given target function with a progress dialog as a parent. """ progress_update = pyqtSignal(int) thread_failed = pyqtSignal() work_completed = pyqtSignal() def __init__(self, parent=None, target=None, args=None): super().__init__(parent) # Set target function and it's arguments self.target = target self.args = args # Update a progress bar each time we receive an update signal self.progress_update.connect(parent.setValue) self.thread_failed.connect(parent.on_failed) self.work_completed.connect(parent.on_completed) # Calculate total amount of lines in executed function source_lines = inspect.getsourcelines(target) total_lines = len(source_lines[0]) first_line = source_lines[1] # Set a progress emitter to update the progress from the target self.progress_emitter = ProgressEmitter(self.progress_update, total_lines, first_line) # Set a variable for result self.result = None def run(self): """ Runs the given target function. """ try: # Wait mode cursor QApplication.setOverrideCursor(Qt.WaitCursor) self.result = self.target(self.args, self.progress_emitter) # Reset the progress when finished self.progress_update.emit(100) self.work_completed.emit() except Exception: logger.error(format_exc()) self.thread_failed.emit() finally: QApplication.restoreOverrideCursor() def terminate(self): """ Overrides default terminate() to add some clean up. """ super().terminate() # Termination doesn't call a final block inside run() QApplication.restoreOverrideCursor() def get_result(self): """ Returns result of the execution or None if the execution failed. """ return self.result class ProgressEmitter(): """ Helper class to reduce code repetition while update progress from a function executed in a separated thread. """ def __init__(self, progress_update, total_lines, first_line): self.progress_update = progress_update self.first_line = first_line self.progress_percent = total_lines / 100.0 logger.debug('self.progress_percent: %f', self.progress_percent) def emit(self): """ Call this methid in a target function to update a progress value based on a currently executed line number. """ line = inspect.getframeinfo(inspect.stack()[1][0]).lineno logger.debug('line: %d', line) progress_value = (line - self.first_line) / self.progress_percent logger.debug('progress_value: %d', progress_value) self.progress_update.emit(int(progress_value))