Source code for knit.core

from __future__ import absolute_import, division, print_function

import atexit
import os
import logging
import socket
import select
import signal
import platform
import requests
import socket
from subprocess import Popen, PIPE, call
import struct
import time
import weakref

from .conf import get_config, DEFAULT_KNIT_HOME
from .env import CondaCreator
from .exceptions import KnitException, YARNException
from .yarn_api import YARNAPI
from .utils import triple_slash

from py4j.protocol import Py4JError
from py4j.java_gateway import JavaGateway, GatewayClient
from py4j.java_collections import MapConverter, ListConverter

logger = logging.getLogger(__name__)
on_windows = platform.system() == "Windows"


def read_int(stream):
    length = stream.read(4)
    if not length:
        raise EOFError
    return struct.unpack("!i", length)[0]


[docs]class Knit(object): """ Connection to HDFS/YARN. Launches a single "application" master with a number of worker containers. Parameter definition (nn, nn_port, rm, rm_port): those parameters given to __init__ take priority. If autodetect=True, Knit will attempt to fill out the others from system configuration files; fallback values are provided if this fails. Parameters ---------- nn: str Namenode hostname/ip nn_port: int Namenode Port (default: 9000) rm: str Resource Manager hostname rm_port: int Resource Manager port (default: 8088) lang: str Environment variable language setting, required for ``click`` to successfully read from the shell. (default: 'C.UTF-8') user: str ('root') The user name from point of view of HDFS. This is only used when checking for the existence of knit files on HDFS, since they are stored in the user's home directory. hdfs_home: str Explicit location of a writable directory in HDFS to store files. Defaults to the user 'home': hdfs://user/<username>/ replication_factor: int (3) replication factor for files upload to HDFS (default: 3) autodetect: bool Autodetect configuration upload_always: bool(=False) If True, will upload conda environment zip always; otherwise will attempt to check for the file's existence in HDFS (using the hdfs3 library, if present) and not upload if that matches the existing local file in size and is newer. knit_home: str Location of knit's jar hdfs: HDFileSystem instance or None Used for checking files in HDFS. Note: for now, only one Knit instance can live in a single process because of how py4j interfaces with the JVM. Examples -------- >>> k = Knit() >>> app_id = k.start('sleep 100', num_containers=5, memory=1024) """ JAR_FILE = "knit-1.0-SNAPSHOT.jar" JAVA_APP = "io.continuum.knit.Client" _instances = weakref.WeakSet() def __init__(self, autodetect=True, upload_always=False, hdfs_home=None, knit_home=DEFAULT_KNIT_HOME, hdfs=None, pars=None, **kwargs): self.conf = get_config(autodetect=autodetect, pars=pars, **kwargs) gateway_path = self.conf.get('gateway_path', '') kerb = self.conf.get( 'hadoop.http.authentication.type', '') == 'kerberos' if not kerb and self.conf.get('hadoop.http.authentication.simple.' 'anonymous.allowed', '') == 'false': if 'password' not in self.conf: raise KnitException('Simple auth required: please supply' '`password=`.') pw = self.conf['password'] else: pw = None if self.conf.get('yarn.http.policy', '').upper() == "HTTPS_ONLY": self.yarn_api = YARNAPI(self.conf['rm'], self.conf['rm_port_https'], scheme='https', gateway_path=gateway_path, kerberos=kerb, username=self.conf['user'], password=pw) else: self.yarn_api = YARNAPI(self.conf['rm'], self.conf['rm_port'], gateway_path=gateway_path, kerberos=kerb, username=self.conf['user'], password=pw) self.KNIT_HOME = knit_home self.upload_always = upload_always self.lang = self.conf.get('lang', 'C.UTF-8') self.hdfs_home = hdfs_home or self.conf.get( 'dfs.user.home.base.dir', '/user/' + self.conf['user']) self.client_gateway = None # must set KNIT_HOME ENV for YARN App os.environ['KNIT_HOME'] = self.KNIT_HOME os.environ['REPLICATION_FACTOR'] = str(self.conf['replication_factor']) os.environ['HDFS_KNIT_DIR'] = self.hdfs_home self.client = None self.master = None self.app_id = None self.proc = None self.hdfs = hdfs self._instances.add(self) def __repr__(self): return "Knit<RM={0}:{1}>".format(self.conf['rm'], self.conf['rm_port']) @property def JAR_FILE_PATH(self): return os.path.join(self.KNIT_HOME, self.JAR_FILE) def _pre_flight_checks(self, num_containers, virtual_cores, memory, files, queue): """Some checks to see if app is possible to schedule This depends on YARN's allocations reporting, which do not necessarily reflect the true amount of resources on the cluster. Other failure modes, such as full disc, are not likely to be caught here. """ try: # check response from RM met = self.yarn_api.cluster_metrics() except YARNException: raise except requests.RequestException as e: if isinstance(e, requests.Timeout): m = 'Connection timeout' else: m = 'Connection error' raise YARNException(m + ' when talking to the ' 'YARN REST server at {}. This can mean that ' 'the server/port values are wrong, that you ' 'are using the wrong protocol (http/https) or ' 'that you need to route through a proxy.' ''.format(self.yarn_api.url)) if met['activeNodes'] < 1: raise KnitException('No name-nodes active') # What if we simply don't have the full yarn-site.xml available? mmin = int(self.conf.get('yarn.scheduler.minimum-allocation-mb', 1024)) # 300MB default allocation for AM in client.scala mem = (max(300, mmin) + num_containers * max(memory, mmin)) if met['availableMB'] < mem: raise KnitException('Memory estimate for app (%iMB) exceeds cluster' ' capacity (%iMB)' % (mem, met['availableMB'])) c = 1 + num_containers * virtual_cores if met['availableVirtualCores'] < c: raise KnitException('vCPU request for app (%i) exceeds cluster capa' 'city (%i)' % (c, met['availableVirtualCores'])) nodes = self.yarn_api.nodes() if all((max(mmin, memory) > n['availMemoryMB']) and (virtual_cores > n['availableVirtualCores']) for n in nodes): # cannot test without multiple nodemanagers raise KnitException('No NodeManager can fit any single container') if self.hdfs: df = self.hdfs.df() cap = (df['capacity'] - df['used']) // 2**20 fs = [self.JAR_FILE_PATH] + [f for f in files if not f.startswith('hdfs://')] need = sum(os.stat(f).st_size for f in fs) // 2**20 # NB: if replication > 1 this might not be enough if cap < need: raise KnitException('HDFS space requirement (%iMB) exceeds' 'capacity (%iMB)' % (need, cap))
[docs] def start(self, cmd, num_containers=1, virtual_cores=1, memory=128, files=None, envvars=None, app_name="knit", queue="default", checks=True): """ Method to start a yarn app with a distributed shell Parameters ---------- cmd: str command to run in each yarn container num_containers: int Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can saturate, i.e. * the average number of threads it expects to have runnable at a time. virtual_cores: int Number of virtual cores per container (default: 1) * A node's capacity should be configured with virtual cores equal to * its number of physical cores. memory: int Memory per container (default: 128) * The unit for memory is megabytes. files: list list of files to be include in each container. If starting with `hdfs://`, assume these already exist in HDFS and don't need uploading. Otherwise, if hdfs3 is installed, existence of the file on HDFS will be checked to see if upload is needed. Files ending with `.zip` will be decompressed in the container before launch as a directory with the same name as the file: if myarc.zip contains files inside a directory stuff/, to the container they will appear at ./myarc.zip/stuff/* . envvars: dict Environment variables to pass to AM *and* workers. Both keys and values must be strings only. app_name: String Application name shown in YARN (default: "knit") queue: String RM Queue to use while scheduling (default: "default") checks: bool=True Whether to run pre-flight checks before submitting app to YARN Returns ------- applicationId: str A yarn application ID string """ files = files or [] envvars = envvars or {'KNIT_LANG': self.lang} for k, v in envvars.items(): if not isinstance(k, str) or not isinstance(v, str): raise ValueError('Environment must contain only strings (%s)' % ((k, v),)) if self.app_id: raise ValueError('Already started') if not isinstance(memory, int): raise KnitException("Memory argument must be an integer") if files: if not isinstance(files, list): raise KnitException("File argument must be a list of strings") if checks: self._pre_flight_checks(num_containers, virtual_cores, memory, files, queue) # From https://github.com/apache/spark/blob/d83c2f9f0b08d6d5d369d9fae04cdb15448e7f0d/python/pyspark/java_gateway.py # thank you spark ## Socket for PythonGatewayServer to communicate its port to us callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) callback_socket.bind(('127.0.0.1', 0)) callback_socket.listen(1) callback_host, callback_port = callback_socket.getsockname() if not os.path.exists(self.JAR_FILE_PATH): raise KnitException('JAR file %s does not exists - please build' ' with maven' % self.JAR_FILE_PATH) args = ["hadoop", "jar", self.JAR_FILE_PATH, self.JAVA_APP, "--callbackHost", str(callback_host), "--callbackPort", str(callback_port)] ## Launch the Java gateway. # We open a pipe to stdin so that the Java gateway can die when the pipe is broken if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) proc = Popen(args, stdin=PIPE, preexec_fn=preexec_func) else: # preexec_fn not supported on Windows proc = Popen(args, stdin=PIPE) self.proc = proc gateway_port = None # We use select() here in order to avoid blocking indefinitely if the # subprocess dies before connecting long_timeout = 60 while gateway_port is None and proc.poll() is None and long_timeout > 0: timeout = 1 # (seconds) readable, _, _ = select.select([callback_socket], [], [], timeout) if callback_socket in readable: gateway_connection = callback_socket.accept()[0] # Determine which ephemeral port the server started on: gateway_port = read_int(gateway_connection.makefile(mode="rb")) gateway_connection.close() callback_socket.close() long_timeout -= 1 if gateway_port is None: raise Exception("The JVM Knit client failed to launch successfully." " Check that java is installed and the Knit JAR" " file exists.") gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True) self.client = gateway.entry_point self.client_gateway = gateway logger.debug("Files submitted: %s" % files) upfiles = [f for f in files if (not f.startswith('hdfs://') and self.check_needs_upload(f))] logger.debug("Files to upload: %s" % upfiles) jfiles = ListConverter().convert(upfiles, gateway._gateway_client) jenv = MapConverter().convert(envvars, gateway._gateway_client) self.app_id = self.client.start(jfiles, jenv, app_name, queue) ## Wait for AM to appear long_timeout = 100 master_rpcport = -1 while master_rpcport == -1: master_rpcport = self.client.masterRPCPort() time.sleep(0.2) long_timeout -= 0.2 if long_timeout < 0: break if master_rpcport in [-1, 'N/A']: raise Exception( """The application master JVM process failed to report back. This can mean: - that the YARN cluster cannot scheduler adequate resources - check k.yarn_api.cluster_metrics() and other diagnostic methods; - that the ApplicationMaster crashed - check the application logs, k.logs(); - that the cluster is otherwise unhealthy - check the RM and NN logs (use k.yarn_api.system_logs() to find these on a one-node system """) master_rpchost = self.client.masterRPCHost() gateway = JavaGateway(GatewayClient( address=master_rpchost, port=master_rpcport), auto_convert=True) self.master = gateway.entry_point rfiles = [triple_slash(f) if f.startswith('hdfs://') else '/'.join(['hdfs://', self.hdfs_home, '.knitDeps', os.path.basename(f)]) for f in files] logger.debug("Resource files: %s" % rfiles) jfiles = ListConverter().convert(rfiles, gateway._gateway_client) jenv = MapConverter().convert(envvars, gateway._gateway_client) self.master.init(jfiles, jenv, cmd, num_containers, virtual_cores, memory) return self.app_id
[docs] def add_containers(self, num_containers=1, virtual_cores=1, memory=128): """ Method to add containers to an already running yarn app num_containers: int Number of containers YARN should request (default: 1) * A container should be requested with the number of cores it can saturate, i.e. * the average number of threads it expects to have runnable at a time. virtual_cores: int Number of virtual cores per container (default: 1) * A node's capacity should be configured with virtual cores equal to * its number of physical cores. memory: int Memory per container (default: 128) * The unit for memory is megabytes. """ self.master.addContainers(num_containers, virtual_cores, memory)
[docs] def get_containers(self): """ Method to return active containers Returns ------- container_list: List List of dicts with each container's details """ if self.app_id: return self.yarn_api.app_containers(self.app_id) else: raise KnitException('Cannot get containers, app has not started')
[docs] def get_container_statuses(self): """Get status info for each container Returns dict where the values are the raw text output. """ return {c['id']: c['state'] for c in self.get_containers()}
[docs] def remove_containers(self, container_id): """ Method to remove containers from a running yarn app Calls removeContainers in ApplicationMaster.scala Be careful removing the ...0001 container. This is where the applicationMaster is running Parameters ---------- container_id: str Returns ------- None """ if container_id not in self.get_container_statuses(): raise KnitException('Attempt to remove container nor owned by this' 'app: ' + container_id) self.master.removeContainer(str(container_id))
[docs] @staticmethod def create_env(env_name, packages=None, remove=False, channels=None, conda_pars=None): """ Create zipped directory of a conda environment Parameters ---------- env_name : str packages : list conda_root: str Location of conda installation. If None, will download miniconda and produce an isolated environment. remove : bool remove possible conda environment before creating channels : list of str conda channels to use (defaults to your conda setup) conda_pars: dict Further pars to pass to CondaCreator Returns ------- path: str path to zipped conda environment Examples -------- >>> k = Knit() >>> pkg_path = k.create_env(env_name='dev', ... packages=['distributed', 'dask', 'pandas']) """ channels = channels or [] c = CondaCreator(channels=channels, **(conda_pars or {})) return c.create_env(env_name, packages=packages, remove=remove)
[docs] def logs(self, shell=False): """ Collect logs from RM (if running) With shell=True, collect logs from HDFS after job completion Parameters ---------- shell: bool Shell out to yarn CLI (default False) Returns ------- log: dictionary logs from each container (when possible) """ if self.app_id: return self.yarn_api.logs(self.app_id, shell=shell) else: raise KnitException('Cannot get logs, app not started')
[docs] def print_logs(self, shell=False): """print out a more console-friendly version of logs()""" for l, v in self.logs(shell).items(): print('\n### Container ', l, ', id ', v.get('id', 'None'), ' ###\n') for part in ['stdout', 'stderr']: print('##', part, '##') print(v[part])
[docs] def wait_for_completion(self, timeout=10): """ Wait for completion of the yarn application Returns ------- bool: True if successful, False otherwise """ cur_status = self.runtime_status() while cur_status not in ['FAILED', 'KILLED', 'FINISHED']: time.sleep(0.2) timeout -= 0.2 cur_status = self.runtime_status() if timeout < 0: break return timeout > 0
[docs] def kill(self): """ Method to kill a yarn application Returns ------- bool: True if successful, False otherwise. """ if self.client is None: # never started, can't stop - should be warning or exception? return False try: self.client.kill() except Py4JError: logger.debug("Error while attempting to kill", exc_info=1) # fallback self.yarn_api.kill(self.app_id) if self.proc is not None: self.client_gateway.shutdown() if on_windows: call(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(self.proc.pid)]) self.proc.terminate() self.proc.communicate() self.proc = None self.client = None out = self.runtime_status() == 'KILLED' return out
def __del__(self): if self.app_id is not None: try: self.kill() except: pass self.app_id = None
[docs] def status(self): """ Get status of an application Returns ------- log: dictionary status of application """ if self.app_id: return self.yarn_api.apps_info(self.app_id) else: raise KnitException("Cannot get status, app not started")
[docs] def runtime_status(self): """ Get runtime status of an application Returns ------- str: status of application """ try: return self.yarn_api.state(self.app_id) except: return "NONE"
[docs] def list_envs(self): """List knit conda environments already in HDFS Looks in staging directory for zip-files Returns: list of dict Details for each zip-file.""" if self.hdfs: files = self.hdfs.ls(self.hdfs_home + '/.knitDeps/', True) return [f for f in files if f['name'].endswith('.zip')] else: raise ImportError('Set the `hdfs` attribute to be able to list' 'environments.')
[docs] def check_needs_upload(self, path): """Upload is needed if file does not exist in HDFS or is older""" if self.upload_always: return True fn = '/'.join([self.hdfs_home, '.knitDeps', os.path.basename(path)]) if self.hdfs and self.hdfs.exists(fn): st = os.stat(path) size = st.st_size t = st.st_mtime info = self.hdfs.info(fn) if info['size'] == size and t < info['last_mod']: return False else: return True else: return True
@classmethod def _cleanup(cls): # called on program exit to destroy lingering connections/apps for instance in cls._instances: instance.kill()
atexit.register(Knit._cleanup)