Example Pipeline
Our first task will be to convert a ZIP code TSV into a set of county level entries.
The input file looks like:
ZIP,COUNTYNAME,STATE,STCOUNTYFP,CLASSFP
36003,Autauga County,AL,01001,H1
36006,Autauga County,AL,01001,H1
36067,Autauga County,AL,01001,H1
36066,Autauga County,AL,01001,H1
36703,Autauga County,AL,01001,H1
36701,Autauga County,AL,01001,H1
36091,Autauga County,AL,01001,H1
First is the header of the pipeline. This declares the unique name of the pipeline and it’s output directory.
name: zipcode_map
outdir: ./
docs: Converts zipcode TSV into graph elements
Next the configuration is declared. In this case the only input is the zipcode TSV.
There is a default value, so the pipeline can be invoked without passing in
any parameters. However, to apply this pipeline to a new input file, the
input parameter zipcode
could be used to define the source file.
config:
schema: ../covid19_datadictionary/gdcdictionary/schemas/
zipcode: ../data/ZIP-COUNTY-FIPS_2017-06.csv
The inputs
section declares data input sources. In this pipeline, there is
only one input, which is to run the table loader.
inputs:
tableLoad:
input: "{{config.zipcode}}"
sep: ","
Tableload operaters of the input file that was originally passed in using the
inputs
stanza. SIFTER string parsing is based on mustache template system.
To access the string passed in the template is {{config.zipcode}}
.
The seperator in the file input file is a ,
so that is also passed in as a
parameter to the extractor.
The tableLoad
extractor opens up the TSV and generates a one message for
every row in the file. It uses the header of the file to map the column values
into a dictionary. The first row would produce the message:
{
"ZIP" : "36003",
"COUNTYNAME" : "Autauga County",
"STATE" : "AL",
"STCOUNTYFP" : "01001",
"CLASSFP" : "H1"
}
The stream of messages are then passed into the steps listed in the transform
section of the tableLoad extractor.
For the current tranform, we want to produce a single entry per STCOUNTYFP
,
however, the file has a line per ZIP
. We need to run a reduce
transform,
that collects rows togeather using a field key, which in this case is "{{row.STCOUNTYFP}}"
,
and then runs a function merge
that takes two messages, merges them togeather
and produces a single output message.
The two messages:
{ "ZIP" : "36003", "COUNTYNAME" : "Autauga County", "STATE" : "AL", "STCOUNTYFP" : "01001", "CLASSFP" : "H1"}
{ "ZIP" : "36006", "COUNTYNAME" : "Autauga County", "STATE" : "AL", "STCOUNTYFP" : "01001", "CLASSFP" : "H1"}
Would be merged into the message:
{ "ZIP" : ["36003", "36006"], "COUNTYNAME" : "Autauga County", "STATE" : "AL", "STCOUNTYFP" : "01001", "CLASSFP" : "H1"}
The reduce
transform step uses a block of python code to describe the function.
The method
field names the function, in this case merge
that will be used
as the reduce function.
zipReduce:
- from: zipcode
- reduce:
field: STCOUNTYFP
method: merge
python: >
def merge(x,y):
a = x.get('zipcodes', []) + [x['ZIP']]
b = y.get('zipcodes', []) + [y['ZIP']]
x['zipcodes'] = a + b
return x
The original messages produced by the loader have all of the information required
by the summary_location
object type as described by the JSON schema that was linked
to in the header stanza. However, the data is all under the wrong field names.
To remap the data, we use a project
tranformation that uses the template engine
to project data into new files in the message. The template engine has the current
message data in the value row
. So the value
FIPS:{{row.STCOUNTYFP}}
is mapped into the field id
.
- project:
mapping:
id: "FIPS:{{row.STCOUNTYFP}}"
province_state: "{{row.STATE}}"
summary_locations: "{{row.STCOUNTYFP}}"
county: "{{row.COUNTYNAME}}"
submitter_id: "{{row.STCOUNTYFP}}"
type: summary_location
projects: []
Using this projection, the message:
{
"ZIP" : ["36003", "36006"],
"COUNTYNAME" : "Autauga County",
"STATE" : "AL",
"STCOUNTYFP" : "01001",
"CLASSFP" : "H1"
}
would become
{
"id" : "FIPS:01001",
"province_state" : "AL",
"summary_locations" : "01001",
"county" : "Autauga County",
"submitter_id" : "01001",
"type" : "summary_location"
"projects" : [],
"ZIP" : ["36003", "36006"],
"COUNTYNAME" : "Autauga County",
"STATE" : "AL",
"STCOUNTYFP" : "01001",
"CLASSFP" : "H1"
}
Now that the data has been remapped, we pass the data into the ‘objectCreate’
transformation, which will read in the schema for summary_location
, check the
message to make sure it matches and then output it.
- objectCreate:
class: summary_location
Outputs
To create an output table, with two columns connecting
ZIP
values to STCOUNTYFP
values. The STCOUNTYFP
is a county level FIPS
code, used by the census office. A single FIPS code my contain many ZIP codes,
and we can use this table later for mapping ids when loading the data into a database.
outputs:
zip2fips:
tableWrite:
from:
output: zip2fips
columns:
- ZIP
- STCOUNTYFP