|
| 1 | +import time |
| 2 | +from typing import Dict, List |
| 3 | +import json |
| 4 | + |
| 5 | +from confluent_kafka import Producer, Consumer |
| 6 | +from DEVSKernel.KafkaDEVS.MS4Me.ms4me_kafka_wire_adapters import StandardWireAdapter |
| 7 | +from DEVSKernel.KafkaDEVS.logconfig import coord_kafka_logger |
| 8 | +from Patterns.Proxy import AbstractStreamProxy, AbstractReceiverProxy |
| 9 | +from DEVSKernel.KafkaDEVS.MS4Me.ms4me_kafka_messages import BaseMessage |
| 10 | + |
| 11 | +class KafkaStreamProxy(AbstractStreamProxy): |
| 12 | + """ |
| 13 | + Implémentation concrète du proxy d'envoi utilisant Kafka Producer. |
| 14 | + Encapsule toute la logique d'envoi de messages vers Kafka. |
| 15 | + """ |
| 16 | + |
| 17 | + def __init__(self, bootstrap_servers: str, wire_adapter=None): |
| 18 | + """ |
| 19 | + Initialise le proxy d'envoi Kafka. |
| 20 | + |
| 21 | + Args: |
| 22 | + bootstrap_servers: Adresse du broker Kafka |
| 23 | + wire_adapter: Adaptateur pour la sérialisation (par défaut: StandardWireAdapter) |
| 24 | + """ |
| 25 | + self._producer = Producer({ |
| 26 | + "bootstrap.servers": bootstrap_servers, |
| 27 | + "enable.idempotence": True, |
| 28 | + "acks": "all", |
| 29 | + "max.in.flight.requests.per.connection": 5, |
| 30 | + "retries": 10, |
| 31 | + }) |
| 32 | + self.wire = wire_adapter or StandardWireAdapter |
| 33 | + self._logger = coord_kafka_logger |
| 34 | + |
| 35 | + def send_message(self, topic: str, msg: BaseMessage): |
| 36 | + """ |
| 37 | + Envoie un message typé DEVS vers un topic Kafka. |
| 38 | + |
| 39 | + Args: |
| 40 | + topic: Le topic Kafka de destination |
| 41 | + msg: Le message DEVS typé à envoyer |
| 42 | + """ |
| 43 | + msg_dict = msg.to_dict() |
| 44 | + payload = json.dumps(msg_dict).encode("utf-8") |
| 45 | + |
| 46 | + self._producer.produce(topic, value=payload) |
| 47 | + self._producer.flush() |
| 48 | + |
| 49 | + self._logger.debug("OUT: topic=%s value=%s", topic, payload) |
| 50 | + |
| 51 | + def flush(self): |
| 52 | + """Force l'envoi immédiat de tous les messages en attente""" |
| 53 | + self._producer.flush() |
| 54 | + |
| 55 | + def close(self): |
| 56 | + """Ferme proprement le producer Kafka""" |
| 57 | + self._producer.flush() |
| 58 | + self._logger.info("KafkaStreamProxy closed") |
| 59 | + |
| 60 | + |
| 61 | +class KafkaReceiverProxy(AbstractReceiverProxy): |
| 62 | + """ |
| 63 | + Implémentation concrète du proxy de réception utilisant Kafka Consumer. |
| 64 | + Encapsule toute la logique de réception et traitement des messages Kafka. |
| 65 | + """ |
| 66 | + |
| 67 | + def __init__(self, bootstrap_servers: str, group_id: str, wire_adapter=None): |
| 68 | + """ |
| 69 | + Initialise le proxy de réception Kafka. |
| 70 | + |
| 71 | + Args: |
| 72 | + bootstrap_servers: Adresse du broker Kafka |
| 73 | + group_id: Identifiant du groupe de consommateurs |
| 74 | + wire_adapter: Adaptateur pour la désérialisation (par défaut: StandardWireAdapter) |
| 75 | + """ |
| 76 | + self._consumer = Consumer({ |
| 77 | + "bootstrap.servers": bootstrap_servers, |
| 78 | + "group.id": group_id, |
| 79 | + "auto.offset.reset": "latest", |
| 80 | + "enable.auto.commit": True, |
| 81 | + "session.timeout.ms": 30000, |
| 82 | + "max.poll.interval.ms": 300000, |
| 83 | + }) |
| 84 | + self.wire = wire_adapter or StandardWireAdapter |
| 85 | + self._logger = coord_kafka_logger |
| 86 | + self._subscribed_topics = [] |
| 87 | + |
| 88 | + def subscribe(self, topics: List[str]): |
| 89 | + """ |
| 90 | + S'abonne à une liste de topics Kafka. |
| 91 | + |
| 92 | + Args: |
| 93 | + topics: Liste des noms de topics à écouter |
| 94 | + """ |
| 95 | + self._consumer.subscribe(topics) |
| 96 | + self._subscribed_topics = topics |
| 97 | + self._logger.info("Subscribed to topics: %s", topics) |
| 98 | + |
| 99 | + def receive_messages(self, pending: List, timeout: float) -> Dict: |
| 100 | + """ |
| 101 | + Attend et collecte les messages des workers spécifiés. |
| 102 | + |
| 103 | + Args: |
| 104 | + pending: Liste des modèles DEVS dont on attend une réponse |
| 105 | + timeout: Temps maximum d'attente en secondes |
| 106 | + |
| 107 | + Returns: |
| 108 | + Dictionnaire mappant chaque modèle à son message reçu |
| 109 | + |
| 110 | + Raises: |
| 111 | + TimeoutError: Si tous les messages attendus ne sont pas reçus |
| 112 | + """ |
| 113 | + received = {} |
| 114 | + deadline = time.time() + timeout |
| 115 | + |
| 116 | + # Copie de la liste pour ne pas modifier l'originale |
| 117 | + remaining = list(pending) |
| 118 | + |
| 119 | + while remaining and time.time() < deadline: |
| 120 | + msg = self._consumer.poll(timeout=0.5) |
| 121 | + if msg is None or msg.error(): |
| 122 | + continue |
| 123 | + |
| 124 | + try: |
| 125 | + data = json.loads(msg.value().decode("utf-8")) |
| 126 | + |
| 127 | + self._logger.debug( |
| 128 | + "IN: topic=%s value=%s", |
| 129 | + msg.topic(), |
| 130 | + json.dumps(data), |
| 131 | + ) |
| 132 | + |
| 133 | + # Désérialisation du message DEVS |
| 134 | + devs_msg = self.wire.from_wire(data) |
| 135 | + model_name = data.get('sender') |
| 136 | + |
| 137 | + if not model_name: |
| 138 | + self._logger.warning("Message without sender field: %s", data) |
| 139 | + continue |
| 140 | + |
| 141 | + # Trouve et retire le modèle correspondant |
| 142 | + for i, model in enumerate(remaining): |
| 143 | + if model.getBlockModel().label == model_name: |
| 144 | + matched_model = remaining.pop(i) |
| 145 | + received[matched_model] = devs_msg |
| 146 | + break |
| 147 | + |
| 148 | + except json.JSONDecodeError as e: |
| 149 | + self._logger.error("JSON decode error: %s", e) |
| 150 | + except Exception as e: |
| 151 | + self._logger.error("Error processing message: %s", e) |
| 152 | + |
| 153 | + if remaining: |
| 154 | + missing_labels = [m.getBlockModel().label for m in remaining] |
| 155 | + raise TimeoutError( |
| 156 | + f"Kafka timeout: missing responses from models {missing_labels}" |
| 157 | + ) |
| 158 | + |
| 159 | + return received |
| 160 | + |
| 161 | + def purge_old_messages(self, max_seconds: float = 2.0) -> int: |
| 162 | + """ |
| 163 | + Vide les anciens messages présents dans le topic. |
| 164 | + |
| 165 | + Args: |
| 166 | + max_seconds: Temps maximum pour purger les messages |
| 167 | + |
| 168 | + Returns: |
| 169 | + Nombre de messages purgés |
| 170 | + """ |
| 171 | + flushed = 0 |
| 172 | + start_flush = time.time() |
| 173 | + |
| 174 | + self._logger.info("Purging old messages...") |
| 175 | + |
| 176 | + while time.time() - start_flush < max_seconds: |
| 177 | + msg = self._consumer.poll(timeout=0.1) |
| 178 | + if msg is None: |
| 179 | + break |
| 180 | + if not msg.error(): |
| 181 | + flushed += 1 |
| 182 | + |
| 183 | + if flushed > 0: |
| 184 | + self._logger.info("Flushed %s old messages", flushed) |
| 185 | + |
| 186 | + return flushed |
| 187 | + |
| 188 | + def close(self): |
| 189 | + """Ferme proprement le consumer Kafka""" |
| 190 | + self._consumer.close() |
| 191 | + self._logger.info("KafkaReceiverProxy closed") |
0 commit comments