#/usr/bin/env python
# -*- coding: utf-8
import tkinter as tk
import numpy as np
import pyaudio
from threading import Thread, Event
from functools import partial
import queue
MEASURE_TIME = 0.1
class ReadLineInAudioThreadClient(object):
def __init__(self, capacitance_ne555, queue):
self.queue = queue
self.run_event = None
self.thread = None
self.capacitance_ne555 = capacitance_ne555
def stop(self):
if self.thread:
self.run_event.set()
self.thread.join()
self.thread = None
def start(self, measure_time):
self.run_event = Event()
self.thread = Thread(target=self.worker_thread, args=[measure_time],
daemon = True)
self.thread.start()
def calibrate_cap(self):
self.calibrate = True
def worker_thread(self, measure_time):
self.calibrate = False if not self.calibrate\
else self.capacitance_ne555.calibrate()
self.capacitance_ne555.start_stream()
while not self.run_event.is_set():
self.queue.put((self.capacitance_ne555.measure()))
self.run_event.wait(measure_time)
class CapacitanceNE555(object):
RA = 10 # kOhm
RB = 50 # kOhm
RESISTANCE = (RA + RB * 2) * 0.69
def __init__(self,
rate = 22050,
frames_per_buffer = 1024):
self.py_audio = pyaudio.PyAudio()
self.stream = None
self.stream_data = None
self.ref_capacity = None
self.ref_frequency = None
self.rate = rate
self.frames_per_buffer = frames_per_buffer
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
def start_stream(self):
self.stream = self.py_audio.open(format= pyaudio.paInt16,
channels = 1,
rate = self.rate,
frames_per_buffer = \
self.frames_per_buffer,
input = True,
stream_callback = self.callback)
def stop_stream(self):
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
def callback(self, in_data, frame_count, time_info, status):
self.stream_data = in_data
return (in_data, pyaudio.paContinue)
def calibrate(self):
self.ref_capacity = None
def measure(self):
if self.stream_data:
np_data = np.frombuffer(self.stream_data,
dtype = np.int16)
def get_capacity(ref_capacity = 0):
frequency = np.abs(np.fft.fftfreq(
np_data.size)[np.argmax(
np.abs(np.fft.fft(
np_data))
**2)]) * self.rate
return (1 / frequency / self.RESISTANCE * 1000000000\
- ref_capacity, frequency) if frequency else (0, 0)
if self.ref_capacity:
capacity, frequency = get_capacity(self.ref_capacity)
return (capacity, frequency, self.ref_capacity,
self.ref_frequency), np_data
else:
self.ref_capacity, self.ref_frequency = get_capacity()
return (0, self.ref_frequency, self.ref_capacity,
self.ref_frequency), np_data
def release(self):
if self.stream:
self.stop_stream()
self.py_audio.terminate()
class CapacitanceUI(tk.LabelFrame):
WIDTH = 300
HEIGHT = 180
def __init__(self,
parent,
capacitance_ne555,
width = WIDTH,
height = HEIGHT,
measure_time = MEASURE_TIME,
signal_strength = 100):
tk.LabelFrame.__init__(self,
parent,
text = "CAPACITANCE",
relief = "solid")
self.queue = queue.Queue()
self.read_line_in_audio_thread_client = \
ReadLineInAudioThreadClient(capacitance_ne555, self.queue)
self.parent = parent
self.width = width
self.height = height
self.measure_time = measure_time
self.signal_strength = signal_strength
self.after_id = None
self.y = 0
self.display_conf = {"cap" : (
self.width / 2,
self.height / 2 - 30,
("pf", "nf"),
"system 27 bold",
"blue",
"",
"cap"),
"freq" : (
self.width / 2,
20,
("Hz", "kHz"),
"system 15 bold",
"green",
"",
"freq"),
"ref_cap" :(
self.width / 2 + 70,
self.height - 10,
("pf", "nf"),
"system 12 bold",
"darkorange",
"REF: ",
"ref_cap"),
"ref_freq" :(
self.width / 2 - 70,
self.height - 10,
("Hz", "kHz"),
"system 12 bold",
"darkorange",
"REF: ",
"ref_freq")}
self.display = tk.Canvas(self, width = width, height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_display("cap", 0)
self.start_button = tk.Button(self, text = "START", width = 7,
command = self.start_stop_measure)
self.start_button.pack(side = tk.LEFT, padx = 30, pady = 5)
self.calibrate_button = tk.Button(self, text = "CALIBRATE", width = 7,
command = self.calibrate)
self.calibrate_button.pack(side = tk.RIGHT, padx = 30, pady = 5)
self.calibrate()
def calibrate(self):
for x in range(self.width):
self.display.delete("freq_line{0}".format(x))
self.start_button.config(state = "disabled")
self.read_line_in_audio_thread_client.calibrate_cap()
self.start_measure()
self.after_id = self.after(int(self.measure_time * 4000),
self.stop_measure)
def update_display(self, tag, value = None):
width, height, units, font, color, info, tag = self.display_conf[tag]
if value or value == 0:
if str(value).isalpha():
text = value
else:
value = "{0:.0f} {1}".format(value, units[0])\
if value < 999 else\
"{0:.3f} {1}".format(value / 1000, units[1])
self.display.delete(tag)
self.display.create_text(width,
height,
text = "{0}{1}".format(info ,value),
font = font,
fill = color,
tag = tag)
else:
self.display.delete(tag)
def update_signal_line(self, points, tag, fill = "navy"):
self.display.delete(tag)
x, y, x_2, y_2 = points
self.display.create_line(x, y, x_2, y_2,
fill = fill,
tag = tag)
def start_stop_measure(self):
self.stop_measure() if self.after_id else self.start_measure()
def start_measure(self):
self.calibrate_button.config(state = "disabled")
self.start_button.config(text = "STOP")
self.read_line_in_audio_thread_client.start(self.measure_time)
self.measure()
def stop_measure(self):
self.after_cancel(self.after_id)
self.after_id = None
self.start_button.config(state = "normal")
self.calibrate_button.config(state = "normal")
self.start_button.config(text = "START")
self.read_line_in_audio_thread_client.stop()
def measure(self):
if self.after_id:
if not self.queue.empty():
while True:
try:
try:
values, stream_data = self.queue.get_nowait()
except TypeError as er:
print(er)
else:
for value, tag in zip(values, ("cap",
"freq",
"ref_cap",
"ref_freq")):
self.update_display(tag, value)
x, y = 0, self.y
for data in stream_data[: self.width]:
data = data * (self.signal_strength / 50000)
self.update_signal_line((x,
self.y
+ self.height - 60,
x + 1,
data
+ self.height - 60),
"freq_line{0}"\
.format(x))
x += 1
self.y = data
except queue.Empty:
break
self.after_id = self.after(int(self.measure_time * 1000),
self.measure)
def release(self):
if self.after_id:
self.stop_measure()
self.parent.destroy()
def main():
root = tk.Tk()
root.title("CAPACITANCE")
root.resizable(0, 0)
with CapacitanceNE555() as capacitance_ne555:
capacitance_ui = CapacitanceUI(root, capacitance_ne555)
capacitance_ui.pack(expand=tk.YES, padx = 5, pady = 5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
#/usr/bin/env python
# -*- coding: utf-8
import tkinter as tk
import numpy as np
import pyaudio
from threading import Thread, Event
from functools import partial
import queue
MEASURE_TIME = 0.1
class ReadLineInAudioThreadClient(object):
def __init__(self, capacitance_ne555, queue):
self.queue = queue
self.run_event = None
self.thread = None
self.capacitance_ne555 = capacitance_ne555
def stop(self):
if self.thread:
self.run_event.set()
self.thread.join()
self.thread = None
def start(self, measure_time):
self.run_event = Event()
self.thread = Thread(target=self.worker_thread, args=[measure_time],
daemon = True)
self.thread.start()
def calibrate_cap(self):
self.calibrate = True
def worker_thread(self, measure_time):
self.calibrate = False if not self.calibrate\
else self.capacitance_ne555.calibrate()
self.capacitance_ne555.start_stream()
while not self.run_event.is_set():
self.queue.put((self.capacitance_ne555.measure()))
self.run_event.wait(measure_time)
class CapacitanceNE555(object):
RA = 10 # kOhm
RB = 50 # kOhm
RESISTANCE = (RA + RB * 2) * 0.69
def __init__(self,
rate = 22050,
frames_per_buffer = 1024):
self.py_audio = pyaudio.PyAudio()
self.stream = None
self.stream_data = None
self.ref_capacity = None
self.ref_frequency = None
self.rate = rate
self.frames_per_buffer = frames_per_buffer
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
def start_stream(self):
self.stream = self.py_audio.open(format= pyaudio.paInt16,
channels = 1,
rate = self.rate,
frames_per_buffer = \
self.frames_per_buffer,
input = True,
stream_callback = self.callback)
def stop_stream(self):
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
def callback(self, in_data, frame_count, time_info, status):
self.stream_data = in_data
return (in_data, pyaudio.paContinue)
def calibrate(self):
self.ref_capacity = None
def measure(self):
if self.stream_data:
np_data = np.frombuffer(self.stream_data,
dtype = np.int16)
def get_capacity(ref_capacity = 0):
frequency = np.abs(np.fft.fftfreq(
np_data.size)[np.argmax(
np.abs(np.fft.fft(
np_data))
**2)]) * self.rate
return (1 / frequency / self.RESISTANCE * 1000000000\
- ref_capacity, frequency) if frequency else (0, 0)
if self.ref_capacity:
capacity, frequency = get_capacity(self.ref_capacity)
return (capacity, frequency, self.ref_capacity,
self.ref_frequency), np_data
else:
self.ref_capacity, self.ref_frequency = get_capacity()
return (0, self.ref_frequency, self.ref_capacity,
self.ref_frequency), np_data
def release(self):
if self.stream:
self.stop_stream()
self.py_audio.terminate()
class CapacitanceUI(tk.LabelFrame):
WIDTH = 300
HEIGHT = 180
def __init__(self,
parent,
capacitance_ne555,
width = WIDTH,
height = HEIGHT,
measure_time = MEASURE_TIME,
signal_strength = 100):
tk.LabelFrame.__init__(self,
parent,
text = "CAPACITANCE",
relief = "solid")
self.queue = queue.Queue()
self.read_line_in_audio_thread_client = \
ReadLineInAudioThreadClient(capacitance_ne555, self.queue)
self.parent = parent
self.width = width
self.height = height
self.measure_time = measure_time
self.signal_strength = signal_strength
self.after_id = None
self.y = 0
self.display_conf = {"cap" : (
self.width / 2,
self.height / 2 - 30,
("pf", "nf"),
"Arial 27 bold",
"blue",
"",
"cap"),
"freq" : (
self.width / 2,
20,
("Hz", "kHz"),
"Arial 15 bold",
"green",
"",
"freq"),
"ref_cap" :(
self.width / 2 + 70,
self.height - 10,
("pf", "nf"),
"Arial 12 bold",
"darkorange",
"REF: ",
"ref_cap"),
"ref_freq" :(
self.width / 2 - 70,
self.height - 10,
("Hz", "kHz"),
"Arial 12 bold",
"darkorange",
"REF: ",
"ref_freq")}
self.display = tk.Canvas(self, width = width, height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_display("cap", 0)
self.start_button = tk.Button(self, text = "START", width = 7,
command = self.start_stop_measure)
self.start_button.pack(side = tk.LEFT, padx = 30, pady = 5)
self.calibrate_button = tk.Button(self, text = "CALIBRATE", width = 9,
command = self.calibrate)
self.calibrate_button.pack(side = tk.RIGHT, padx = 30, pady = 5)
self.calibrate()
def calibrate(self):
for x in range(self.width):
self.display.delete("freq_line{0}".format(x))
self.start_button.config(state = "disabled")
self.read_line_in_audio_thread_client.calibrate_cap()
self.start_measure()
self.after_id = self.after(int(self.measure_time * 4000),
self.stop_measure)
def update_display(self, tag, value = None):
width, height, units, font, color, info, tag = self.display_conf[tag]
if value or value == 0:
if str(value).isalpha():
text = value
else:
value = "{0:.0f} {1}".format(value, units[0])\
if value < 999 else\
"{0:.3f} {1}".format(value / 1000, units[1])
self.display.delete(tag)
self.display.create_text(width,
height,
text = "{0}{1}".format(info ,value),
font = font,
fill = color,
tag = tag)
else:
self.display.delete(tag)
def update_signal_line(self, points, tag, fill = "navy"):
self.display.delete(tag)
x, y, x_2, y_2 = points
self.display.create_line(x, y, x_2, y_2,
fill = fill,
tag = tag)
def start_stop_measure(self):
self.stop_measure() if self.after_id else self.start_measure()
def start_measure(self):
self.calibrate_button.config(state = "disabled")
self.start_button.config(text = "STOP")
self.read_line_in_audio_thread_client.start(self.measure_time)
self.measure()
def stop_measure(self):
self.after_cancel(self.after_id)
self.after_id = None
self.start_button.config(state = "normal")
self.calibrate_button.config(state = "normal")
self.start_button.config(text = "START")
self.read_line_in_audio_thread_client.stop()
def measure(self):
if self.after_id:
if not self.queue.empty():
while True:
try:
try:
values, stream_data = self.queue.get_nowait()
except TypeError as er:
print(er)
else:
for value, tag in zip(values, ("cap",
"freq",
"ref_cap",
"ref_freq")):
self.update_display(tag, value)
x, y = 0, self.y
for data in stream_data[: self.width]:
data = data * (self.signal_strength / 50000)
self.update_signal_line((x,
self.y
+ self.height - 60,
x + 1,
data
+ self.height - 60),
"freq_line{0}"\
.format(x))
x += 1
self.y = data
except queue.Empty:
break
self.after_id = self.after(int(self.measure_time * 1000),
self.measure)
def release(self):
if self.after_id:
self.stop_measure()
self.parent.destroy()
def main():
root = tk.Tk()
root.title("CAPACITANCE")
root.resizable(0, 0)
with CapacitanceNE555() as capacitance_ne555:
capacitance_ui = CapacitanceUI(root, capacitance_ne555)
capacitance_ui.pack(expand=tk.YES, padx = 5, pady = 5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()