API

Knit([nn, nn_port, rm, rm_port, autodetect, ...]) Connection to HDFS/YARN
Knit.start(cmd[, num_containers, ...]) Method to start a yarn app with a distributed shell
Knit.logs([shell]) Collect logs from RM (if running)
Knit.status() Get status of an application
Knit.kill([timeout]) Method to kill a yarn application
Knit.create_env(env_name[, packages, ...]) Create zipped directory of a conda environment
class knit.core.Knit(nn='localhost', nn_port=8020, rm='localhost', rm_port=8088, autodetect=False, validate=True)[source]

Connection to HDFS/YARN

Parameters:

nn: str

Namenode hostname/ip

nn: str

Namenode hostname/ip

nn_port: str

Namenode Port (default: 9000)

rm: str

Resource Manager hostname

rm_port: str

Resource Manager port (default: 8088)

autodetect: bool

Autodetect NN/RM IP/Ports

validate: bool

Validate entered NN/RM IP/Ports match detected config file

Examples

>>> k = Knit()
>>> app_id = k.start('sleep 100', num_containers=5, memory=1024)
add_containers(num_containers=1, virtual_cores=1, memory=128)[source]

Method to add containers to an already running yarn app

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.
  • the average number of threads it expects to have runnable at a time.
virtual_cores: int
Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.
memory: int
Memory per container (default: 128) * The unit for memory is megabytes.
static create_env(env_name, packages=None, conda_root=None, remove=False)[source]

Create zipped directory of a conda environment

Parameters:

env_name : str

packages : list

conda_root : str, optional

remove : bool

remove possible conda environment before creating

Returns:

path: str

path to zipped conda environment

Examples

>>> k = Knit()
>>> pkg_path = k.create_env(env_name='dev',
...                         packages=['distributed', 'dask', 'pandas'])
kill(timeout=10)[source]

Method to kill a yarn application

Parameters:

app_id: str

YARN application id

timeout: int

Time in seconds to wait for completion before killing (default 10s)

Returns:

bool:

True if successful, False otherwise.

logs(shell=False)[source]

Collect logs from RM (if running) With shell=True, collect logs from HDFS after job completion

Parameters:

app_id: str

A yarn application ID string

shell: bool

Shell out to yarn CLI (default False)

Returns:

log: dictionary

logs from each container (when possible)

runtime_status()[source]

Get runtime status of an application

Returns:

str:

status of application

start(cmd, num_containers=1, virtual_cores=1, memory=128, env='', files=[], app_name='knit', queue='default')[source]

Method to start a yarn app with a distributed shell

Parameters:

cmd: str

command to run in each yarn container

num_containers: int

Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can

saturate, i.e.

  • the average number of threads it expects to have runnable at a time.

virtual_cores: int

Number of virtual cores per container (default: 1) * A node’s capacity should be configured with virtual cores equal to * its number of physical cores.

memory: int

Memory per container (default: 128) * The unit for memory is megabytes.

env: string

Full Path to zipped Python environment

files: list

list of files to be include in each container

app_name: String

Application name shown in YARN (default: “knit”)

queue: String

RM Queue to use while scheduling (default: “default”)

Returns:

applicationId: str

A yarn application ID string

status()[source]

Get status of an application

Returns:

log: dictionary

status of application

wait_for_completion(timeout=10)[source]

Wait for completion of the yarn application

Returns:

bool:

True if successful, False otherwise