Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/formatting.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: formatting action
on:
# Trigger the workflow on push or pull request,
# but only for the main branch
push:
branches:
- main
pull_request:
branches:
- main
jobs:
linter_name:
name: formatting
runs-on: ubuntu-latest
steps:
- uses: SuffolkLITLab/ALActions/black-formatting@main
# - uses: SuffolkLITLab/ALActions/docsig@main
2 changes: 1 addition & 1 deletion docassemble/InterviewStats/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.5.3'
__version__ = "0.5.3"
43 changes: 24 additions & 19 deletions docassemble/InterviewStats/cache_geography.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def download_file(url: str, local_file: str) -> str:
Download a file from an arbitrary URL to a local file
"""
# https://stackoverflow.com/a/16696317
log('Downloading {} to {}'.format(url, local_file))
log("Downloading {} to {}".format(url, local_file))
with requests.get(url, stream=True) as r:
r.raise_for_status()
with local_file.open(mode='wb') as f:
with local_file.open(mode="wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return local_file
Expand All @@ -31,26 +31,31 @@ def get_fips_code(state_abbrev: str) -> str:
Gets the FIPS (Federal Information Processing Standards) code for a state
abbreviation, like 'MA'.
"""
if state_abbrev.lower() == 'us':
return 'us'
states = cenpy.explorer.fips_table('STATE')
return states[states['State Abbreviation'] == state_abbrev.upper()].iat[0, 1]
if state_abbrev.lower() == "us":
return "us"
states = cenpy.explorer.fips_table("STATE")
return states[states["State Abbreviation"] == state_abbrev.upper()].iat[0, 1]


base_url = 'https://www2.census.gov/geo/tiger/GENZ2019/shp/'
base_url = "https://www2.census.gov/geo/tiger/GENZ2019/shp/"


def get_boundary_file(state_abbrev: str, layer_type: str, resolution: str='500k') -> str:
return 'cb_2019_{}_{}_{}.zip'.format(
get_fips_code(state_abbrev), layer_type, resolution)
def get_boundary_file(
state_abbrev: str, layer_type: str, resolution: str = "500k"
) -> str:
return "cb_2019_{}_{}_{}.zip".format(
get_fips_code(state_abbrev), layer_type, resolution
)


def saved_dir() -> Path:
cdir = Path(__file__).resolve().parent
return cdir.joinpath('data/sources')
return cdir.joinpath("data/sources")


def download_shapes(state_abbrev: str, layer_type: str, resolution: str='500k') -> bool:
def download_shapes(
state_abbrev: str, layer_type: str, resolution: str = "500k"
) -> bool:
"""
See https://www2.census.gov/geo/tiger/GENZ2019/2019_file_name_def.pdf?#
for layer type (entity name)
Expand All @@ -64,24 +69,24 @@ def download_shapes(state_abbrev: str, layer_type: str, resolution: str='500k')


def get_zips() -> gpd.GeoDataFrame:
full_file_path = saved_dir().joinpath(get_boundary_file('us', 'zcta510'))
full_file_path = saved_dir().joinpath(get_boundary_file("us", "zcta510"))
if not full_file_path.exists():
log("{} doesn't exist, downloading".format(full_file_path))
download_shapes('us', 'zcta510')
download_shapes("us", "zcta510")
# TODO(brycew): consider a bounding box: it's 2x as fast with one, but
# trying to read bounding boxes from a different shp file is slower
return gpd.read_file('zip://' + str(full_file_path))
return gpd.read_file("zip://" + str(full_file_path))


def get_tracts(state_abbrevs):
# TODO(brycew): finish
all_state_path = saved_dir().joinpath(get_boundary_file('us', 'state', '20m'))
all_state_path = saved_dir().joinpath(get_boundary_file("us", "state", "20m"))
# if not Path(all_state_path).exists():
# download_shapes('us', 'state')
full_file_path = saved_dir().joinpath(get_boundary_file('us', 'tract'))
full_file_path = saved_dir().joinpath(get_boundary_file("us", "tract"))
if not full_file_path.exists():
download_shapes('us', 'tract')
#state_shapes = gpd.read_file('zip://' + all_state_path)
download_shapes("us", "tract")
# state_shapes = gpd.read_file('zip://' + all_state_path)
# bounds = tuple(reduce(lambda l1, l2: np.minimum(l1, l2),
# state_shapes['geometry'].bounds[['minx', 'miny']].values)) + \
# tuple(reduce(lambda l1, l2: np.maximum(l1, l2),
Expand Down
232 changes: 131 additions & 101 deletions docassemble/InterviewStats/data_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

from .snapshot_statistics import *

__all__ = ['get_column_values_list',
'get_col_val_count',
'processing_data',
'save_random_records',
'phrase']
__all__ = [
"get_column_values_list",
"get_col_val_count",
"processing_data",
"save_random_records",
"phrase",
]


def processing_data(raw_data=None, headings=None):
"""
Expand All @@ -30,120 +33,147 @@ def processing_data(raw_data=None, headings=None):
# 1. Build filtered field-type lists
# 1.1 Get field-types column from raw_data
field_type_full_dict = {}
field_type_full_dict = raw_data[0]['field_type_list']
field_type_full_dict = raw_data[0]["field_type_list"]
records_count = len(raw_data)

# 1.2. Identify multi-valued type fields for graph, this is the list of original question variable names, not the flattened column headings in the database.
multi_choices_fld_list = []
for k, v in field_type_full_dict.items():
if v in ['checkboxes', 'multiselect']:
multi_choices_fld_list.append(k)
if v in ["checkboxes", "multiselect"]:
multi_choices_fld_list.append(k)

# 1.2.1 Set multi-valued headings to the database column headings (un-flatten them)
multi_choices_heading_dict = {}
for fld in multi_choices_fld_list:
multi_choices_heading_dict[fld] = [v for v in headings if fld in v] #{fld: [fld_subkeys]}

multi_choices_heading_dict = {}
for fld in multi_choices_fld_list:
multi_choices_heading_dict[fld] = [
v for v in headings if fld in v
] # {fld: [fld_subkeys]}

# 1.3. Set single value field list - remove text/numberic fields and multi-valued fields from field_type_full_list
single_value_fld_list = list()
for k, v in field_type_full_dict.items():
if v not in ['text', 'area', 'number', 'integer', 'currency'] and k not in multi_choices_fld_list:
single_value_fld_list.append(k)

if (
v not in ["text", "area", "number", "integer", "currency"]
and k not in multi_choices_fld_list
):
single_value_fld_list.append(k)

# 2. Build data dictionaries for tabular data arrays
answer_counts_multi = dict()
possible_answers_multi = list() # Question's possible answers
answer_counts_single = dict()
possible_answers_single = list() # Question's possible answers
answer_counts_multi = dict()
possible_answers_multi = list() # Question's possible answers
answer_counts_single = dict()
possible_answers_single = list() # Question's possible answers

# 2.1 Build data for multi-valued questions
for fld in multi_choices_fld_list:
# Merge flattened columns into multi_sub_dict
# subkeys are "choices" names of the question variable
sub_dict = dict()
for row, subkeys in multi_choices_heading_dict.items():
if row == fld:
for k in subkeys:
# Remove prefix portion from each subkey
original_k = k.replace(fld + "_", '')
# Get answer data for each subkey
col = get_column_values_list(raw_data, k)
# Get answer's count and % for each subkey
sub_dict[original_k] = get_col_val_count(records_count, column = col)

# Save answer's "count / %" for fld
answer_counts_multi[fld] = {k: v for k, v in sub_dict.items()}
# Possible_answers_single will be used as index in the table display
possible_answers_multi.append([k for k in sub_dict.keys()])
for fld in multi_choices_fld_list:
# Merge flattened columns into multi_sub_dict
# subkeys are "choices" names of the question variable
sub_dict = dict()
for row, subkeys in multi_choices_heading_dict.items():
if row == fld:
for k in subkeys:
# Remove prefix portion from each subkey
original_k = k.replace(fld + "_", "")
# Get answer data for each subkey
col = get_column_values_list(raw_data, k)
# Get answer's count and % for each subkey
sub_dict[original_k] = get_col_val_count(records_count, column=col)

# Save answer's "count / %" for fld
answer_counts_multi[fld] = {k: v for k, v in sub_dict.items()}
# Possible_answers_single will be used as index in the table display
possible_answers_multi.append([k for k in sub_dict.keys()])

# 2.2 Build data for single value questions
for fld in single_value_fld_list:
# Get list of unique values for fld
unique_col = get_column_values(raw_data, fld)
# Get list of complete data for fld
col = get_column_values_list(raw_data, fld)
# Save answer's "count / %" for fld
answer_counts_single[fld] = get_col_val_count(records_count, labels=unique_col, column = col)
# Possible_answers_single will be used as index in the table display
possible_answers_single.append([t for t in unique_col])

# Get list of unique values for fld
unique_col = get_column_values(raw_data, fld)
# Get list of complete data for fld
col = get_column_values_list(raw_data, fld)
# Save answer's "count / %" for fld
answer_counts_single[fld] = get_col_val_count(
records_count, labels=unique_col, column=col
)
# Possible_answers_single will be used as index in the table display
possible_answers_single.append([t for t in unique_col])

# 3. Return results - separate multi-valued fields from single-valued fields
return [answer_counts_multi, possible_answers_multi, answer_counts_single, possible_answers_single]

def get_column_values_list(records, column) -> list:
return [
answer_counts_multi,
possible_answers_multi,
answer_counts_single,
possible_answers_single,
]


def get_column_values_list(records, column) -> list:
if not records or not column:
return []
return [record.get(column) for record in records]

def get_col_val_count(records_count=None, label=None, labels=None, column=None):
if labels: # Single-valued fields
results = dict()
for val in labels:
cnt = column.count(val)
pct = "%.1f%%" % round(cnt*100/records_count, 1)
results[val] = f'{cnt} / {pct}'

else: # Multi-choices fields, count True for each label/subkey
cnt = 0
for item in column:
if item:
cnt += 1
pct = "%.1f%%" % round(cnt*100/records_count, 1)
results = f'{cnt} / {pct}'


def get_col_val_count(records_count=None, label=None, labels=None, column=None):
if labels: # Single-valued fields
results = dict()
for val in labels:
cnt = column.count(val)
pct = "%.1f%%" % round(cnt * 100 / records_count, 1)
results[val] = f"{cnt} / {pct}"

else: # Multi-choices fields, count True for each label/subkey
cnt = 0
for item in column:
if item:
cnt += 1
pct = "%.1f%%" % round(cnt * 100 / records_count, 1)
results = f"{cnt} / {pct}"
return results

def save_random_records(number_of_records, title = '', seed_data_list = None, date_input=None, tags=None):

def save_random_records(
number_of_records, title="", seed_data_list=None, date_input=None, tags=None
):
"""
Generate random survey type records for testing data reports.
"""
for index in range(number_of_records):
type_dict = dict()
field_dict = dict()
for k, v in seed_data_list.items():
type_dict[k] = list(v)[1]
field_dict[k] = list(v)[0] # Field name without quotes

data_to_save = dict()
data_to_save['title'] = title
data_to_save['field_type_list'] = type_dict

for k, v in type_dict.items():
# If a field is of checkboxes type, flatten its subkeys
# so that each subkey/value pair is saved in its own column.
if v in ['checkboxes', 'multiselect']:
for label in field_dict[k]:
data_to_save[k + '_' + label] = (random.choice(field_dict[k]) == label)
else:
data_to_save[k] = random.choice(field_dict[k])

record_date = random.choice(date_input)

filename = get_current_info().get('yaml_filename', None)
random_uid = random_alphanumeric(32)
new_entry = JsonStorage(filename=filename, key=random_uid, data=data_to_save, tags=tags, modtime=record_date, persistent=False)
JsonDb.add(new_entry)
"""
for index in range(number_of_records):
type_dict = dict()
field_dict = dict()
for k, v in seed_data_list.items():
type_dict[k] = list(v)[1]
field_dict[k] = list(v)[0] # Field name without quotes

data_to_save = dict()
data_to_save["title"] = title
data_to_save["field_type_list"] = type_dict

for k, v in type_dict.items():
# If a field is of checkboxes type, flatten its subkeys
# so that each subkey/value pair is saved in its own column.
if v in ["checkboxes", "multiselect"]:
for label in field_dict[k]:
data_to_save[k + "_" + label] = (
random.choice(field_dict[k]) == label
)
else:
data_to_save[k] = random.choice(field_dict[k])

record_date = random.choice(date_input)

filename = get_current_info().get("yaml_filename", None)
random_uid = random_alphanumeric(32)
new_entry = JsonStorage(
filename=filename,
key=random_uid,
data=data_to_save,
tags=tags,
modtime=record_date,
persistent=False,
)
JsonDb.add(new_entry)
JsonDb.commit()

def phrase(input:str) -> str:
base = {'eq': 'on', 'gt': 'after', 'lt': 'before', 'between': 'between'}
return base[input]


def phrase(input: str) -> str:
base = {"eq": "on", "gt": "after", "lt": "before", "between": "between"}
return base[input]
Loading
Loading