Source code for pycev

################################################################################
"""
pycev: A compressed event record reader for SEL CEV files.

Author(s):
  - Joe Stanley: joe_stanley@selinc.com  engineerjoe440@gmail.com

Homepage: https://github.com/engineerjoe440/pycev

SEL Protocol Application Guide:
    https://selinc.com/api/download/5026/

SEL Compressed ASCII (CASCII) Application Guide:
    https://selinc.com/api/download/5021/

───────────────────────────────────────────────────────────────────────────────

MIT License

Copyright (c) 2020 Joe Stanley

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
################################################################################

# Standard Imports
import os
import re
import inspect
from typing import Union
import warnings
import datetime as dt
from collections import namedtuple

# Describe Package for External Interpretation
_name_ = "pycev"
_version_ = "0.1.0"
__version__ = _version_  # Alias the Version String


# Define Global Strings
EVENT_SETTINGS_SEP = '"SETTINGS","02E1"'
ANALOG_SAMPLES = "SAM/CYC_A"
DIGITAL_SAMPLES = "SAM/CYC_D"
TRIGGER_KEY_CHAR = ">"
FREQUENCY_KEY = "FREQ"


# Custom Exceptions and Warnings
class MalformedChecksumFailure(UserWarning):
    """Record data appears to be malformed, and fails checksum validation."""

class UnexpectedFileExtension(UserWarning):
    """File does not appear to use expected "CEV" extension."""

class MalformedHeadingDataMismatch(Exception):
    """CEV is Malformed - Heading and Data Section Lengths do not Match."""

class MalformedNoFIDFound(Exception):
    """CEV is Malformed - FID String Could not be Located."""

class MalformedNoSampleNumberFound(Exception):
    """CEV is Malformed - Number of Samples Could not be Located."""


# Define Function to Interpret Row-Wise Checksum for Validity
[docs]def row_wise_checksum(row_data, constrain=True): """ Identify the data and validate it with included checksum. Events should contain rows of data, where each row appears as: "some,data,in,the,event","checksum" Knowing this format, this function will separate the data from the checksum, and use the checksum to evaluate the validity of the data. The resulting data will be returned as a namedtuple as: Row('data':str, 'validity':bool) Parameters ---------- row_data: str The string containing all row data in form described above. Returns ------- row: namedtuple The namedtuple containing the separated data and validity marker in the form of: Row('data':str, 'validity':bool) """ # Define the namedtuple Structure used for Return Row = namedtuple('Row', ['data', 'validity']) # Clean Input Data row_data = row_data.replace('\r', '') row_data = row_data.replace('\n', '') # Successfully Extracted Data row_contents = row_data[:-6] # Remove Checksum Characters checksum_str = row_data[-6:].replace('"', '') # Keep Checksum Only # Evaluate Checksum as INT checksum_int = int.from_bytes( bytes.fromhex(checksum_str), byteorder='big', signed=False ) # Evaluate the Checksum if isinstance(row_contents, str): checksum = sum(map(ord, row_contents)) else: checksum = sum(row_contents) # Cap the Value if Needed if constrain: checksum = checksum & 0xffff # Bit-wise AND with 16-bit maximum # Remove Trailing Comma if Present if row_contents.endswith(','): row_contents = row_contents[:-1] # Trim Comma # Pack Structure valid = checksum == checksum_int row = Row(data=row_contents, validity=valid) return row
# Define Function to Split Event Data from Relay Settings
[docs]def split_event_and_relay_data(data): """ Split the Event Data from Relay Settings. This function accepts the full text from a CEV file, and separates the event-record data from the relay's configuration settings. These two separated items are returned as a named-tuple which is formed as: EventData('record':str, 'settings':str) Parameters ---------- data: str The string containing all data read from the CEV file. Returns ------- event: namedtuple The namedtuple containing the 'record' and 'settings' information, each being the so-described elements from the event record which have been read. namedtuple is of the form: EventData('record':str, 'settings':str) where 'record' is the event information, and 'settings' is the relay's configuration settings. """ # Define the namedtuple Structure used for Return EventData = namedtuple('EventData', ['record', 'settings']) # Gather the Components from the Event Record record, settings = data.split(EVENT_SETTINGS_SEP) # Prepare Structure event = EventData(record=record, settings=settings) return event
# Define the Primary Class
[docs]class Cev(): """ SEL CEV File Reader. This class serves to provide functionality to read SEL (Schweitzer Engineering Laboratories) Compressed EVent records (CEV files) and grant pragmatic access to the key components of these records. The methods, attributes, and properties of this class serve to expose data in a manner that supports scientific analysis and allow common data-science tools such as NumPy, Matplotlib, Pandas, and others to conveniently interpret information. This all is in an effort to support data scientists and engineers to make empowered, intelligent decisions about the power systems which they are responsible for. Within this module, this class (`Cev`) is also aliased as CEV for programming convenience, and to allow users to access the functionality in a format that is consistent with their work. Parameters ---------- file: str, optional String describing the relative or fully qualified path to the CEV file that should be read. Optionally used during class initialization, may also be loaded using the `load` method. data: str, optional String describing all contents of the CEV file as read. Optionally used during class initialization to pre-load the contents of the CEV file. **kwargs: dict, optional Additional optional parameters which may be specified during class initialization. Common arguments include: - encoding: str, file encoding such as 'utf-8' - decode_opt: str, file decoding option such as 'strict', 'ignore', 'replace', or 'backslashreplace' as defined by standard str.decode method. - ignore_warnings: bool, control to ignore warnings Class initialization may include one or more kwargs, but none are required. Attributes ---------- analog_channels: list of list of float List of lists containing the analog samples for each channel. analog_channel_ids: list of str List of the analog channel names whose index values correspond directly to the channel datasets in `analog_channels`. analog_count: int Number of analog channels present in CEV. channels_count: int Total number of analog and digital (status) channels in CEV. data: str Full string context of the entire CEV record; includes both the event information and relay settings that were included with the record. digital_channels: list of list of bool List of lists containing boolean states for each digital (status) channel present in CEV. This list presents itself as an alias to the `status_channels` class attribute. digital_channel_ids: list of str List of the digital (status) channel names whose index values correspond directly to the channel datasets in `digital_channels`. This list presents itself as an alias to the `status_channel_ids` class attribute. digital_count: int Number of digital channels present in the CEV. fid: str Relay firmware identification string; does not include the 'FID=' specifier. frequency: float The recorded nominal frequency present in the CEV. raw_fid: str "Raw" relay firmware identification string; includes the 'FID=' specifier to lead the string. record: str Event record data-sub-section contents; contains only the event-related data and heading fields of the CEV that was loaded. record_lines: list of str Row-wise split contents of the record with all newline and carriage-return characters removed. settings: str Relay settings data-sub-section contents; contains only the relay settings portion of the CEV that was loaded. status_channels: list of list of bool List of lists containing boolean states for each status (digital) channel present in CEV. This list is aliased to the `digital_channels` class attribute. status_channel_ids: list of str List of the status (digital) channel names whose index values correspond directly to the channel datasets in `status_channels`. This list is aliased to the `digital_channel_ids` class attribute. status_count: int Number of status channels present in the CEV. trigger_time: datetime Date-time structure indicating when the event was "triggered" by protection logic in the relay. Examples -------- >>> from pycev import CEV >>> # Load a file and parse, directly. >>> record = CEV(file="./event-report.cev") >>> print("Trigger time = {}s".format(record.trigger_time)) """ def __init__(self, file: str = None, data: Union[str, bytes] = None, **kwargs): """ Prepare CEV Reader. Class initialization with optional data input methods for file-path and raw data as either a string or bytes. """ # Handle Encoding Type if Provided if "encoding" in kwargs: encoding = kwargs['encoding'] else: encoding = None # Handle Decoding Option if Provided if "decode_opt" in kwargs: self._decode_opt = kwargs['decode_opt'] elif "decode_option" in kwargs: self._decode_opt = kwargs['decode_option'] else: self._decode_opt = 'strict' # Handle Warnings Override if "ignore_warnings" in kwargs: self.ignore_warnings = kwargs["ignore_warnings"] else: self.ignore_warnings = False # Prepare Defaults self.data = '' self.record = '' self.settings = '' self.record_lines = [] self.fid = '' self.raw_fid = '' self.time = [] self.trigger_time = dt.datetime(1970, 1, 1) # Default to Epoch self.channels_count = 0 self.analog_channels = [] self.analog_channel_ids = [] self.status_channels = [] self.status_channel_ids = [] self.digital_channels = self.status_channels # Alias the Digitals self.digital_channel_ids = self.status_channel_ids # Alias Digitals self.analog_count = 0 self.status_count = 0 self.digital_count = 0 self.frequency = 0.0 self.year = 1970 self.month = 1 self.day = 1 self.hour = 0 self.min = 0 self.sec = 0 self.msec = 0 self._ignored_channels = [] self._trig_column = -1 self._trig_row = 0 self._analog_samp_timedelta = None self._digital_samp_timedelta = None self._properties = {} # Empty Dictionary of the Keys # Prepare Data or File if Provided if isinstance(file, (str, bytes)): self.load(file=file, encoding=encoding) elif hasattr(file, "read"): # Probably a file-like object? self.load_data(data=file.read(), encoding=encoding) elif isinstance(data, (str, bytes)): self.load_data(data=data, encoding=encoding) # Define Simple Method to Identify Class Keys def _keys(self): """Capture Class Attributes as Keys.""" return self.__dict__.keys() # Define Simple File Extension Validator def _validate_extension(self, file): """Validate Extension is of *.CEV Format.""" if not os.path.exists(file): raise FileNotFoundError( "Argument `file` must be a valid file-path to a CEV file." ) _, ext = os.path.splitext(file) if 'CEV' not in ext.upper(): # Throw Warning to User if not self.ignore_warnings: # Capture Pertinent Information parent = inspect.stack()[2] callfile = parent.filename lineno = parent.lineno warnings.showwarning( message=( 'File does not appear to use "CEV" extension,' + f' instead is "{ext}".' ), category=UnexpectedFileExtension, filename=callfile, lineno=lineno, ) # Proceed Without Exception # Define Simple Decoder for Data def _decode(self, data, encoding): """Simply Decode the Data Using the Specified Encoding Format.""" if encoding is not None: return data.decode(encoding, self._decode_opt) else: return data # Define Function to Prepare Record and Validate Checksums def _prepare_and_validate_record(self): """Simply Split the Record and Settings, then Evaluate Checksums.""" # Split Data self.record, self.settings = split_event_and_relay_data(self.data) invalid_lines = {} # Evaluate Record Checksums for i, line in enumerate(self.record.split('\n')): if line == '' or (line is None) or line == ' ': continue # Collect Line Data and content, validity = row_wise_checksum(line) self.record_lines.append(content) if not validity: invalid_lines[i + 1] = content # Throw Warning to User if invalid_lines and (not self.ignore_warnings): # Capture Pertinent Information parent = inspect.stack()[2] callfile = parent.filename lineno = parent.lineno warnings.showwarning( message=( 'Record data appears to be malformed, ' 'and fails checksum validation for lines: ' ", ".join([str(ind) for ind in invalid_lines]) ), category=MalformedChecksumFailure, filename=callfile, lineno=lineno, ) # Return the Validity Signal return len(invalid_lines) == 0 # Define Internal Test to Identify Header def _is_header(self, row_data): """Test Function to Evaluate Whether Row is Header.""" if row_data.startswith('"') and row_data.endswith('"'): return True else: return False # Define Internal Test to Identify Data Row def _is_data(self, row_data): """Test Function to Evaluate Whether Row isn't Header.""" return not self._is_header(row_data=row_data) # Define Primary Parsing Function def _parse_record(self): """Primary Parsing Function to Interpret the CEV.""" # Operate on "Row-Pairs" with two Rows at Once to Pair Key with Value # Start with Row-Index-Zero (first row), and Assuming Header iRow = 0 # Only Validate the First Row, Since Second Row Should Contain FID header = self._is_header(self.record_lines[iRow]) # Manage the Initial Record Data while header: # Clean and Split the Heading and Content heading_row = self.record_lines[iRow].replace('"', '').split(',') content_row = self.record_lines[iRow + 1] content_row = content_row.replace('"', '').split(',') if not len(heading_row) == len(content_row): print(heading_row) print(content_row) raise MalformedHeadingDataMismatch( "CEV may be malformed, heading and data length don't match" ) # Load the Data into Class Keys for key, value in zip(heading_row, content_row): # Verify Attribute and Load key_lower = key.lower() if key_lower in self._keys(): if callable(self.__dict__[key_lower]): continue # Don't Overwrite a Callable! if (key.find('/') == -1) and (key.find('(') == -1): # Valid Class Variable Name, Load Directly self.__dict__[key_lower] = value # Store the Data as a Property self._properties[key] = value # Check Next Group header = ( self._is_header(self.record_lines[iRow + 2]) and self._is_data(self.record_lines[iRow + 3]) and self._is_header(self.record_lines[iRow + 4]) ) # Increment Row Index iRow += 2 # Following the Primary Header Content, a Single Header Remains # with the Analog and Digital Channel Names. # Split on either a comma (',') or a space (' ') channels = re.split(r',| ', self.record_lines[iRow]) is_analog = True # First Channel from Left is Analog # Identify Channel Names as Analog or Digital for i, channel in enumerate(channels): # Check if Trig Channel if '"TRIG"' == channel: is_analog = False self._ignored_channels.append(i) self._trig_column = i continue # Don't Track the TRIP Channel elif '*' == channel: self._ignored_channels.append(i) continue # Don't Track Unused Channels elif '' == channel: continue # Don't Track Empty Channel Names # Remove Double Quotes channel = channel.replace('"', '') # Channel Must be Valid, Append Name to Either Analog or Digital if is_analog: self.analog_channel_ids.append(channel) else: self.status_channel_ids.append(channel) iRow += 1 # Increment Past the Data Heading Column # Characterize Number of Channels self.analog_count = len(self.analog_channel_ids) self.status_count = len(self.status_channel_ids) # Build the Channel Lists According to Sizes self.analog_channels = [[] for x in range(self.analog_count)] self.status_channels = [[] for x in range(self.status_count)] # Iterate over Data Rows to Load Channels numRows = len(self.record_lines) initRow = iRow while iRow < numRows: channels = self.record_lines[iRow].split(',') # Track Analog Quantities k = 0 for i in range(0, self.analog_count): value = float(channels[i]) # Verify that Channel Index Shouldn't be Ignored if i not in self._ignored_channels: self.analog_channels[i].append(value) k = i + 2 # Identify Trigger Data Row if TRIGGER_KEY_CHAR in channels[k-1]: self._trig_row = iRow - initRow # Format the Digitals digital_string = channels[k].replace('"', '') digitals = [] # Iteratively Process the Hex String while len(digital_string) > 0: hex_byte = digital_string[:2] digital_string = digital_string[2:] # Identify the Bits binary_byte = bin(int(hex_byte, base=16)) # Use zfill to pad the string with zeros as we want # all 8 digits of the byte. bits_string = binary_byte[2:].zfill(8) digitals.extend([int(bit) for bit in bits_string]) # Track Digital Quantities for i in range(0, self.status_count): # Verify that Channel Index (Offset by the TRIG channel) # Shouldn't be Ignored if (i + self._trig_column) not in self._ignored_channels: self.status_channels[i].append(digitals[i]) iRow += 1 # Increment Row Index # Define Event Trigger Time Evaluator def _eval_trigger_time(self): """Use Time Information to Identify Trigger Time.""" usec = int(self.msec) * 1000 self.trigger_time = dt.datetime( year=int(self.year), month=int(self.month), day=int(self.day), hour=int(self.hour), minute=int(self.min), second=int(self.sec), microsecond=usec ) # Define Frequency Identifier def _eval_frequency(self): """Identify and load the system nominal frequency.""" self.frequency = float(self._properties.get(FREQUENCY_KEY, 60.0)) # Define FID Cleaner def _clean_fid(self): """Store the 'raw' FID in a New Variable, and Clean Existing FID.""" try: self.raw_fid = self.fid self.fid = self.fid.split('=')[1] except Exception as err: raise MalformedNoFIDFound( "Failed to load relay FID from CEV" ) from err # Define Samples-Per-Cycle Evaluator def _eval_samples_per_cycle(self): """Identify the Samples/Cycle Indicators, Calculate the Deltas.""" try: # Extract the Number of Samples per Cycle, Evaluate Milliseconds ms_per_cyc = 1000 / self.frequency analog_ms = ms_per_cyc / float(self._properties[ANALOG_SAMPLES]) digital_ms = ms_per_cyc / float(self._properties[DIGITAL_SAMPLES]) except KeyError as err: raise MalformedNoSampleNumberFound( "Failed to identify number of samples per cycle" ) from err # Prepare the TimeDeltas self._analog_samp_timedelta = dt.timedelta(milliseconds=analog_ms) self._digital_samp_timedelta = dt.timedelta(milliseconds=digital_ms) # Define Timestamp Loader def _eval_timestamps(self): """Evaluate event timestamps.""" # Calculate the first timestamp initTimeDelta = self._analog_samp_timedelta * self._trig_row self.time = [self.trigger_time - initTimeDelta] # Iteratively Calculate Remaining Timestamps for _ in range(1, len(self.analog_channels[0])): # Add Timedelta to Most Recent Time Value self.time.append(self.time[-1] + self._analog_samp_timedelta) # Define File Loader Method
[docs] def load(self, file: str, encoding: str = None): """ *CEV File Loader Method*. Use this method to load a CEV file, and parse its contents into the valuable class attributes and structure. Parameters ---------- file: str String describing the relative or fully qualified path to the CEV file that should be read. Optionally used during class initialization, may also be loaded using the `load` method. encoding: str, optional String specifying the encoding format (if required) in which the file is stored. This may be used for files of format 'utf-8', for example. See Also -------- load_data : Load data which has already been read from a file, or is presented as a stream. Raises ------ FileNotFoundError If the file cannot be located on the system MalformedHeadingDataMismatch If the file contains a header and content row pair which do not share an equal number of columns. Warns ----- UnexpectedFileExtension If the uppercase-cast file extension is not ".CEV" MalformedChecksumFailure If any of the CEV line-wise checksums do not evaluate successfully Examples -------- >>> from pycev import CEV >>> # Load a file and parse, directly. >>> record = CEV() # Create the parser instance >>> record.load(file="./event-report.cev") >>> print("Trigger time = {}s".format(record.trigger_time)) """ # Switch between Handling File-Path, or File-Like-Object if isinstance(file, str): # Validate file Extension self._validate_extension(file) # Read File with Encoding with open(file, 'r', encoding=encoding) as file_obj: self.data = file_obj.read() # Gather ALL Data From File elif hasattr(file, "read"): self.data = file.read() # Process the Data and Load Record self.load_data(data=None)
# Define Data Loader Method
[docs] def load_data(self, data: Union[str, bytes], encoding: str = None): """ *CEV Data Loader Method*. Use this method to load the data from a CEV file which has already been read, or data which is being streamed to the class (i.e., an active connection to a relay). This method will parse the data and load the class attributes and structures appropriately. Parameters ---------- data: [str, bytes] String of the full file content. encoding: str, optional String specifying the encoding format (if required) in which the file is stored. This may be used for files of format 'utf-8', for example. See Also -------- load : Load data from a CEV file. Raises ------ MalformedHeadingDataMismatch If the file contains a header and content row pair which do not share an equal number of columns. Warns ----- MalformedChecksumFailure If any of the CEV line-wise checksums do not evaluate successfully Examples -------- >>> from pycev import CEV >>> # Load a file and parse, directly. >>> record = CEV() # Create the parser instance >>> with open("./event-report.cev", 'r') as file: ... data = file.read() >>> record.load_data(data=data) >>> print("Trigger time = {}s".format(record.trigger_time)) """ # Method is Called Internally with `data=None` # Don't Try Loading in this Case if data is not None: # Decode Data As Needed self.data = self._decode(data, encoding=encoding) # Prepare Record Information self._prepare_and_validate_record() # Parse the Record self._parse_record() # Determine the Trigger Time self._eval_trigger_time() # Identify Nominal System Frequency self._eval_frequency() # Clean FID self._clean_fid() # Rationalize Number of Samples per Cycle self._eval_samples_per_cycle() # Evaluate the Timestamps self._eval_timestamps()
# Define Method to Access the Analog Channel by Name
[docs] def get_analog(self, channel_name: str): """ *Extract an analog channel by name*. Use this method to return the list of analog values associated with the particular analog channel with the specified name. Parameters ---------- channel_name: str Name of the analog channel which should be extracted. Returns ------- channel: list of float The analog channel values in a zero-based list. See Also -------- get_status : Collect the digital channel status for a specified name. get_digital : Collect the digital channel status for a specified name. Examples -------- >>> from pycev import CEV >>> # Load a file and parse, directly. >>> record = CEV() # Create the parser instance >>> record.load(file="./event-report.cev") >>> record.get_analog("FREQ") [...] """ # Identify the Analog Channel Index channel_index = self.analog_channel_ids.index(channel_name) # Return the Analog Channel return self.analog_channels[channel_index]
# Define Method to Access the Digital Channel by Name
[docs] def get_status(self, channel_name: str): """ *Extract an digital channel by name*. Use this method to return the list of digital values associated with the particular digital channel with the specified name. Parameters ---------- channel_name: str Name of the digital channel which should be extracted. Returns ------- channel: list of bool The digital channel values in a zero-based list. See Also -------- get_analog : Collect the analog channel status for a specified name. get_status : Collect the digital channel status for a specified name. get_digital : Collect the digital channel status for a specified name. Examples -------- >>> from pycev import CEV >>> # Load a file and parse, directly. >>> record = CEV() # Create the parser instance >>> record.load(file="./event-report.cev") >>> record.get_status("TRIPLED") [...] >>> record.get_digital("TRIPLED") [...] """ # Identify the Digital Channel Index channel_index = self.status_channel_ids.index(channel_name) # Return the Digital Channel return self.status_channels[channel_index]
# Alias `get_status` to `get_digital` get_digital = get_status
# Alias the Class: `Cev` to `CEV` for Convenience CEV = Cev # END