diff --git a/scripts/cellmeta.py b/scripts/cellmeta.py new file mode 100644 index 00000000..53e11d65 --- /dev/null +++ b/scripts/cellmeta.py @@ -0,0 +1,138 @@ +from chargefile import ChargeFile, SearchDirection + +CYCLES_PER_STEP = 4 +STEP_COUNT = 12 + + +def charge_cylces_in_step(globalstep: int): + cyclepoint = globalstep % STEP_COUNT + if cyclepoint == 0: + if (globalstep / STEP_COUNT) % 10 == 0: + return 1 + else: + return 0 + if cyclepoint == 9: + return 1 + if cyclepoint == 11: + return CYCLES_PER_STEP + return 0 + + +def charge_cycles_at_step(globalstep: int,): + count = 0 + for i in range(globalstep): + count += charge_cylces_in_step(i) + return count + + +def thermal_cylces_in_step(globalstep: int, substep: int = -1): + cyclepoint = globalstep % STEP_COUNT + if cyclepoint == 0: + if (globalstep / STEP_COUNT) % 10 == 0: + return 0 + else: + return CYCLES_PER_STEP + if cyclepoint == 2: + return CYCLES_PER_STEP + if cyclepoint == 4: + return CYCLES_PER_STEP + if cyclepoint == 6: + return CYCLES_PER_STEP + if cyclepoint == 8: + return CYCLES_PER_STEP + if cyclepoint == 10: + return CYCLES_PER_STEP + if cyclepoint == 11: + return 1 + return 0 + + +def thermal_cycles_at_step(globalstep: int, substep: int): + count = 0 + for i in range(globalstep - 1): + count += thermal_cylces_in_step(globalstep) + count += thermal_cylces_in_step(globalstep, substep) + return count + + +non_charge_cycle_cell = list(range(4, 7)) +non_thermal_cycle_cell = list(range(11, 21)) +cell_thermal_range = { + 0: [35, 55], + 1: [35, 55], + 2: [35, 55], + 3: [35, 55], + 4: [35, 55], + 5: [35, 55], + 6: [35, 55], + 7: [35, 45], + 8: [35, 45], + 9: [35, 45], + 10: [35, 45], + 11: [35, 35], + 12: [35, 35], + 13: [35, 35], + 14: [45, 45], + 15: [45, 45], + 16: [45, 45], + 17: [35, 55], + 18: [35, 55], + 19: [35, 55], + 20: [35, 55], +} + +cell_group_table = { + 0: 0, + 1: 0, + 2: 0, + 3: 0, + 4: 1, + 5: 1, + 6: 1, + 7: 2, + 8: 2, + 9: 2, + 10: 2, + 11: 3, + 12: 3, + 13: 3, + 14: 4, + 15: 4, + 16: 4, + 17: 5, + 18: 5, + 19: 5, + 20: 5, +} + + +class CellMeta: + def __init__(self, cellid: int, globalstep: int, substep: int, charge_files: list[ChargeFile], total_cells: int): + closest_avg = None + closest_charge = None + if cellid not in non_charge_cycle_cell: + closest_avg = ChargeFile.FindClosest(charge_files, globalstep, -1) + closest_charge = ChargeFile.FindClosest(charge_files, globalstep, cellid) + if closest_charge is not None: + assert closest_charge.cell == cellid + + total_charge_cells = 0 + for i in range(total_cells): + if i not in non_charge_cycle_cell: + total_charge_cells += 1 + + self.cell_group = cell_group_table[cellid] + self.charge_cycles = charge_cycles_at_step(globalstep) if cellid not in non_charge_cycle_cell else 0 + self.thermal_cycles = thermal_cycles_at_step(globalstep, substep) if cellid not in non_thermal_cycle_cell else 0 + self.last_avg_cap = abs(closest_avg.capacity) / total_charge_cells if closest_avg is not None else -1 + self.last_avg_cap_step = closest_avg.step if closest_avg is not None else -1 + self.last_cap = abs(closest_charge.capacity) if closest_charge is not None else -1 + self.last_cap_step = closest_charge.step if closest_charge is not None else -1 + self.thermal_range = cell_thermal_range[cellid] + if cellid not in non_charge_cycle_cell: + self.soc = ChargeFile.GetSoc(charge_files, globalstep, cellid, total_charge_cells) + self.cap_esitmate = ChargeFile.GetCapacityEsitmate(charge_files, globalstep, cellid, total_charge_cells) + else: + self.soc = -1 + self.cap_esitmate = -1 + self.soc_estimate = -1 diff --git a/scripts/chargefile.py b/scripts/chargefile.py new file mode 100644 index 00000000..e3ad4006 --- /dev/null +++ b/scripts/chargefile.py @@ -0,0 +1,177 @@ +import csv + +from parseerror import ParseError +import os +import enum + + +class SearchDirection(enum.Enum): + CLOSEST = 0 + PREVIOUS_ONLY = 1 + FORWARD_ONLY = 2 + + +def calc_capacity(charge_curve: list[dict]): + capacity = 0.0 + prev_time = -1 + prev_current = -1 + total_t = 0 + for entry in charge_curve: + if prev_time > 0: + delta_s = entry['time'] - prev_time + current = (entry['current'] + prev_current) / 2 + capacity += current * (delta_s / (60.0 * 60.0)) + total_t += delta_s + prev_time = entry['time'] + prev_current = entry['current'] + return capacity + + +class ChargeFile: + def __init__(self, filename: str): + self.start_voltage = 0.0 + self.end_voltage = 0.0 + self.capacity = 0.0 + self.cell = -1 + self.discharge = False + self.current = 0.0 + self.full_cycle = False + self.step = 0 + + if os.path.split(filename)[1].startswith("single_cell_charge") or os.path.split(filename)[1].startswith("single_cell_discharge"): + tokens = filename.split('.')[0].split('_') + self.step = int(tokens[-2]) + self.cell = int(tokens[-1]) + elif os.path.split(filename)[1].startswith("charge_for"): + self.step = int(filename.split('.')[0].split('_')[-1]) + else: + raise ParseError(f"File name {os.path.split(filename)[1]} not in the expected sheme for ChargeFile") + + with open(filename, newline='') as csvfile: + reader = csv.reader(csvfile, delimiter=',', quotechar='"') + reader.__next__() + timestr = reader.__next__()[0] + if timestr != "time": + raise ParseError(f"Expected time got {timestr}") + charge_curve = list() + for row in reader: + charge_curve.append({'time': int(row[0]), 'voltage': float(row[1]), 'current': float(row[2])}) + self.current = charge_curve[int(len(charge_curve) / 2)]['current'] + self.discharge = self.current < 0 + self.start_voltage = charge_curve[0]['voltage'] + self.end_voltage = charge_curve[-1]['voltage'] + self.capacity = calc_capacity(charge_curve) + self.full_cycle = self.start_voltage > 4.05 and self.end_voltage < 3.15 or self.start_voltage < 3.15 and self.end_voltage > 4.05 + + @staticmethod + def FindClosest(charge_files: list, step: int, cellid: int = -1, full_cycle=True, direction=SearchDirection.CLOSEST): + closest_file = None + for charge_file in charge_files: + if charge_file.cell != cellid: + continue + if direction == SearchDirection.PREVIOUS_ONLY and charge_file.step > step: + continue + if direction == SearchDirection.FORWARD_ONLY and charge_file.step < step: + continue + if not full_cycle or charge_file.full_cycle: + if closest_file is not None: + if abs(step - closest_file.step) > abs(step - charge_file.step): + closest_file = charge_file + elif abs(step - closest_file.step) == abs(step - charge_file.step) and step > closest_file.step and not closest_file.discharge: + if (step > closest_file.step and not closest_file.discharge) or (step < closest_file.step and closest_file.discharge): + closest_file = charge_file + else: + closest_file = charge_file + return closest_file + + @staticmethod + def GetSoc(charge_files: list, step: int, cellid: int, cell_count: int) -> float: + + common_closest_full = ChargeFile.FindClosest(charge_files, step, -1, True, SearchDirection.PREVIOUS_ONLY) + specific_closest_full = ChargeFile.FindClosest(charge_files, step, cellid, True, SearchDirection.PREVIOUS_ONLY) + + if specific_closest_full is None and common_closest_full is None: + return -1.0 + + if common_closest_full is None: + closest_full = specific_closest_full + elif specific_closest_full is None: + closest_full = common_closest_full + elif step - specific_closest_full.step < step - common_closest_full.step: + closest_full = specific_closest_full + else: + closest_full = common_closest_full + + full_cap = closest_full.capacity + if closest_full.cell == -1: + full_cap = full_cap / cell_count + + if closest_full.discharge: + charge_counter = 0.0 + else: + charge_counter = full_cap + + accepted_count = 0 + end_voltage = closest_full.end_voltage + + for charge_file in charge_files: + if charge_file.step <= step and charge_file.step > closest_full.step: + accepted_count += 1 + if charge_file.cell == -1: + charge_counter += charge_file.capacity / cell_count + else: + charge_counter += charge_file.capacity + end_voltage = charge_file.end_voltage + if end_voltage > 4.15: + charge_counter = full_cap + elif end_voltage < 3.15: + charge_counter = 0 + + soc = charge_counter / abs(full_cap) + + if soc > 1.05 or soc < -0.05: + return -1 + + assert not (end_voltage < 3.4 and soc > 0.8) + assert not (end_voltage > 4.0 and soc < 0.6) + assert not (soc < -0.1 or soc > 1.1) + + return soc + + def GetCommonCapacityEstimate(charge_files: list, step: int) -> tuple[float, int] | None: + prev_charge = ChargeFile.FindClosest(charge_files, step, -1, True, SearchDirection.PREVIOUS_ONLY) + next_charge = ChargeFile.FindClosest(charge_files, step, -1, True, SearchDirection.FORWARD_ONLY) + + if prev_charge is None and next_charge is None: + return None + if prev_charge is None: + return (abs(next_charge.capacity), next_charge.step - step) + if next_charge is None: + return (abs(prev_charge.capacity), step - prev_charge.step) + + return ((abs(next_charge.capacity) - abs(prev_charge.capacity)) * ((step - prev_charge.step) / (next_charge.step - prev_charge.step)) + abs(prev_charge.capacity), + min(step - prev_charge.step, next_charge.step - step)) + + def GetCapacityEsitmate(charge_files: list, step: int, cellid: int, cell_count: int) -> float: + prev_charge = ChargeFile.FindClosest(charge_files, step, cellid, True, SearchDirection.PREVIOUS_ONLY) + next_charge = ChargeFile.FindClosest(charge_files, step, cellid, True, SearchDirection.FORWARD_ONLY) + + common_cap = ChargeFile.GetCommonCapacityEstimate(charge_files, step) + if prev_charge is None and next_charge is None: + if common_cap is None: + return -1 + return common_cap[0] / cell_count + + if prev_charge is not None and next_charge is not None: + single_charge_estimate = (abs(next_charge.capacity) - abs(prev_charge.capacity)) * ((step - prev_charge.step) / (next_charge.step - prev_charge.step)) + single_charge_estimate += abs(prev_charge.capacity) + if common_cap is None or min(step - prev_charge.step, next_charge.step - step) < common_cap[1]: + return single_charge_estimate + common_cap_at_prev = ChargeFile.GetCommonCapacityEstimate(charge_files, prev_charge.step) + common_cap_at_next = ChargeFile.GetCommonCapacityEstimate(charge_files, next_charge.step) + avg_delta = ((abs(prev_charge.capacity) - common_cap_at_prev[0] / cell_count) + (abs(next_charge.capacity) - common_cap_at_next[0] / cell_count)) / 2.0 + return (common_cap[0] / cell_count) + avg_delta + + singe_charge = prev_charge if prev_charge is not None else next_charge + common_cap_at_single = ChargeFile.GetCommonCapacityEstimate(charge_files, singe_charge.step) + return (common_cap[0] / cell_count) + (abs(singe_charge.capacity) - common_cap_at_single[0] / cell_count) diff --git a/scripts/createdataset.py b/scripts/createdataset.py new file mode 100644 index 00000000..7904fcb8 --- /dev/null +++ b/scripts/createdataset.py @@ -0,0 +1,65 @@ +import argparse +import os +from tqdm import tqdm +import tarfile + +from chargefile import ChargeFile +from spectrafile import SpectraFile +from soc_estimation import add_soc_estimate + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("KissExpiramentCreateDataset") + parser.add_argument('--data', '-d', required=True, help="Data input directory") + parser.add_argument('--out', '-o', required=True, help="output directory") + args = parser.parse_args() + + filenames = [f for f in os.listdir(args.data) if os.path.isfile(os.path.join(args.data, f))] + charge_filenames = [f for f in filenames if f.startswith("charge") or f.startswith("single_cell_")] + spectra_filenames = [f for f in filenames if not f.startswith("charge") and not f.startswith("single_cell_") and not f.startswith("voltage_equlaization_") and f != "expiramentlog.csv"] + + print(f"found {len(spectra_filenames)} spectra") + print(f"found {len(charge_filenames)} charge/discharge sequences") + + if not os.path.exists(args.out + ".tmp"): + os.makedirs(args.out + ".tmp") + + charge_files = list() + for filename in charge_filenames: + charge_files.append(ChargeFile(os.path.join(args.data, filename))) + + cells = set() + for filename in tqdm(spectra_filenames, desc="Finding cells"): + tokens = filename.split('.')[0].split('-') + cellid = int(tokens[1]) + cells.add(cellid) + + print(f"{len(cells)} cells where involved") + + spectras = list() + + for filename in tqdm(spectra_filenames, desc="Resolveing data"): + tokens = filename.split('.')[0].split('-') + step = int(tokens[0]) + cellid = int(tokens[1]) + substep = int(tokens[2]) + sf = SpectraFile(os.path.join(args.data, filename), cellid, step, substep, charge_files, len(cells)) + spectras.append(sf) + + add_soc_estimate(spectras) + + for spectra in spectras: + spectra.write(args.out + ".tmp") + + try: + os.remove(f"{args.out}.tar") + except FileNotFoundError: + pass + tar = tarfile.open(f"{args.out}.tar", mode="x") + for filename in tqdm(os.listdir(args.out + ".tmp"), desc="Saveing data"): + path = os.path.join(args.out + ".tmp", filename) + tar.add(path, arcname=os.path.split(path)[-1]) + os.remove(path) + os.rmdir(args.out + ".tmp") + tar.close() + diff --git a/scripts/extractmeta.py b/scripts/extractmeta.py new file mode 100644 index 00000000..2d36e774 --- /dev/null +++ b/scripts/extractmeta.py @@ -0,0 +1,35 @@ +#!/bin/python + +import tarfile +from tqdm import tqdm +from eisgenerator import EisSpectra +import csv +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser("KissExpiramentExtractMeta") + parser.add_argument('--data', '-d', required=True, help="Data input tar file") + parser.add_argument('--out', '-o', required=True, help="output file") + args = parser.parse_args() + + with open(args.out, 'w', newline='') as outfile: + csvwriter = csv.writer(outfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + with tarfile.open(args.data, mode="r") as tar: + master_labels = None + rows = list() + for file_info in tqdm(tar, desc="Extracting Metadata", total=len(list(tar))): + if file_info.isfile(): + filestr = tar.extractfile(file_info).read() + spectra = EisSpectra.loadFromString(filestr) + if master_labels is None: + master_labels = spectra.labelNames + master_labels_copy = master_labels.copy() + for i in range(len(master_labels_copy)): + print(master_labels_copy[i]) + master_labels_copy[i] = master_labels_copy[i].strip(' "') + csvwriter.writerow(master_labels_copy) + elif master_labels != spectra.labelNames: + print(f"Error: not all files in {args.data} have the same labelNames") + exit(1) + csvwriter.writerow(spectra.labels) + tar.close() diff --git a/scripts/parseerror.py b/scripts/parseerror.py new file mode 100644 index 00000000..8809072a --- /dev/null +++ b/scripts/parseerror.py @@ -0,0 +1,3 @@ +class ParseError(Exception): + def __init__(self, message): + self.message = message diff --git a/scripts/soc_estimation.py b/scripts/soc_estimation.py new file mode 100644 index 00000000..7375c734 --- /dev/null +++ b/scripts/soc_estimation.py @@ -0,0 +1,32 @@ +from scipy.optimize import curve_fit +from scipy.interpolate import splrep, splev +import csv +import argparse +import numpy +import matplotlib.pyplot as plt +from eisgenerator import EisSpectra +import io +import tarfile +from tqdm import tqdm +from spectrafile import SpectraFile + + +def add_soc_estimate(spectras: list[SpectraFile]): + data = [list(), list()] + + for spectra in spectras: + if not spectra.meta.soc <= 0: + data[0].append(spectra.ocv) + data[1].append(spectra.meta.soc) + + ndata = numpy.asarray(data) + ndata.sort(1) + + knots = 9 + qs = numpy.linspace(0, 1, knots)[1:-1] + knots = numpy.quantile(ndata[0], qs) + tck = splrep(ndata[0], ndata[1], t=knots, k=3) + estimates = splev(ndata[0], tck) + + for spectra in spectras: + spectra.meta.soc_estimate = splev(spectra.ocv, tck) diff --git a/scripts/spectrafile.py b/scripts/spectrafile.py new file mode 100644 index 00000000..84a0754b --- /dev/null +++ b/scripts/spectrafile.py @@ -0,0 +1,40 @@ +import os + +from cellmeta import CellMeta +from eisgenerator import EisSpectra +from parseerror import ParseError +from chargefile import ChargeFile + + +class SpectraFile: + def __init__(self, filename: str, cellid: int, step: int, substep: int, charge_files: list[ChargeFile], total_cells: int): + self.cellid = cellid + self.step = step + self.substep = substep + self.filename = filename + self.temperature = -1.0 + self.ocv = -1.0 + self.meta = CellMeta(cellid, step, substep, charge_files, total_cells) + self.filename = os.path.split(filename)[1] + + self.spectra = EisSpectra.loadFromDisk(filename) + header = self.spectra.header.split('"')[1].split(',') + self.temperature = float(header[2]) + self.ocv = float(header[3]) + + if int(header[0]) != step or int(header[1]) != cellid: + raise ParseError(f"file name and file content of SpectraFile {filename} do not match") + + def write(self, directory: str): + metaList = [float(self.step), float(self.substep), float(self.cellid), float(self.meta.cell_group), float(self.temperature), float(self.ocv), + float(self.meta.charge_cycles), float(self.meta.thermal_cycles), float(self.meta.last_avg_cap), float(self.meta.last_avg_cap_step), + float(self.meta.last_cap), float(self.meta.last_cap_step), float(self.meta.cap_esitmate), float(self.meta.soc), float(self.meta.soc_estimate)] + self.spectra.setLabels(metaList) + self.spectra.model = "Unkown" + meta_dsc_strings = ["step", "substep", "cellid", "cell_group", "temparature", "ocv", "charge_cycles", "thermal_cycles", + "last_avg_cap", "last_avg_step", "last_cap", "last_cap_step", "cap_estimate", "soc", "soc_estimate"] + self.spectra.headerDescription = "File origin" + self.spectra.header = "CoinCellHell mesurement file" + self.spectra.labelNames = meta_dsc_strings + self.spectra.saveToDisk(os.path.join(directory, self.filename)) +