Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from machine import SPI, Pin
import utime
import math
import tool.filter
import tool.filterbank

# ==========================================================
# Constants setting
Expand Down Expand Up @@ -37,14 +37,14 @@
LED = led.Led()

# Frequency filter
FILTER = tool.filter.Filter(SAMPLING_HZ)
FILTERBANK = tool.filterbank.Filterbank(SAMPLING_HZ)


# ==========================================================
# Variable setting
# ==========================================================
# Data from A/D
row_data = [[0] * DATA_SIZE, [0] * DATA_SIZE, [0] * DATA_SIZE]
raw_data = [[0] * DATA_SIZE, [0] * DATA_SIZE, [0] * DATA_SIZE]

# Adjusted data
offsetted_data = [[0] * DATA_SIZE, [0] * DATA_SIZE, [0] * DATA_SIZE]
Expand All @@ -56,9 +56,6 @@
# 3-axis adjust value
offset = [0.0, 0.0, 0.0]

# Adujusting time( offset_counter / sampling rate = sec)
offset_counter = DATA_SIZE * 5

# The system is adjusted
is_offsetted = False

Expand Down Expand Up @@ -102,34 +99,35 @@ def get_seismic_intensity_class(scale: float):
def read_acceleration(axis: int):
# Read 3-axis data from acceleration sensor
a = ADC.read(axis)
index = frame % len(row_data[axis])
index = frame % len(raw_data[axis])

# Remove outliers
if 0 < a < 4096 and a != 1024 and a != 2048 and a != 3072:
row_data[axis][index] = a
raw_data[axis][index] = a
else:
row_data[axis][index] = int(offset[axis])
raw_data[axis][index] = int(offset[axis])

# Set offset value if not adjusted.
if is_offsetted == False:
offset[axis] = sum(row_data[axis]) / len(row_data[axis])
offsetted_data[axis][index] = int(row_data[axis][index] - offset[axis])
offset[axis] = raw_data[axis][index]

offsetted_data[axis][index] = int(raw_data[axis][index] - offset[axis]) / 1024 * 981


def set_filtered_data(axis: int):
# Calculate with the latest 5 frames.
offsetted_dataset = [0.0] * 5
# Calculate with the latest 3 frames.
offsetted_dataset = [0.0] * 3
for i in range(len(offsetted_dataset)):
index = (frame - len(offsetted_dataset) + i + 1) % len(row_data[axis])
index = (frame - len(offsetted_dataset) + i + 1) % DATA_SIZE
# G to Gal.
offsetted_dataset[i] = offsetted_data[axis][index] / 1024 * 980
index = frame % len(row_data[axis])
filtered_data[axis][index] = FILTER.exec(offsetted_dataset).pop()
offsetted_dataset[i] = offsetted_data[axis][index]
index = frame % DATA_SIZE
filtered_data[axis][index] = FILTERBANK.exec(axis, offsetted_dataset)


def set_composite_gal():
# Vector composition of 3-axis data
index = frame % len(filtered_data[0])
index = frame % DATA_SIZE
__composite_gal[index] = math.sqrt(
filtered_data[0][index] ** 2 +
filtered_data[1][index] ** 2 +
Expand All @@ -154,7 +152,6 @@ def set_composite_gal():
# LED updated every second
if frame % SAMPLING_HZ == 0:
__gal = sorted(__composite_gal)[-int(SAMPLING_HZ * 0.3)]
print("Gal:", __gal)
if 0 < __gal:
scale = round(2.0 * math.log10(__gal) + 0.94, 2)
sic = get_seismic_intensity_class(scale)
Expand All @@ -174,31 +171,24 @@ def set_composite_gal():
elif frame % (SAMPLING_HZ/4) == 0 and sic < latest_sic and is_offsetted:
# Blink the LED corresponding to the seismic intensity of past earthquakes.
LED.toggle(latest_sic)
elif is_offsetted == False:
# Make the LED wave during zero point correction.
v = int(offset_counter * 100 / (SAMPLING_HZ * 5)) % 10
LED.brink_scale(v, v)

# Initialize the frame after 60 seconds
# Reinitialize the frame after 60 seconds
if SAMPLING_HZ * 60 <= frame:
frame = 0

frame += 1
current_time = utime.time_ns()
sleep_time = 1 / SAMPLING_HZ - (current_time - start_time)/1000000.0
if 0 < sleep_time:
utime.sleep(sleep_time)

if offset_counter <= 0 and is_offsetted == False:
if is_offsetted == False:
is_offsetted = True
print("offset complete!!")
elif 0 < offset_counter:
offset_counter -= 1

if 0 < sleep_time:
utime.sleep(sleep_time)

if ADJUST_START_PIN.value() == Pin.PULL_UP and is_offsetted:
# When the zero point adjust button is pressed, the correction is started.
is_offsetted = False
offset_counter = SAMPLING_HZ * 5
latest_time = utime.time()
latest_sic = 0
print("offset start!!")
print("reset")
136 changes: 11 additions & 125 deletions tool/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,132 +2,18 @@


class Filter():
def __init__(self, sampling_hz):
self.__dt = 1.0 / sampling_hz
self.__f0 = 0.45
self.__f1 = 7.0
self.__f2 = 0.5
self.__f3 = 12.0
self.__f4 = 20.0
self.__f5 = 30.0
self.__h2a = 1.0
self.__h2b = 0.75
self.__h3 = 0.9
self.__h4 = 0.6
self.__h5 = 0.6
self.__g = 1.262
self.__pi = math.pi
def __init__(self, coefs):
self.a = coefs[0]
self.b = coefs[1]
self.output_data = [0,0,0]

def __func_A14(self, hc, fc, input_data):
# A14
omega_c = 2 * self.__pi * fc
a0 = 12 / (self.__dt * self.__dt) + (12 * hc * omega_c) / \
self.__dt + (omega_c * omega_c)
a1 = 10 * (omega_c * omega_c) - 24 / (self.__dt * self.__dt)
a2 = 12 / (self.__dt * self.__dt) - (12 * hc * omega_c) / \
self.__dt + (omega_c * omega_c)
b0 = omega_c * omega_c
b1 = 10 * (omega_c * omega_c)
b2 = omega_c * omega_c
def filter_new_sample(self,input_data):

return self.__func_A15(a0, a1, a2, b0, b1, b2, input_data)
for i in range(2,0,-1):
self.output_data[i] = self.output_data[i-1]

def __func_A15(self, a0, a1, a2, b0, b1, b2, input_data:list[float]):
output_data = []
self.output_data[0] = ( -self.a[1] * self.output_data[1] - self.a[2] * self.output_data[2] + self.b[0] *
input_data[0] + self.b[1] * input_data[1] + self.b[2] * input_data[2] ) / self.a[0]

return self.output_data

# 1つめ
k1 = (b0 * input_data[0]) / a0
output_data.append(k1)

# 2つめ
k2 = (-a1 * output_data[0] + b0 *
input_data[1] + b1 * input_data[0]) / a0
output_data.append(k2)

# 3つめ以降
for k in range(2, len(input_data)):
value = (-a1 * output_data[k-1] - a2 * output_data[k-2] + b0 *
input_data[k] + b1 * input_data[k-1] + b2 * input_data[k-2]) / a0
output_data.append(value)

return output_data

def __filter01(self, input_data):
fa1 = self.__f0
fa2 = self.__f1

# A11
omega_a1 = 2 * self.__pi * fa1
omega_a2 = 2 * self.__pi * fa2

a0 = 8 / (self.__dt * self.__dt) + (4 * omega_a1 + 2 * omega_a2) / \
self.__dt + omega_a1 * omega_a2
a1 = 2 * omega_a1 * omega_a2 - 16 / (self.__dt * self.__dt)
a2 = 8 / (self.__dt * self.__dt) - (4 * omega_a1 + 2 * omega_a2) / \
self.__dt + omega_a1 * omega_a2
b0 = 4 / (self.__dt * self.__dt) + 2 * omega_a2 / self.__dt
b1 = -8 / (self.__dt * self.__dt)
b2 = 4 / (self.__dt * self.__dt) - 2 * omega_a2 / self.__dt

return self.__func_A15(a0, a1, a2, b0, b1, b2, input_data)

def __filter02(self, input_data):
fa3 = self.__f1

# A12
omega_a3 = 2 * self.__pi * fa3
a0 = 16 / (self.__dt * self.__dt) + 17 * omega_a3 / self.__dt + (omega_a3 * omega_a3)
a1 = 2 * omega_a3 * omega_a3 - 32 / (self.__dt * self.__dt)
a2 = 16 / (self.__dt * self.__dt) - 17 * omega_a3 / self.__dt + (omega_a3 * omega_a3)
b0 = 4 / (self.__dt * self.__dt) + 8.5 * omega_a3 / self.__dt + (omega_a3 * omega_a3)
b1 = 2 * omega_a3 * omega_a3 - 8 / (self.__dt * self.__dt)
b2 = 4 / (self.__dt * self.__dt) - 8.5 * omega_a3 / self.__dt + (omega_a3 * omega_a3)

return self.__func_A15(a0, a1, a2, b0, b1, b2, input_data)

def __filter03(self, input_data):
hb1 = self.__h2a
hb2 = self.__h2b
fb = self.__f2

# A13
omega_b = 2 * self.__pi * fb
a0 = 12 / (self.__dt * self.__dt) + (12 * hb2 * omega_b) / \
self.__dt + (omega_b * omega_b)
a1 = 10 * (omega_b * omega_b) - 24 / (self.__dt * self.__dt)
a2 = 12 / (self.__dt * self.__dt) - (12 + hb2 * omega_b) / \
self.__dt + (omega_b * omega_b)
b0 = 12 / (self.__dt * self.__dt) + (12 * hb1 * omega_b) / \
self.__dt + (omega_b * omega_b)
b1 = 10 * (omega_b * omega_b) - 24 / (self.__dt * self.__dt)
b2 = 12 / (self.__dt * self.__dt) - (12 * hb1 * omega_b) / \
self.__dt + (omega_b * omega_b)

return self.__func_A15(a0, a1, a2, b0, b1, b2, input_data)

def __filter04(self, input_data):
hc = self.__h3
fc = self.__f3
return self.__func_A14(hc, fc, input_data)

def __filter05(self, input_data):
hc = self.__h4
fc = self.__f4
return self.__func_A14(hc, fc, input_data)

def __filter06(self, input_data):
hc = self.__h5
fc = self.__f5
return self.__func_A14(hc, fc, input_data)

def __filter07(self, input_data):
output_data = []
gd = self.__g

for k in range(0, len(input_data)):
output_data.append(input_data[k] * gd)

return output_data

def exec(self, input_data: list[float]):
return self.__filter07(self.__filter06(self.__filter05(self.__filter04(self.__filter03(self.__filter02(self.__filter01(input_data)))))))
Loading