diff --git a/.github/workflows/formatting.yml b/.github/workflows/formatting.yml
new file mode 100644
index 0000000..5e40970
--- /dev/null
+++ b/.github/workflows/formatting.yml
@@ -0,0 +1,17 @@
+name: formatting action
+on:
+ # Trigger the workflow on push or pull request,
+ # but only for the main branch
+ push:
+ branches:
+ - main
+ pull_request:
+ branches:
+ - main
+jobs:
+ linter_name:
+ name: formatting
+ runs-on: ubuntu-latest
+ steps:
+ - uses: SuffolkLITLab/ALActions/black-formatting@main
+ # - uses: SuffolkLITLab/ALActions/docsig@main
\ No newline at end of file
diff --git a/docassemble/InterviewStats/__init__.py b/docassemble/InterviewStats/__init__.py
index ed7d50e..43a1e95 100644
--- a/docassemble/InterviewStats/__init__.py
+++ b/docassemble/InterviewStats/__init__.py
@@ -1 +1 @@
-__version__ = '0.5.3'
+__version__ = "0.5.3"
diff --git a/docassemble/InterviewStats/cache_geography.py b/docassemble/InterviewStats/cache_geography.py
index 6df2cf9..a9a8bdc 100644
--- a/docassemble/InterviewStats/cache_geography.py
+++ b/docassemble/InterviewStats/cache_geography.py
@@ -17,10 +17,10 @@ def download_file(url: str, local_file: str) -> str:
Download a file from an arbitrary URL to a local file
"""
# https://stackoverflow.com/a/16696317
- log('Downloading {} to {}'.format(url, local_file))
+ log("Downloading {} to {}".format(url, local_file))
with requests.get(url, stream=True) as r:
r.raise_for_status()
- with local_file.open(mode='wb') as f:
+ with local_file.open(mode="wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return local_file
@@ -31,26 +31,31 @@ def get_fips_code(state_abbrev: str) -> str:
Gets the FIPS (Federal Information Processing Standards) code for a state
abbreviation, like 'MA'.
"""
- if state_abbrev.lower() == 'us':
- return 'us'
- states = cenpy.explorer.fips_table('STATE')
- return states[states['State Abbreviation'] == state_abbrev.upper()].iat[0, 1]
+ if state_abbrev.lower() == "us":
+ return "us"
+ states = cenpy.explorer.fips_table("STATE")
+ return states[states["State Abbreviation"] == state_abbrev.upper()].iat[0, 1]
-base_url = 'https://www2.census.gov/geo/tiger/GENZ2019/shp/'
+base_url = "https://www2.census.gov/geo/tiger/GENZ2019/shp/"
-def get_boundary_file(state_abbrev: str, layer_type: str, resolution: str='500k') -> str:
- return 'cb_2019_{}_{}_{}.zip'.format(
- get_fips_code(state_abbrev), layer_type, resolution)
+def get_boundary_file(
+ state_abbrev: str, layer_type: str, resolution: str = "500k"
+) -> str:
+ return "cb_2019_{}_{}_{}.zip".format(
+ get_fips_code(state_abbrev), layer_type, resolution
+ )
def saved_dir() -> Path:
cdir = Path(__file__).resolve().parent
- return cdir.joinpath('data/sources')
+ return cdir.joinpath("data/sources")
-def download_shapes(state_abbrev: str, layer_type: str, resolution: str='500k') -> bool:
+def download_shapes(
+ state_abbrev: str, layer_type: str, resolution: str = "500k"
+) -> bool:
"""
See https://www2.census.gov/geo/tiger/GENZ2019/2019_file_name_def.pdf?#
for layer type (entity name)
@@ -64,24 +69,24 @@ def download_shapes(state_abbrev: str, layer_type: str, resolution: str='500k')
def get_zips() -> gpd.GeoDataFrame:
- full_file_path = saved_dir().joinpath(get_boundary_file('us', 'zcta510'))
+ full_file_path = saved_dir().joinpath(get_boundary_file("us", "zcta510"))
if not full_file_path.exists():
log("{} doesn't exist, downloading".format(full_file_path))
- download_shapes('us', 'zcta510')
+ download_shapes("us", "zcta510")
# TODO(brycew): consider a bounding box: it's 2x as fast with one, but
# trying to read bounding boxes from a different shp file is slower
- return gpd.read_file('zip://' + str(full_file_path))
+ return gpd.read_file("zip://" + str(full_file_path))
def get_tracts(state_abbrevs):
# TODO(brycew): finish
- all_state_path = saved_dir().joinpath(get_boundary_file('us', 'state', '20m'))
+ all_state_path = saved_dir().joinpath(get_boundary_file("us", "state", "20m"))
# if not Path(all_state_path).exists():
# download_shapes('us', 'state')
- full_file_path = saved_dir().joinpath(get_boundary_file('us', 'tract'))
+ full_file_path = saved_dir().joinpath(get_boundary_file("us", "tract"))
if not full_file_path.exists():
- download_shapes('us', 'tract')
- #state_shapes = gpd.read_file('zip://' + all_state_path)
+ download_shapes("us", "tract")
+ # state_shapes = gpd.read_file('zip://' + all_state_path)
# bounds = tuple(reduce(lambda l1, l2: np.minimum(l1, l2),
# state_shapes['geometry'].bounds[['minx', 'miny']].values)) + \
# tuple(reduce(lambda l1, l2: np.maximum(l1, l2),
diff --git a/docassemble/InterviewStats/data_report.py b/docassemble/InterviewStats/data_report.py
index 960de95..a482003 100644
--- a/docassemble/InterviewStats/data_report.py
+++ b/docassemble/InterviewStats/data_report.py
@@ -14,11 +14,14 @@
from .snapshot_statistics import *
-__all__ = ['get_column_values_list',
- 'get_col_val_count',
- 'processing_data',
- 'save_random_records',
- 'phrase']
+__all__ = [
+ "get_column_values_list",
+ "get_col_val_count",
+ "processing_data",
+ "save_random_records",
+ "phrase",
+]
+
def processing_data(raw_data=None, headings=None):
"""
@@ -30,120 +33,147 @@ def processing_data(raw_data=None, headings=None):
# 1. Build filtered field-type lists
# 1.1 Get field-types column from raw_data
field_type_full_dict = {}
- field_type_full_dict = raw_data[0]['field_type_list']
+ field_type_full_dict = raw_data[0]["field_type_list"]
records_count = len(raw_data)
-
+
# 1.2. Identify multi-valued type fields for graph, this is the list of original question variable names, not the flattened column headings in the database.
multi_choices_fld_list = []
for k, v in field_type_full_dict.items():
- if v in ['checkboxes', 'multiselect']:
- multi_choices_fld_list.append(k)
-
+ if v in ["checkboxes", "multiselect"]:
+ multi_choices_fld_list.append(k)
+
# 1.2.1 Set multi-valued headings to the database column headings (un-flatten them)
- multi_choices_heading_dict = {}
- for fld in multi_choices_fld_list:
- multi_choices_heading_dict[fld] = [v for v in headings if fld in v] #{fld: [fld_subkeys]}
-
+ multi_choices_heading_dict = {}
+ for fld in multi_choices_fld_list:
+ multi_choices_heading_dict[fld] = [
+ v for v in headings if fld in v
+ ] # {fld: [fld_subkeys]}
+
# 1.3. Set single value field list - remove text/numberic fields and multi-valued fields from field_type_full_list
single_value_fld_list = list()
for k, v in field_type_full_dict.items():
- if v not in ['text', 'area', 'number', 'integer', 'currency'] and k not in multi_choices_fld_list:
- single_value_fld_list.append(k)
-
+ if (
+ v not in ["text", "area", "number", "integer", "currency"]
+ and k not in multi_choices_fld_list
+ ):
+ single_value_fld_list.append(k)
+
# 2. Build data dictionaries for tabular data arrays
- answer_counts_multi = dict()
- possible_answers_multi = list() # Question's possible answers
- answer_counts_single = dict()
- possible_answers_single = list() # Question's possible answers
-
+ answer_counts_multi = dict()
+ possible_answers_multi = list() # Question's possible answers
+ answer_counts_single = dict()
+ possible_answers_single = list() # Question's possible answers
+
# 2.1 Build data for multi-valued questions
- for fld in multi_choices_fld_list:
- # Merge flattened columns into multi_sub_dict
- # subkeys are "choices" names of the question variable
- sub_dict = dict()
- for row, subkeys in multi_choices_heading_dict.items():
- if row == fld:
- for k in subkeys:
- # Remove prefix portion from each subkey
- original_k = k.replace(fld + "_", '')
- # Get answer data for each subkey
- col = get_column_values_list(raw_data, k)
- # Get answer's count and % for each subkey
- sub_dict[original_k] = get_col_val_count(records_count, column = col)
-
- # Save answer's "count / %" for fld
- answer_counts_multi[fld] = {k: v for k, v in sub_dict.items()}
- # Possible_answers_single will be used as index in the table display
- possible_answers_multi.append([k for k in sub_dict.keys()])
-
+ for fld in multi_choices_fld_list:
+ # Merge flattened columns into multi_sub_dict
+ # subkeys are "choices" names of the question variable
+ sub_dict = dict()
+ for row, subkeys in multi_choices_heading_dict.items():
+ if row == fld:
+ for k in subkeys:
+ # Remove prefix portion from each subkey
+ original_k = k.replace(fld + "_", "")
+ # Get answer data for each subkey
+ col = get_column_values_list(raw_data, k)
+ # Get answer's count and % for each subkey
+ sub_dict[original_k] = get_col_val_count(records_count, column=col)
+
+ # Save answer's "count / %" for fld
+ answer_counts_multi[fld] = {k: v for k, v in sub_dict.items()}
+ # Possible_answers_single will be used as index in the table display
+ possible_answers_multi.append([k for k in sub_dict.keys()])
+
# 2.2 Build data for single value questions
for fld in single_value_fld_list:
- # Get list of unique values for fld
- unique_col = get_column_values(raw_data, fld)
- # Get list of complete data for fld
- col = get_column_values_list(raw_data, fld)
- # Save answer's "count / %" for fld
- answer_counts_single[fld] = get_col_val_count(records_count, labels=unique_col, column = col)
- # Possible_answers_single will be used as index in the table display
- possible_answers_single.append([t for t in unique_col])
-
+ # Get list of unique values for fld
+ unique_col = get_column_values(raw_data, fld)
+ # Get list of complete data for fld
+ col = get_column_values_list(raw_data, fld)
+ # Save answer's "count / %" for fld
+ answer_counts_single[fld] = get_col_val_count(
+ records_count, labels=unique_col, column=col
+ )
+ # Possible_answers_single will be used as index in the table display
+ possible_answers_single.append([t for t in unique_col])
+
# 3. Return results - separate multi-valued fields from single-valued fields
- return [answer_counts_multi, possible_answers_multi, answer_counts_single, possible_answers_single]
-
-def get_column_values_list(records, column) -> list:
+ return [
+ answer_counts_multi,
+ possible_answers_multi,
+ answer_counts_single,
+ possible_answers_single,
+ ]
+
+
+def get_column_values_list(records, column) -> list:
if not records or not column:
return []
return [record.get(column) for record in records]
-
-def get_col_val_count(records_count=None, label=None, labels=None, column=None):
- if labels: # Single-valued fields
- results = dict()
- for val in labels:
- cnt = column.count(val)
- pct = "%.1f%%" % round(cnt*100/records_count, 1)
- results[val] = f'{cnt} / {pct}'
-
- else: # Multi-choices fields, count True for each label/subkey
- cnt = 0
- for item in column:
- if item:
- cnt += 1
- pct = "%.1f%%" % round(cnt*100/records_count, 1)
- results = f'{cnt} / {pct}'
+
+
+def get_col_val_count(records_count=None, label=None, labels=None, column=None):
+ if labels: # Single-valued fields
+ results = dict()
+ for val in labels:
+ cnt = column.count(val)
+ pct = "%.1f%%" % round(cnt * 100 / records_count, 1)
+ results[val] = f"{cnt} / {pct}"
+
+ else: # Multi-choices fields, count True for each label/subkey
+ cnt = 0
+ for item in column:
+ if item:
+ cnt += 1
+ pct = "%.1f%%" % round(cnt * 100 / records_count, 1)
+ results = f"{cnt} / {pct}"
return results
-def save_random_records(number_of_records, title = '', seed_data_list = None, date_input=None, tags=None):
+
+def save_random_records(
+ number_of_records, title="", seed_data_list=None, date_input=None, tags=None
+):
"""
Generate random survey type records for testing data reports.
- """
- for index in range(number_of_records):
- type_dict = dict()
- field_dict = dict()
- for k, v in seed_data_list.items():
- type_dict[k] = list(v)[1]
- field_dict[k] = list(v)[0] # Field name without quotes
-
- data_to_save = dict()
- data_to_save['title'] = title
- data_to_save['field_type_list'] = type_dict
-
- for k, v in type_dict.items():
- # If a field is of checkboxes type, flatten its subkeys
- # so that each subkey/value pair is saved in its own column.
- if v in ['checkboxes', 'multiselect']:
- for label in field_dict[k]:
- data_to_save[k + '_' + label] = (random.choice(field_dict[k]) == label)
- else:
- data_to_save[k] = random.choice(field_dict[k])
-
- record_date = random.choice(date_input)
-
- filename = get_current_info().get('yaml_filename', None)
- random_uid = random_alphanumeric(32)
- new_entry = JsonStorage(filename=filename, key=random_uid, data=data_to_save, tags=tags, modtime=record_date, persistent=False)
- JsonDb.add(new_entry)
+ """
+ for index in range(number_of_records):
+ type_dict = dict()
+ field_dict = dict()
+ for k, v in seed_data_list.items():
+ type_dict[k] = list(v)[1]
+ field_dict[k] = list(v)[0] # Field name without quotes
+
+ data_to_save = dict()
+ data_to_save["title"] = title
+ data_to_save["field_type_list"] = type_dict
+
+ for k, v in type_dict.items():
+ # If a field is of checkboxes type, flatten its subkeys
+ # so that each subkey/value pair is saved in its own column.
+ if v in ["checkboxes", "multiselect"]:
+ for label in field_dict[k]:
+ data_to_save[k + "_" + label] = (
+ random.choice(field_dict[k]) == label
+ )
+ else:
+ data_to_save[k] = random.choice(field_dict[k])
+
+ record_date = random.choice(date_input)
+
+ filename = get_current_info().get("yaml_filename", None)
+ random_uid = random_alphanumeric(32)
+ new_entry = JsonStorage(
+ filename=filename,
+ key=random_uid,
+ data=data_to_save,
+ tags=tags,
+ modtime=record_date,
+ persistent=False,
+ )
+ JsonDb.add(new_entry)
JsonDb.commit()
-
-def phrase(input:str) -> str:
- base = {'eq': 'on', 'gt': 'after', 'lt': 'before', 'between': 'between'}
- return base[input]
\ No newline at end of file
+
+
+def phrase(input: str) -> str:
+ base = {"eq": "on", "gt": "after", "lt": "before", "between": "between"}
+ return base[input]
diff --git a/docassemble/InterviewStats/generate_test_data.py b/docassemble/InterviewStats/generate_test_data.py
index 26a7503..bd57559 100644
--- a/docassemble/InterviewStats/generate_test_data.py
+++ b/docassemble/InterviewStats/generate_test_data.py
@@ -11,24 +11,28 @@
JsonDb = db.session
-__all__ = ['write_random_user', 'delete_data_from_interview']
+__all__ = ["write_random_user", "delete_data_from_interview"]
+
def write_random_user(data_iterator, filename=None, tags=None):
- if not filename:
- filename = get_current_info().get('yaml_filename', None)
- for data in data_iterator:
- random_uid = random_alphanumeric(32)
- new_entry = JsonStorage(filename=filename, key=random_uid, data=data, tags=tags, persistent=False)
- JsonDb.add(new_entry)
- JsonDb.commit()
-
+ if not filename:
+ filename = get_current_info().get("yaml_filename", None)
+ for data in data_iterator:
+ random_uid = random_alphanumeric(32)
+ new_entry = JsonStorage(
+ filename=filename, key=random_uid, data=data, tags=tags, persistent=False
+ )
+ JsonDb.add(new_entry)
+ JsonDb.commit()
+
+
def delete_data_from_interview(filename=None):
- if not filename:
- filename = get_current_info().get('yaml_filename', None)
- conn = variables_snapshot_connection()
- cur = conn.cursor()
- # use a parameterized query to prevent SQL injection
- query = "DELETE FROM jsonstorage WHERE filename=%(filename)s"
- cur.execute(query, {'filename': filename})
- conn.commit()
- conn.close()
\ No newline at end of file
+ if not filename:
+ filename = get_current_info().get("yaml_filename", None)
+ conn = variables_snapshot_connection()
+ cur = conn.cursor()
+ # use a parameterized query to prevent SQL injection
+ query = "DELETE FROM jsonstorage WHERE filename=%(filename)s"
+ cur.execute(query, {"filename": filename})
+ conn.commit()
+ conn.close()
diff --git a/docassemble/InterviewStats/snapshot_geography.py b/docassemble/InterviewStats/snapshot_geography.py
index 245a5b7..a53db8c 100644
--- a/docassemble/InterviewStats/snapshot_geography.py
+++ b/docassemble/InterviewStats/snapshot_geography.py
@@ -1,9 +1,18 @@
#!/usr/bin/env python3
from bokeh.plotting import figure
-from bokeh.models import GeoJSONDataSource, LinearColorMapper, ColorBar, \
- WheelZoomTool, DataTable, ColumnDataSource, \
- TableColumn, PanTool, SaveTool, HoverTool
+from bokeh.models import (
+ GeoJSONDataSource,
+ LinearColorMapper,
+ ColorBar,
+ WheelZoomTool,
+ DataTable,
+ ColumnDataSource,
+ TableColumn,
+ PanTool,
+ SaveTool,
+ HoverTool,
+)
from bokeh.tile_providers import Vendors, get_provider
from bokeh.resources import CDN
from bokeh.palettes import brewer
@@ -26,12 +35,17 @@
import sys
import cenpy
-__all__ = ['get_filters_from_strings', 'make_usage_map',
- 'write_standalone_usage_map', 'get_embedable_usage_map']
+__all__ = [
+ "get_filters_from_strings",
+ "make_usage_map",
+ "write_standalone_usage_map",
+ "get_embedable_usage_map",
+]
DesiredFilter = Tuple[str, str, Any]
FunctionalFilter = Tuple[str, Callable[[Any, Any], bool]]
+
def get_filters_from_strings(filters: List[DesiredFilter]) -> List[FunctionalFilter]:
"""
Make binary operator functions from the given string.
@@ -48,14 +62,15 @@ def get_filters_from_strings(filters: List[DesiredFilter]) -> List[FunctionalFil
# middle element in filters will now be binary operator function
x = make_usage_map(df, col_name, filters)
"""
- filt_map = {'eq': operator.eq,
- 'ne': operator.ne,
- 'lt': operator.lt,
- 'gt': operator.gt,
- 'le': operator.le,
- 'ge': operator.ge,
- 'any': lambda x, y: operator.and_(x, np.full(x.shape, True))
- }
+ filt_map = {
+ "eq": operator.eq,
+ "ne": operator.ne,
+ "lt": operator.lt,
+ "gt": operator.gt,
+ "le": operator.le,
+ "ge": operator.ge,
+ "any": lambda x, y: operator.and_(x, np.full(x.shape, True)),
+ }
return [(filt[0], filt_map[filt[1]], filt[2]) for filt in filters]
@@ -64,35 +79,43 @@ def get_filters_from_strings(filters: List[DesiredFilter]) -> List[FunctionalFil
def grab_geography(agg_df, geo_col: str, time_col: str) -> Optional[gpd.GeoDataFrame]:
global all_zip_shapes
- if geo_col.lower() == 'zip':
- agg_zip_codes = agg_df['zip']
+ if geo_col.lower() == "zip":
+ agg_zip_codes = agg_df["zip"]
if all_zip_shapes.empty:
start = timeit.time.time()
all_zip_shapes = get_zips()
end = timeit.time.time()
- log('Grabbed {} zips in {} seconds'.format(
- len(all_zip_shapes), end - start))
+ log(
+ "Grabbed {} zips in {} seconds".format(len(all_zip_shapes), end - start)
+ )
# Gets rid of zips not in US, and those we're not using
- geo_loc_counts = all_zip_shapes.merge(agg_df, how='inner',
- left_on='GEOID10', right_on='zip').drop(columns=[time_col])
- geo_loc_counts = geo_loc_counts.to_crs('EPSG:3857') # for tile mapping
+ geo_loc_counts = all_zip_shapes.merge(
+ agg_df, how="inner", left_on="GEOID10", right_on="zip"
+ ).drop(columns=[time_col])
+ geo_loc_counts = geo_loc_counts.to_crs("EPSG:3857") # for tile mapping
return geo_loc_counts
else:
# If not doing zip codes, you can use the State code to filter things.
# fips = cenpy.explorer.fips_table('STATE').set_index('State Abbreviation').loc['MA']['FIPS Code']
- print('ERROR: locations besides zips not supported right now')
+ print("ERROR: locations besides zips not supported right now")
return None
-def make_bokeh_map(geosource, geo_loc_counts, col_name: str='zip'):
+def make_bokeh_map(geosource, geo_loc_counts, col_name: str = "zip"):
zoom_tool = WheelZoomTool(zoom_on_axis=False)
tools = [PanTool(), zoom_tool, SaveTool()]
- TOOLTIPS = [(col_name, '@{}'.format(col_name)),
- ('Number of users', '@{}'.format(col_name + '_counts') + '{0}')]
- map_plot = figure(title='Submissions by ' + col_name,
- x_axis_type='mercator', y_axis_type='mercator',
- tooltips=TOOLTIPS, tools=tools)
- map_plot.sizing_mode = 'stretch_width'
+ TOOLTIPS = [
+ (col_name, "@{}".format(col_name)),
+ ("Number of users", "@{}".format(col_name + "_counts") + "{0}"),
+ ]
+ map_plot = figure(
+ title="Submissions by " + col_name,
+ x_axis_type="mercator",
+ y_axis_type="mercator",
+ tooltips=TOOLTIPS,
+ tools=tools,
+ )
+ map_plot.sizing_mode = "stretch_width"
map_plot.toolbar.active_scroll = zoom_tool
# https://docs.bokeh.org/en/latest/docs/reference/tile_providers.html#bokeh-tile-providers
@@ -100,52 +123,84 @@ def make_bokeh_map(geosource, geo_loc_counts, col_name: str='zip'):
# Settled on brewer for colors: https://colorbrewer2.org
# Was considering `colorcet`, but https://arxiv.org/pdf/1509.03700v1.pdf says stick with brewer
- palette = list(reversed(brewer['YlGnBu'][5])) # Gets yellow as low and blue as high
- vals = geo_loc_counts[col_name + '_counts']
+ palette = list(reversed(brewer["YlGnBu"][5])) # Gets yellow as low and blue as high
+ vals = geo_loc_counts[col_name + "_counts"]
if vals.empty:
max_val = 10
else:
max_val = max(vals)
color_mapper = LinearColorMapper(palette=palette, low=0, high=max_val)
- map_plot.patches('xs', 'ys', source=geosource,
- fill_color={'field': col_name + '_counts', 'transform': color_mapper},
- line_color='black', line_width=0.5, fill_alpha=0.5)
- color_bar = ColorBar(color_mapper=color_mapper, label_standoff=8, height=20,
- border_line_color=None, location=(0, 0),
- orientation='horizontal')
- map_plot.add_layout(color_bar, 'below')
+ map_plot.patches(
+ "xs",
+ "ys",
+ source=geosource,
+ fill_color={"field": col_name + "_counts", "transform": color_mapper},
+ line_color="black",
+ line_width=0.5,
+ fill_alpha=0.5,
+ )
+ color_bar = ColorBar(
+ color_mapper=color_mapper,
+ label_standoff=8,
+ height=20,
+ border_line_color=None,
+ location=(0, 0),
+ orientation="horizontal",
+ )
+ map_plot.add_layout(color_bar, "below")
return map_plot
def make_bokeh_date_histogram(date_series):
# Make a plot that handles making a histogram of submission times
- ridge_plots = figure(title='When did users submit?', x_axis_label='Date',
- y_axis_label='Count', x_axis_type='datetime', toolbar_location=None)
- ridge_plots.sizing_mode = 'stretch_width'
+ ridge_plots = figure(
+ title="When did users submit?",
+ x_axis_label="Date",
+ y_axis_label="Count",
+ x_axis_type="datetime",
+ toolbar_location=None,
+ )
+ ridge_plots.sizing_mode = "stretch_width"
ridge_plots.toolbar.active_drag = None
ridge_plots.toolbar.active_scroll = None
hist, edges = np.histogram(date_series, density=False, bins=50)
- hist_df = pd.DataFrame({'amount': hist, 'left': pd.to_datetime(edges[:-1], unit='s'),
- 'right': pd.to_datetime(edges[1:], unit='s')})
+ hist_df = pd.DataFrame(
+ {
+ "amount": hist,
+ "left": pd.to_datetime(edges[:-1], unit="s"),
+ "right": pd.to_datetime(edges[1:], unit="s"),
+ }
+ )
fmt_str = "%b %d, %y, %r"
- hist_df['interval'] = ["{} to {}".format(left.strftime(fmt_str), right.strftime(fmt_str))
- for left, right in zip(hist_df["left"], hist_df["right"])]
+ hist_df["interval"] = [
+ "{} to {}".format(left.strftime(fmt_str), right.strftime(fmt_str))
+ for left, right in zip(hist_df["left"], hist_df["right"])
+ ]
ridge_source = ColumnDataSource(hist_df)
- ridge_plots.quad(top='amount', bottom=0, left='left', right='right', fill_color='orange',
- line_color='black', alpha=0.8, source=ridge_source)
- hover = HoverTool(tooltips=[('Interval', '@interval'), ('Count', '@amount')])
+ ridge_plots.quad(
+ top="amount",
+ bottom=0,
+ left="left",
+ right="right",
+ fill_color="orange",
+ line_color="black",
+ alpha=0.8,
+ source=ridge_source,
+ )
+ hover = HoverTool(tooltips=[("Interval", "@interval"), ("Count", "@amount")])
ridge_plots.add_tools(hover)
return ridge_plots
-def make_bokeh_table(geosource, col_name: str='zip'):
+def make_bokeh_table(geosource, col_name: str = "zip"):
# Make a table of the numerical values that we can sort by
dt_columns = [
TableColumn(field=col_name, title=col_name),
- TableColumn(field=col_name + '_percent',
- title='Percentage of users in ' + col_name),
- TableColumn(field=col_name + '_counts', title='Number of Users in ' + col_name)
+ TableColumn(
+ field=col_name + "_percent", title="Percentage of users in " + col_name
+ ),
+ TableColumn(field=col_name + "_counts", title="Number of Users in " + col_name),
]
data_table = DataTable(source=geosource, columns=dt_columns)
data_table.selectable = False
@@ -156,7 +211,12 @@ def make_bokeh_table(geosource, col_name: str='zip'):
return data_table
-def make_usage_map(loc_df, geo_col: str='zip', time_col: str='modtime', filters: List[FunctionalFilter]=[]):
+def make_usage_map(
+ loc_df,
+ geo_col: str = "zip",
+ time_col: str = "modtime",
+ filters: List[FunctionalFilter] = [],
+):
"""
Returns a bokeh layout, with a choropleth map of locations we've received
Expects the dataframe to contain rows with a `geo_col` and `time_col` columns
@@ -172,7 +232,7 @@ def make_usage_map(loc_df, geo_col: str='zip', time_col: str='modtime', filters:
if has_geo_col:
to_rm = loc_df[geo_col].str.len() == 0
loc_df = loc_df.drop(to_rm.index[to_rm])
- loc_df[geo_col + '_counts'] = 1
+ loc_df[geo_col + "_counts"] = 1
for it_filter in filters:
to_rm = ~it_filter[1](loc_df[it_filter[0]], it_filter[2])
@@ -182,10 +242,14 @@ def make_usage_map(loc_df, geo_col: str='zip', time_col: str='modtime', filters:
loc_df[time_col] = pd.to_datetime(loc_df[time_col])
if has_geo_col:
- agg_df = loc_df.groupby(geo_col).agg({geo_col + '_counts': 'sum',
- time_col: 'max'}).reset_index(level=0)
- agg_df[geo_col + '_percent'] = agg_df[geo_col +
- '_counts'] / agg_df[geo_col + '_counts'].sum()
+ agg_df = (
+ loc_df.groupby(geo_col)
+ .agg({geo_col + "_counts": "sum", time_col: "max"})
+ .reset_index(level=0)
+ )
+ agg_df[geo_col + "_percent"] = (
+ agg_df[geo_col + "_counts"] / agg_df[geo_col + "_counts"].sum()
+ )
geo_loc_counts = grab_geography(agg_df, geo_col, time_col)
if geo_loc_counts is None:
@@ -193,7 +257,7 @@ def make_usage_map(loc_df, geo_col: str='zip', time_col: str='modtime', filters:
geosource = ColumnDataSource(agg_df)
else:
outer_end = timeit.time.time()
- log('Processed {} in {} seconds'.format(geo_col, outer_end - outer_start))
+ log("Processed {} in {} seconds".format(geo_col, outer_end - outer_start))
geosource = GeoJSONDataSource(geojson=geo_loc_counts.to_json())
start = timeit.time.time()
@@ -201,18 +265,21 @@ def make_usage_map(loc_df, geo_col: str='zip', time_col: str='modtime', filters:
if has_geo_col:
all_components.append(make_bokeh_map(geosource, geo_loc_counts, geo_col))
if has_time_col:
- all_components.append(make_bokeh_date_histogram([x.timestamp() for x in loc_df[time_col]]))
+ all_components.append(
+ make_bokeh_date_histogram([x.timestamp() for x in loc_df[time_col]])
+ )
if has_geo_col:
all_components.append(make_bokeh_table(geosource, geo_col))
end = timeit.time.time()
- log('Made plots in {} seconds'.format(end - start))
+ log("Made plots in {} seconds".format(end - start))
return column(all_components)
def get_embedable_usage_map(layout):
import bokeh
+
script, div = components(layout)
- bokeh_version = bokeh.__version__ if str(bokeh.__version__) != '' else '2.3.0'
+ bokeh_version = bokeh.__version__ if str(bokeh.__version__) != "" else "2.3.0"
# from https://docs.bokeh.org/en/latest/docs/user_guide/embed.html#components
inline_cdn = """
@@ -225,19 +292,21 @@ def get_embedable_usage_map(layout):
def write_standalone_usage_map(layout, output_file: str):
- html = file_html(layout, CDN, 'All Data')
- with open('{}'.format(output_file), 'w') as f:
+ html = file_html(layout, CDN, "All Data")
+ with open("{}".format(output_file), "w") as f:
f.write(html)
def main(argv: List[str]):
if len(argv) < 3:
- print('Need ')
+ print("Need ")
return
- loc_df = pd.read_csv(argv[1], dtype='str')
- layout = make_usage_map(loc_df, 'zip', 'modtime') # , [('state', operator.eq, 'MA')])
+ loc_df = pd.read_csv(argv[1], dtype="str")
+ layout = make_usage_map(
+ loc_df, "zip", "modtime"
+ ) # , [('state', operator.eq, 'MA')])
write_standalone_usage_map(layout, argv[2])
-if __name__ == '__main__':
+if __name__ == "__main__":
main(sys.argv)
diff --git a/docassemble/InterviewStats/snapshot_statistics.py b/docassemble/InterviewStats/snapshot_statistics.py
index 1d04827..c9d7505 100644
--- a/docassemble/InterviewStats/snapshot_statistics.py
+++ b/docassemble/InterviewStats/snapshot_statistics.py
@@ -1,7 +1,20 @@
from docassemble.base.util import variables_snapshot_connection, interview_menu
from typing import List
-__all__ = ['get_filenames', 'get_summary_stats', 'get_stats', 'get_columns', 'get_column_values', 'get_combined_filename_list', "get_overall_stats", "get_summary_stats_by_filename", 'shorten_filename', 'get_session_summary_stats_by_filename', 'get_session_overall_stats']
+__all__ = [
+ "get_filenames",
+ "get_summary_stats",
+ "get_stats",
+ "get_columns",
+ "get_column_values",
+ "get_combined_filename_list",
+ "get_overall_stats",
+ "get_summary_stats_by_filename",
+ "shorten_filename",
+ "get_session_summary_stats_by_filename",
+ "get_session_overall_stats",
+]
+
def get_filenames():
conn = variables_snapshot_connection()
@@ -21,7 +34,13 @@ def get_combined_filename_list():
found_match = False
for interview in interview_filenames:
if interview["filename"] == json_interview[0]:
- combined_interviews.append({interview["filename"]: interview.get("title", interview["filename"]) })
+ combined_interviews.append(
+ {
+ interview["filename"]: interview.get(
+ "title", interview["filename"]
+ )
+ }
+ )
found_match = True
continue
if not found_match:
@@ -41,27 +60,28 @@ def shorten_filename(filename: str, max_length: int = 20) -> str:
Returns a string safe for display (contains \u200b zero-width spaces to allow wrapping).
"""
if not filename:
- return ''
+ return ""
# Remove package prefix before ':'
- name = filename.split(':', 1)[-1]
+ name = filename.split(":", 1)[-1]
# Remove .yml suffix
- if name.endswith('.yml'):
+ if name.endswith(".yml"):
name = name[:-4]
# Replace path-like slashes with just the last component
- if '/' in name:
- name = name.split('/')[-1]
+ if "/" in name:
+ name = name.split("/")[-1]
# Insert zero-width space after underscores to allow wrapping
- name = name.replace('_', '_' + '\u200b')
+ name = name.replace("_", "_" + "\u200b")
# Collapse middle if too long
if len(name) > max_length:
keep = max_length - 3
front_end = (keep + 1) // 2
back_start = keep // 2
- name = name[:front_end] + '...' + name[-back_start:]
+ name = name[:front_end] + "..." + name[-back_start:]
return name
+
def get_summary_stats(filename: str):
conn = variables_snapshot_connection()
with conn.cursor() as cur:
@@ -74,7 +94,7 @@ def get_summary_stats(filename: str):
AND
tags IS DISTINCT FROM 'metadata'
"""
- cur.execute(query, {'filename': filename})
+ cur.execute(query, {"filename": filename})
val = cur.fetchone()
conn.close()
return val
@@ -111,7 +131,7 @@ def get_summary_stats_by_filename():
# Some connection objects require a cursor() for execution
result = None
- if result is not None and hasattr(result, 'mappings'):
+ if result is not None and hasattr(result, "mappings"):
rows = list(result.mappings())
results = [dict(r) for r in rows]
else:
@@ -126,7 +146,7 @@ def get_summary_stats_by_filename():
# Normalize None counts to 0 for count columns
for r in results:
- for key in ('count_30d', 'count_90d', 'count_365d', 'count_all'):
+ for key in ("count_30d", "count_90d", "count_365d", "count_all"):
if r.get(key) is None:
r[key] = 0
return results
@@ -140,9 +160,9 @@ def get_overall_stats():
val = cur.fetchone()
conn.close()
return val
-
-
-def get_stats(filename: str, column:str=None):
+
+
+def get_stats(filename: str, column: str = None):
conn = variables_snapshot_connection()
with conn.cursor() as cur:
# use a parameterized query to prevent SQL injection
@@ -157,14 +177,14 @@ def get_stats(filename: str, column:str=None):
WHERE filename=%(filename)s
AND
tags IS DISTINCT FROM 'metadata'"""
- cur.execute(query, {'filename': filename})
+ cur.execute(query, {"filename": filename})
records = list()
for record in cur:
# Add modtime to the all stats
- record[1]['modtime'] = record[0]
+ record[1]["modtime"] = record[0]
# Note that this is normally empty or 'metadata'
# in store_variables_snapshot() this is the `key` parameter
- record[1]['tags'] = record[2]
+ record[1]["tags"] = record[2]
if column:
if column in record[1]:
records.append(record[1][column])
@@ -176,6 +196,7 @@ def get_stats(filename: str, column:str=None):
conn.close()
return records
+
def get_columns(records):
if not records:
return []
@@ -185,11 +206,12 @@ def get_columns(records):
else:
return []
+
def get_column_values(records, column) -> set:
if not records or not column:
return []
return set([record.get(column) for record in records])
-
+
def get_session_summary_stats_by_filename(filter_step1: bool = True):
"""Return session-based summary stats grouped by filename.
@@ -231,16 +253,16 @@ def get_session_summary_stats_by_filename(filter_step1: bool = True):
conn = variables_snapshot_connection()
try:
try:
- result = conn.execute(query, {'filter_step1': filter_step1})
+ result = conn.execute(query, {"filter_step1": filter_step1})
except Exception:
result = None
- if result is not None and hasattr(result, 'mappings'):
+ if result is not None and hasattr(result, "mappings"):
rows = list(result.mappings())
results = [dict(r) for r in rows]
else:
with conn.cursor() as cur:
- cur.execute(query, {'filter_step1': filter_step1})
+ cur.execute(query, {"filter_step1": filter_step1})
rows = cur.fetchall()
cols = [d[0] for d in cur.description]
results = [dict(zip(cols, row)) for row in rows]
@@ -248,7 +270,7 @@ def get_session_summary_stats_by_filename(filter_step1: bool = True):
conn.close()
for r in results:
- for key in ('count_30d', 'count_90d', 'count_365d', 'count_all'):
+ for key in ("count_30d", "count_90d", "count_365d", "count_all"):
if r.get(key) is None:
r[key] = 0
return results
@@ -275,8 +297,7 @@ def get_session_overall_stats(filter_step1: bool = True):
conn = variables_snapshot_connection()
with conn.cursor() as cur:
- cur.execute(query, {'filter_step1': filter_step1})
+ cur.execute(query, {"filter_step1": filter_step1})
val = cur.fetchone()
conn.close()
return val
-
\ No newline at end of file