#/usr/bin/env python
# -*- coding: utf-8
import tkinter as tk
import serial, serial.tools.list_ports
import time
from threading import Thread, Event
from functools import partial
import queue
WIDTH = 270
HEIGHT = 100
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
UPDATE_INTERVAL = 2000
def __init__(self, parent, serial_thread_client, queue, width, height):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.parent = parent
self.serial_thread_client = serial_thread_client
self.queue = queue
self.width = width
self.height = height
self.device = None
self.update_interval = self.UPDATE_INTERVAL
self.measure_counter = self.MEASURE_COUNTER
self.after_id = None
self.measuring_data = list()
self.display_conf = {"cap" : (
self.width / 2,
self.height / 2,
"{0:2.2f} uf",
"system 18 bold",
"blue",
"cap"),
"counter" : (
40,
self.height - 10,
"COUNTS: {0}",
"system 7 bold",
"green",
"counter"),
"error" : (
self.width / 2,
10,
"{0}",
"system 5 bold",
"red",
"error"),
"wait" :(
self.width / 2,
self.height / 2,
"{0}",
"system 18 bold",
"magenta",
"cap")}
self.menubar = tk.Menu()
parent.config(menu=self.menubar)
self.menubar.add_cascade(label="PORTS", menu = self.get_ports(
self.menubar))
self.display = tk.Canvas(self, width = width, height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display(0, "cap")
self.update_cap_display(self.measure_counter, "counter")
button_frame = tk.Frame(self)
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
self.buttons[0].config(state = "disabled")
def measure_start(self):
self.menubar.entryconfig("PORTS", state = "disabled")
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.measuring_data = list()
self.serial_thread_client.start(self.device)
self.measure()
def get_ports(self, menubar):
port_menu = tk.Menu(menubar, tearoff = 0)
for serial_ports in serial.tools.list_ports.comports():
device, desc, hwid = serial_ports
port_menu.add_command(label = "{0}:{1}:{2}".format(
device, desc, hwid), command = partial(
self.set_device, device))
return port_menu
def set_device(self, device):
self.buttons[0].config(state = "normal")
self.device = device
def measure_stop(self):
self.menubar.entryconfig("PORTS", state = "normal")
for button in self.buttons:
button.config(state = "active")
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(self.measure_counter, "counter")
self.serial_thread_client.stop()
if self.after_id:
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(self.measure_counter, "counter")
def update_cap_display(self, text, tag):
width, height, text_conf, font, color, tag = self.display_conf[tag]
self.display.delete(tag)
self.display.create_text(width, height, text = text_conf.format(text),
font = font, fill = color,tag = tag)
def measure(self):
measure_time = 1
if not self.queue.empty():
while True:
try:
capacity, measure_time, error = self.queue.get_nowait()
except queue.Empty:
break
if error:
self.update_cap_display(capacity, "error")
self.measure_stop()
else:
self.measuring_data.append(capacity)
self.update_cap_display(capacity, "cap")
self.update_cap_display(self.measure_counter, "counter")
self.measure_counter -= 1
else:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", "wait")
if self.measure_counter > 0:
self.after_id = self.after(int(self.update_interval
* measure_time if isinstance(measure_time, float)
else 1), self.measure)
else:
if len(self.measuring_data) > 1:
self.update_cap_display(sum(self.measuring_data)\
/ len(self.measuring_data), "cap")
self.measure_stop()
else:
self.measure_stop()
def release(self):
if self.after_id:
self.measure_stop()
self.parent.destroy()
class SerialThreadedClient(object):
CALIBRATION = 150
def __init__(self, queue):
self.queue = queue
self.run_event = None
self.thread = None
def __enter__(self):
return self
def __exit__(self, *args):
self.stop()
def stop(self):
if self.thread:
self.run_event.set()
self.thread.join()
self.thread = None
def start(self, device):
self.run_event = Event()
self.thread = Thread(target=self.worker_thread, args=[device],
daemon = True)
self.thread.start()
def worker_thread(self, device):
measure_time = 0
while not self.run_event.is_set():
try:
with serial.Serial(device) as ser:
start_time = time.time()
ser.setRTS(True)
while not ser.getDSR():
measure_time = time.time() - start_time
ser.setRTS(False)
self.queue.put((measure_time * 1000 - self.CALIBRATION\
* measure_time, measure_time, False))\
if measure_time > 0 else\
self.queue.put(("capacity < 1 µf or not connected",
False, True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, False, True))
self.run_event.wait(measure_time * 3)
def main(queue):
root = tk.Tk()
root.title("CAPACITANCE")
root.resizable(0, 0)
queue = queue.Queue()
with SerialThreadedClient(queue) as serial_thread_client:
capacitance_ui = CapacitanceUI(root, serial_thread_client, queue,
WIDTH, HEIGHT)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main(queue)