API

Knit([nn, nn_port, rm, rm_port, user, …]) Connection to HDFS/YARN.
Knit.start(cmd[, num_containers, …]) Method to start a yarn app with a distributed shell
Knit.logs([shell]) Collect logs from RM (if running)
Knit.status() Get status of an application
Knit.kill() Method to kill a yarn application
Knit.create_env(env_name[, packages, …]) Create zipped directory of a conda environment
class knit.core.Knit(nn=None, nn_port=None, rm=None, rm_port=None, user='root', replication_factor=3, autodetect=False, validate=True, upload_always=False)[source]

Connection to HDFS/YARN. Launches a single “application” master with a number of worker containers.

Parameter definition (nn, nn_port, rm, rm_port): those parameters given to __init__ take priority. If autodetect=True, Knit will attempt to fill out the others from system configuration files; fallback values are provided if this fails.

Parameters:

nn: str

Namenode hostname/ip

nn_port: int

Namenode Port (default: 9000)

rm: str

Resource Manager hostname

rm_port: int

Resource Manager port (default: 8088)

user: str (‘root’)

The user name from point of view of HDFS. This is only used when checking for the existence of knit files on HDFS, since they are stored in the user’s home directory.

replication_factor: int (3)

replication factor for files upload to HDFS (default: 3)

autodetect: bool

Autodetect NN/RM IP/Ports

validate: bool

Validate entered NN/RM IP/Ports match detected config file

upload_always: bool(=False)

If True, will upload conda environment zip always; otherwise will attempt to check for the file’s existence in HDFS (using the hdfs3 library, if present) and not upload if that matches the existing local file in size and is newer.

Examples

>>> k = Knit()
>>> app_id = k.start('sleep 100', num_containers=5, memory=1024)
add_containers(num_containers=1, virtual_cores=1, memory=128)[source]

Method to add containers to an already running yarn app

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.
  • the average number of threads it expects to have runnable at a time.
virtual_cores: int
Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.
memory: int
Memory per container (default: 128) * The unit for memory is megabytes.
check_env_needs_upload(env_path)[source]

Upload is needed if zip file does not exist in HDFS or is older

static create_env(env_name, packages=None, conda_root=None, remove=False)[source]

Create zipped directory of a conda environment

Parameters:

env_name : str

packages : list

conda_root : str, optional

remove : bool

remove possible conda environment before creating

Returns:

path: str

path to zipped conda environment

Examples

>>> k = Knit()
>>> pkg_path = k.create_env(env_name='dev',
...                         packages=['distributed', 'dask', 'pandas'])
get_container_statuses()[source]

Get status info for each container via CLI

Returns dict where the values are the raw text output.

get_containers()[source]

Method to return active containers

Calls getContainers in Client.scala which returns comma delimited containerIds
Returns:

container_list: List

List of str

kill()[source]

Method to kill a yarn application

Returns:

bool:

True if successful, False otherwise.

list_envs()[source]

List knit conda environments already in HDFS

Looks in location /user/{user}/.knitDeps/ for zip-files

Returns: list of dict
Details for each zip-file.
logs(shell=False)[source]

Collect logs from RM (if running) With shell=True, collect logs from HDFS after job completion

Parameters:

shell: bool

Shell out to yarn CLI (default False)

Returns:

log: dictionary

logs from each container (when possible)

remove_containers(container_id)[source]

Method to remove containers from a running yarn app

Calls removeContainers in ApplicationMaster.scala

Be careful removing the …0001 container. This is where the applicationMaster is running

Parameters:container_id: str
Returns:None
runtime_status()[source]

Get runtime status of an application

Returns:

str:

status of application

start(cmd, num_containers=1, virtual_cores=1, memory=128, env='', files=[], app_name='knit', queue='default')[source]

Method to start a yarn app with a distributed shell

Parameters:

cmd: str

command to run in each yarn container

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.

  • the average number of threads it expects to have runnable at a time.

virtual_cores: int

Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.

memory: int

Memory per container (default: 128) * The unit for memory is megabytes.

env: string

Full Path to zipped Python environment

files: list

list of files to be include in each container

app_name: String

Application name shown in YARN (default: “knit”)

queue: String

RM Queue to use while scheduling (default: “default”)

Returns:

applicationId: str

A yarn application ID string

status()[source]

Get status of an application

Returns:

log: dictionary

status of application

wait_for_completion(timeout=10)[source]

Wait for completion of the yarn application

Returns:

bool:

True if successful, False otherwise