API

CondaCreator([conda_root, conda_envs, …]) Create Conda Env
CondaCreator.create_env(env_name[, …]) Create zipped directory of a conda environment
CondaCreator.zip_env(env_path) Zip env directory
YARNAPI(rm, rm_port[, scheme])
YARNAPI.apps App IDs known to YARN
YARNAPI.app_containers([app_id, info]) Get list of container information for given app.
YARNAPI.logs(app_id[, shell, retries, delay]) Collect logs from RM (if running)
YARNAPI.container_status(container_id) Ask the YARN shell about the given container
YARNAPI.status(app_id) Get status of an application
YARNAPI.kill_all([knit_only]) Kill a set of applications
YARNAPI.kill(app_id) Method to kill a yarn application
Knit([autodetect, upload_always, hdfs_home, …]) Connection to HDFS/YARN.
Knit.start(cmd[, num_containers, …]) Method to start a yarn app with a distributed shell
Knit.logs([shell]) Collect logs from RM (if running)
Knit.status() Get status of an application
Knit.kill() Method to kill a yarn application
Knit.create_env(env_name[, packages, …]) Create zipped directory of a conda environment
class knit.core.Knit(autodetect=True, upload_always=False, hdfs_home=None, knit_home='/home/docs/checkouts/readthedocs.org/user_builds/knit/checkouts/latest/knit/java_libs', pars=None, **kwargs)[source]

Connection to HDFS/YARN. Launches a single “application” master with a number of worker containers.

Parameter definition (nn, nn_port, rm, rm_port): those parameters given to __init__ take priority. If autodetect=True, Knit will attempt to fill out the others from system configuration files; fallback values are provided if this fails.

Parameters:

nn: str

Namenode hostname/ip

nn_port: int

Namenode Port (default: 9000)

rm: str

Resource Manager hostname

rm_port: int

Resource Manager port (default: 8088)

user: str (‘root’)

The user name from point of view of HDFS. This is only used when checking for the existence of knit files on HDFS, since they are stored in the user’s home directory.

hdfs_home: str

Explicit location of a writable directory in HDFS to store files. Defaults to the user ‘home’: hdfs://user/<username>/

replication_factor: int (3)

replication factor for files upload to HDFS (default: 3)

autodetect: bool

Autodetect configuration

upload_always: bool(=False)

If True, will upload conda environment zip always; otherwise will attempt to check for the file’s existence in HDFS (using the hdfs3 library, if present) and not upload if that matches the existing local file in size and is newer.

knit_home: str

Location of knit’s jar

Note: for now, only one Knit instance can live in a single process because

of how py4j interfaces with the JVM.

Examples

>>> k = Knit()
>>> app_id = k.start('sleep 100', num_containers=5, memory=1024)
add_containers(num_containers=1, virtual_cores=1, memory=128)[source]

Method to add containers to an already running yarn app

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.
  • the average number of threads it expects to have runnable at a time.
virtual_cores: int
Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.
memory: int
Memory per container (default: 128) * The unit for memory is megabytes.
check_env_needs_upload(env_path)[source]

Upload is needed if zip file does not exist in HDFS or is older

static create_env(env_name, packages=None, remove=False, channels=None, conda_pars=None)[source]

Create zipped directory of a conda environment

Parameters:

env_name : str

packages : list

conda_root: str

Location of conda installation. If None, will download miniconda and produce an isolated environment.

remove : bool

remove possible conda environment before creating

channels : list of str

conda channels to use (defaults to your conda setup)

conda_pars: dict

Further pars to pass to CondaCreator

Returns:

path: str

path to zipped conda environment

Examples

>>> k = Knit()
>>> pkg_path = k.create_env(env_name='dev',
...                         packages=['distributed', 'dask', 'pandas'])
get_container_statuses()[source]

Get status info for each container

Returns dict where the values are the raw text output.

get_containers()[source]

Method to return active containers

Returns:

container_list: List

List of dicts with each container’s details

hdfs

An instance of HDFileSystem

Useful for checking on the contents of the staging directory. Will be automatically generated using this instance’s configuration, but can instead directly set self._hdfs if necessary.

Note: if the namenode/port is not defined in the conf, will not attempt a connection, since it can take a while trying to connect to localhost:8020.

kill()[source]

Method to kill a yarn application

Returns:

bool:

True if successful, False otherwise.

list_envs()[source]

List knit conda environments already in HDFS

Looks staging directory for zip-files

Returns: list of dict
Details for each zip-file.
logs(shell=False)[source]

Collect logs from RM (if running) With shell=True, collect logs from HDFS after job completion

Parameters:

shell: bool

Shell out to yarn CLI (default False)

Returns:

log: dictionary

logs from each container (when possible)

print_logs(shell=False)[source]

print out a more console-friendly version of logs()

remove_containers(container_id)[source]

Method to remove containers from a running yarn app

Calls removeContainers in ApplicationMaster.scala

Be careful removing the …0001 container. This is where the applicationMaster is running

Parameters:container_id: str
Returns:None
runtime_status()[source]

Get runtime status of an application

Returns:

str:

status of application

start(cmd, num_containers=1, virtual_cores=1, memory=128, env='', files=[], app_name='knit', queue='default', checks=True)[source]

Method to start a yarn app with a distributed shell

Parameters:

cmd: str

command to run in each yarn container

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.

  • the average number of threads it expects to have runnable at a time.

virtual_cores: int

Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.

memory: int

Memory per container (default: 128) * The unit for memory is megabytes.

env: string

Full Path to zipped Python environment

files: list

list of files to be include in each container

app_name: String

Application name shown in YARN (default: “knit”)

queue: String

RM Queue to use while scheduling (default: “default”)

checks: bool=True

Whether to run pre-flight checks before submitting app to YARN

Returns:

applicationId: str

A yarn application ID string

status()[source]

Get status of an application

Returns:

log: dictionary

status of application

wait_for_completion(timeout=10)[source]

Wait for completion of the yarn application

Returns:

bool:

True if successful, False otherwise

DaskYARNCluster
DaskYARNCluster.start
DaskYARNCluster.stop
DaskYARNCluster.close
DaskYARNCluster.add_workers
DaskYARNCluster.remove_worker