import time import numpy as np import matplotlib.pyplot as plt import copy from mopso import MOPSO from surrogate_handler import SurrogateHandler # --- EXTENDED CLASS (Inheritance) --- class SmartMOPSO(MOPSO): def __init__(self, model_type=None, **kwargs): super().__init__(**kwargs) # Initialize Surrogate Handler if model_type is provided self.use_surrogate = (model_type is not None) if self.use_surrogate: self.surrogate_handler = SurrogateHandler(model_type) # Pre-fill with initial particle data for p in self.particles: self.surrogate_handler.add_data(p.x, p.f_current[1]) def iterate(self): train_freq = 10 # Retrain every 10 iterations # Check if retraining is needed if self.use_surrogate and (self.t % train_freq == 0): self.surrogate_handler.train() # Determine if AI prediction should be used use_ai = (self.use_surrogate and self.surrogate_handler.is_trained and self.t % train_freq != 0) # Main loop (overriding original logic to manage control flow) for t in range(self.t): self.select_leader() for i in range(self.n): # Movement self.particles[i].update_velocity(self.leader.x, self.c1, self.c2, self.w) self.particles[i].update_position() self.particles[i].keep_boudaries(self.A_max) if use_ai: # 1. Fast exact calculation (f1, f3) f1 = self.particles[i].f1(self.prices) f3 = self.particles[i].f3() # 2. Slow prediction (f2) via AI f2_pred = self.surrogate_handler.predict(self.particles[i].x) # 3. Inject scores without running the expensive 'updating_socs' self.particles[i].f_current = [f1, f2_pred, f3] else: # Standard Calculation (Slow & Exact) self.particles[i].updating_socs(self.socs, self.capacities) self.particles[i].evaluate(self.prices, self.socs, self.socs_req, self.times) # Capture data for AI training if self.use_surrogate: self.surrogate_handler.add_data(self.particles[i].x, self.particles[i].f_current[1]) self.particles[i].update_best() self.update_archive() # --- EXECUTION FUNCTION --- def run_scenario(scenario_name, model_type=None): print(f"\n--- Launching Scenario: {scenario_name} ---") start_time = time.time() # Simulation parameters params = { 'f_weights': [1,1,1], 'A_max': 500, 'price_mean': 0.15, 'price_std': 0.05, 'capacities': [50]*10, 'n': 20, 't': 50, 'w': 0.4, 'c1': 2.0, 'c2': 2.0, 'nb_vehicles': 10, 'delta_t': 60, 'nb_of_ticks': 72 } # Instantiate extended class optimizer = SmartMOPSO(model_type=model_type, **params) # Run simulation optimizer.iterate() end_time = time.time() duration = end_time - start_time # Retrieve best f2 (e.g., from archive) best_f2 = min([p.f_current[1] for p in optimizer.archive]) if optimizer.archive else 0 print(f"Finished in {duration:.2f} seconds.") print(f"Best f2 found: {best_f2:.4f}") return duration, best_f2 # --- MAIN --- if __name__ == "__main__": results = {} # 1. Without Surrogate (Baseline) d1, f1_score = run_scenario("No AI", model_type=None) results['No-AI'] = (d1, f1_score) # 2. With MLP d2, f2_score = run_scenario("With MLP", model_type='mlp') results['MLP'] = (d2, f2_score) # 3. With Random Forest d3, f3_score = run_scenario("With Random Forest", model_type='rf') results['RF'] = (d3, f3_score) # --- DISPLAY RESULTS --- print("\n=== SUMMARY ===") print(f"{'Mode':<15} | {'Time (s)':<10} | {'Best f2':<10}") print("-" * 45) for k, v in results.items(): print(f"{k:<15} | {v[0]:<10.2f} | {v[1]:<10.4f}")