Open Observatory Pipeline
This is the Open Observatory data processing pipeline
Setup
Edit invoke.yaml
based on invoke.yaml.example
to contain all the relevant
tokens.
Install also all the python requirements in requirements.txt
.
How to run the pipeline tasks
To run on the AWS cloud do:
invoke start_computer
If you would like to run on the current machine the task that adds things to the
postgres database you should run (after having installed the requirements in
requirements-computer.txt
)
invoke add_headers_to_db
To re-run the data pipeline from scratch you shall first clean all the streams by running:
invoke clean_streams
Then for every date you wish to import streams on you shall run:
invoke add_headers_to_db --workers=NUMBER_OF_CORES $YEAR
When doing this AWS it's ideal to have 1 machine per year running.
This can be achieved by running the start_computer task like so:
invoke start_computer --invoke-command="add_headers_to_db --workers=32 --halt --date-interval=2012"
invoke start_computer --invoke-command="add_headers_to_db --workers=32 --halt --date-interval=2013"
invoke start_computer --invoke-command="add_headers_to_db --workers=32 --halt --date-interval=2014"
invoke start_computer --invoke-command="add_headers_to_db --workers=32 --halt --date-interval=2015"
Configuration
Before running the pipeline you should configure it by editing the
invoke.yaml
file. An example configuration file is provided inside of
invoke.yaml.example
.
The files you should probably be editing are the following:
core
-
tmp_dir What directory should be used to store temporary files.
-
ssh_private_key_file What ssh private key shall be used by luigi for sshing into ssh:// machines.
-
ooni_pipeline_path The location on the ec2 instance where to look for the ooni-pipeline repository.
aws
-
access_key_id This is your AWS access key ID for spinning up EC2 instances.
-
secret_access_key This is your AWS secret token.
-
ssh_private_key_file This is a private key that will be used for sshing into the started machines.
postgres
-
host The hostname of your postgres instance.
-
database The database name.
-
username The username to use when logging in.
-
password The password to use when logging in.
-
table The database table to use for writing report headers to.
ooni
- bridge_db_path A path to where you have a bridge_db.json file that contains mappings between bridge IPs, their hashes and the ring they were taken for (this is required for the sanitisation of bridge_reachability reports).
spark
-
spark_submit Path to where the spark-submit command can be found.
-
master The name of the yarn master node.
papertrail
-
hostname The hostname of the papertrail logging backend
-
port The port of the papertrail logging backend
kafka
This is currently not used
spark
This is currently not used
List of tasks
Tasks are run by using pyinvoke and are defined inside
of tasks.py
.
Generate streams
This task can be run via:
invoke generate_streams --date-interval=DATE_INTERVAL [--src=URI]
[--workers=NUM --dst-private=URI --dst-public=URI]
[--halt]
The purpose of this task is to take the YAML reports that are located at the
address specified by the src
URI and move them over into the private and
public bucket after having operated on them some transformations and
sanitisations.
The transformations are in particular that of partitioning the data by data and
converting them to JSON. This means that all the reports from 2019-10-11 will
end up in a JSON file named 2019-10-11.json.
Each line of the JSON file will contain the full report header and an extra key used to identify if it's a header or a measurement.
The reason for splitting it into daily buckets is to avoid random seeking as much as possible.
- date-interval
The date range that should be taken into consideration when running the
generate_streams
task. If no date range is specified it will run against all
the dates.
The format for the date range is that of the luigi DateInterval
module.
For example: 2019-10
will be the full month of October 2019 or
2019-10-29-2019-10-31
will be the dates of the 29th and 30th of 2019.
- src default:
s3n://ooni-private/reports-raw/yaml/
Where the reports should be read from. This is considered the master dataset of the pipeline.
- workers default: 16
The number of CPU workers to use when running the operations.
- dst_private: default:
s3n://ooni-private/
The target location in which to place the processed JSON streams. They will end
up inside of $URI/reports-raw/streams/
.
- dst_public: default: `s3n://ooni-public/"
The target location in which to place the sanitised and processed JSON streams.
They will end up inside of $URI/reports-sanitised/streams/
.
- halt: default: disabled
This is an optional flag that indicates if we should or should not halt the machine when done.
Upload reports
This task can be run via:
invoke upload_reports --src=URI [--dst=URI --workers=INT --move --halt]
This task is responsible for moving or copying the reports from a certain
incoming AWS bucket to the private ooni-pipeline bucket ready for being
processed by the generate_streams
task.
It will look inside of src for all files ending with .yaml
.
The files will be renamed when moving them over to the dst directory using the
following format: {date}-{asn}-{test_name}-{df_version}-{ext}
.
-
src: From from where the reports should be copied or moved. In the ooni-pipeline this is set to
s3://ooni-incoming/
that is the incoming bucket. -
dst default:
s3n://ooni-private/reports-raw/yaml/
Where the reports should be moved or copied to.
- workers default: 16
The number of CPU workers to use when running the operations.
- move
If the source file should be deleted once it has been successfully copied.
- halt: default: disabled
This is an optional flag that indicates if we should or should not halt the machine when done.
List reports
This task can be run via:
invoke list_reports --path=URI
Will list all the files that appear to be OONI reports in a certain directory.
- path: default:
s3n://ooni-private/reports-raw/yaml/
That path to list reports inside of.
Clean streams
This task can be run via:
invoke clean_streams --dst-private=URI --dst-public=URI
This will delete all the files that are generated by the generate_streams
task.
In particular these files are:
-
PRIVATE/reports-raw/streams
-
PRIVATE/reports-sanitised/yaml
-
PRIVATE/reports-sanitised/streams
The arguments are:
- dst_private: default:
s3n://ooni-private/
The directory that contains the raw reports.
- dst-public: default:
s3n://ooni-public/
The directory that contains the public reports.
Add headers to DB
This task can be run via:
invoke add_headers_to_db --date-interval=DATE_INTERVAL [--src=URI]
[--workers=NUM --dst-private=URI --dst-public=URI]
[--halt]
When no date is specified it will run upload_reports
on s3n://ooni-incoming
by moving them, then run on these incoming reports the add_headers_to_db batch
operation.
When a date range is specified it will run the batch operation on such date.
The batch operation will sanitise the YAML reports generating their streams and
then add the report headers to the database.
For the schema of the database see the avro specification inside of
pipeline/helpers/report.py
.
- date-interval
The date range that should be taken into consideration when running the
generate_streams
task. If no date range is specified it will run against all
the dates.
The format for the date range is that of the luigi DateInterval
module.
For example: 2019-10
will be the full month of October 2019 or
2019-10-29-2019-10-31
will be the dates of the 29th and 30th of 2019.
- src default:
s3n://ooni-private/reports-raw/yaml/
Where the reports should be read from. This is considered the master dataset of the pipeline.
- workers default: 16
The number of CPU workers to use when running the operations.
- dst_private: default:
s3n://ooni-private/
The target location in which to place the processed JSON streams. They will end
up inside of $URI/reports-raw/streams/
.
- dst_public: default: `s3n://ooni-public/"
The target location in which to place the sanitised and processed JSON streams.
They will end up inside of $URI/reports-sanitised/streams/
.
- halt: default: disabled
This is an optional flag that indicates if we should or should not halt the machine when done.
Sync reports
This task can be run via:
invoke sync_reports --srcs=URI [--dst-private=URI --workers=INT --halt]
This task is responsible for moving the reports from a certain set of sources to the incoming bucket. This task is usually run on collectors to place the data they have gathered into the incoming AWS bucket.
It will look inside of src for all files ending with .yaml
.
- srcs: default:
ssh://root@bouncer.infra.ooni.nu/data/bouncer/archive
The source directories from where to look for OONI reports.
- dst-private default:
s3n://ooni-incoming/
Where the reports should be moved to.
- workers default: 16
The number of CPU workers to use when running the operations.
- halt: default: disabled
This is an optional flag that indicates if we should or should not halt the machine when done.
Start computer
This task can be run via:
invoke start_computer [--private-key=PATH --instance-type=INSTANCE_TYPE ]
[--invoke_command=INVOKE_COMMAND]
- private-key: default:
private/ooni-pipeline.pem
The private key to be used to ssh into the machine.
- instance_type: default:
c3.8xlarge
The type of AWS EC2 instance to start. A full list of them can be found here: https://aws.amazon.com/ec2/instance-types/
- invoke_command: default:
add_headers_to_db --workers=32 --halt
The invoke command to be run once the machine is started. Remember that it may be important to also run the --halt to avoid extra costs.
Spark apps
This task can be run via:
invoke spark_apps [--private-key=PATH instance-type=INSTANCE_TYPE --invoke_command=INVOKE_COMMAND]
This task will run the batch spark based apps on a hadoop cluster. The current
batch operations are responsible for inspecting the sanitised streams bucketed
by date located inside of --src
, generating some database views based upon them
and writing a processed JSON file inside of --dst
to indicate that the certain
date has been processed.
- date-interval
The format for the date range is that of the luigi DateInterval
module.
For example: 2019-10
will be the full month of October 2019 or
2019-10-29-2019-10-31
will be the dates of the 29th and 30th of 2019.
- src: default:
s3n://ooni-public/reports-sanitised/streams/
From where to read the JSON streams from.
- dst: default:
s3n://ooni-public/processed/
Where to write a file to indicate a certain date has been processed.
- workers: default: 3
The number of CPU workers to use when running the operations.
Spark submit
This task is work in progress and is not throughly tested, it's for running spark scripts on a hadoop cluster.