1 module puppeteer.communication.communicator;
2 
3 import puppeteer.puppeteer;
4 import puppeteer.var_monitor_utils;
5 
6 import puppeteer.logging.ipuppeteer_logger;
7 
8 import puppeteer.communication.icommunicator;
9 import puppeteer.communication.communicator_messages;
10 import puppeteer.communication.communication_exception;
11 
12 import std.stdio;
13 import std.exception;
14 import std.concurrency;
15 import std.datetime;
16 import std.conv;
17 
18 import core.thread;
19 import core.atomic;
20 
21 shared class Communicator(VarMonitorTypes...) : ICommunicator!VarMonitorTypes
22 {
23     static int next_id = 0;
24     private int id;
25 
26     @property
27     private string workerTidName()
28     {
29         enum nameBase = "puppeteer.communicator";
30 
31         return nameBase ~ to!string(id);
32     }
33 
34     @property
35     private Tid workerTid()
36     {
37         return locate(workerTidName);
38     }
39 
40     /* End of ugly fix */
41 
42     @property
43     public bool isCommunicationOngoing()
44     {
45         return workerTid != Tid.init;
46     }
47 
48     protected Puppeteer!VarMonitorTypes connectedPuppeteer;
49 
50     this()
51     {
52         id = next_id++;
53     }
54 
55     public bool startCommunication(shared Puppeteer!VarMonitorTypes puppeteer, string devFilename, BaudRate baudRate, Parity parity, string logFilename)
56     {
57         enforce!CommunicationException(!isCommunicationOngoing);
58 
59         connectedPuppeteer = puppeteer;
60 
61         auto workerTid = spawn(&communicationLoop, devFilename, baudRate, parity, logFilename);
62         register(workerTidName, workerTid);
63 
64         auto msg = receiveOnly!CommunicationEstablishedMessage();
65 
66         if(!msg.success)
67             connectedPuppeteer = null;
68 
69         return msg.success;
70     }
71 
72     public void endCommunication()
73     {
74         enforceCommunicationOngoing();
75 
76         workerTid.send(EndCommunicationMessage());
77         receiveOnly!CommunicationEndedMessage();
78 
79         connectedPuppeteer = null;
80     }
81 
82     public void changeAIMonitorStatus(ubyte pin, bool shouldMonitor)
83     {
84         enforceCommunicationOngoing();
85         workerTid.send(PinMonitorMessage(shouldMonitor ?
86                                             PinMonitorMessage.Action.start :
87                                             PinMonitorMessage.Action.stop, pin));
88     }
89 
90     mixin(unrollChangeVarMonitorStatusMethods!VarMonitorTypes);
91     public void changeVarMonitorStatus(VarType)(ubyte varIndex, bool shouldMonitor)
92     {
93         enforceCommunicationOngoing();
94         workerTid.send(VarMonitorMessage(shouldMonitor ?
95                                             VarMonitorMessage.Action.start :
96                                             VarMonitorMessage.Action.stop,
97                                             varIndex,
98                                             varMonitorTypeCode!VarType));
99     }
100 
101     public void setPWMValue(ubyte pin, ubyte pwmValue)
102     {
103         enforceCommunicationOngoing();
104         workerTid.send(SetPWMMessage(pin, pwmValue));
105     }
106 
107     private void communicationLoop(string fileName, immutable BaudRate baudRate, immutable Parity parity, string logFilename)
108     {
109         enum receiveTimeoutMs = 10;
110         enum bytesReadAtOnce = 1;
111 
112         enum ubyte commandControlByte = 0xff;
113 
114         bool shouldContinue = true;
115 
116         ISerialPort arduinoSerialPort;
117         IPuppeteerLogger logger;
118         scope(exit) destroy(logger);
119 
120         void handlePinMonitorMessage(PinMonitorMessage msg)
121         {
122             void sendStartMonitoringPinCmd(ISerialPort serialPort, ubyte pin)
123             {
124                 debug writeln("Sending startMonitoringCommand for pin "~to!string(pin));
125                 serialPort.write([commandControlByte, 0x01, pin]);
126             }
127 
128             void sendStopMonitoringPinCmd(ISerialPort serialPort, ubyte pin)
129             {
130                 debug writeln("Sending stopMonitoringCommand for pin "~to!string(pin));
131                 serialPort.write([commandControlByte, 0x02, pin]);
132             }
133 
134             final switch(msg.action) with (PinMonitorMessage.Action)
135             {
136                 case start:
137                     sendStartMonitoringPinCmd(arduinoSerialPort, msg.pin);
138                     break;
139 
140                 case stop:
141                     sendStopMonitoringPinCmd(arduinoSerialPort, msg.pin);
142                     break;
143             }
144         }
145 
146         void handleEndCommunicationMessage(EndCommunicationMessage msg)
147         {
148             shouldContinue = false;
149         }
150 
151         void handleVarMonitorMessage(VarMonitorMessage msg)
152         {
153             void sendStartMonitoringVariableCmd(ISerialPort serialPort, VarMonitorTypeCode typeCode, byte varIndex)
154             {
155                 debug writeln("Sending startMonitoringVariableCommand for type ", typeCode, " and index ", varIndex);
156                 serialPort.write([commandControlByte, 0x05, typeCode, varIndex]);
157             }
158 
159             void sendStopMonitoringVariableCmd(ISerialPort serialPort, VarMonitorTypeCode typeCode, byte varIndex)
160             {
161                 debug writeln("Sending stopMonitoringVariableCommand for type ", typeCode, " and index ", varIndex);
162                 serialPort.write([commandControlByte, 0x06, typeCode, varIndex]);
163             }
164 
165             final switch(msg.action) with (VarMonitorMessage.Action)
166             {
167                 case start:
168                     sendStartMonitoringVariableCmd(arduinoSerialPort, msg.varTypeCode, msg.varIndex);
169                     break;
170 
171                 case stop:
172                     sendStopMonitoringVariableCmd(arduinoSerialPort, msg.varTypeCode, msg.varIndex);
173                     break;
174             }
175         }
176 
177         void handleSetPWMMessage(SetPWMMessage msg)
178         {
179             void sendSetPWMCmd(ISerialPort serialPort, ubyte pin, ubyte value)
180             {
181                 debug writeln("Sending setPWMCommand for pin "~to!string(pin)~" and value "~to!string(value));
182                 serialPort.write([commandControlByte, 0x04, pin, value]);
183             }
184 
185             sendSetPWMCmd(arduinoSerialPort, msg.pin, msg.value);
186         }
187 
188         StopWatch timer;
189 
190         void handleReadBytes(ubyte[] readBytes)
191         in
192         {
193             assert(readBytes !is null);
194             assert(readBytes.length != 0);
195         }
196         body
197         {
198             enum ReadCommands : ubyte
199             {
200                 analogMonitor = 0x1,
201                 varMonitor = 0x2,
202                 error = 0xfe
203             }
204 
205             void handleAnalogMonitorCommand(ubyte[] command)
206             in
207             {
208                 assert(command !is null);
209                 assert(command.length == 4);
210                 assert(command[0] == ReadCommands.analogMonitor);
211             }
212             body
213             {
214                 long readMilliseconds = timer.peek().msecs;
215 
216                 debug(2) writeln("Handling analogMonitorCommand ", command);
217 
218                 ubyte pin = command[1];
219 
220                 enum arduinoAnalogReadMax = 1023;
221                 enum arduinoAnalogReference = 5;
222                 enum ubytePossibleValues = 256;
223 
224                 ushort encodedValue = command[2] * ubytePossibleValues + command[3];
225                 float readValue =  arduinoAnalogReference * to!float(encodedValue) / arduinoAnalogReadMax;
226 
227                 connectedPuppeteer.emitAIRead(pin, readValue, readMilliseconds);
228             }
229 
230             void handleVarMonitorCommand(ubyte[] command)
231             in
232             {
233                 assert(command !is null);
234                 assert(command.length == 5);
235                 assert(command[0] == ReadCommands.varMonitor);
236             }
237             body
238             {
239                 long readMilliseconds = timer.peek().msecs;
240                 debug(2) writeln("Handling varMonitorCommand ", command);
241 
242                 void handleData(VarType)(ubyte varIndex, ubyte[] data)
243                 if(connectedPuppeteer.canMonitor!VarType)
244                 {
245                     VarType decodeData(VarType : short)(ubyte[] data) pure
246                     {
247                         enum ubytePossibleValues = 256;
248                         return to!short(data[0] * ubytePossibleValues + data[1]);
249                     }
250 
251                     auto receivedData = decodeData!VarType(data);
252 
253                     connectedPuppeteer.emitVarRead!VarType(varIndex, receivedData, timer.peek().msecs);
254                 }
255 
256                 void delegate (ubyte, ubyte[]) selectDelegate(VarMonitorTypeCode typeCode)
257                 {
258                     string generateSwitch()
259                     {
260                         string str = "switch (typeCode) with (VarMonitorTypeCode) {";
261 
262                         foreach(varType; VarMonitorTypes)
263                         {
264                             str ~= "case " ~ to!string(varMonitorTypeCode!varType) ~ ": return &handleData!" ~ varType.stringof ~ ";";
265                         }
266 
267                         str ~= "default: return null; }" ;
268 
269                         return str;
270                     }
271 
272                     mixin(generateSwitch());
273                 }
274 
275                 try
276                 {
277                     auto handler = selectDelegate(to!VarMonitorTypeCode(command[1]));
278                     if(handler)
279                         handler(command[2], command[3..$]);
280                 }catch(ConvException e)
281                 {
282                     writeln("Received invalid varMonitor type: ",e);
283                 }
284             }
285 
286             static ubyte[] readBuffer = [];
287 
288             readBuffer ~= readBytes;
289 
290             void popReadBuffer(size_t elements = 1)
291             {
292                 readBuffer = readBuffer.length >= elements ? readBuffer[elements..$] : [];
293             }
294 
295             if(readBuffer[0] != commandControlByte)
296             {
297                 debug writeln("Received corrupt command. Discarding first byte and returning");
298                 popReadBuffer();
299                 return;
300             }
301 
302             if(readBuffer.length < 2)
303                 return;
304 
305             //Try to make sense out of the readBytes
306             switch(readBuffer[1])
307             {
308                 case ReadCommands.analogMonitor:
309                     if(readBuffer.length < 5)
310                         return;
311 
312                     handleAnalogMonitorCommand(readBuffer[1..5]);
313                     popReadBuffer(5);
314                     break;
315 
316                 case ReadCommands.varMonitor:
317                     if(readBuffer.length < 6)
318                         return;
319 
320                     handleVarMonitorCommand(readBuffer[1..6]);
321                     popReadBuffer(6);
322                     break;
323 
324                 case ReadCommands.error:
325                     writeln("Error received!");
326                     break;
327 
328                 default:
329                     writeln("Unhandled ubyte command received: ", readBuffer[0], ". Cleaning command buffer.");
330                     readBuffer = [];
331             }
332         }
333 
334         enum portReadTimeoutMs = 200;
335         arduinoSerialPort = ISerialPort.getInstance(fileName, parity, baudRate, portReadTimeoutMs);
336         if(!arduinoSerialPort.open())
337         {
338             ownerTid.send(CommunicationEstablishedMessage(false));
339             return;
340         }
341 
342         //Some puppets seems to need some time between port opening and communication start
343         Thread.sleep(dur!"seconds"(1));
344 
345         enum ubyte[] puppeteerReadyCommand = [0x0, 0x0];
346         arduinoSerialPort.write([commandControlByte] ~ puppeteerReadyCommand);
347 
348         //Wait for the puppet to answer it is ready
349         {
350             enum ubyte[] puppetReadyCommand = [0x0, 0x0];
351             ubyte[] cache = [];
352             enum msBetweenChecks = 100;
353 
354             int readCounter = 0;
355             enum readsUntilFailure = 30;
356 
357             while(true)
358             {
359                 ubyte[] readBytes = arduinoSerialPort.read(1);
360 
361                 if(readBytes !is null)
362                 {
363                     cache ~= readBytes;
364                     debug writeln("handshake cache is currently ", cache);
365 
366                     if(cache.length == 3)
367                     {
368                         if(cache == [commandControlByte] ~ puppetReadyCommand)
369                             break; //Ready!
370                         else
371                             cache = cache[1..$]; //pop front and continue
372                     }
373                 }
374 
375                 if(++readCounter > readsUntilFailure)
376                 {
377                     ownerTid.send(CommunicationEstablishedMessage(false));
378                     return;
379                 }
380 
381                 Thread.sleep(dur!"msecs"(msBetweenChecks));
382             }
383         }
384 
385         ownerTid.send(CommunicationEstablishedMessage(true));
386         timer.start();
387 
388         do
389         {
390             ubyte[] readBytes = arduinoSerialPort.read(bytesReadAtOnce);
391 
392             if(readBytes !is null)
393             {
394                 debug(3) writeln("Read bytes ", readBytes);
395                 handleReadBytes(readBytes);
396             }
397 
398             receiveTimeout(msecs(receiveTimeoutMs), &handleEndCommunicationMessage,
399                     &handlePinMonitorMessage,
400                     &handleVarMonitorMessage,
401                     &handleSetPWMMessage);
402 
403         }while(shouldContinue);
404 
405         void sendPuppeteerClosedCmd(ISerialPort serialPort)
406         {
407             debug writeln("Sending puppeteerClosedCommand");
408             serialPort.write([commandControlByte, 0x99]);
409         }
410         sendPuppeteerClosedCmd(arduinoSerialPort);
411 
412         arduinoSerialPort.close();
413 
414         ownerTid.send(CommunicationEndedMessage());
415     }
416 }