Source code for pywebchannel.GeneratorWatcher

import importlib.util
import inspect
import io
import os
import sys
from typing import Optional, List, Dict

from PySide6.QtCore import QObject, QFileSystemWatcher, QDir, QFileInfo, Slot

from pywebchannel.CodeAnalyzer import Interface, CodeAnalyzer
from pywebchannel.Utils import Logger, Utils, Generator


[docs] class GeneratorWatcher(QFileSystemWatcher): """A class that inherits from QFileSystemWatcher and watches for changes in python files. Attributes: watchTargetDirMap (Dict[str, str]): A dictionary that maps the source directory to the target directory. """ def __init__(self, parent: Optional[QObject] = None): """The constructor method for the GeneratorWatcher class. Args: parent (Optional[QObject], optional): The parent object of the GeneratorWatcher. Defaults to None. """ # Call the super constructor super().__init__(parent) # Connect the signals to the slots self.directoryChanged.connect(self.onDirectoryChanged) self.fileChanged.connect(self.onFileChanged) # Initialize the watchTargetDirMap attribute self.watchTargetDirMap: Dict[str, str] = {}
[docs] @Slot(str) def onDirectoryChanged(self, dirPath: str): """The slot that is triggered when a directory is changed. Args: dirPath (str): The path of the changed directory. """ # Create a QDir object from the dirPath directory = QDir(dirPath) # Get the list of python files in the directory pythonFiles = self._getPythonFiles(directory) # Get the list of watch files in the directory watchFiles = self._getWatchFilesIn(directory) # Loop through the python files for pFile in pythonFiles: try: # Check if the file is already in the watch list watchFiles.index(pFile) except ValueError: # If not, add the file to the watch list self.addFile(pFile)
[docs] @Slot(str) def onFileChanged(self, filePath: str): """The slot that is triggered when a file is changed. Args: filePath (str): The path of the changed file. """ # Create a QFileInfo object from the filePath file = QFileInfo(filePath) # Check if the file exists if not file.exists(): # If not, delete the corresponding typescript file self._delete_typescript(filePath) else: # If yes, update the corresponding typescript file self._update_typescript(filePath)
[docs] def addDirectory(self, dirPathToWatch: str, dirTargetPath: str): """A method that adds a directory to the watch list. Args: dirPathToWatch (str): The path of the directory to watch. dirTargetPath (str): The path of the target directory to generate typescript files. """ # Create a QDir object from the dirPathToWatch directory = QDir(dirPathToWatch) # Get the absolute path of the directory absPath = directory.absolutePath() # Create a QDir object from the dirTargetPath targetDirectory = QDir(dirTargetPath) # Get the absolute path of the target directory targetAbsPath = targetDirectory.absolutePath() # Check if the directory to watch exists if not directory.exists(): # If not, log an error message Logger.error(f"Watch Directory '{absPath}' does not exist") return # Check if the target directory exists if not targetDirectory.exists(): # If not, log an error message Logger.error(f"Target Directory '{targetAbsPath}' does not exist") return # Check if the directory to watch is already in the watch list if self.directories().count(absPath) > 0: # If yes, log a warning message Logger.warning(f"Directory, '{absPath}', is already in watch list") return # Try to add the directory to the watch list if not self.addPath(absPath): # If failed, log a warning message Logger.warning(f"Directory, '{absPath}', could not added into watch list") return # If succeeded, log an info message Logger.info(f"Directory '{absPath}' added into watch list") # Add the directory to the watchTargetDirMap attribute self.watchTargetDirMap[absPath] = targetAbsPath # Add files in the directory to the watch list for pFile in self._getPythonFiles(directory): self.addFile(pFile)
[docs] def addFile(self, filePath: str): """A method that adds a file to the watch list. Args: filePath (str): The path of the file to watch. """ # Create a QFileInfo object from the filePath file = QFileInfo(filePath) # Check if the file exists if not file.exists(): # If not, log an error message Logger.error(f"File '{filePath}' does not exist") return # Check if the file is already in the watch list if self.files().count(file.absoluteFilePath()) > 0: # If yes, log a warning message Logger.warning(f"File, '{filePath}', is already in watch list") return # Try to add the file to the watch list if not self.addPath(file.absoluteFilePath()): # If failed, log a warning message Logger.warning(f"File, '{filePath}', could not added into watch list") return # If succeeded, log an info message Logger.info(f"File '{filePath}' added into watch list") # Trigger the onFileChanged slot self.onFileChanged(filePath)
def _getWatchFilesIn(self, directory: QDir) -> List[str]: """A helper method that returns the list of watch files in a directory. Args: directory (QDir): The directory to search. Returns: List[str]: The list of watch files in the directory. """ # Get the absolute path of the directory dirPath = directory.absolutePath() # Initialize an empty list files = [] # Loop through the watch files for file in self.files(): # Check if the file starts with the directory path if file.startswith(dirPath): # If yes, append the file to the list files.append(file) # Return the list return files @staticmethod def _getPythonFiles(directory: QDir) -> List[str]: """A helper method that returns the list of python files in a directory. Args: directory (QDir): The directory to search. Returns: List[str]: The list of python files in the directory. """ # Get the list of files that match the "*.py" filter pythonFiles = directory.entryList(["*.py"]) # Try to remove the "__init__.py" and "ControllerBase.py" files from the list try: pythonFiles.remove("__init__.py") pythonFiles.remove("ControllerBase.py") except ValueError: # If failed, ignore the exception pass # Loop through the list of python files for i in range(0, len(pythonFiles)): # Prepend the absolute path of the directory to the file name pythonFiles[i] = f"{directory.absolutePath()}/{pythonFiles[i]}" # Return the list return pythonFiles # This is a module that converts Python code to TypeScript code # It uses importlib, inspect, os, io, and sys modules # It also uses Logger, CodeAnalyzer, Generator, Interface, and Utils classes
[docs] def getOutputFilePath(self, filePath): """Get the output file path for the TypeScript file. Args: filePath (str): The input file path for the Python file. Returns: str: The output file path for the TypeScript file. """ # Find the position of the last slash in the file path pos = filePath.rfind("/") # Get the source folder and the source name from the file path sourceFolder = filePath[0:pos] sourceName = filePath[pos + 1:] # Check if the source folder is in the watch target directory map if sourceFolder in self.watchTargetDirMap: # Get the target folder from the map targetFolder = self.watchTargetDirMap[sourceFolder] else: # Log a warning message and use the source folder as the target folder Logger.warning(f"No target folder mapped for '{sourceFolder}'") targetFolder = sourceFolder # Replace the .py extension with .ts for the target source name targetSourceName = sourceName.replace(".py", ".ts") # Return the target folder and the target source name as the output file path return targetFolder + "/" + targetSourceName
def _delete_typescript(self, filePath: str): """Delete the TypeScript file corresponding to the Python file. Args: filePath (str): The input file path for the Python file. """ # Get the output file path for the TypeScript file outputFile = self.getOutputFilePath(filePath) # Check if the output file exists if os.path.exists(outputFile): # Delete the output file os.remove(outputFile) # Log an info message Logger.info(f"Typescript file, {outputFile} has been deleted") @staticmethod def __get_members(filePath: str): """Get the members from the Python file. Args: filePath (str): The input file path for the Python file. Returns: list: A list of tuples of member names and objects. """ # Load the module with a custom name from the absolute path spec = importlib.util.spec_from_file_location("MODULE_NAME", filePath) module = importlib.util.module_from_spec(spec) sys.modules["MODULE_NAME"] = module spec.loader.exec_module(module) # Get all the members from the loaded module members = inspect.getmembers(module) # Filter them out, only keep the classes from the same file members_in_file = [] for i in range(len(members)): # noinspection PyBroadException try: name, member = members[i] # Check if the member belongs to the custom module if member.__module__ == "MODULE_NAME": # Add the member to the list members_in_file.append(members[i]) except Exception: # Ignore any exceptions pass # Return the list of members in the file return members_in_file def _update_typescript(self, filePath: str): """Update the TypeScript file corresponding to the Python file. Args: filePath (str): The input file path for the Python file. """ # Initialize an empty dictionary for the interface map interfaceMap: Dict[str, Interface] = {} # Initialize an empty list for the dependencies deps = [] # Loop through the members in the file for memberName, MetaClass in self.__get_members(filePath): # Create a code analyzer object for the member analyzer = CodeAnalyzer(MetaClass) # Check if the member is acceptable for conversion if analyzer.isAcceptable(): # Run the analyzer and get the interface object interfaceMap[memberName] = analyzer.run() # Extend the dependencies list with the interface dependencies deps.extend(interfaceMap[memberName].dependencies()) else: # Log a warning message for the unprocessable member Logger.warning( f"The class/function, '{memberName}' cannot be processed. It must be a class derived from " f"'Controller' or 'BaseModel' from Pydantic" ) # Remove the duplicate dependencies deps = list(dict.fromkeys(deps)) # Remove the TypeScript primitives from the dependencies deps = [*filter(lambda d: not Utils.isTypescriptPrimitive(d), deps)] # Remove the dependencies that are already in the same file deps = [*filter(lambda d: d not in interfaceMap, deps)] # Generate the TypeScript code header header = Generator.header() # Generate the TypeScript code imports imports = Generator.imports(deps) # Initialize an empty dictionary for the interface lines interfaceLines: Dict[str, list[str]] = dict() # Loop through the interface map for name, interface in interfaceMap.items(): # Generate the TypeScript code interface interfaceLines[name] = Generator.interface(name, interface) # Combine the header, imports, and interface lines codeLines = [*header, *imports] for name, interface in interfaceLines.items(): codeLines.extend(interface) # Get the output file path for the TypeScript file outputFile = self.getOutputFilePath(filePath) # Write the TypeScript code into the output file with io.open(outputFile, mode="w") as f: f.write("\n".join(codeLines)) # Log an info message Logger.info(f"Conversion completed and written into '{outputFile}'")