1 module puppeteer.communication.communicator; 2 3 import puppeteer.puppeteer; 4 import puppeteer.var_monitor_utils; 5 6 import puppeteer.logging.ipuppeteer_logger; 7 8 import puppeteer.communication.icommunicator; 9 import puppeteer.communication.communicator_messages; 10 import puppeteer.communication.communication_exception; 11 12 import std.stdio; 13 import std.exception; 14 import std.concurrency; 15 import std.datetime; 16 import std.conv; 17 18 import core.thread; 19 import core.atomic; 20 21 shared class Communicator(VarMonitorTypes...) : ICommunicator!VarMonitorTypes 22 { 23 static int next_id = 0; 24 private int id; 25 26 @property 27 private string workerTidName() 28 { 29 enum nameBase = "puppeteer.communicator"; 30 31 return nameBase ~ to!string(id); 32 } 33 34 @property 35 private Tid workerTid() 36 { 37 return locate(workerTidName); 38 } 39 40 /* End of ugly fix */ 41 42 @property 43 public bool isCommunicationOngoing() 44 { 45 return workerTid != Tid.init; 46 } 47 48 protected Puppeteer!VarMonitorTypes connectedPuppeteer; 49 50 this() 51 { 52 id = next_id++; 53 } 54 55 public bool startCommunication(shared Puppeteer!VarMonitorTypes puppeteer, string devFilename, BaudRate baudRate, Parity parity, string logFilename) 56 { 57 enforce!CommunicationException(!isCommunicationOngoing); 58 59 connectedPuppeteer = puppeteer; 60 61 auto workerTid = spawn(&communicationLoop, devFilename, baudRate, parity, logFilename); 62 register(workerTidName, workerTid); 63 64 auto msg = receiveOnly!CommunicationEstablishedMessage(); 65 66 if(!msg.success) 67 connectedPuppeteer = null; 68 69 return msg.success; 70 } 71 72 public void endCommunication() 73 { 74 enforceCommunicationOngoing(); 75 76 workerTid.send(EndCommunicationMessage()); 77 receiveOnly!CommunicationEndedMessage(); 78 79 connectedPuppeteer = null; 80 } 81 82 public void changeAIMonitorStatus(ubyte pin, bool shouldMonitor) 83 { 84 enforceCommunicationOngoing(); 85 workerTid.send(PinMonitorMessage(shouldMonitor ? 86 PinMonitorMessage.Action.start : 87 PinMonitorMessage.Action.stop, pin)); 88 } 89 90 mixin(unrollChangeVarMonitorStatusMethods!VarMonitorTypes); 91 public void changeVarMonitorStatus(VarType)(ubyte varIndex, bool shouldMonitor) 92 { 93 enforceCommunicationOngoing(); 94 workerTid.send(VarMonitorMessage(shouldMonitor ? 95 VarMonitorMessage.Action.start : 96 VarMonitorMessage.Action.stop, 97 varIndex, 98 varMonitorTypeCode!VarType)); 99 } 100 101 public void setPWMValue(ubyte pin, ubyte pwmValue) 102 { 103 enforceCommunicationOngoing(); 104 workerTid.send(SetPWMMessage(pin, pwmValue)); 105 } 106 107 private void communicationLoop(string fileName, immutable BaudRate baudRate, immutable Parity parity, string logFilename) 108 { 109 enum receiveTimeoutMs = 10; 110 enum bytesReadAtOnce = 1; 111 112 enum ubyte commandControlByte = 0xff; 113 114 bool shouldContinue = true; 115 116 ISerialPort arduinoSerialPort; 117 IPuppeteerLogger logger; 118 scope(exit) destroy(logger); 119 120 void handlePinMonitorMessage(PinMonitorMessage msg) 121 { 122 void sendStartMonitoringPinCmd(ISerialPort serialPort, ubyte pin) 123 { 124 debug writeln("Sending startMonitoringCommand for pin "~to!string(pin)); 125 serialPort.write([commandControlByte, 0x01, pin]); 126 } 127 128 void sendStopMonitoringPinCmd(ISerialPort serialPort, ubyte pin) 129 { 130 debug writeln("Sending stopMonitoringCommand for pin "~to!string(pin)); 131 serialPort.write([commandControlByte, 0x02, pin]); 132 } 133 134 final switch(msg.action) with (PinMonitorMessage.Action) 135 { 136 case start: 137 sendStartMonitoringPinCmd(arduinoSerialPort, msg.pin); 138 break; 139 140 case stop: 141 sendStopMonitoringPinCmd(arduinoSerialPort, msg.pin); 142 break; 143 } 144 } 145 146 void handleEndCommunicationMessage(EndCommunicationMessage msg) 147 { 148 shouldContinue = false; 149 } 150 151 void handleVarMonitorMessage(VarMonitorMessage msg) 152 { 153 void sendStartMonitoringVariableCmd(ISerialPort serialPort, VarMonitorTypeCode typeCode, byte varIndex) 154 { 155 debug writeln("Sending startMonitoringVariableCommand for type ", typeCode, " and index ", varIndex); 156 serialPort.write([commandControlByte, 0x05, typeCode, varIndex]); 157 } 158 159 void sendStopMonitoringVariableCmd(ISerialPort serialPort, VarMonitorTypeCode typeCode, byte varIndex) 160 { 161 debug writeln("Sending stopMonitoringVariableCommand for type ", typeCode, " and index ", varIndex); 162 serialPort.write([commandControlByte, 0x06, typeCode, varIndex]); 163 } 164 165 final switch(msg.action) with (VarMonitorMessage.Action) 166 { 167 case start: 168 sendStartMonitoringVariableCmd(arduinoSerialPort, msg.varTypeCode, msg.varIndex); 169 break; 170 171 case stop: 172 sendStopMonitoringVariableCmd(arduinoSerialPort, msg.varTypeCode, msg.varIndex); 173 break; 174 } 175 } 176 177 void handleSetPWMMessage(SetPWMMessage msg) 178 { 179 void sendSetPWMCmd(ISerialPort serialPort, ubyte pin, ubyte value) 180 { 181 debug writeln("Sending setPWMCommand for pin "~to!string(pin)~" and value "~to!string(value)); 182 serialPort.write([commandControlByte, 0x04, pin, value]); 183 } 184 185 sendSetPWMCmd(arduinoSerialPort, msg.pin, msg.value); 186 } 187 188 StopWatch timer; 189 190 void handleReadBytes(ubyte[] readBytes) 191 in 192 { 193 assert(readBytes !is null); 194 assert(readBytes.length != 0); 195 } 196 body 197 { 198 enum ReadCommands : ubyte 199 { 200 analogMonitor = 0x1, 201 varMonitor = 0x2, 202 error = 0xfe 203 } 204 205 void handleAnalogMonitorCommand(ubyte[] command) 206 in 207 { 208 assert(command !is null); 209 assert(command.length == 4); 210 assert(command[0] == ReadCommands.analogMonitor); 211 } 212 body 213 { 214 long readMilliseconds = timer.peek().msecs; 215 216 debug(2) writeln("Handling analogMonitorCommand ", command); 217 218 ubyte pin = command[1]; 219 220 enum arduinoAnalogReadMax = 1023; 221 enum arduinoAnalogReference = 5; 222 enum ubytePossibleValues = 256; 223 224 ushort encodedValue = command[2] * ubytePossibleValues + command[3]; 225 float readValue = arduinoAnalogReference * to!float(encodedValue) / arduinoAnalogReadMax; 226 227 connectedPuppeteer.emitAIRead(pin, readValue, readMilliseconds); 228 } 229 230 void handleVarMonitorCommand(ubyte[] command) 231 in 232 { 233 assert(command !is null); 234 assert(command.length == 5); 235 assert(command[0] == ReadCommands.varMonitor); 236 } 237 body 238 { 239 long readMilliseconds = timer.peek().msecs; 240 debug(2) writeln("Handling varMonitorCommand ", command); 241 242 void handleData(VarType)(ubyte varIndex, ubyte[] data) 243 if(connectedPuppeteer.canMonitor!VarType) 244 { 245 VarType decodeData(VarType : short)(ubyte[] data) pure 246 { 247 enum ubytePossibleValues = 256; 248 return to!short(data[0] * ubytePossibleValues + data[1]); 249 } 250 251 auto receivedData = decodeData!VarType(data); 252 253 connectedPuppeteer.emitVarRead!VarType(varIndex, receivedData, timer.peek().msecs); 254 } 255 256 void delegate (ubyte, ubyte[]) selectDelegate(VarMonitorTypeCode typeCode) 257 { 258 string generateSwitch() 259 { 260 string str = "switch (typeCode) with (VarMonitorTypeCode) {"; 261 262 foreach(varType; VarMonitorTypes) 263 { 264 str ~= "case " ~ to!string(varMonitorTypeCode!varType) ~ ": return &handleData!" ~ varType.stringof ~ ";"; 265 } 266 267 str ~= "default: return null; }" ; 268 269 return str; 270 } 271 272 mixin(generateSwitch()); 273 } 274 275 try 276 { 277 auto handler = selectDelegate(to!VarMonitorTypeCode(command[1])); 278 if(handler) 279 handler(command[2], command[3..$]); 280 }catch(ConvException e) 281 { 282 writeln("Received invalid varMonitor type: ",e); 283 } 284 } 285 286 static ubyte[] readBuffer = []; 287 288 readBuffer ~= readBytes; 289 290 void popReadBuffer(size_t elements = 1) 291 { 292 readBuffer = readBuffer.length >= elements ? readBuffer[elements..$] : []; 293 } 294 295 if(readBuffer[0] != commandControlByte) 296 { 297 debug writeln("Received corrupt command. Discarding first byte and returning"); 298 popReadBuffer(); 299 return; 300 } 301 302 if(readBuffer.length < 2) 303 return; 304 305 //Try to make sense out of the readBytes 306 switch(readBuffer[1]) 307 { 308 case ReadCommands.analogMonitor: 309 if(readBuffer.length < 5) 310 return; 311 312 handleAnalogMonitorCommand(readBuffer[1..5]); 313 popReadBuffer(5); 314 break; 315 316 case ReadCommands.varMonitor: 317 if(readBuffer.length < 6) 318 return; 319 320 handleVarMonitorCommand(readBuffer[1..6]); 321 popReadBuffer(6); 322 break; 323 324 case ReadCommands.error: 325 writeln("Error received!"); 326 break; 327 328 default: 329 writeln("Unhandled ubyte command received: ", readBuffer[0], ". Cleaning command buffer."); 330 readBuffer = []; 331 } 332 } 333 334 enum portReadTimeoutMs = 200; 335 arduinoSerialPort = ISerialPort.getInstance(fileName, parity, baudRate, portReadTimeoutMs); 336 if(!arduinoSerialPort.open()) 337 { 338 ownerTid.send(CommunicationEstablishedMessage(false)); 339 return; 340 } 341 342 //Some puppets seems to need some time between port opening and communication start 343 Thread.sleep(dur!"seconds"(1)); 344 345 enum ubyte[] puppeteerReadyCommand = [0x0, 0x0]; 346 arduinoSerialPort.write([commandControlByte] ~ puppeteerReadyCommand); 347 348 //Wait for the puppet to answer it is ready 349 { 350 enum ubyte[] puppetReadyCommand = [0x0, 0x0]; 351 ubyte[] cache = []; 352 enum msBetweenChecks = 100; 353 354 int readCounter = 0; 355 enum readsUntilFailure = 30; 356 357 while(true) 358 { 359 ubyte[] readBytes = arduinoSerialPort.read(1); 360 361 if(readBytes !is null) 362 { 363 cache ~= readBytes; 364 debug writeln("handshake cache is currently ", cache); 365 366 if(cache.length == 3) 367 { 368 if(cache == [commandControlByte] ~ puppetReadyCommand) 369 break; //Ready! 370 else 371 cache = cache[1..$]; //pop front and continue 372 } 373 } 374 375 if(++readCounter > readsUntilFailure) 376 { 377 ownerTid.send(CommunicationEstablishedMessage(false)); 378 return; 379 } 380 381 Thread.sleep(dur!"msecs"(msBetweenChecks)); 382 } 383 } 384 385 ownerTid.send(CommunicationEstablishedMessage(true)); 386 timer.start(); 387 388 do 389 { 390 ubyte[] readBytes = arduinoSerialPort.read(bytesReadAtOnce); 391 392 if(readBytes !is null) 393 { 394 debug(3) writeln("Read bytes ", readBytes); 395 handleReadBytes(readBytes); 396 } 397 398 receiveTimeout(msecs(receiveTimeoutMs), &handleEndCommunicationMessage, 399 &handlePinMonitorMessage, 400 &handleVarMonitorMessage, 401 &handleSetPWMMessage); 402 403 }while(shouldContinue); 404 405 void sendPuppeteerClosedCmd(ISerialPort serialPort) 406 { 407 debug writeln("Sending puppeteerClosedCommand"); 408 serialPort.write([commandControlByte, 0x99]); 409 } 410 sendPuppeteerClosedCmd(arduinoSerialPort); 411 412 arduinoSerialPort.close(); 413 414 ownerTid.send(CommunicationEndedMessage()); 415 } 416 }