

Track stats of job listing datasets, before and after transformation into the common schema.


DatasetStatsCounter(self, dataset_id, quarter)

Accumulate data Dataset ETL statistics for a quarter to show presence and absence of different fields, and the total count of rows


dataset_id (string) A dataset id
quarter (string) The quarter being analyzed


DatasetStatsAggregator(self, dataset_id, s3_conn)

Aggregate data Dataset ETL statistics up to the dataset level


dataset_id (string) A dataset id
s3_conn (boto.Connection) an s3 connection


GlobalStatsAggregator(self, s3_conn)

Aggregate Dataset ETL statistics up to the global level


s3_conn (boto.Connection) an s3 connection


Track field value distribution of common schema job postings


FieldValueCounter(self, quarter, field_values)

Accumulate field distribution statistics for common schema job postings


quarter (string) The quarter being analyzed

field_values (list) each entry should be either

    1. a field key
    2. a tuple, first value field key, second value function to fetch value or values from document


Aggregation functions that can be used with pandas dataframes


listy_n_most_common(*params, **kwparams)

Expects each item to be iterable, each sub-item to be addable


AggregateFunction(self, returns)

Wrap a function with an attribute that indicates the return type name


A variety of common-schema job posting collections.

Each class in this module should implement a generator that yields job postings (in the common schema, as a JSON string), and has a 'metadata' attribute so any users of the job postings can inspect meaningful metadata about the postings.


JobPostingCollectionFromS3(self, s3_conn, s3_paths, extra_metadata=None)

Stream job posting from s3.

Expects that each will be stored in JSON format, one job posting per line. The s3_path given will be iterated through as a prefix, so job postings may be partitioned under that prefix however you choose. It will look in every file under that prefix.


import json
from airflow.hooks import S3Hook
from skills_ml.job_postings.common_schema import JobPostingGenerator
s3_conn = S3Hook().get_conn()
job_postings_generator = JobPostingCollectionFromS3(s3_conn, s3_path='my-bucket/job_postings_common_schema')
for job_posting in job_postings_generator:


  • s3_conn: a boto s3 connection
  • s3_path: path to the job listings. there may be multiple


JobPostingCollectionSample(self, num_records:int=50)

Stream a finite number of job postings stored within the library.


import json

job_postings = JobPostingCollectionSample()
for job_posting in job_postings:

Meant to provide a dependency-less example of common schema job postings
for introduction to the library

    num_records (int): The maximum number of records to return. Defaults to 50 (all postings available)

<h2 id="skills_ml.job_postings.common_schema.generate_job_postings_from_s3">generate_job_postings_from_s3</h2>

generate_job_postings_from_s3(s3_conn, s3_prefix:str) -> Generator[Dict[str, Any], NoneType, NoneType]

Stream all job listings from s3 Args

  • s3_conn: a boto s3 connection
  • s3_prefix: path to the job listings.


string in json format representing the next job listing
    Refer to sample_job_listing.json for example structure


generate_job_postings_from_s3_multiple_prefixes(s3_conn, s3_prefixes:str) -> Generator[Dict[str, Any], NoneType, NoneType]

Chain the generators of a list of multiple quarters Args

  • s3_conn: a boto s3 connection
  • s3_prefixes: paths to job listings


a generator that all generators are chained together into


batches_generator(iterable, batch_size)

Batch generator Args

  • iterable: an iterable
  • batch_size: batch size



Retrieve the occupation from the job posting

First checks the custom 'onet_soc_code' key, then the standard 'occupationalCategory' key, and falls back to the unknown occupation


Encapsulates the computation of some piece of data for job postings, to make aggregation and tabular datasets easy to produce


JobPostingComputedProperty(self, storage, partition_func=None)

Base class for computers of job posting properties.

Using this class, expensive computations can be performed once, stored on S3 per job posting in partitions, and reused in different aggregations.

The base class takes care of all of the serialization and partitioning, leaving subclasses to implement a function for computing the property of a single posting and metadata describing the output of this function.

Subclasses must implement

- _compute_func_on_one to produce a callable that takes in a single
    job posting and returns JSON-serializable output representing the computation target.
    This function can produce objects that are kept in scope and reused,
    so properties that require a large object (e.g. a trained classifier) to do their
    computation work can be downloaded from S3 here without requiring the I/O work
    to be done over and over. (See .computers.SOCClassifyProperty for illustration)
- property_name attribute (string) that is used when saving the computed properties
- property_columns attribute (list) of ComputedPropertyColumns that
    map to the column names output by `_compute_func_on_one`


storage (skills_ml.storage.Store) A storage object in which to store the cached properties.
partition_func (callable, optional) A function that takes a job posting and
    outputs a string that should be used as a partition key. Must be deterministic.
    Defaults to the 'datePosted' value

    The caches will be namespaced by the property name and partition function


ComputedPropertyColumn(self, name, description, compatible_aggregate_function_paths=None)

Metadata about a specific output column of a computed property


name (string) The name of the column
description (string) A description of the column and how it was populated.
  • compatible_aggregate_function_paths (dict, optional): If this property is meant to be used in aggregations, map string function paths to descriptions of what the function is computing for this column. All function paths should be compatible with pandas.agg (one argument, an iterable), though multi-argument functions can be used in conjunction with functools.partial


Aggregate job posting computed properties into tabular datasets


df_for_properties_and_keys(computed_properties, keys)

Assemble a dataframe with the raw data from many computed properties and keys


computed_properties (list of JobPostingComputedProperty)
keys (list of strs)
  • Returns: pandas.DataFrame


expand_array_col_to_many_cols(base_col, func, aggregation)

Expand an array column created as the result of an .aggregate call into many columns


base_col (string) The name of the base column (before .aggregate)
func (function) The base function that was aggregated on
aggregation (pandas.DataFrame) The post-aggregation dataframe
  • Returns: pandas.DataFrame, minus the array column and plus columns for each array value



Deals with the possibility of functools.partial being applied to a given function. Allows access to the decorated 'return' attribute whether or not it is also a partial function


aggregate_function (callable) Either a raw function or a functools.partial object
  • Returns: callable


aggregation_for_properties_and_keys(grouping_properties, aggregate_properties, aggregate_functions, keys)

Assemble an aggregation dataframe for given partition keys


grouping_properties (list of JobPostingComputedProperty)
    Properties to form the primary key of the aggregation
aggregate_properties (list of JobPostingComputedProperty)
    Properties to be aggregated over the primary key
aggregate_functions (dict) A lookup of aggregate functions
    to be applied for each aggregate column
keys (list of str) The desired partition keys for the aggregation to cover
  • Returns: pandas.DataFrame indexed on the grouping properties, covering all data from the given keys


aggregate_properties(out_filename, grouping_properties, aggregate_properties, aggregate_functions, storage, aggregation_name)

Aggregate computed properties and stores the resulting CSV


out_filename (string) The desired filename (without path) for the .csv
grouping_properties (list of JobPostingComputedProperty)
    Properties to form the primary key of the aggregation
aggregate_properties (list of JobPostingComputedProperty)
    Properties to be aggregated over the primary key
aggregate_functions (dict) A lookup of aggregate functions
    to be applied for each aggregate column
aggregations_path (string) The base s3 path to store aggregations
aggregation_name (string) The name of this particular aggregation
  • Returns: nothing


Various computers of job posting properties. Each class is generally a generic algorithm (such as skill extraction or occupation classification) paired with enough configuration to run on its own


TitleCleanPhaseOne(self, storage, partition_func=None)

Perform one phase of job title cleaning: lowercase/remove punctuation


TitleCleanPhaseTwo(self, storage, partition_func=None)

Perform two phases of job title cleaning

  1. lowercase/remove punctuation
  2. Remove geography information


Geography(self, geo_querier, *args, **kwargs)

Produce a geography by querying a given JobGeographyQuerier




SOCClassifyProperty(self, classifier_obj, *args, **kwargs)

Classify the SOC code from a trained classifier


classifier_obj (object, optional) An object to use as a classifier.
    If not sent one will be downloaded from s3


GivenSOC(self, storage, partition_func=None)

Assign the SOC code given by the partner


HourlyPay(self, storage, partition_func=None)

The pay given in the baseSalary field if salaryFrequency is hourly


YearlyPay(self, storage, partition_func=None)

The pay given in the baseSalary field if salaryFrequency is yearly


SkillCounts(self, skill_extractor, *args, **kwargs)

Adding top skill counts from a skill extractor

Args: (skills_ml.algorithms.skill_extractors.base.SkillExtractorBase) A skill extractor object


PostingIdPresent(self, storage, partition_func=None)

Records job posting ids. Used for counting job postings



CorpusCreator(self, job_posting_generator=None, document_schema_fields=['description', 'experienceRequirements', 'qualifications', 'skills'], raw=False)

A base class for objects that convert common schema job listings into a corpus in documnet level suitable for use by machine learning algorithms or specific tasks.


from skills_ml.job_postings.common_schema import JobPostingCollectionSample
from skills_ml.job_postings.corpora.basic import CorpusCreator

job_postings_generator = JobPostingCollectionSample()

# Default will include all the cleaned job postings
corpus = CorpusCreator(job_postings_generator)

# For getting a the raw job postings without any cleaning
corpus = CorpusCreator(job_postings_generator, raw=True)


  • job_posting_generator (generator): an iterable that generates JSON strings. Each string is expected to represent a job listing conforming to the common schema See sample_job_listing.json for an example of this schema
  • document_schema_fields (list): an list of schema fields to be included
  • raw (bool): a flag whether to return the raw documents or transformed documents


(dict): a dictinary only with selected fields as keys and corresponding raw/cleaned value


SimpleCorpusCreator(self, job_posting_generator=None, document_schema_fields=['description', 'experienceRequirements', 'qualifications', 'skills'], raw=False)

An object that transforms job listing documents by picking important schema fields and returns them as one large lowercased string


Doc2VecGensimCorpusCreator(self, job_posting_generator, document_schema_fields=['description', 'experienceRequirements', 'qualifications', 'skills'], *args, **kwargs)

Corpus for training Gensim Doc2Vec An object that transforms job listing documents by picking important schema fields and yields them as one large cleaned array of words


from skills_ml.job_postings.common_schema import JobPostingCollectionSample
from skills_ml.job_postings.corpora.basic import Doc2VecGensimCorpusCreator

job_postings_generator = JobPostingCollectionSample()

corpus = Doc2VecGensimCorpusCreator(job_postings_generator)

    job_posting_generator (generator): a job posting generator
    document_schema_fields (list): an list of schema fields to be included

<h2 id="skills_ml.job_postings.corpora.Word2VecGensimCorpusCreator">Word2VecGensimCorpusCreator</h2>

Word2VecGensimCorpusCreator(self, job_posting_generator, document_schema_fields=['description', 'experienceRequirements', 'qualifications', 'skills'], *args, **kwargs)

An object that transforms job listing documents by picking important schema fields and yields them as one large cleaned array of words


JobCategoryCorpusCreator(self, job_posting_generator=None, document_schema_fields=['description', 'experienceRequirements', 'qualifications', 'skills'], raw=False)

An object that extract the label of each job listing document which could be onet soc code or occupationalCategory and yields them as a lowercased string


SectionExtractWord2VecCorpusCreator(self, section_regex, *args, **kwargs)

Only return the contents of the configured section headers.

Heavily utilizes skills_ml.algorithms.nlp.section_extract. For more detail on how to define 'sections', refer to its docstring.


RawCorpusCreator(self, job_posting_generator, document_schema_fields=['description', 'experienceRequirements', 'qualifications', 'skills'])

An object that yields the joined raw string of job posting


Filtering streamed job postings


soc_major_group_filter(major_groups:List) -> Callable

Return a function that checks the ONET Soc Code of a job posting (if it is present) against the configured major groups.


JobPostingFilterer(self, job_posting_generator:Generator[Dict[str, Any], NoneType, NoneType], filter_funcs:List[Callable])

Filter common schema job postings through a number of filtering functions


  • job_posting_generator: An iterable of job postings (each in dict form)
  • filter_funcs: A list of filtering functions, each taking in a job posting document (as dict) and returning a boolean instructing whether or not the posting passes the filter


Extracting geographies from job posting datasets



Convert a job posting to a geocode-ready search string

Includes city and state if present, or just city


job_posting (dict) A job posting in schema.org/JobPosting json form
  • Returns: (string) A geocode-ready search string



JobGeographyQuerier(self, /, *args, **kwargs)

Base class for retrievers/computers of geography data from job postings

The main interface is query(job_posting), which returns a tuple the same length as self.output_columns.

Subclasses must implement

output_columns (property/attribute): a collection of two-tuples with a name and description
    for each column output by the querier
name (property/attribute) a name of the querier
_query(job_posting) to take a job posting object and return
    a tuple of the same length as self.output_columns.


Look up the CBSA for a job posting from a census crosswalk (job location -> Census Place -> Census UA -> Census CBSA)


JobCBSAFromGeocodeQuerier(self, geocoder, cbsa_finder)

Queries the Core-Based Statistical Area for a job

This object delegates the CBSA-finding algorithm to a passed-in finder. In practice, you can look at the skills_ml.algorithms.geocoders.cbsa module for an example of how this can be generated.

Instead, this object focuses on the job posting-centric logic necessary, such as converting the job posting to the form needed to use the cache and dealing with differents kinds of cache misses.


cbsa_finder (dict) A mapping of geocoding search strings to
    (CBSA FIPS, CBSA Name) tuples



Queries the Core-Based Statistical Area for a job using a census crosswalk

First looks up a Place or County Subdivision by the job posting's state and city. If it finds a result, it will then take the Urbanized Area for that Place or County Subdivison and find CBSAs associated with it.

Queries return all hits, so there may be multiple CBSAs for a given query.




Import USAJobs postings into the Open Skills common schema


USAJobsTransformer(self, bucket_name=None, prefix=None, **kwargs)



VirginiaTransformer(self, bucket_name=None, prefix=None, **kwargs)


Sample job postings


JobSampler(self, job_posting_generator, k, weights=None, key=<function JobSampler.<lambda> at 0x7f00f9c8d400>, random_state=None)

Job posting sampler using reservoir sampling methods

It takes a job_posting generator as an input. To sample based on weights, one should sepecify a weight dictionary.


  • job_posting_generator (iterator): Job posting iterator to sample from.
  • k (int): number of documents to sample
  • weights (dict): a dictionary that has key-value pairs as label-weighting pairs. It expects every label in the iterator to be present as a key in the weights dictionary For example,
  • weights = {'11': 2, '13', 1}. In this case, the label/key is the occupation major group and the value is the weight you want to sample with.
  • key (callable): a function to be called on each element to associate to the key of weights dictionary
  • random_state (int): the seed used by the random number generator