API

CondaCreator([conda_root, conda_envs, …]) Create Conda Env
CondaCreator.create_env(env_name[, …]) Create zipped directory of a conda environment
CondaCreator.zip_env
YARNAPI(rm, rm_port[, scheme, gateway_path, …]) REST interface to YARN
YARNAPI.apps App IDs known to YARN
YARNAPI.app_containers([app_id, info]) Get list of container information for given app.
YARNAPI.logs(app_id[, shell, retries, delay]) Collect logs from RM (if running) With shell=True, collect logs from HDFS after job completion
YARNAPI.container_status(container_id) Ask the YARN shell about the given container
YARNAPI.status(app_id) Get status of an application
YARNAPI.kill_all([knit_only]) Kill a set of applications
YARNAPI.kill(app_id) Method to kill a yarn application
Knit([autodetect, upload_always, hdfs_home, …]) Connection to HDFS/YARN.
Knit.start(cmd[, num_containers, …]) Method to start a yarn app with a distributed shell
Knit.logs([shell]) Collect logs from RM (if running) With shell=True, collect logs from HDFS after job completion
Knit.status() Get status of an application
Knit.kill() Method to kill a yarn application
Knit.create_env(env_name[, packages, …]) Create zipped directory of a conda environment
class knit.core.Knit(autodetect=True, upload_always=False, hdfs_home=None, knit_home='/home/docs/checkouts/readthedocs.org/user_builds/knit/checkouts/stable/knit/java_libs', hdfs=None, pars=None, **kwargs)[source]

Connection to HDFS/YARN. Launches a single “application” master with a number of worker containers.

Parameter definition (nn, nn_port, rm, rm_port): those parameters given to __init__ take priority. If autodetect=True, Knit will attempt to fill out the others from system configuration files; fallback values are provided if this fails.

Parameters:
nn: str

Namenode hostname/ip

nn_port: int

Namenode Port (default: 9000)

rm: str

Resource Manager hostname

rm_port: int

Resource Manager port (default: 8088)

lang: str

Environment variable language setting, required for click to successfully read from the shell. (default: ‘C.UTF-8’)

user: str (‘root’)

The user name from point of view of HDFS. This is only used when checking for the existence of knit files on HDFS, since they are stored in the user’s home directory.

hdfs_home: str

Explicit location of a writable directory in HDFS to store files. Defaults to the user ‘home’: hdfs://user/<username>/

replication_factor: int (3)

replication factor for files upload to HDFS (default: 3)

autodetect: bool

Autodetect configuration

upload_always: bool(=False)

If True, will upload conda environment zip always; otherwise will attempt to check for the file’s existence in HDFS (using the hdfs3 library, if present) and not upload if that matches the existing local file in size and is newer.

knit_home: str

Location of knit’s jar

hdfs: HDFileSystem instance or None

Used for checking files in HDFS.

Note: for now, only one Knit instance can live in a single process because
of how py4j interfaces with the JVM.

Examples

>>> k = Knit()
>>> app_id = k.start('sleep 100', num_containers=5, memory=1024)
add_containers(num_containers=1, virtual_cores=1, memory=128)[source]

Method to add containers to an already running yarn app

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.
  • the average number of threads it expects to have runnable at a time.
virtual_cores: int
Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.
memory: int
Memory per container (default: 128) * The unit for memory is megabytes.
check_needs_upload(path)[source]

Upload is needed if file does not exist in HDFS or is older

static create_env(env_name, packages=None, remove=False, channels=None, conda_pars=None)[source]

Create zipped directory of a conda environment

Parameters:
env_name : str
packages : list
conda_root: str

Location of conda installation. If None, will download miniconda and produce an isolated environment.

remove : bool

remove possible conda environment before creating

channels : list of str

conda channels to use (defaults to your conda setup)

conda_pars: dict

Further pars to pass to CondaCreator

Returns:
path: str

path to zipped conda environment

Examples

>>> k = Knit()
>>> pkg_path = k.create_env(env_name='dev',
...                         packages=['distributed', 'dask', 'pandas'])
get_container_statuses()[source]

Get status info for each container

Returns dict where the values are the raw text output.

get_containers()[source]

Method to return active containers

Returns:
container_list: List

List of dicts with each container’s details

kill()[source]

Method to kill a yarn application

Returns:
bool:

True if successful, False otherwise.

list_envs()[source]

List knit conda environments already in HDFS

Looks in staging directory for zip-files

Returns: list of dict
Details for each zip-file.
logs(shell=False)[source]

Collect logs from RM (if running) With shell=True, collect logs from HDFS after job completion

Parameters:
shell: bool

Shell out to yarn CLI (default False)

Returns:
log: dictionary

logs from each container (when possible)

print_logs(shell=False)[source]

print out a more console-friendly version of logs()

remove_containers(container_id)[source]

Method to remove containers from a running yarn app

Calls removeContainers in ApplicationMaster.scala

Be careful removing the …0001 container. This is where the applicationMaster is running

Parameters:
container_id: str
Returns:
None
runtime_status()[source]

Get runtime status of an application

Returns:
str:

status of application

start(cmd, num_containers=1, virtual_cores=1, memory=128, files=None, envvars=None, app_name='knit', queue='default', checks=True)[source]

Method to start a yarn app with a distributed shell

Parameters:
cmd: str

command to run in each yarn container

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.

  • the average number of threads it expects to have runnable at a time.
virtual_cores: int

Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.

memory: int

Memory per container (default: 128) * The unit for memory is megabytes.

files: list

list of files to be include in each container. If starting with hdfs://, assume these already exist in HDFS and don’t need uploading. Otherwise, if hdfs3 is installed, existence of the file on HDFS will be checked to see if upload is needed. Files ending with .zip will be decompressed in the container before launch as a directory with the same name as the file: if myarc.zip contains files inside a directory stuff/, to the container they will appear at ./myarc.zip/stuff/* .

envvars: dict

Environment variables to pass to AM and workers. Both keys and values must be strings only.

app_name: String

Application name shown in YARN (default: “knit”)

queue: String

RM Queue to use while scheduling (default: “default”)

checks: bool=True

Whether to run pre-flight checks before submitting app to YARN

Returns:
applicationId: str

A yarn application ID string

status()[source]

Get status of an application

Returns:
log: dictionary

status of application

wait_for_completion(timeout=10)[source]

Wait for completion of the yarn application

Returns:
bool:

True if successful, False otherwise

DaskYARNCluster
DaskYARNCluster.start
DaskYARNCluster.stop
DaskYARNCluster.close
DaskYARNCluster.add_workers
DaskYARNCluster.remove_worker