import csv from parseerror import ParseError import os import enum class SearchDirection(enum.Enum): CLOSEST = 0 PREVIOUS_ONLY = 1 FORWARD_ONLY = 2 def calc_capacity(charge_curve: list[dict]): capacity = 0.0 prev_time = -1 prev_current = -1 total_t = 0 for entry in charge_curve: if prev_time > 0: delta_s = entry['time'] - prev_time current = (entry['current'] + prev_current) / 2 capacity += current * (delta_s / (60.0 * 60.0)) total_t += delta_s prev_time = entry['time'] prev_current = entry['current'] return capacity class ChargeFile: def __init__(self, filename: str): self.start_voltage = 0.0 self.end_voltage = 0.0 self.capacity = 0.0 self.cell = -1 self.discharge = False self.current = 0.0 self.full_cycle = False self.step = 0 if os.path.split(filename)[1].startswith("single_cell_charge") or os.path.split(filename)[1].startswith("single_cell_discharge"): tokens = filename.split('.')[0].split('_') self.step = int(tokens[-2]) self.cell = int(tokens[-1]) elif os.path.split(filename)[1].startswith("charge_for"): self.step = int(filename.split('.')[0].split('_')[-1]) else: raise ParseError(f"File name {os.path.split(filename)[1]} not in the expected sheme for ChargeFile") with open(filename, newline='') as csvfile: reader = csv.reader(csvfile, delimiter=',', quotechar='"') reader.__next__() timestr = reader.__next__()[0] if timestr != "time": raise ParseError(f"Expected time got {timestr}") charge_curve = list() for row in reader: charge_curve.append({'time': int(row[0]), 'voltage': float(row[1]), 'current': float(row[2])}) self.current = charge_curve[int(len(charge_curve) / 2)]['current'] self.discharge = self.current < 0 self.start_voltage = charge_curve[0]['voltage'] self.end_voltage = charge_curve[-1]['voltage'] self.capacity = calc_capacity(charge_curve) self.full_cycle = self.start_voltage > 4.05 and self.end_voltage < 3.15 or self.start_voltage < 3.15 and self.end_voltage > 4.05 @staticmethod def FindClosest(charge_files: list, step: int, cellid: int = -1, full_cycle=True, direction=SearchDirection.CLOSEST): closest_file = None for charge_file in charge_files: if charge_file.cell != cellid: continue if direction == SearchDirection.PREVIOUS_ONLY and charge_file.step > step: continue if direction == SearchDirection.FORWARD_ONLY and charge_file.step < step: continue if not full_cycle or charge_file.full_cycle: if closest_file is not None: if abs(step - closest_file.step) > abs(step - charge_file.step): closest_file = charge_file elif abs(step - closest_file.step) == abs(step - charge_file.step) and step > closest_file.step and not closest_file.discharge: if (step > closest_file.step and not closest_file.discharge) or (step < closest_file.step and closest_file.discharge): closest_file = charge_file else: closest_file = charge_file return closest_file @staticmethod def GetSoc(charge_files: list, step: int, cellid: int, cell_count: int) -> float: common_closest_full = ChargeFile.FindClosest(charge_files, step, -1, True, SearchDirection.PREVIOUS_ONLY) specific_closest_full = ChargeFile.FindClosest(charge_files, step, cellid, True, SearchDirection.PREVIOUS_ONLY) if specific_closest_full is None and common_closest_full is None: return -1.0 if common_closest_full is None: closest_full = specific_closest_full elif specific_closest_full is None: closest_full = common_closest_full elif step - specific_closest_full.step < step - common_closest_full.step: closest_full = specific_closest_full else: closest_full = common_closest_full full_cap = closest_full.capacity if closest_full.cell == -1: full_cap = full_cap / cell_count if closest_full.discharge: charge_counter = 0.0 else: charge_counter = full_cap accepted_count = 0 end_voltage = closest_full.end_voltage for charge_file in charge_files: if charge_file.step <= step and charge_file.step > closest_full.step: accepted_count += 1 if charge_file.cell == -1: charge_counter += charge_file.capacity / cell_count else: charge_counter += charge_file.capacity end_voltage = charge_file.end_voltage if end_voltage > 4.15: charge_counter = full_cap elif end_voltage < 3.15: charge_counter = 0 soc = charge_counter / abs(full_cap) if soc > 1.05 or soc < -0.05: return -1 assert not (end_voltage < 3.4 and soc > 0.8) assert not (end_voltage > 4.0 and soc < 0.6) assert not (soc < -0.1 or soc > 1.1) return soc def GetCommonCapacityEstimate(charge_files: list, step: int) -> tuple[float, int] | None: prev_charge = ChargeFile.FindClosest(charge_files, step, -1, True, SearchDirection.PREVIOUS_ONLY) next_charge = ChargeFile.FindClosest(charge_files, step, -1, True, SearchDirection.FORWARD_ONLY) if prev_charge is None and next_charge is None: return None if prev_charge is None: return (abs(next_charge.capacity), next_charge.step - step) if next_charge is None: return (abs(prev_charge.capacity), step - prev_charge.step) return ((abs(next_charge.capacity) - abs(prev_charge.capacity)) * ((step - prev_charge.step) / (next_charge.step - prev_charge.step)) + abs(prev_charge.capacity), min(step - prev_charge.step, next_charge.step - step)) def GetCapacityEsitmate(charge_files: list, step: int, cellid: int, cell_count: int) -> float: prev_charge = ChargeFile.FindClosest(charge_files, step, cellid, True, SearchDirection.PREVIOUS_ONLY) next_charge = ChargeFile.FindClosest(charge_files, step, cellid, True, SearchDirection.FORWARD_ONLY) common_cap = ChargeFile.GetCommonCapacityEstimate(charge_files, step) if prev_charge is None and next_charge is None: if common_cap is None: return -1 return common_cap[0] / cell_count if prev_charge is not None and next_charge is not None: single_charge_estimate = (abs(next_charge.capacity) - abs(prev_charge.capacity)) * ((step - prev_charge.step) / (next_charge.step - prev_charge.step)) single_charge_estimate += abs(prev_charge.capacity) if common_cap is None or min(step - prev_charge.step, next_charge.step - step) < common_cap[1]: return single_charge_estimate common_cap_at_prev = ChargeFile.GetCommonCapacityEstimate(charge_files, prev_charge.step) common_cap_at_next = ChargeFile.GetCommonCapacityEstimate(charge_files, next_charge.step) avg_delta = ((abs(prev_charge.capacity) - common_cap_at_prev[0] / cell_count) + (abs(next_charge.capacity) - common_cap_at_next[0] / cell_count)) / 2.0 return (common_cap[0] / cell_count) + avg_delta singe_charge = prev_charge if prev_charge is not None else next_charge common_cap_at_single = ChargeFile.GetCommonCapacityEstimate(charge_files, singe_charge.step) return (common_cap[0] / cell_count) + (abs(singe_charge.capacity) - common_cap_at_single[0] / cell_count)