import time import pandas as pd import numpy as np import matplotlib.pyplot as plt import copy from mopso import MOPSO from surrogate_handler import SurrogateHandler # --- EXTENDED CLASS (Inheritance) --- class SmartMOPSO(MOPSO): def __init__(self, model_type=None, **kwargs): super().__init__(**kwargs) # Initialize Surrogate Handler if model_type is provided self.use_surrogate = (model_type is not None) if self.use_surrogate: self.surrogate_handler = SurrogateHandler(model_type) # Pre-fill with initial particle data for p in self.particles: self.surrogate_handler.add_data(p.x, p.f_current[1]) def iterate(self): train_freq = 10 # Retrain every 10 iterations # Check if retraining is needed if self.use_surrogate and (self.t % train_freq == 0): self.surrogate_handler.train() # Determine if AI prediction should be used use_ai = (self.use_surrogate and self.surrogate_handler.is_trained and self.t % train_freq != 0) # Main loop (overriding original logic to manage control flow) for t in range(self.t): self.select_leader() for i in range(self.n): # Movement self.particles[i].update_velocity(self.leader.x, self.c1, self.c2, self.w) self.particles[i].update_position() self.particles[i].keep_boudaries(self.A_max) if use_ai: # 1. Fast exact calculation (f1, f3) f1 = self.particles[i].f1(self.prices) f3 = self.particles[i].f3() # 2. Slow prediction (f2) via AI f2_pred = self.surrogate_handler.predict(self.particles[i].x) # 3. Inject scores without running the expensive 'updating_socs' self.particles[i].f_current = [f1, f2_pred, f3] else: # Standard Calculation (Slow & Exact) self.particles[i].updating_socs(self.socs, self.capacities) self.particles[i].evaluate(self.prices, self.socs, self.socs_req, self.times) # Capture data for AI training if self.use_surrogate: self.surrogate_handler.add_data(self.particles[i].x, self.particles[i].f_current[1]) self.particles[i].update_best() self.update_archive() def calculate_elec_prices(csv_file:str, sep:str=';'): elec_df = pd.read_csv(filepath_or_buffer=csv_file, sep=sep) # Mean of Winter and Summer of 2025 electric prices (Euros/MWh) elec_mean = (elec_df['Winter 2025'].mean() + elec_df['Summer 2025'].mean())/2 # Standard variation of Winter and Summer of 2025 electric prices (Euros/MWh) elec_std = (elec_df['Winter 2025'].std() + elec_df['Summer 2025'].std())/2 print(f'Electricity prices:\n - Mean: ${elec_mean}€/Mwh\n - Std: ${elec_std}€/Mwh') return elec_mean, elec_std def generate_capacities(csv_file:str, nb_vehicles:int, seed:int=42, sep:str=';'): cap_df = pd.read_csv(filepath_or_buffer=csv_file, sep=sep) # Getting back all kind of battery capacities with unique values all_capacities = cap_df['Battery Capacity kwh'].dropna().unique() # Extracting random values for generating the array of capacities capacities = pd.Series(all_capacities).sample(n=nb_vehicles, random_state=seed) print(f'Capacities of vehicles (kwh): ${capacities}') return capacities # --- EXECUTION FUNCTION --- def run_scenario(scenario_name, model_type=None): elec_price_csv = 'data/elec_prices.csv' capacity_csv = 'data/vehicle_capacity.csv' # Simulation parameters N = 20 # Number of vehicles T = 30 # Number of iterations (for the particles) W = 0.4 # Inertia (for exploration) C1 = 0.3 # Individual trust C2 = 0.2 # Social trust ARC_SIZE = 10 # Archive size P_MEAN, P_STD = calculate_elec_prices(elec_price_csv) CAPACITIES = generate_capacities(capacity_csv, N) NB_TICKS = 48 DELTA = 60 A_MAX = 0 X_MAX = 0 X_MIN = 0 print(f"\n--- Launching Scenario: {scenario_name} ---") start_time = time.time() # Simulation parameters params = { 'A_max': 500, 'price_mean': 0.15, 'price_std': 0.05, 'capacities': [50]*10, 'n': 20, 't': 50, 'w': 0.4, 'c1': 2.0, 'c2': 2.0, 'nb_vehicles': 10, 'delta_t': 60, 'nb_of_ticks': 72 } # Instantiate extended class optimizer = SmartMOPSO(model_type=model_type, **params) # Run simulation optimizer.iterate() end_time = time.time() duration = end_time - start_time # Retrieve best f2 (e.g. from archive) best_f2 = min([p.f_best[1] for p in optimizer.archive]) if optimizer.archive else 0 print(f"Finished in {duration:.2f} seconds.") print(f"Best f2 found: {best_f2:.4f}") return duration, best_f2 # --- MAIN --- if __name__ == "__main__": results = {} # 1. Without Surrogate (Baseline) d1, f1_score = run_scenario("No AI", model_type=None) results['No-AI'] = (d1, f1_score) # 2. With MLP d2, f2_score = run_scenario("With MLP", model_type='mlp') results['MLP'] = (d2, f2_score) # 3. With Random Forest d3, f3_score = run_scenario("With Random Forest", model_type='rf') results['RF'] = (d3, f3_score) # --- DISPLAY RESULTS --- print("\n=== SUMMARY ===") print(f"{'Mode':<15} | {'Time (s)':<10} | {'Best f2':<10}") print("-" * 45) for k, v in results.items(): print(f"{k:<15} | {v[0]:<10.2f} | {v[1]:<10.4f}")